Skip to content

Commit

Permalink
refactor destination semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 2, 2024
1 parent b4418e9 commit f50fc0f
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 27 deletions.
14 changes: 11 additions & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,18 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}, nil
}

func (c *EventHubConnector) GetPeerOfEventhubNamespace(eventhubNamespace string) string {
for peerName, config := range c.config.Eventhubs {
if config.Namespace == eventhubNamespace {
return peerName
}
}
return ""
}

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the "eh_peer.eh_topic" that ought to be used.
tableMap := req.GetTableNameMapping()

for _, destinationTable := range tableMap {
// parse peer name and topic name.
name, err := NewScopedEventhub(destinationTable)
Expand All @@ -281,7 +288,8 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
return nil, err
}

err = c.hubManager.EnsureEventHubExists(c.ctx, name)
peerName := c.GetPeerOfEventhubNamespace(name.EventhubNamespace)
err = c.hubManager.EnsureEventHubExists(c.ctx, name, peerName)
if err != nil {
c.logger.Error("failed to ensure eventhub exists",
slog.Any("error", err), slog.String("destinationTable", destinationTable))
Expand Down
14 changes: 5 additions & 9 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,7 @@ func (m *EventHubManager) GetNumPartitions(ctx context.Context, name ScopedEvent
func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
*azeventhubs.ProducerClient, error,
) {
ehConfig, ok := m.peerConfig.Get(name.PeerName)
if !ok {
return nil, fmt.Errorf("eventhub '%s' not configured", name.Eventhub)
}

namespace := ehConfig.Namespace
namespace := name.EventhubNamespace
// if the namespace isn't fully qualified, add the `.servicebus.windows.net`
// check by counting the number of '.' in the namespace
if strings.Count(namespace, ".") < 2 {
Expand Down Expand Up @@ -130,6 +125,7 @@ func (m *EventHubManager) Close(ctx context.Context) error {

m.hubs.Range(func(key any, value any) bool {
name := key.(ScopedEventhub)
slog.Info(fmt.Sprintf("closing eventhub client for %v", name))
hub := value.(*azeventhubs.ProducerClient)
err := m.closeProducerClient(ctx, hub)
if err != nil {
Expand Down Expand Up @@ -164,10 +160,10 @@ func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, destination
}

// EnsureEventHubExists ensures that the eventhub exists.
func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub) error {
cfg, ok := m.peerConfig.Get(name.PeerName)
func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub, peerName string) error {
cfg, ok := m.peerConfig.Get(peerName)
if !ok {
return fmt.Errorf("eventhub peer '%s' not configured", name.PeerName)
return fmt.Errorf("eventhub peer '%s' not configured", peerName)
}

hubClient, err := m.getEventHubMgmtClient(cfg.SubscriptionId)
Expand Down
27 changes: 12 additions & 15 deletions flow/connectors/eventhub/scoped_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,30 @@ import (
// partition_column is the column in the table that is used to determine
// the partition key for the eventhub. Partition value is one such value of that column.
type ScopedEventhub struct {
PeerName string
EventhubNamespace string
Eventhub string
DestinationTable string
PartitionKeyColumn string
PartitionKeyValue string
}

func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) {
// split by dot, the model is peername.eventhub.partition_key_column
// split by dot, the model is eventhub.eventhub_namespace.table_name.partition_key_column
parts := strings.Split(dstTableName, ".")

if len(parts) != 3 {
if len(parts) != 4 {
return ScopedEventhub{}, fmt.Errorf("invalid scoped eventhub '%s'", dstTableName)
}

// support eventhub name and partition key with hyphens etc.
eventhubPart := strings.Trim(parts[1], `"`)
partitionPart := strings.Trim(parts[2], `"`)
// support eventhub namespace, eventhub name, table name, partition key with hyphens etc.
eventhubPart := strings.Trim(parts[0], `"`)
namespacePart := strings.Trim(parts[1], `"`)
destTablePart := strings.Trim(parts[2], `"`)
partitionPart := strings.Trim(parts[3], `"`)
return ScopedEventhub{
PeerName: parts[0],
EventhubNamespace: namespacePart,
Eventhub: eventhubPart,
DestinationTable: destTablePart,
PartitionKeyColumn: partitionPart,
}, nil
}
Expand All @@ -37,14 +41,7 @@ func (s *ScopedEventhub) SetPartitionValue(value string) {
s.PartitionKeyValue = value
}

func (s ScopedEventhub) Equals(other ScopedEventhub) bool {
return s.PeerName == other.PeerName &&
s.Eventhub == other.Eventhub &&
s.PartitionKeyColumn == other.PartitionKeyColumn &&
s.PartitionKeyValue == other.PartitionKeyValue
}

// ToString returns the string representation of the ScopedEventhub
func (s ScopedEventhub) ToString() string {
return fmt.Sprintf("%s.%s.%s.%s", s.PeerName, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue)
return fmt.Sprintf("%s.%s.%s.%s.%s", s.EventhubNamespace, s.Eventhub, s.DestinationTable, s.PartitionKeyColumn, s.PartitionKeyValue)
}

0 comments on commit f50fc0f

Please sign in to comment.