From 68800a9327d7b91ecb82faffe994b912378dd70d Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 6 Feb 2024 14:25:57 +0530 Subject: [PATCH 1/5] Revert "change comment" This reverts commit 2333e262361fd2df87b41b9a39b91eed6eed7719. --- flow/connectors/eventhub/scoped_eventhub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/eventhub/scoped_eventhub.go b/flow/connectors/eventhub/scoped_eventhub.go index 76c647d99e..d2309a88e8 100644 --- a/flow/connectors/eventhub/scoped_eventhub.go +++ b/flow/connectors/eventhub/scoped_eventhub.go @@ -16,7 +16,7 @@ type ScopedEventhub struct { } func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { - // split by dot, the model is eventhub_namespace.eventhub_name.table_name.partition_key_column + // split by dot, the model is eventhub.eventhub_namespace.table_name.partition_key_column parts := strings.Split(dstTableName, ".") if len(parts) != 4 { From e38077c48327b71a09d34ae0daa9f2a765594726 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 6 Feb 2024 14:26:11 +0530 Subject: [PATCH 2/5] Revert "change the order of destination" This reverts commit 521956f60100f3c0108fc1c7e3486da558115264. --- flow/connectors/eventhub/scoped_eventhub.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/eventhub/scoped_eventhub.go b/flow/connectors/eventhub/scoped_eventhub.go index d2309a88e8..cc18211278 100644 --- a/flow/connectors/eventhub/scoped_eventhub.go +++ b/flow/connectors/eventhub/scoped_eventhub.go @@ -28,8 +28,8 @@ func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { // It's just so that we have distinct destination table names // in create mirror's table mapping. // We can ignore it. - namespacePart := strings.Trim(parts[0], `"`) - eventhubPart := strings.Trim(parts[1], `"`) + eventhubPart := strings.Trim(parts[0], `"`) + namespacePart := strings.Trim(parts[1], `"`) partitionPart := strings.Trim(parts[3], `"`) return ScopedEventhub{ EventhubNamespace: namespacePart, From 9dca0bb8bdd2bd219a7393b41fb74dfdc3727451 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 6 Feb 2024 14:26:21 +0530 Subject: [PATCH 3/5] Revert "Eventhubs: Refactor Destination Table Semantics (#1200)" This reverts commit f9fa222ea2bda151d2ca52940c5e90416af9e0bd. --- flow/connectors/eventhub/eventhub.go | 14 +++-------- flow/connectors/eventhub/hubmanager.go | 13 +++++++--- flow/connectors/eventhub/scoped_eventhub.go | 28 +++++++++++---------- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 1baf893092..7a7ce63ba6 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -267,18 +267,11 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }, nil } -func (c *EventHubConnector) GetPeerOfEventhubNamespace(eventhubNamespace string) string { - for peerName, config := range c.config.Eventhubs { - if config.Namespace == eventhubNamespace { - return peerName - } - } - return "" -} - func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { // create topics for each table + // key is the source table and value is the "eh_peer.eh_topic" that ought to be used. tableMap := req.GetTableNameMapping() + for _, destinationTable := range tableMap { // parse peer name and topic name. name, err := NewScopedEventhub(destinationTable) @@ -288,8 +281,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr return nil, err } - peerName := c.GetPeerOfEventhubNamespace(name.EventhubNamespace) - err = c.hubManager.EnsureEventHubExists(c.ctx, name, peerName) + err = c.hubManager.EnsureEventHubExists(c.ctx, name) if err != nil { c.logger.Error("failed to ensure eventhub exists", slog.Any("error", err), slog.String("destinationTable", destinationTable)) diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 6302eb5ad3..48a518b3d3 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -68,7 +68,12 @@ func (m *EventHubManager) GetNumPartitions(ctx context.Context, name ScopedEvent func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( *azeventhubs.ProducerClient, error, ) { - namespace := name.EventhubNamespace + ehConfig, ok := m.peerConfig.Get(name.PeerName) + if !ok { + return nil, fmt.Errorf("eventhub '%s' not configured", name.Eventhub) + } + + namespace := ehConfig.Namespace // if the namespace isn't fully qualified, add the `.servicebus.windows.net` // check by counting the number of '.' in the namespace if strings.Count(namespace, ".") < 2 { @@ -159,10 +164,10 @@ func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, destination } // EnsureEventHubExists ensures that the eventhub exists. -func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub, peerName string) error { - cfg, ok := m.peerConfig.Get(peerName) +func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub) error { + cfg, ok := m.peerConfig.Get(name.PeerName) if !ok { - return fmt.Errorf("eventhub peer '%s' not configured", peerName) + return fmt.Errorf("eventhub peer '%s' not configured", name.PeerName) } hubClient, err := m.getEventHubMgmtClient(cfg.SubscriptionId) diff --git a/flow/connectors/eventhub/scoped_eventhub.go b/flow/connectors/eventhub/scoped_eventhub.go index cc18211278..f3b5d46d4c 100644 --- a/flow/connectors/eventhub/scoped_eventhub.go +++ b/flow/connectors/eventhub/scoped_eventhub.go @@ -9,30 +9,25 @@ import ( // partition_column is the column in the table that is used to determine // the partition key for the eventhub. Partition value is one such value of that column. type ScopedEventhub struct { - EventhubNamespace string + PeerName string Eventhub string PartitionKeyColumn string PartitionKeyValue string } func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { - // split by dot, the model is eventhub.eventhub_namespace.table_name.partition_key_column + // split by dot, the model is peername.eventhub.partition_key_column parts := strings.Split(dstTableName, ".") - if len(parts) != 4 { + if len(parts) != 3 { return ScopedEventhub{}, fmt.Errorf("invalid scoped eventhub '%s'", dstTableName) } - // support eventhub namespace, eventhub name, partition key with hyphens etc. - // part[2] will be some table identifier. - // It's just so that we have distinct destination table names - // in create mirror's table mapping. - // We can ignore it. - eventhubPart := strings.Trim(parts[0], `"`) - namespacePart := strings.Trim(parts[1], `"`) - partitionPart := strings.Trim(parts[3], `"`) + // support eventhub name and partition key with hyphens etc. + eventhubPart := strings.Trim(parts[1], `"`) + partitionPart := strings.Trim(parts[2], `"`) return ScopedEventhub{ - EventhubNamespace: namespacePart, + PeerName: parts[0], Eventhub: eventhubPart, PartitionKeyColumn: partitionPart, }, nil @@ -42,7 +37,14 @@ func (s *ScopedEventhub) SetPartitionValue(value string) { s.PartitionKeyValue = value } +func (s ScopedEventhub) Equals(other ScopedEventhub) bool { + return s.PeerName == other.PeerName && + s.Eventhub == other.Eventhub && + s.PartitionKeyColumn == other.PartitionKeyColumn && + s.PartitionKeyValue == other.PartitionKeyValue +} + // ToString returns the string representation of the ScopedEventhub func (s ScopedEventhub) ToString() string { - return fmt.Sprintf("%s.%s.%s.%s", s.EventhubNamespace, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue) + return fmt.Sprintf("%s.%s.%s.%s", s.PeerName, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue) } From a118a6cdb4119a6876f743329bd3f285f656ecea Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 6 Feb 2024 14:28:34 +0530 Subject: [PATCH 4/5] accomodate unique key --- flow/connectors/eventhub/scoped_eventhub.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flow/connectors/eventhub/scoped_eventhub.go b/flow/connectors/eventhub/scoped_eventhub.go index f3b5d46d4c..4bfc1f2c16 100644 --- a/flow/connectors/eventhub/scoped_eventhub.go +++ b/flow/connectors/eventhub/scoped_eventhub.go @@ -16,7 +16,7 @@ type ScopedEventhub struct { } func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { - // split by dot, the model is peername.eventhub.partition_key_column + // split by dot, the model is peername.eventhub.uniquekey.partition_key_column parts := strings.Split(dstTableName, ".") if len(parts) != 3 { @@ -24,8 +24,9 @@ func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { } // support eventhub name and partition key with hyphens etc. + // unique key is unused. eventhubPart := strings.Trim(parts[1], `"`) - partitionPart := strings.Trim(parts[2], `"`) + partitionPart := strings.Trim(parts[3], `"`) return ScopedEventhub{ PeerName: parts[0], Eventhub: eventhubPart, From 47f69ea92f0d27eef578cb86629b86429bbf1423 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 6 Feb 2024 14:37:15 +0530 Subject: [PATCH 5/5] minor change --- flow/connectors/eventhub/scoped_eventhub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/eventhub/scoped_eventhub.go b/flow/connectors/eventhub/scoped_eventhub.go index 4bfc1f2c16..75c48d90e4 100644 --- a/flow/connectors/eventhub/scoped_eventhub.go +++ b/flow/connectors/eventhub/scoped_eventhub.go @@ -19,7 +19,7 @@ func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { // split by dot, the model is peername.eventhub.uniquekey.partition_key_column parts := strings.Split(dstTableName, ".") - if len(parts) != 3 { + if len(parts) != 4 { return ScopedEventhub{}, fmt.Errorf("invalid scoped eventhub '%s'", dstTableName) }