diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 34ce7eb70e..91e1f74ba9 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -315,12 +315,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, numRecords := res.NumRecordsSynced syncDuration := time.Since(syncStartTime) - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n", - numRecords, int(syncDuration.Seconds())), - ) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds()))) + activity.RecordHeartbeat(ctx, fmt.Sprintf("pushed %d records", numRecords)) lastCheckpoint, err := recordBatch.GetLastCheckpoint() if err != nil { + slog.ErrorContext(ctx, "failed to get last checkpoint", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } @@ -334,6 +334,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pglogrepl.LSN(lastCheckpoint), ) if err != nil { + slog.ErrorContext(ctx, "failed to update num rows and end lsn for cdc batch", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } @@ -345,6 +346,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pglogrepl.LSN(lastCheckpoint), ) if err != nil { + slog.ErrorContext(ctx, "failed to update latest lsn at target for cdc flow", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } @@ -356,6 +358,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } } if err != nil { + slog.ErrorContext(ctx, "failed to update latest lsn at target for cdc flow", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } @@ -627,7 +630,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return err } - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced)) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced)) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) @@ -779,7 +782,8 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { slog.InfoContext(ctx, fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name)) } } - ticker.Reset(sendTimeout) + ticker.Stop() + ticker = time.NewTicker(sendTimeout) } } diff --git a/flow/activities/slot.go b/flow/activities/slot.go index 8e8bb9aea0..4a846e77c7 100644 --- a/flow/activities/slot.go +++ b/flow/activities/slot.go @@ -89,6 +89,7 @@ func (a *FlowableActivity) recordSlotSizePeriodically( case <-ctx.Done(): return } - ticker.Reset(timeout) + ticker.Stop() + ticker = time.NewTicker(timeout) } } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 7a7ce63ba6..48faa76466 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -69,7 +69,7 @@ func (c *EventHubConnector) Close() error { allErrors = errors.Join(allErrors, err) } - err = c.hubManager.Close(context.Background()) + err = c.hubManager.Close(c.ctx) if err != nil { c.logger.Error("failed to close event hub manager", slog.Any("error", err)) allErrors = errors.Join(allErrors, err) @@ -119,7 +119,6 @@ func (c *EventHubConnector) processBatch( flowJobName string, batch *model.CDCRecordStream, ) (uint32, error) { - ctx := context.Background() batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) @@ -145,7 +144,7 @@ func (c *EventHubConnector) processBatch( case record, ok := <-batch.GetRecords(): if !ok { c.logger.Info("flushing batches because no more records") - err := batchPerTopic.flushAllBatches(ctx, flowJobName) + err := batchPerTopic.flushAllBatches(c.ctx, flowJobName) if err != nil { return 0, err } @@ -175,7 +174,7 @@ func (c *EventHubConnector) processBatch( return 0, err } - numPartitions, err := c.hubManager.GetNumPartitions(ctx, destination) + numPartitions, err := c.hubManager.GetNumPartitions(c.ctx, destination) if err != nil { c.logger.Error("failed to get number of partitions", slog.Any("error", err)) return 0, err @@ -194,7 +193,7 @@ func (c *EventHubConnector) processBatch( } partitionKey = utils.HashedPartitionKey(partitionKey, uint32(numPartitions)) destination.SetPartitionValue(partitionKey) - err = batchPerTopic.AddEvent(ctx, destination, json, false) + err = batchPerTopic.AddEvent(c.ctx, destination, json, false) if err != nil { c.logger.Error("failed to add event to batch", slog.Any("error", err)) return 0, err @@ -205,8 +204,13 @@ func (c *EventHubConnector) processBatch( c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords))) } + case <-c.ctx.Done(): + c.logger.Error("context cancelled", slog.Any("error", c.ctx.Err())) + return 0, fmt.Errorf("[eventhub] context cancelled %v", c.ctx.Err()) + case <-ticker.C: - err := batchPerTopic.flushAllBatches(ctx, flowJobName) + c.logger.Info("flushing batches because of timeout") + err := batchPerTopic.flushAllBatches(c.ctx, flowJobName) if err != nil { return 0, err } @@ -220,7 +224,8 @@ func (c *EventHubConnector) processBatch( } } - ticker.Reset(eventHubFlushTimeout) + ticker.Stop() + ticker = time.NewTicker(eventHubFlushTimeout) } } } @@ -245,6 +250,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S c.logger.Error("failed to update last offset", slog.Any("error", err)) return nil, err } + err = c.pgMetadata.IncrementID(req.FlowJobName) if err != nil { c.logger.Error("failed to increment id", slog.Any("error", err)) diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 48a518b3d3..2b50239607 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -7,6 +7,7 @@ import ( "log/slog" "strings" "sync" + "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" @@ -126,6 +127,12 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu } func (m *EventHubManager) Close(ctx context.Context) error { + numHubsClosed := atomic.Uint32{} + shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string { + return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load()) + }) + defer shutdown() + var allErrors error m.hubs.Range(func(key any, value any) bool { @@ -136,6 +143,7 @@ func (m *EventHubManager) Close(ctx context.Context) error { slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err)) allErrors = errors.Join(allErrors, err) } + numHubsClosed.Add(1) return true })