From 668f96e6bfd2813e0ed1d386981a0bbae1e77c47 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 19 Nov 2023 12:29:15 -0500 Subject: [PATCH] use local ctxes --- flow/connectors/eventhub/eventhub.go | 13 +++++++++++-- flow/connectors/eventhub/hubmanager.go | 5 ++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 0d144737f4..47783fab72 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -55,8 +55,12 @@ func NewEventHubConnector( func (c *EventHubConnector) Close() error { var allErrors error + // create a subcontext with a 1 minute timeout. + ctx, cancel := context.WithTimeout(c.ctx, 1*time.Minute) + defer cancel() + // close all the eventhub clients. - err := c.hubManager.Close(context.Background()) + err := c.hubManager.Close(ctx) if err != nil { log.Errorf("failed to close eventhub clients: %v", err) allErrors = errors.Join(allErrors, err) @@ -129,7 +133,6 @@ func (c *EventHubConnector) processBatch( batch *model.CDCRecordStream, maxParallelism int64, ) (uint32, error) { - ctx := context.Background() batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) @@ -152,6 +155,9 @@ func (c *EventHubConnector) processBatch( return 0, err } + ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second) + defer cancel() + err = batchPerTopic.AddEvent(ctx, topicName, json) if err != nil { log.WithFields(log.Fields{ @@ -171,6 +177,9 @@ func (c *EventHubConnector) processBatch( "flowName": flowJobName, }).Infof("processed %d records for sending", numRecords) + ctx, cancel := context.WithTimeout(c.ctx, 1*time.Minute) + defer cancel() + flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName) if flushErr != nil { return 0, flushErr diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index d0ed29f804..a8f5bd0a16 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -41,7 +41,7 @@ func NewEventHubManager( } } -func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( +func (m *EventHubManager) GetOrCreateHubClient(parCtx context.Context, name ScopedEventhub) ( *azeventhubs.ProducerClient, error) { ehConfig, ok := m.peerConfig.Get(name.PeerName) if !ok { @@ -55,6 +55,9 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace) } + ctx, cancel := context.WithTimeout(parCtx, 30*time.Second) + defer cancel() + var hubConnectOK bool var hub any hub, hubConnectOK = m.hubs.Load(name)