Skip to content

Commit

Permalink
Get number of partitions for a eventhub dynamically (#1143)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Jan 24, 2024
1 parent 50fb5e5 commit 6b2583c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
9 changes: 4 additions & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,12 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

ehConfig, ok := c.hubManager.peerConfig.Get(destination.PeerName)
if !ok {
c.logger.Error("failed to get eventhub config", slog.Any("error", err))
numPartitions, err := c.hubManager.GetNumPartitions(ctx, destination)
if err != nil {
c.logger.Error("failed to get number of partitions", slog.Any("error", err))
return 0, err
}

numPartitions := ehConfig.PartitionCount
// Scoped eventhub is of the form peer_name.eventhub_name.partition_column
// partition_column is the column in the table that is used to determine
// the partition key for the eventhub.
Expand All @@ -193,7 +192,7 @@ func (c *EventHubConnector) processBatch(
} else {
partitionKey = fmt.Sprintf("%v", partitionValue)
}
partitionKey = utils.HashedPartitionKey(partitionKey, numPartitions)
partitionKey = utils.HashedPartitionKey(partitionKey, uint32(numPartitions))
destination.SetPartitionValue(partitionKey)
err = batchPerTopic.AddEvent(ctx, destination, json, false)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type EventHubManager struct {
peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig]
// eventhub name -> client
hubs sync.Map
// eventhub name -> number of partitions
partitionCount sync.Map
}

func NewEventHubManager(
Expand All @@ -41,6 +43,28 @@ func NewEventHubManager(
}
}

func (m *EventHubManager) GetNumPartitions(ctx context.Context, name ScopedEventhub) (int, error) {
partitionCount, ok := m.partitionCount.Load(name)
if ok {
return partitionCount.(int), nil
}

hub, err := m.GetOrCreateHubClient(ctx, name)
if err != nil {
return 0, err
}

props, err := hub.GetEventHubProperties(ctx, nil)
if err != nil {
return 0, fmt.Errorf("failed to get eventhub properties: %v", err)
}

numPartitions := len(props.PartitionIDs)
m.partitionCount.Store(name, numPartitions)

return numPartitions, nil
}

func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
*azeventhubs.ProducerClient, error,
) {
Expand Down

0 comments on commit 6b2583c

Please sign in to comment.