Skip to content

Commit

Permalink
Eventhub: Revert to earlier order with unique key (#1208)
Browse files Browse the repository at this point in the history
Destination for eventhub cdc mirror is now of the form
`peername.eventhub_name.unique_key.column_name`
  • Loading branch information
Amogh-Bharadwaj authored Feb 6, 2024
1 parent 2333e26 commit f6a7372
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
14 changes: 3 additions & 11 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,11 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}, nil
}

func (c *EventHubConnector) GetPeerOfEventhubNamespace(eventhubNamespace string) string {
for peerName, config := range c.config.Eventhubs {
if config.Namespace == eventhubNamespace {
return peerName
}
}
return ""
}

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the "eh_peer.eh_topic" that ought to be used.
tableMap := req.GetTableNameMapping()

for _, destinationTable := range tableMap {
// parse peer name and topic name.
name, err := NewScopedEventhub(destinationTable)
Expand All @@ -288,8 +281,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
return nil, err
}

peerName := c.GetPeerOfEventhubNamespace(name.EventhubNamespace)
err = c.hubManager.EnsureEventHubExists(c.ctx, name, peerName)
err = c.hubManager.EnsureEventHubExists(c.ctx, name)
if err != nil {
c.logger.Error("failed to ensure eventhub exists",
slog.Any("error", err), slog.String("destinationTable", destinationTable))
Expand Down
13 changes: 9 additions & 4 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func (m *EventHubManager) GetNumPartitions(ctx context.Context, name ScopedEvent
func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
*azeventhubs.ProducerClient, error,
) {
namespace := name.EventhubNamespace
ehConfig, ok := m.peerConfig.Get(name.PeerName)
if !ok {
return nil, fmt.Errorf("eventhub '%s' not configured", name.Eventhub)
}

namespace := ehConfig.Namespace
// if the namespace isn't fully qualified, add the `.servicebus.windows.net`
// check by counting the number of '.' in the namespace
if strings.Count(namespace, ".") < 2 {
Expand Down Expand Up @@ -159,10 +164,10 @@ func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, destination
}

// EnsureEventHubExists ensures that the eventhub exists.
func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub, peerName string) error {
cfg, ok := m.peerConfig.Get(peerName)
func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub) error {
cfg, ok := m.peerConfig.Get(name.PeerName)
if !ok {
return fmt.Errorf("eventhub peer '%s' not configured", peerName)
return fmt.Errorf("eventhub peer '%s' not configured", name.PeerName)
}

hubClient, err := m.getEventHubMgmtClient(cfg.SubscriptionId)
Expand Down
23 changes: 13 additions & 10 deletions flow/connectors/eventhub/scoped_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,26 @@ import (
// partition_column is the column in the table that is used to determine
// the partition key for the eventhub. Partition value is one such value of that column.
type ScopedEventhub struct {
EventhubNamespace string
PeerName string
Eventhub string
PartitionKeyColumn string
PartitionKeyValue string
}

func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) {
// split by dot, the model is eventhub_namespace.eventhub_name.table_name.partition_key_column
// split by dot, the model is peername.eventhub.uniquekey.partition_key_column
parts := strings.Split(dstTableName, ".")

if len(parts) != 4 {
return ScopedEventhub{}, fmt.Errorf("invalid scoped eventhub '%s'", dstTableName)
}

// support eventhub namespace, eventhub name, partition key with hyphens etc.
// part[2] will be some table identifier.
// It's just so that we have distinct destination table names
// in create mirror's table mapping.
// We can ignore it.
namespacePart := strings.Trim(parts[0], `"`)
// support eventhub name and partition key with hyphens etc.
// unique key is unused.
eventhubPart := strings.Trim(parts[1], `"`)
partitionPart := strings.Trim(parts[3], `"`)
return ScopedEventhub{
EventhubNamespace: namespacePart,
PeerName: parts[0],
Eventhub: eventhubPart,
PartitionKeyColumn: partitionPart,
}, nil
Expand All @@ -42,7 +38,14 @@ func (s *ScopedEventhub) SetPartitionValue(value string) {
s.PartitionKeyValue = value
}

func (s ScopedEventhub) Equals(other ScopedEventhub) bool {
return s.PeerName == other.PeerName &&
s.Eventhub == other.Eventhub &&
s.PartitionKeyColumn == other.PartitionKeyColumn &&
s.PartitionKeyValue == other.PartitionKeyValue
}

// ToString returns the string representation of the ScopedEventhub
func (s ScopedEventhub) ToString() string {
return fmt.Sprintf("%s.%s.%s.%s", s.EventhubNamespace, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue)
return fmt.Sprintf("%s.%s.%s.%s", s.PeerName, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue)
}

0 comments on commit f6a7372

Please sign in to comment.