diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 2b50239607..51824f6c7c 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -66,12 +66,12 @@ func (m *EventHubManager) GetNumPartitions(ctx context.Context, name ScopedEvent return numPartitions, nil } -func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( +func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, scopedHub ScopedEventhub) ( *azeventhubs.ProducerClient, error, ) { - ehConfig, ok := m.peerConfig.Get(name.PeerName) + ehConfig, ok := m.peerConfig.Get(scopedHub.PeerName) if !ok { - return nil, fmt.Errorf("eventhub '%s' not configured", name.Eventhub) + return nil, fmt.Errorf("eventhub '%s' not configured", scopedHub.Eventhub) } namespace := ehConfig.Namespace @@ -81,21 +81,22 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace) } + hubName := scopedHub.Eventhub var hubConnectOK bool var hub any - hub, hubConnectOK = m.hubs.Load(name) + hub, hubConnectOK = m.hubs.Load(hubName) if hubConnectOK { hubTmp := hub.(*azeventhubs.ProducerClient) _, err := hubTmp.GetEventHubProperties(ctx, nil) if err != nil { - slog.Info(fmt.Sprintf("eventhub %s", name)+ + slog.Info(fmt.Sprintf("eventhub %s", hubName)+ "not reachable. Will re-establish connection and re-create it.", slog.Any("error", err)) closeError := m.closeProducerClient(ctx, hubTmp) if closeError != nil { slog.Error("failed to close producer client", slog.Any("error", closeError)) } - m.hubs.Delete(name) + m.hubs.Delete(hubName) hubConnectOK = false } } @@ -108,11 +109,11 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE MaxRetryDelay: 16 * time.Second, }, } - hub, err := azeventhubs.NewProducerClient(namespace, name.Eventhub, m.creds, opts) + hub, err := azeventhubs.NewProducerClient(namespace, hubName, m.creds, opts) if err != nil { return nil, fmt.Errorf("failed to create eventhub client: %v", err) } - m.hubs.Store(name, hub) + m.hubs.Store(hubName, hub) return hub, nil } @@ -136,11 +137,10 @@ func (m *EventHubManager) Close(ctx context.Context) error { var allErrors error m.hubs.Range(func(key any, value any) bool { - name := key.(ScopedEventhub) hub := value.(*azeventhubs.ProducerClient) err := m.closeProducerClient(ctx, hub) if err != nil { - slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err)) + slog.Error(fmt.Sprintf("failed to close eventhub client for %v", key), slog.Any("error", err)) allErrors = errors.Join(allErrors, err) } numHubsClosed.Add(1)