diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 1baf893092..7a7ce63ba6 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -267,18 +267,11 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }, nil } -func (c *EventHubConnector) GetPeerOfEventhubNamespace(eventhubNamespace string) string { - for peerName, config := range c.config.Eventhubs { - if config.Namespace == eventhubNamespace { - return peerName - } - } - return "" -} - func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { // create topics for each table + // key is the source table and value is the "eh_peer.eh_topic" that ought to be used. tableMap := req.GetTableNameMapping() + for _, destinationTable := range tableMap { // parse peer name and topic name. name, err := NewScopedEventhub(destinationTable) @@ -288,8 +281,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr return nil, err } - peerName := c.GetPeerOfEventhubNamespace(name.EventhubNamespace) - err = c.hubManager.EnsureEventHubExists(c.ctx, name, peerName) + err = c.hubManager.EnsureEventHubExists(c.ctx, name) if err != nil { c.logger.Error("failed to ensure eventhub exists", slog.Any("error", err), slog.String("destinationTable", destinationTable)) diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 6302eb5ad3..48a518b3d3 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -68,7 +68,12 @@ func (m *EventHubManager) GetNumPartitions(ctx context.Context, name ScopedEvent func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( *azeventhubs.ProducerClient, error, ) { - namespace := name.EventhubNamespace + ehConfig, ok := m.peerConfig.Get(name.PeerName) + if !ok { + return nil, fmt.Errorf("eventhub '%s' not configured", name.Eventhub) + } + + namespace := ehConfig.Namespace // if the namespace isn't fully qualified, add the `.servicebus.windows.net` // check by counting the number of '.' in the namespace if strings.Count(namespace, ".") < 2 { @@ -159,10 +164,10 @@ func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, destination } // EnsureEventHubExists ensures that the eventhub exists. -func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub, peerName string) error { - cfg, ok := m.peerConfig.Get(peerName) +func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub) error { + cfg, ok := m.peerConfig.Get(name.PeerName) if !ok { - return fmt.Errorf("eventhub peer '%s' not configured", peerName) + return fmt.Errorf("eventhub peer '%s' not configured", name.PeerName) } hubClient, err := m.getEventHubMgmtClient(cfg.SubscriptionId) diff --git a/flow/connectors/eventhub/scoped_eventhub.go b/flow/connectors/eventhub/scoped_eventhub.go index 76c647d99e..75c48d90e4 100644 --- a/flow/connectors/eventhub/scoped_eventhub.go +++ b/flow/connectors/eventhub/scoped_eventhub.go @@ -9,30 +9,26 @@ import ( // partition_column is the column in the table that is used to determine // the partition key for the eventhub. Partition value is one such value of that column. type ScopedEventhub struct { - EventhubNamespace string + PeerName string Eventhub string PartitionKeyColumn string PartitionKeyValue string } func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { - // split by dot, the model is eventhub_namespace.eventhub_name.table_name.partition_key_column + // split by dot, the model is peername.eventhub.uniquekey.partition_key_column parts := strings.Split(dstTableName, ".") if len(parts) != 4 { return ScopedEventhub{}, fmt.Errorf("invalid scoped eventhub '%s'", dstTableName) } - // support eventhub namespace, eventhub name, partition key with hyphens etc. - // part[2] will be some table identifier. - // It's just so that we have distinct destination table names - // in create mirror's table mapping. - // We can ignore it. - namespacePart := strings.Trim(parts[0], `"`) + // support eventhub name and partition key with hyphens etc. + // unique key is unused. eventhubPart := strings.Trim(parts[1], `"`) partitionPart := strings.Trim(parts[3], `"`) return ScopedEventhub{ - EventhubNamespace: namespacePart, + PeerName: parts[0], Eventhub: eventhubPart, PartitionKeyColumn: partitionPart, }, nil @@ -42,7 +38,14 @@ func (s *ScopedEventhub) SetPartitionValue(value string) { s.PartitionKeyValue = value } +func (s ScopedEventhub) Equals(other ScopedEventhub) bool { + return s.PeerName == other.PeerName && + s.Eventhub == other.Eventhub && + s.PartitionKeyColumn == other.PartitionKeyColumn && + s.PartitionKeyValue == other.PartitionKeyValue +} + // ToString returns the string representation of the ScopedEventhub func (s ScopedEventhub) ToString() string { - return fmt.Sprintf("%s.%s.%s.%s", s.EventhubNamespace, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue) + return fmt.Sprintf("%s.%s.%s.%s", s.PeerName, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue) }