diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index b0b248a488..7a7ce63ba6 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -175,13 +175,12 @@ func (c *EventHubConnector) processBatch( return 0, err } - ehConfig, ok := c.hubManager.peerConfig.Get(destination.PeerName) - if !ok { - c.logger.Error("failed to get eventhub config", slog.Any("error", err)) + numPartitions, err := c.hubManager.GetNumPartitions(ctx, destination) + if err != nil { + c.logger.Error("failed to get number of partitions", slog.Any("error", err)) return 0, err } - numPartitions := ehConfig.PartitionCount // Scoped eventhub is of the form peer_name.eventhub_name.partition_column // partition_column is the column in the table that is used to determine // the partition key for the eventhub. @@ -193,7 +192,7 @@ func (c *EventHubConnector) processBatch( } else { partitionKey = fmt.Sprintf("%v", partitionValue) } - partitionKey = utils.HashedPartitionKey(partitionKey, numPartitions) + partitionKey = utils.HashedPartitionKey(partitionKey, uint32(numPartitions)) destination.SetPartitionValue(partitionKey) err = batchPerTopic.AddEvent(ctx, destination, json, false) if err != nil { diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 19f608c4bf..48a518b3d3 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -23,6 +23,8 @@ type EventHubManager struct { peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig] // eventhub name -> client hubs sync.Map + // eventhub name -> number of partitions + partitionCount sync.Map } func NewEventHubManager( @@ -41,6 +43,28 @@ func NewEventHubManager( } } +func (m *EventHubManager) GetNumPartitions(ctx context.Context, name ScopedEventhub) (int, error) { + partitionCount, ok := m.partitionCount.Load(name) + if ok { + return partitionCount.(int), nil + } + + hub, err := m.GetOrCreateHubClient(ctx, name) + if err != nil { + return 0, err + } + + props, err := hub.GetEventHubProperties(ctx, nil) + if err != nil { + return 0, fmt.Errorf("failed to get eventhub properties: %v", err) + } + + numPartitions := len(props.PartitionIDs) + m.partitionCount.Store(name, numPartitions) + + return numPartitions, nil +} + func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( *azeventhubs.ProducerClient, error, ) {