Skip to content

Commit

Permalink
multiple partitions same client
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 25, 2024
1 parent 3d97a16 commit bdca21f
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ func (m *EventHubManager) GetNumPartitions(ctx context.Context, name ScopedEvent
return numPartitions, nil
}

func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, scopedHub ScopedEventhub) (
*azeventhubs.ProducerClient, error,
) {
ehConfig, ok := m.peerConfig.Get(name.PeerName)
ehConfig, ok := m.peerConfig.Get(scopedHub.PeerName)
if !ok {
return nil, fmt.Errorf("eventhub '%s' not configured", name.Eventhub)
return nil, fmt.Errorf("eventhub '%s' not configured", scopedHub.Eventhub)
}

namespace := ehConfig.Namespace
Expand All @@ -81,21 +81,22 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace)
}

hubName := scopedHub.Eventhub
var hubConnectOK bool
var hub any
hub, hubConnectOK = m.hubs.Load(name)
hub, hubConnectOK = m.hubs.Load(hubName)
if hubConnectOK {
hubTmp := hub.(*azeventhubs.ProducerClient)
_, err := hubTmp.GetEventHubProperties(ctx, nil)
if err != nil {
slog.Info(fmt.Sprintf("eventhub %s", name)+
slog.Info(fmt.Sprintf("eventhub %s", hubName)+
"not reachable. Will re-establish connection and re-create it.",
slog.Any("error", err))
closeError := m.closeProducerClient(ctx, hubTmp)
if closeError != nil {
slog.Error("failed to close producer client", slog.Any("error", closeError))
}
m.hubs.Delete(name)
m.hubs.Delete(hubName)
hubConnectOK = false
}
}
Expand All @@ -108,11 +109,11 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
MaxRetryDelay: 16 * time.Second,
},
}
hub, err := azeventhubs.NewProducerClient(namespace, name.Eventhub, m.creds, opts)
hub, err := azeventhubs.NewProducerClient(namespace, hubName, m.creds, opts)
if err != nil {
return nil, fmt.Errorf("failed to create eventhub client: %v", err)
}
m.hubs.Store(name, hub)
m.hubs.Store(hubName, hub)
return hub, nil
}

Expand All @@ -136,11 +137,10 @@ func (m *EventHubManager) Close(ctx context.Context) error {
var allErrors error

m.hubs.Range(func(key any, value any) bool {
name := key.(ScopedEventhub)
hub := value.(*azeventhubs.ProducerClient)
err := m.closeProducerClient(ctx, hub)
if err != nil {
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", key), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}
numHubsClosed.Add(1)
Expand Down

0 comments on commit bdca21f

Please sign in to comment.