Skip to content

Commit

Permalink
better context errors
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 19, 2023
1 parent 668f96e commit d38e27a
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,21 @@ func (c *EventHubConnector) Close() error {

// create a subcontext with a 1 minute timeout.
ctx, cancel := context.WithTimeout(c.ctx, 1*time.Minute)
defer cancel()

// close all the eventhub clients.
err := c.hubManager.Close(ctx)
cancel()

if err != nil {
log.Errorf("failed to close eventhub clients: %v", err)
allErrors = errors.Join(allErrors, err)
}

if ctx.Err() != nil {
log.Errorf("failed to close eventhub clients: %v", ctx.Err())
allErrors = errors.Join(allErrors, ctx.Err())
}

// close the postgres metadata store.
err = c.pgMetadata.Close()
if err != nil {
Expand Down Expand Up @@ -156,16 +162,23 @@ func (c *EventHubConnector) processBatch(
}

ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
defer cancel()

err = batchPerTopic.AddEvent(ctx, topicName, json)
cancel()

if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to add event to batch: %v", err)
return 0, err
}

if ctx.Err() != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to add event to batch: %v", err)
return 0, ctx.Err()
}

if numRecords%1000 == 0 {
log.WithFields(log.Fields{
"flowName": flowJobName,
Expand All @@ -178,12 +191,18 @@ func (c *EventHubConnector) processBatch(
}).Infof("processed %d records for sending", numRecords)

ctx, cancel := context.WithTimeout(c.ctx, 1*time.Minute)
defer cancel()

flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
cancel()

if flushErr != nil {
return 0, flushErr
}

if ctx.Err() != nil {
return 0, ctx.Err()
}

batchPerTopic.Clear()

log.WithFields(log.Fields{
Expand Down

0 comments on commit d38e27a

Please sign in to comment.