Skip to content

Commit

Permalink
fix ctx 2
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 15, 2023
1 parent 7cd2525 commit 10fed29
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest)
maxParallelism = 10
}

ctx := context.Background()
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

Expand All @@ -155,7 +156,8 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest)
}

flushBatch := func() error {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
err := c.sendEventBatch(
ctx, batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
Expand All @@ -175,7 +177,7 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest)
return err
}

err = batchPerTopic.AddEvent(context.Background(), topicName, json)
err = batchPerTopic.AddEvent(ctx, topicName, json)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
Expand All @@ -197,7 +199,8 @@ func (c *EventHubConnector) syncRecordBatchAsync(req *model.SyncRecordsRequest)

// send the remaining events.
if batchPerTopic.Len() > 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
err := c.sendEventBatch(
ctx, batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
if err != nil {
return err
Expand Down Expand Up @@ -291,6 +294,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

func (c *EventHubConnector) sendEventBatch(
ctx context.Context,
events *HubBatches,
maxParallelism int64,
flowName string,
Expand Down Expand Up @@ -319,7 +323,7 @@ func (c *EventHubConnector) sendEventBatch(
}()

numEvents := eventBatch.NumEvents()
err := c.sendBatch(tblName, eventBatch)
err := c.sendBatch(ctx, tblName, eventBatch)
if err != nil {
once.Do(func() { firstErr = err })
return
Expand Down Expand Up @@ -349,8 +353,8 @@ func (c *EventHubConnector) sendEventBatch(
return nil
}

func (c *EventHubConnector) sendBatch(tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error {
subCtx, cancel := context.WithTimeout(c.ctx, 5*time.Minute)
func (c *EventHubConnector) sendBatch(ctx context.Context, tblName ScopedEventhub, events *azeventhubs.EventDataBatch) error {
subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

hub, err := c.hubManager.GetOrCreateHubClient(tblName)
Expand Down

0 comments on commit 10fed29

Please sign in to comment.