Skip to content

Commit

Permalink
grafana metrics for eh
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 13, 2023
1 parent 7e2309c commit 0e0296c
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -125,6 +126,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

batchPerTopic := make(map[string][]*eventhub.Event)
startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSON()
if err != nil {
Expand Down Expand Up @@ -166,17 +168,20 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}
}

rowsSynced := len(batch.Records)
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("[total] successfully sent %d records to event hub", len(batch.Records))
}).Infof("[total] successfully sent %d records to event hub", rowsSynced)

err := c.UpdateLastOffset(req.FlowJobName, batch.LastCheckPointID)
if err != nil {
log.Errorf("failed to update last offset: %v", err)
return nil, err
}

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime))
metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, int64(rowsSynced),
time.Since(startTime), int64(rowsSynced))
return &model.SyncResponse{
FirstSyncedCheckPointID: batch.FirstCheckPointID,
LastSyncedCheckPointID: batch.LastCheckPointID,
Expand Down

0 comments on commit 0e0296c

Please sign in to comment.