diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 34ce7eb70e..1414cdeff1 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -318,9 +318,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n", numRecords, int(syncDuration.Seconds())), ) + activity.RecordHeartbeat(ctx, fmt.Sprintf("pushed %d records", numRecords)) lastCheckpoint, err := recordBatch.GetLastCheckpoint() if err != nil { + slog.ErrorContext(ctx, "failed to get last checkpoint", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } @@ -334,6 +336,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pglogrepl.LSN(lastCheckpoint), ) if err != nil { + slog.ErrorContext(ctx, "failed to update num rows and end lsn for cdc batch", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } @@ -345,6 +348,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pglogrepl.LSN(lastCheckpoint), ) if err != nil { + slog.ErrorContext(ctx, "failed to update latest lsn at target for cdc flow", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } @@ -356,6 +360,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } } if err != nil { + slog.ErrorContext(ctx, "failed to update latest lsn at target for cdc flow", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 7a7ce63ba6..a7e3c330a5 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -119,7 +119,6 @@ func (c *EventHubConnector) processBatch( flowJobName string, batch *model.CDCRecordStream, ) (uint32, error) { - ctx := context.Background() batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) @@ -145,7 +144,7 @@ func (c *EventHubConnector) processBatch( case record, ok := <-batch.GetRecords(): if !ok { c.logger.Info("flushing batches because no more records") - err := batchPerTopic.flushAllBatches(ctx, flowJobName) + err := batchPerTopic.flushAllBatches(c.ctx, flowJobName) if err != nil { return 0, err } @@ -175,7 +174,7 @@ func (c *EventHubConnector) processBatch( return 0, err } - numPartitions, err := c.hubManager.GetNumPartitions(ctx, destination) + numPartitions, err := c.hubManager.GetNumPartitions(c.ctx, destination) if err != nil { c.logger.Error("failed to get number of partitions", slog.Any("error", err)) return 0, err @@ -194,7 +193,7 @@ func (c *EventHubConnector) processBatch( } partitionKey = utils.HashedPartitionKey(partitionKey, uint32(numPartitions)) destination.SetPartitionValue(partitionKey) - err = batchPerTopic.AddEvent(ctx, destination, json, false) + err = batchPerTopic.AddEvent(c.ctx, destination, json, false) if err != nil { c.logger.Error("failed to add event to batch", slog.Any("error", err)) return 0, err @@ -205,8 +204,13 @@ func (c *EventHubConnector) processBatch( c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords))) } + case <-c.ctx.Done(): + c.logger.Error("context cancelled", slog.Any("error", c.ctx.Err())) + return 0, fmt.Errorf("[eventhub] context cancelled %v", c.ctx.Err()) + case <-ticker.C: - err := batchPerTopic.flushAllBatches(ctx, flowJobName) + c.logger.Info("flushing batches because of timeout") + err := batchPerTopic.flushAllBatches(c.ctx, flowJobName) if err != nil { return 0, err } @@ -245,6 +249,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S c.logger.Error("failed to update last offset", slog.Any("error", err)) return nil, err } + err = c.pgMetadata.IncrementID(req.FlowJobName) if err != nil { c.logger.Error("failed to increment id", slog.Any("error", err))