From d38e27a7173dd351bac5885ad00ee9940e4cb355 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 19 Nov 2023 12:56:56 -0500 Subject: [PATCH] better context errors --- flow/connectors/eventhub/eventhub.go | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 47783fab72..e67618a57b 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -57,15 +57,21 @@ func (c *EventHubConnector) Close() error { // create a subcontext with a 1 minute timeout. ctx, cancel := context.WithTimeout(c.ctx, 1*time.Minute) - defer cancel() // close all the eventhub clients. err := c.hubManager.Close(ctx) + cancel() + if err != nil { log.Errorf("failed to close eventhub clients: %v", err) allErrors = errors.Join(allErrors, err) } + if ctx.Err() != nil { + log.Errorf("failed to close eventhub clients: %v", ctx.Err()) + allErrors = errors.Join(allErrors, ctx.Err()) + } + // close the postgres metadata store. err = c.pgMetadata.Close() if err != nil { @@ -156,9 +162,9 @@ func (c *EventHubConnector) processBatch( } ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second) - defer cancel() - err = batchPerTopic.AddEvent(ctx, topicName, json) + cancel() + if err != nil { log.WithFields(log.Fields{ "flowName": flowJobName, @@ -166,6 +172,13 @@ func (c *EventHubConnector) processBatch( return 0, err } + if ctx.Err() != nil { + log.WithFields(log.Fields{ + "flowName": flowJobName, + }).Infof("failed to add event to batch: %v", err) + return 0, ctx.Err() + } + if numRecords%1000 == 0 { log.WithFields(log.Fields{ "flowName": flowJobName, @@ -178,12 +191,18 @@ func (c *EventHubConnector) processBatch( }).Infof("processed %d records for sending", numRecords) ctx, cancel := context.WithTimeout(c.ctx, 1*time.Minute) - defer cancel() flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName) + cancel() + if flushErr != nil { return 0, flushErr } + + if ctx.Err() != nil { + return 0, ctx.Err() + } + batchPerTopic.Clear() log.WithFields(log.Fields{