Skip to content

Commit

Permalink
fix scoped_eventhub
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 2, 2024
1 parent f50fc0f commit 0955e76
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions flow/connectors/eventhub/scoped_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
type ScopedEventhub struct {
EventhubNamespace string
Eventhub string
DestinationTable string
PartitionKeyColumn string
PartitionKeyValue string
}
Expand All @@ -24,15 +23,17 @@ func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) {
return ScopedEventhub{}, fmt.Errorf("invalid scoped eventhub '%s'", dstTableName)
}

// support eventhub namespace, eventhub name, table name, partition key with hyphens etc.
// support eventhub namespace, eventhub name, partition key with hyphens etc.
// part[2] will be some table identifier.
// It's just so that we have distinct destination table names
// in create mirror's table mapping.
// We can ignore it.
eventhubPart := strings.Trim(parts[0], `"`)
namespacePart := strings.Trim(parts[1], `"`)
destTablePart := strings.Trim(parts[2], `"`)
partitionPart := strings.Trim(parts[3], `"`)
return ScopedEventhub{
EventhubNamespace: namespacePart,
Eventhub: eventhubPart,
DestinationTable: destTablePart,
PartitionKeyColumn: partitionPart,
}, nil
}
Expand All @@ -43,5 +44,5 @@ func (s *ScopedEventhub) SetPartitionValue(value string) {

// ToString returns the string representation of the ScopedEventhub
func (s ScopedEventhub) ToString() string {
return fmt.Sprintf("%s.%s.%s.%s.%s", s.EventhubNamespace, s.Eventhub, s.DestinationTable, s.PartitionKeyColumn, s.PartitionKeyValue)
return fmt.Sprintf("%s.%s.%s.%s", s.EventhubNamespace, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue)
}

0 comments on commit 0955e76

Please sign in to comment.