From ed3bd4b1fb79a1df3e324ff5c0b84c4f46304493 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 11 Dec 2023 11:53:35 -0500 Subject: [PATCH] producer client closing --- flow/connectors/eventhub/eventhub.go | 6 ++++++ flow/connectors/eventhub/hubmanager.go | 29 ++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 896b7fd9a7..259fe9b103 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -62,6 +62,12 @@ func (c *EventHubConnector) Close() error { allErrors = errors.Join(allErrors, err) } + err = c.hubManager.Close(context.Background()) + if err != nil { + log.Errorf("failed to close event hub manager: %v", err) + allErrors = errors.Join(allErrors, err) + } + return allErrors } diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index a30f94d162..f56c7217ab 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -2,6 +2,7 @@ package conneventhub import ( "context" + "errors" "fmt" "strings" "sync" @@ -62,6 +63,10 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE _, err := hubTmp.GetEventHubProperties(ctx, nil) if err != nil { log.Infof("eventhub %s not reachable. Will re-establish connection and re-create it. Err: %v", name, err) + closeError := m.closeProducerClient(ctx, hubTmp) + if closeError != nil { + log.Errorf("failed to close producer client: %v", closeError) + } m.hubs.Delete(name) hubConnectOK = false } @@ -86,6 +91,30 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE return hub.(*azeventhubs.ProducerClient), nil } +func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhubs.ProducerClient) error { + if pc != nil { + return pc.Close(ctx) + } + return nil +} + +func (m *EventHubManager) Close(ctx context.Context) error { + var allErrors error + + m.hubs.Range(func(key any, value any) bool { + name := key.(ScopedEventhub) + hub := value.(*azeventhubs.ProducerClient) + err := m.closeProducerClient(ctx, hub) + if err != nil { + log.Errorf("failed to close eventhub client for %s: %v", name, err) + allErrors = errors.Join(allErrors, err) + } + return true + }) + + return allErrors +} + func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) ( *azeventhubs.EventDataBatch, error) { hub, err := m.GetOrCreateHubClient(ctx, name)