diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 1a6c026fff..2eaf9f1d3f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -315,9 +315,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, numRecords := res.NumRecordsSynced syncDuration := time.Since(syncStartTime) - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n", - numRecords, int(syncDuration.Seconds())), - ) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds()))) lastCheckpoint, err := recordBatch.GetLastCheckpoint() if err != nil { @@ -426,7 +424,7 @@ func (a *FlowableActivity) StartNormalize( // log the number of batches normalized if res != nil { - slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d\n", + slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d", res.StartBatchID, res.EndBatchID)) } @@ -505,11 +503,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, numPartitions := len(partitions.Partitions) - slog.InfoContext(ctx, fmt.Sprintf("replicating partitions for batch %d - size: %d\n", + slog.InfoContext(ctx, fmt.Sprintf("replicating partitions for batch %d - size: %d", partitions.BatchId, numPartitions), ) for i, p := range partitions.Partitions { - slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId)) + slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s", partitions.BatchId, p.PartitionId)) err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) @@ -551,7 +549,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } defer connectors.CloseConnector(dstConn) - slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s\n", partition.PartitionId)) + slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s", partition.PartitionId)) shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) }) @@ -590,7 +588,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to pull qrep records: %w", err) } - slog.InfoContext(ctx, fmt.Sprintf("pulled %d records\n", len(recordBatch.Records))) + slog.InfoContext(ctx, fmt.Sprintf("pulled %d records", len(recordBatch.Records))) err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, int64(len(recordBatch.Records))) if err != nil { @@ -611,7 +609,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } if rowsSynced == 0 { - slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId)) + slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s", partition.PartitionId)) pullCancel() } else { wg.Wait() @@ -625,7 +623,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return err } - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced)) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced)) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) @@ -732,6 +730,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { sendTimeout := 10 * time.Minute ticker := time.NewTicker(sendTimeout) defer ticker.Stop() + activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes") for { select { @@ -798,7 +797,7 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, } defer connectors.CloseConnector(srcConn) pgSrcConn := srcConn.(*connpostgres.PostgresConnector) - slog.InfoContext(ctx, fmt.Sprintf("current last partition value is %v\n", last)) + slog.InfoContext(ctx, fmt.Sprintf("current last partition value is %v", last)) attemptCount := 1 for { activity.RecordHeartbeat(ctx, fmt.Sprintf("no new rows yet, attempt #%d", attemptCount)) @@ -914,7 +913,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, } defer connectors.CloseConnector(dstConn) - slog.InfoContext(ctx, "replicating xmin\n") + slog.InfoContext(ctx, "replicating xmin") bufferSize := shared.FetchAndChannelSize errGroup, errCtx := errgroup.WithContext(ctx) @@ -980,7 +979,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, } if rowsSynced == 0 { - slog.InfoContext(ctx, "no records to push for xmin\n") + slog.InfoContext(ctx, "no records to push for xmin") } else { err := errGroup.Wait() if err != nil { @@ -993,7 +992,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, return 0, err } - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced)) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced)) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) diff --git a/flow/activities/slot.go b/flow/activities/slot.go index 8e8bb9aea0..b748e67e45 100644 --- a/flow/activities/slot.go +++ b/flow/activities/slot.go @@ -77,8 +77,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically( timeout := 5 * time.Minute ticker := time.NewTicker(timeout) - defer ticker.Stop() + for { select { case <-ticker.C: diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index f14ca4ac1c..66a9131676 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -202,6 +202,9 @@ func (c *EventHubConnector) processBatch( c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords))) } + case <-c.ctx.Done(): + return 0, fmt.Errorf("[eventhub] context cancelled %w", c.ctx.Err()) + case <-ticker.C: err := batchPerTopic.flushAllBatches(ctx, flowJobName) if err != nil { @@ -213,7 +216,7 @@ func (c *EventHubConnector) processBatch( lastUpdatedOffset = lastSeenLSN c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN)) if err != nil { - return 0, fmt.Errorf("failed to update last offset: %v", err) + return 0, fmt.Errorf("failed to update last offset: %w", err) } } diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 7f6d206728..0caa6106fa 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -7,6 +7,7 @@ import ( "log/slog" "strings" "sync" + "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" @@ -103,8 +104,13 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu } func (m *EventHubManager) Close(ctx context.Context) error { - var allErrors error + numHubsClosed := atomic.Uint32{} + shutdown := utils.HeartbeatRoutine(ctx, func() string { + return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load()) + }) + defer shutdown() + var allErrors error m.hubs.Range(func(key any, value any) bool { name := key.(ScopedEventhub) hub := value.(*azeventhubs.ProducerClient) @@ -113,6 +119,7 @@ func (m *EventHubManager) Close(ctx context.Context) error { slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err)) allErrors = errors.Join(allErrors, err) } + numHubsClosed.Add(1) return true })