From 1c71d71bc03af676630f1e242761ba167fc7b8d7 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 25 Jan 2024 07:50:51 -0500 Subject: [PATCH 1/3] handle context cancellation eventhub --- flow/activities/flowable.go | 5 +++++ flow/connectors/eventhub/eventhub.go | 15 ++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 34ce7eb70e..1414cdeff1 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -318,9 +318,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n", numRecords, int(syncDuration.Seconds())), ) + activity.RecordHeartbeat(ctx, fmt.Sprintf("pushed %d records", numRecords)) lastCheckpoint, err := recordBatch.GetLastCheckpoint() if err != nil { + slog.ErrorContext(ctx, "failed to get last checkpoint", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } @@ -334,6 +336,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pglogrepl.LSN(lastCheckpoint), ) if err != nil { + slog.ErrorContext(ctx, "failed to update num rows and end lsn for cdc batch", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } @@ -345,6 +348,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pglogrepl.LSN(lastCheckpoint), ) if err != nil { + slog.ErrorContext(ctx, "failed to update latest lsn at target for cdc flow", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } @@ -356,6 +360,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } } if err != nil { + slog.ErrorContext(ctx, "failed to update latest lsn at target for cdc flow", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 7a7ce63ba6..a7e3c330a5 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -119,7 +119,6 @@ func (c *EventHubConnector) processBatch( flowJobName string, batch *model.CDCRecordStream, ) (uint32, error) { - ctx := context.Background() batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) @@ -145,7 +144,7 @@ func (c *EventHubConnector) processBatch( case record, ok := <-batch.GetRecords(): if !ok { c.logger.Info("flushing batches because no more records") - err := batchPerTopic.flushAllBatches(ctx, flowJobName) + err := batchPerTopic.flushAllBatches(c.ctx, flowJobName) if err != nil { return 0, err } @@ -175,7 +174,7 @@ func (c *EventHubConnector) processBatch( return 0, err } - numPartitions, err := c.hubManager.GetNumPartitions(ctx, destination) + numPartitions, err := c.hubManager.GetNumPartitions(c.ctx, destination) if err != nil { c.logger.Error("failed to get number of partitions", slog.Any("error", err)) return 0, err @@ -194,7 +193,7 @@ func (c *EventHubConnector) processBatch( } partitionKey = utils.HashedPartitionKey(partitionKey, uint32(numPartitions)) destination.SetPartitionValue(partitionKey) - err = batchPerTopic.AddEvent(ctx, destination, json, false) + err = batchPerTopic.AddEvent(c.ctx, destination, json, false) if err != nil { c.logger.Error("failed to add event to batch", slog.Any("error", err)) return 0, err @@ -205,8 +204,13 @@ func (c *EventHubConnector) processBatch( c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords))) } + case <-c.ctx.Done(): + c.logger.Error("context cancelled", slog.Any("error", c.ctx.Err())) + return 0, fmt.Errorf("[eventhub] context cancelled %v", c.ctx.Err()) + case <-ticker.C: - err := batchPerTopic.flushAllBatches(ctx, flowJobName) + c.logger.Info("flushing batches because of timeout") + err := batchPerTopic.flushAllBatches(c.ctx, flowJobName) if err != nil { return 0, err } @@ -245,6 +249,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S c.logger.Error("failed to update last offset", slog.Any("error", err)) return nil, err } + err = c.pgMetadata.IncrementID(req.FlowJobName) if err != nil { c.logger.Error("failed to increment id", slog.Any("error", err)) From ca68b64e781bdfec11ea59c255b06f4b18f0db22 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 25 Jan 2024 08:07:31 -0500 Subject: [PATCH 2/3] Revert "tickers: use Reset over Stop/NewTicker (#985)" This reverts commit 5b0c8b3b712ef8f561afb470acddab24769f8598. --- flow/activities/flowable.go | 3 ++- flow/activities/slot.go | 3 ++- flow/connectors/eventhub/eventhub.go | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 1414cdeff1..7c1b7e4656 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -784,7 +784,8 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { slog.InfoContext(ctx, fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name)) } } - ticker.Reset(sendTimeout) + ticker.Stop() + ticker = time.NewTicker(sendTimeout) } } diff --git a/flow/activities/slot.go b/flow/activities/slot.go index 8e8bb9aea0..4a846e77c7 100644 --- a/flow/activities/slot.go +++ b/flow/activities/slot.go @@ -89,6 +89,7 @@ func (a *FlowableActivity) recordSlotSizePeriodically( case <-ctx.Done(): return } - ticker.Reset(timeout) + ticker.Stop() + ticker = time.NewTicker(timeout) } } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index a7e3c330a5..0948eef6ac 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -224,7 +224,8 @@ func (c *EventHubConnector) processBatch( } } - ticker.Reset(eventHubFlushTimeout) + ticker.Stop() + ticker = time.NewTicker(eventHubFlushTimeout) } } } From b919a1a3caffa7f71396b7ce5367443760471159 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 25 Jan 2024 08:40:24 -0500 Subject: [PATCH 3/3] fix closing hubs --- flow/activities/flowable.go | 6 ++---- flow/connectors/eventhub/eventhub.go | 2 +- flow/connectors/eventhub/hubmanager.go | 8 ++++++++ 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 7c1b7e4656..91e1f74ba9 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -315,9 +315,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, numRecords := res.NumRecordsSynced syncDuration := time.Since(syncStartTime) - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n", - numRecords, int(syncDuration.Seconds())), - ) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds()))) activity.RecordHeartbeat(ctx, fmt.Sprintf("pushed %d records", numRecords)) lastCheckpoint, err := recordBatch.GetLastCheckpoint() @@ -632,7 +630,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return err } - slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced)) + slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced)) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 0948eef6ac..48faa76466 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -69,7 +69,7 @@ func (c *EventHubConnector) Close() error { allErrors = errors.Join(allErrors, err) } - err = c.hubManager.Close(context.Background()) + err = c.hubManager.Close(c.ctx) if err != nil { c.logger.Error("failed to close event hub manager", slog.Any("error", err)) allErrors = errors.Join(allErrors, err) diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 48a518b3d3..2b50239607 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -7,6 +7,7 @@ import ( "log/slog" "strings" "sync" + "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" @@ -126,6 +127,12 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu } func (m *EventHubManager) Close(ctx context.Context) error { + numHubsClosed := atomic.Uint32{} + shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string { + return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load()) + }) + defer shutdown() + var allErrors error m.hubs.Range(func(key any, value any) bool { @@ -136,6 +143,7 @@ func (m *EventHubManager) Close(ctx context.Context) error { slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err)) allErrors = errors.Join(allErrors, err) } + numHubsClosed.Add(1) return true })