Skip to content

Commit

Permalink
Eventhub template (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
rokatyy authored Feb 22, 2024
1 parent ed3b133 commit d55a74d
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 0 deletions.
117 changes: 117 additions & 0 deletions eventhub/eventhub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright 2023 The Nuclio Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
ctx "context"
"encoding/json"
)

type metric struct {
ID string `json:"id"`
Latitude string `json:"latitude"`
Longitude string `json:"longitude"`
TirePressure float32 `json:"tirePressure"`
FuelEfficiencyPercentage float32 `json:"fuelEfficiencyPercentage"`
Temperature int `json:"temperature"`
WeatherCondition string `json:"weatherCondition"`
}

type alarm struct {
ID string
Type string
}

type weather struct {
Temperature int `json:"temperature"`
WeatherCondition string `json:"weatherCondition"`
}

func SensorHandler(context *nuclio.Context, event nuclio.Event) (interface{}, error) {

// get alarms eventhub
alarmsEventhub := context.DataBinding["alarmsEventhub"].(*amqp.Sender)

// get enriched fleet eventhub
enrichedFleetEventhub := context.DataBinding["enrichedFleetEventhub"].(*amqp.Sender)

// unmarshal the eventhub metric
eventHubMetric := metric{}
if err := json.Unmarshal(event.GetBody(), &eventHubMetric); err != nil {
return nil, err
}

// send alarm if tire pressure < threshold
var MinTirePressureThreshold float32 = 2
if eventHubMetric.TirePressure < MinTirePressureThreshold {
marshaledAlarm, err := json.Marshal(alarm{ID: eventHubMetric.ID, Type: "LowTirePressue"})
if err != nil {
return nil, err
}

// send alarm to event hub
if err := sendToEventHub(context, marshaledAlarm, alarmsEventhub); err != nil {
return nil, err
}
}

// prepare to send to spark via eventhub
// call weather station for little enrichment
temperature, weatherCondtion, err := getWeather(context, eventHubMetric)
if err != nil {
return nil, err
}

context.Logger.DebugWith("Got weather", "temp", temperature, "weather", weatherCondtion)

// assign return values
eventHubMetric.Temperature = temperature
eventHubMetric.WeatherCondition = weatherCondtion

// send to spark
marshaledMetric, err := json.Marshal(eventHubMetric)
if err != nil {
return nil, err
}

if err := sendToEventHub(context, marshaledMetric, enrichedFleetEventhub); err != nil {
return nil, err
}

return nil, nil
}

func sendToEventHub(context *nuclio.Context, data []byte, hub *amqp.Sender) error {

// create an amqp message with the body
message := amqp.Message{
Data: [][]byte{data},
}

// send the metric
if err := hub.Send(ctx.Background(), &message); err != nil {
context.Logger.WarnWith("Failed to send message to eventhub", "err", err)

return err
}

return nil
}

func getWeather(context *nuclio.Context, m metric) (int, string, error) {
return 30, "stormy", nil
}
48 changes: 48 additions & 0 deletions eventhub/function.yaml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
apiVersion: "nuclio.io/v1beta1"
kind: "NuclioFunction"
metadata:
name: eventhub
spec:
description: >
An Azure Event Hub triggered function with a configuration that connects to an Azure Event Hub.
The function reads messages from two partitions, process the messages, invokes another function,
and sends the processed payload to another Azure Event Hub.

runtime: "golang"
handler: main:SensorHandler
minReplicas: 1
maxReplicas: 1
disableDefaultHTTPTrigger: false
build:
functionSourceCode: {{ .SourceCode }}
commands:
- apk --update --no-cache add ca-certificates
triggers:
eventhub:
kind: eventhub
attributes:
sharedAccessKeyName: {{ .SharedAccessKeyName }}
sharedAccessKeyValue: {{ .SharedAccessKeyValue }}
namespace: {{ .Namespace }}
eventHubName: {{ .EventHubName }}
consumerGroup: {{ .ConsumerGroup }}
# Match event hub partition number
partitions:
- 0
- 1
# Output to another event hub
dataBindings:
alarmsEventhub:
class: eventhub
attributes:
sharedAccessKeyName: {{ .SharedAccessKeyName }}
sharedAccessKeyValue: {{ .SharedAccessKeyValue }}
eventHubName: alarms
namespace: {{ .Namespace }}
enrichedFleetEventhub:
class: eventhub
attributes:
sharedAccessKeyName: {{ .SharedAccessKeyName }}
sharedAccessKeyValue: {{ .SharedAccessKeyValue }}
eventHubName: enrichedfleet
namespace: {{ .Namespace }}
28 changes: 28 additions & 0 deletions eventhub/function.yaml.values
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
SharedAccessKeyName:
displayName: Shared Access Key Name
kind: string
description: "Shared Access Key Name"
required: true
attributes:
defaultValue: ""
SharedAccessKeyValue:
displayName: Shared Access Key Value
kind: string
description: "Shared Access Key Value"
required: true
attributes:
defaultValue: ""
Namespace:
displayName: Namespace
kind: string
description: "Eventhub namespace"
required: true
attributes:
defaultValue: ""
ConsumerGroup:
displayName: Consumer Group
kind: string
description: "Consumer Group"
required: true
attributes:
defaultValue: ""

0 comments on commit d55a74d

Please sign in to comment.