From f50fc0f1fa6e978d7221cad21ab955410fa2f74f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 2 Feb 2024 19:50:53 +0530 Subject: [PATCH] refactor destination semantics --- flow/connectors/eventhub/eventhub.go | 14 ++++++++--- flow/connectors/eventhub/hubmanager.go | 14 ++++------- flow/connectors/eventhub/scoped_eventhub.go | 27 +++++++++------------ 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 7a7ce63ba6..1baf893092 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -267,11 +267,18 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }, nil } +func (c *EventHubConnector) GetPeerOfEventhubNamespace(eventhubNamespace string) string { + for peerName, config := range c.config.Eventhubs { + if config.Namespace == eventhubNamespace { + return peerName + } + } + return "" +} + func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { // create topics for each table - // key is the source table and value is the "eh_peer.eh_topic" that ought to be used. tableMap := req.GetTableNameMapping() - for _, destinationTable := range tableMap { // parse peer name and topic name. name, err := NewScopedEventhub(destinationTable) @@ -281,7 +288,8 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr return nil, err } - err = c.hubManager.EnsureEventHubExists(c.ctx, name) + peerName := c.GetPeerOfEventhubNamespace(name.EventhubNamespace) + err = c.hubManager.EnsureEventHubExists(c.ctx, name, peerName) if err != nil { c.logger.Error("failed to ensure eventhub exists", slog.Any("error", err), slog.String("destinationTable", destinationTable)) diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 48a518b3d3..5a8c701c7a 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -68,12 +68,7 @@ func (m *EventHubManager) GetNumPartitions(ctx context.Context, name ScopedEvent func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( *azeventhubs.ProducerClient, error, ) { - ehConfig, ok := m.peerConfig.Get(name.PeerName) - if !ok { - return nil, fmt.Errorf("eventhub '%s' not configured", name.Eventhub) - } - - namespace := ehConfig.Namespace + namespace := name.EventhubNamespace // if the namespace isn't fully qualified, add the `.servicebus.windows.net` // check by counting the number of '.' in the namespace if strings.Count(namespace, ".") < 2 { @@ -130,6 +125,7 @@ func (m *EventHubManager) Close(ctx context.Context) error { m.hubs.Range(func(key any, value any) bool { name := key.(ScopedEventhub) + slog.Info(fmt.Sprintf("closing eventhub client for %v", name)) hub := value.(*azeventhubs.ProducerClient) err := m.closeProducerClient(ctx, hub) if err != nil { @@ -164,10 +160,10 @@ func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, destination } // EnsureEventHubExists ensures that the eventhub exists. -func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub) error { - cfg, ok := m.peerConfig.Get(name.PeerName) +func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub, peerName string) error { + cfg, ok := m.peerConfig.Get(peerName) if !ok { - return fmt.Errorf("eventhub peer '%s' not configured", name.PeerName) + return fmt.Errorf("eventhub peer '%s' not configured", peerName) } hubClient, err := m.getEventHubMgmtClient(cfg.SubscriptionId) diff --git a/flow/connectors/eventhub/scoped_eventhub.go b/flow/connectors/eventhub/scoped_eventhub.go index f3b5d46d4c..5af66e1713 100644 --- a/flow/connectors/eventhub/scoped_eventhub.go +++ b/flow/connectors/eventhub/scoped_eventhub.go @@ -9,26 +9,30 @@ import ( // partition_column is the column in the table that is used to determine // the partition key for the eventhub. Partition value is one such value of that column. type ScopedEventhub struct { - PeerName string + EventhubNamespace string Eventhub string + DestinationTable string PartitionKeyColumn string PartitionKeyValue string } func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { - // split by dot, the model is peername.eventhub.partition_key_column + // split by dot, the model is eventhub.eventhub_namespace.table_name.partition_key_column parts := strings.Split(dstTableName, ".") - if len(parts) != 3 { + if len(parts) != 4 { return ScopedEventhub{}, fmt.Errorf("invalid scoped eventhub '%s'", dstTableName) } - // support eventhub name and partition key with hyphens etc. - eventhubPart := strings.Trim(parts[1], `"`) - partitionPart := strings.Trim(parts[2], `"`) + // support eventhub namespace, eventhub name, table name, partition key with hyphens etc. + eventhubPart := strings.Trim(parts[0], `"`) + namespacePart := strings.Trim(parts[1], `"`) + destTablePart := strings.Trim(parts[2], `"`) + partitionPart := strings.Trim(parts[3], `"`) return ScopedEventhub{ - PeerName: parts[0], + EventhubNamespace: namespacePart, Eventhub: eventhubPart, + DestinationTable: destTablePart, PartitionKeyColumn: partitionPart, }, nil } @@ -37,14 +41,7 @@ func (s *ScopedEventhub) SetPartitionValue(value string) { s.PartitionKeyValue = value } -func (s ScopedEventhub) Equals(other ScopedEventhub) bool { - return s.PeerName == other.PeerName && - s.Eventhub == other.Eventhub && - s.PartitionKeyColumn == other.PartitionKeyColumn && - s.PartitionKeyValue == other.PartitionKeyValue -} - // ToString returns the string representation of the ScopedEventhub func (s ScopedEventhub) ToString() string { - return fmt.Sprintf("%s.%s.%s.%s", s.PeerName, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue) + return fmt.Sprintf("%s.%s.%s.%s.%s", s.EventhubNamespace, s.Eventhub, s.DestinationTable, s.PartitionKeyColumn, s.PartitionKeyValue) }