diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 48e1544f9f..35dfe16def 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -142,6 +142,7 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) maxParallelism = 10 } + ctx := context.Background() batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) @@ -155,7 +156,8 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) } flushBatch := func() error { - err := c.sendEventBatch(batchPerTopic, maxParallelism, + err := c.sendEventBatch( + ctx, batchPerTopic, maxParallelism, req.FlowJobName, tableNameRowsMapping) if err != nil { log.WithFields(log.Fields{ @@ -175,7 +177,7 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) return err } - err = batchPerTopic.AddEvent(context.Background(), topicName, json) + err = batchPerTopic.AddEvent(ctx, topicName, json) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, @@ -197,7 +199,8 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest) // send the remaining events. if batchPerTopic.Len() > 0 { - err := c.sendEventBatch(batchPerTopic, maxParallelism, + err := c.sendEventBatch( + ctx, batchPerTopic, maxParallelism, req.FlowJobName, tableNameRowsMapping) if err != nil { return err @@ -291,6 +294,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } func (c *EventHubConnector) sendEventBatch( + ctx context.Context, events *HubBatches, maxParallelism int64, flowName string, @@ -319,7 +323,7 @@ func (c *EventHubConnector) sendEventBatch( }() numEvents := eventBatch.NumEvents() - err := c.sendBatch(tblName, eventBatch) + err := c.sendBatch(ctx, tblName, eventBatch) if err != nil { once.Do(func() { firstErr = err }) return @@ -349,8 +353,8 @@ func (c *EventHubConnector) sendEventBatch( return nil } -func (c *EventHubConnector) sendBatch(tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error { - subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute) +func (c *EventHubConnector) sendBatch(ctx context.Context, tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error { + subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() hub, err := c.hubManager.GetOrCreateHubClient(tblName)