Skip to content

Commit

Permalink
producer client closing (#794)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Dec 11, 2023
1 parent 1f53658 commit a1ac3e8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
6 changes: 6 additions & 0 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func (c *EventHubConnector) Close() error {
allErrors = errors.Join(allErrors, err)
}

err = c.hubManager.Close(context.Background())
if err != nil {
log.Errorf("failed to close event hub manager: %v", err)
allErrors = errors.Join(allErrors, err)
}

return allErrors
}

Expand Down
29 changes: 29 additions & 0 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package conneventhub

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -62,6 +63,10 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
_, err := hubTmp.GetEventHubProperties(ctx, nil)
if err != nil {
log.Infof("eventhub %s not reachable. Will re-establish connection and re-create it. Err: %v", name, err)
closeError := m.closeProducerClient(ctx, hubTmp)
if closeError != nil {
log.Errorf("failed to close producer client: %v", closeError)
}
m.hubs.Delete(name)
hubConnectOK = false
}
Expand All @@ -86,6 +91,30 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
return hub.(*azeventhubs.ProducerClient), nil
}

func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhubs.ProducerClient) error {
if pc != nil {
return pc.Close(ctx)
}
return nil
}

func (m *EventHubManager) Close(ctx context.Context) error {
var allErrors error

m.hubs.Range(func(key any, value any) bool {
name := key.(ScopedEventhub)
hub := value.(*azeventhubs.ProducerClient)
err := m.closeProducerClient(ctx, hub)
if err != nil {
log.Errorf("failed to close eventhub client for %s: %v", name, err)
allErrors = errors.Join(allErrors, err)
}
return true
})

return allErrors
}

func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (
*azeventhubs.EventDataBatch, error) {
hub, err := m.GetOrCreateHubClient(ctx, name)
Expand Down

0 comments on commit a1ac3e8

Please sign in to comment.