Skip to content

Commit

Permalink
add table mapping for metrics in eventhub
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 12, 2023
1 parent d767df0 commit 7e2309c
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
defer func() {
shutdown <- true
}()

tableNameRowsMapping := make(map[string]uint32)
batch := req.Records
eventsPerHeartBeat := 1000
eventsPerBatch := int(req.PushBatchSize)
Expand Down Expand Up @@ -149,7 +149,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

if (i+1)%eventsPerBatch == 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName)
req.FlowJobName, tableNameRowsMapping)
if err != nil {
return nil, err
}
Expand All @@ -161,7 +161,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
// send the remaining events.
if len(batchPerTopic) > 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName)
req.FlowJobName, tableNameRowsMapping)
if err != nil {
return nil, err
}
Expand All @@ -186,7 +186,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
maxParallelism int64,
flowName string) error {
flowName string,
tableNameRowsMapping map[string]uint32) error {
if len(events) == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
Expand Down Expand Up @@ -233,6 +234,7 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
numEventsPushed, tblName)
tableNameRowsMapping[tblName] += uint32(len(eventBatch))
}(tblName, eventBatch)
}

Expand Down

0 comments on commit 7e2309c

Please sign in to comment.