Skip to content

Commit

Permalink
handle context cancellation eventhub
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 25, 2024
1 parent 6b2583c commit 1c71d71
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
5 changes: 5 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n",
numRecords, int(syncDuration.Seconds())),
)
activity.RecordHeartbeat(ctx, fmt.Sprintf("pushed %d records", numRecords))

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
if err != nil {
slog.ErrorContext(ctx, "failed to get last checkpoint", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}
Expand All @@ -334,6 +336,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
pglogrepl.LSN(lastCheckpoint),
)
if err != nil {
slog.ErrorContext(ctx, "failed to update num rows and end lsn for cdc batch", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
Expand All @@ -345,6 +348,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
pglogrepl.LSN(lastCheckpoint),
)
if err != nil {
slog.ErrorContext(ctx, "failed to update latest lsn at target for cdc flow", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
Expand All @@ -356,6 +360,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
}
if err != nil {
slog.ErrorContext(ctx, "failed to update latest lsn at target for cdc flow", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
Expand Down
15 changes: 10 additions & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (c *EventHubConnector) processBatch(
flowJobName string,
batch *model.CDCRecordStream,
) (uint32, error) {
ctx := context.Background()
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

Expand All @@ -145,7 +144,7 @@ func (c *EventHubConnector) processBatch(
case record, ok := <-batch.GetRecords():
if !ok {
c.logger.Info("flushing batches because no more records")
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
err := batchPerTopic.flushAllBatches(c.ctx, flowJobName)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -175,7 +174,7 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

numPartitions, err := c.hubManager.GetNumPartitions(ctx, destination)
numPartitions, err := c.hubManager.GetNumPartitions(c.ctx, destination)
if err != nil {
c.logger.Error("failed to get number of partitions", slog.Any("error", err))
return 0, err
Expand All @@ -194,7 +193,7 @@ func (c *EventHubConnector) processBatch(
}
partitionKey = utils.HashedPartitionKey(partitionKey, uint32(numPartitions))
destination.SetPartitionValue(partitionKey)
err = batchPerTopic.AddEvent(ctx, destination, json, false)
err = batchPerTopic.AddEvent(c.ctx, destination, json, false)
if err != nil {
c.logger.Error("failed to add event to batch", slog.Any("error", err))
return 0, err
Expand All @@ -205,8 +204,13 @@ func (c *EventHubConnector) processBatch(
c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords)))
}

case <-c.ctx.Done():
c.logger.Error("context cancelled", slog.Any("error", c.ctx.Err()))
return 0, fmt.Errorf("[eventhub] context cancelled %v", c.ctx.Err())

case <-ticker.C:
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
c.logger.Info("flushing batches because of timeout")
err := batchPerTopic.flushAllBatches(c.ctx, flowJobName)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -245,6 +249,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
c.logger.Error("failed to update last offset", slog.Any("error", err))
return nil, err
}

err = c.pgMetadata.IncrementID(req.FlowJobName)
if err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
Expand Down

0 comments on commit 1c71d71

Please sign in to comment.