From 7e2309cb4a8d7eb12abbcddb355b6bea37fe1f44 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 12 Sep 2023 18:47:25 +0530 Subject: [PATCH] add table mapping for metrics in eventhub --- flow/connectors/eventhub/eventhub.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 0a9263c6ae..1f85c5cf85 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -112,7 +112,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S defer func() { shutdown <- true }() - + tableNameRowsMapping := make(map[string]uint32) batch := req.Records eventsPerHeartBeat := 1000 eventsPerBatch := int(req.PushBatchSize) @@ -149,7 +149,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if (i+1)%eventsPerBatch == 0 { err := c.sendEventBatch(batchPerTopic, maxParallelism, - req.FlowJobName) + req.FlowJobName, tableNameRowsMapping) if err != nil { return nil, err } @@ -161,7 +161,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S // send the remaining events. if len(batchPerTopic) > 0 { err := c.sendEventBatch(batchPerTopic, maxParallelism, - req.FlowJobName) + req.FlowJobName, tableNameRowsMapping) if err != nil { return nil, err } @@ -186,7 +186,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, maxParallelism int64, - flowName string) error { + flowName string, + tableNameRowsMapping map[string]uint32) error { if len(events) == 0 { log.WithFields(log.Fields{ "flowName": flowName, @@ -233,6 +234,7 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, "flowName": flowName, }).Infof("pushed %d events to event hub: %s", numEventsPushed, tblName) + tableNameRowsMapping[tblName] += uint32(len(eventBatch)) }(tblName, eventBatch) }