Skip to content

Commit

Permalink
handle context cancellation eventhub
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 25, 2024
1 parent 6b2583c commit 7cd26c3
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (c *EventHubConnector) processBatch(
flowJobName string,
batch *model.CDCRecordStream,
) (uint32, error) {
ctx := context.Background()
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

Expand All @@ -145,7 +144,7 @@ func (c *EventHubConnector) processBatch(
case record, ok := <-batch.GetRecords():
if !ok {
c.logger.Info("flushing batches because no more records")
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
err := batchPerTopic.flushAllBatches(c.ctx, flowJobName)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -175,7 +174,7 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

numPartitions, err := c.hubManager.GetNumPartitions(ctx, destination)
numPartitions, err := c.hubManager.GetNumPartitions(c.ctx, destination)
if err != nil {
c.logger.Error("failed to get number of partitions", slog.Any("error", err))
return 0, err
Expand All @@ -194,7 +193,7 @@ func (c *EventHubConnector) processBatch(
}
partitionKey = utils.HashedPartitionKey(partitionKey, uint32(numPartitions))
destination.SetPartitionValue(partitionKey)
err = batchPerTopic.AddEvent(ctx, destination, json, false)
err = batchPerTopic.AddEvent(c.ctx, destination, json, false)
if err != nil {
c.logger.Error("failed to add event to batch", slog.Any("error", err))
return 0, err
Expand All @@ -205,8 +204,12 @@ func (c *EventHubConnector) processBatch(
c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords)))
}

case <-c.ctx.Done():
return 0, fmt.Errorf("[eventhub] context cancelled %v", c.ctx.Err())

case <-ticker.C:
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
c.logger.Info("flushing batches because of timeout")
err := batchPerTopic.flushAllBatches(c.ctx, flowJobName)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -245,6 +248,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
c.logger.Error("failed to update last offset", slog.Any("error", err))
return nil, err
}

err = c.pgMetadata.IncrementID(req.FlowJobName)
if err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
Expand Down

0 comments on commit 7cd26c3

Please sign in to comment.