Skip to content

Commit

Permalink
use local ctxes
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 19, 2023
1 parent 36969a9 commit 668f96e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
13 changes: 11 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ func NewEventHubConnector(
func (c *EventHubConnector) Close() error {
var allErrors error

// create a subcontext with a 1 minute timeout.
ctx, cancel := context.WithTimeout(c.ctx, 1*time.Minute)
defer cancel()

// close all the eventhub clients.
err := c.hubManager.Close(context.Background())
err := c.hubManager.Close(ctx)
if err != nil {
log.Errorf("failed to close eventhub clients: %v", err)
allErrors = errors.Join(allErrors, err)
Expand Down Expand Up @@ -129,7 +133,6 @@ func (c *EventHubConnector) processBatch(
batch *model.CDCRecordStream,
maxParallelism int64,
) (uint32, error) {
ctx := context.Background()
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

Expand All @@ -152,6 +155,9 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
defer cancel()

err = batchPerTopic.AddEvent(ctx, topicName, json)
if err != nil {
log.WithFields(log.Fields{
Expand All @@ -171,6 +177,9 @@ func (c *EventHubConnector) processBatch(
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)

ctx, cancel := context.WithTimeout(c.ctx, 1*time.Minute)
defer cancel()

flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
if flushErr != nil {
return 0, flushErr
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewEventHubManager(
}
}

func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
func (m *EventHubManager) GetOrCreateHubClient(parCtx context.Context, name ScopedEventhub) (
*azeventhubs.ProducerClient, error) {
ehConfig, ok := m.peerConfig.Get(name.PeerName)
if !ok {
Expand All @@ -55,6 +55,9 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace)
}

ctx, cancel := context.WithTimeout(parCtx, 30*time.Second)
defer cancel()

var hubConnectOK bool
var hub any
hub, hubConnectOK = m.hubs.Load(name)
Expand Down

0 comments on commit 668f96e

Please sign in to comment.