Skip to content

Commit

Permalink
Support EventHubGroup as an alias over multiple eventhub peers
Browse files Browse the repository at this point in the history
To support multiple subscriptions and namespaces we will allow a new peer type called EventHubGroup.

```sql
CREATE PEER customer_1 FROM EVENTHUB WITH (
  subscription_id = 'my-sub-id',
  resource_group = 'my-rg',
  namespace = 'my-ns',
  location = 'eastus'
);

CREATE PEER customer_2 FROM EVENTHUB WITH (
  ...
);

CREATE PEER eventhub_group_1 FROM EVENTHUBGROUP WITH (
  metadata_db = '...',
  customer_1 = true,
  customer_2 = true
)
```

After this change mirrors ought to be created with this event hub group
peer.

Fixes #441
  • Loading branch information
iskakaushik committed Sep 28, 2023
1 parent bb69450 commit 5aa6be7
Show file tree
Hide file tree
Showing 15 changed files with 887 additions and 225 deletions.
5 changes: 4 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connectors
import (
"context"
"errors"
"fmt"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
Expand Down Expand Up @@ -141,7 +142,9 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
case *protos.Peer_EventhubConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
return nil, fmt.Errorf("use eventhub group config instead")
case *protos.Peer_EventhubGroupConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig())
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down
85 changes: 25 additions & 60 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -23,7 +22,7 @@ import (

type EventHubConnector struct {
ctx context.Context
config *protos.EventHubConfig
config *protos.EventHubGroupConfig
pgMetadata *PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
Expand All @@ -33,16 +32,15 @@ type EventHubConnector struct {
// NewEventHubConnector creates a new EventHubConnector.
func NewEventHubConnector(
ctx context.Context,
config *protos.EventHubConfig,
config *protos.EventHubGroupConfig,
) (*EventHubConnector, error) {
defaultAzureCreds, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
log.Errorf("failed to get default azure credentials: %v", err)
return nil, err
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config.GetNamespace())

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb())
if err != nil {
log.Errorf("failed to create postgres metadata store: %v", err)
Expand Down Expand Up @@ -255,7 +253,7 @@ func (c *EventHubConnector) sendEventBatch(
wg.Done()
}()

hub, err := c.hubManager.GetOrCreateHub(tblName)
hub, err := c.hubManager.GetOrCreateHubClient(tblName)
if err != nil {
once.Do(func() { firstErr = err })
return
Expand Down Expand Up @@ -301,74 +299,33 @@ func (c *EventHubConnector) sendEventBatch(

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the destination topic name.
// key is the source table and value is the "eh_peer.eh_topic" that ought to be used.
tableMap := req.GetTableNameMapping()

for _, table := range tableMap {
err := c.ensureEventHub(c.ctx, table, req.FlowJobName)
// parse peer name and topic name.
peerName, topicName, err := parsePeerAndTopicName(table)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
}).Errorf("failed to get event hub properties: %v", err)
}).Errorf("failed to parse peer and topic name: %v", err)
return nil, err
}
}

return nil, nil
}

func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string, flowName string) error {
hubClient, err := c.getEventHubMgmtClient()
if err != nil {
return err
}

namespace := c.config.GetNamespace()
resourceGroup := c.config.GetResourceGroup()
_, err = hubClient.Get(ctx, resourceGroup, namespace, name, nil)

// TODO (kaushik): make these configurable.
partitionCount := int64(3)
retention := int64(1)
if err != nil {
opts := armeventhub.Eventhub{
Properties: &armeventhub.Properties{
PartitionCount: &partitionCount,
MessageRetentionInDays: &retention,
},
}

_, err := hubClient.CreateOrUpdate(ctx, resourceGroup, namespace, name, opts, nil)
err = c.hubManager.EnsureEventHubExists(c.ctx, peerName, topicName)
if err != nil {
log.Errorf("failed to create event hub: %v", err)
return err
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
}).Errorf("failed to ensure event hub exists: %v", err)
return nil, err
}

log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("event hub %s created", name)
} else {
log.Infof("event hub %s already exists", name)
}

return nil
}

func (c *EventHubConnector) getEventHubMgmtClient() (*armeventhub.EventHubsClient, error) {
subID, err := utils.GetAzureSubscriptionID()
if err != nil {
log.Errorf("failed to get azure subscription id: %v", err)
return nil, err
}

hubClient, err := armeventhub.NewEventHubsClient(subID, c.creds, nil)
if err != nil {
log.Errorf("failed to get event hub client: %v", err)
return nil, err
}

return hubClient, nil
return &protos.CreateRawTableOutput{
TableIdentifier: "n/a",
}, nil
}

func (c *EventHubConnector) SetupNormalizedTables(
Expand All @@ -380,6 +337,14 @@ func (c *EventHubConnector) SetupNormalizedTables(
}, nil
}

func parsePeerAndTopicName(s string) (string, string, error) {
parts := strings.Split(s, ".")
if len(parts) != 2 {
return "", "", fmt.Errorf("invalid peer and topic name %s", s)
}
return parts[0], parts[1], nil
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
Expand Down
112 changes: 99 additions & 13 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,62 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
cmap "github.com/orcaman/concurrent-map/v2"
log "github.com/sirupsen/logrus"
)

type EventHubManager struct {
ctx context.Context
creds *azidentity.DefaultAzureCredential
namespace string
hubs cmap.ConcurrentMap[string, *azeventhubs.ProducerClient]
ctx context.Context
creds *azidentity.DefaultAzureCredential
// eventhub peer name -> config
peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig]
// eventhub name -> client
hubs cmap.ConcurrentMap[string, *azeventhubs.ProducerClient]
// eventhub name -> peer name
peerNames cmap.ConcurrentMap[string, string]
}

func NewEventHubManager(
ctx context.Context,
creds *azidentity.DefaultAzureCredential,
namespace string,
groupConfig *protos.EventHubGroupConfig,
) *EventHubManager {
hubs := cmap.New[*azeventhubs.ProducerClient]()
peerConfig := cmap.New[*protos.EventHubConfig]()

for name, config := range groupConfig.Eventhubs {
peerConfig.Set(name, config)
}

return &EventHubManager{
ctx: ctx,
creds: creds,
namespace: namespace,
hubs: hubs,
ctx: ctx,
creds: creds,
hubs: hubs,
}
}

func (m *EventHubManager) GetOrCreateHub(name string) (*azeventhubs.ProducerClient, error) {
hub, ok := m.hubs.Get(name)
func (m *EventHubManager) GetOrCreateHubClient(name string) (*azeventhubs.ProducerClient, error) {
peerName, ok := m.peerNames.Get(name)
if !ok {
return nil, fmt.Errorf("eventhub '%s' has not configured", name)
}

namespace := m.namespace
ehConfig, ok := m.peerConfig.Get(peerName)
if !ok {
return nil, fmt.Errorf("eventhub '%s' not configured", name)
}

namespace := ehConfig.Namespace
// if the namespace isn't fully qualified, add the `.servicebus.windows.net`
// check by counting the number of '.' in the namespace
if strings.Count(namespace, ".") < 2 {
namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace)
}

hub, ok := m.hubs.Get(name)
if !ok {
opts := &azeventhubs.ProducerClientOptions{}
hub, err := azeventhubs.NewProducerClient(namespace, name, m.creds, opts)
Expand All @@ -65,7 +87,7 @@ func (m *EventHubManager) Close() error {
}

func (m *EventHubManager) CreateEventDataBatch(name string) (*azeventhubs.EventDataBatch, error) {
hub, err := m.GetOrCreateHub(name)
hub, err := m.GetOrCreateHubClient(name)
if err != nil {
return nil, err
}
Expand All @@ -78,3 +100,67 @@ func (m *EventHubManager) CreateEventDataBatch(name string) (*azeventhubs.EventD

return batch, nil
}

// EnsureEventHubExists ensures that the eventhub exists.
func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, peerName string, eventhub string) error {
cfg, ok := m.peerConfig.Get(peerName)
if !ok {
return fmt.Errorf("eventhub peer '%s' not configured", peerName)
}

hubClient, err := m.getEventHubMgmtClient(cfg.SubscriptionId)
if err != nil {
return fmt.Errorf("failed to get event hub client: %v", err)
}

namespace := cfg.Namespace
resourceGroup := cfg.ResourceGroup

name := eventhub
_, err = hubClient.Get(ctx, resourceGroup, namespace, name, nil)

m.peerNames.Set(name, peerName)

// TODO (kaushik): make these configurable.
partitionCount := int64(3)
retention := int64(1)
if err != nil {
opts := armeventhub.Eventhub{
Properties: &armeventhub.Properties{
PartitionCount: &partitionCount,
MessageRetentionInDays: &retention,
},
}

_, err := hubClient.CreateOrUpdate(ctx, resourceGroup, namespace, name, opts, nil)
if err != nil {
log.Errorf("failed to create event hub: %v", err)
return err
}

log.Infof("event hub %s created", name)
} else {
log.Infof("event hub %s already exists", name)
}

return nil
}

func (m *EventHubManager) getEventHubMgmtClient(subID string) (*armeventhub.EventHubsClient, error) {
if subID == "" {
envSubID, err := utils.GetAzureSubscriptionID()
if err != nil {
log.Errorf("failed to get azure subscription id: %v", err)
return nil, err
}
subID = envSubID
}

hubClient, err := armeventhub.NewEventHubsClient(subID, m.creds, nil)
if err != nil {
log.Errorf("failed to get event hub client: %v", err)
return nil, err
}

return hubClient, nil
}
Loading

0 comments on commit 5aa6be7

Please sign in to comment.