Skip to content

Commit

Permalink
Support EventHubGroup as an alias over multiple eventhub peers
Browse files Browse the repository at this point in the history
To support multiple subscriptions and namespaces we will allow a new peer type called EventHubGroup.

```sql
CREATE PEER customer_1 FROM EVENTHUB WITH (
  subscription_id = 'my-sub-id',
  resource_group = 'my-rg',
  namespace = 'my-ns',
  location = 'eastus'
);

CREATE PEER customer_2 FROM EVENTHUB WITH (
  ...
);

CREATE PEER eventhub_group_1 FROM EVENTHUBGROUP WITH (
  metadata_db = '...',
  customer_1 = true,
  customer_2 = true
)
```

After this change mirrors ought to be created with this event hub group
peer.

Fixes #441
  • Loading branch information
iskakaushik committed Sep 28, 2023
1 parent bdba9ae commit 0ed16b4
Show file tree
Hide file tree
Showing 18 changed files with 950 additions and 252 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
build:
strategy:
matrix:
runner: [ubuntu-latest, ubicloud]
runner: [ubicloud-standard-8]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
services:
Expand Down Expand Up @@ -46,15 +46,15 @@ jobs:
with:
name: "bq_service_account.json"
json: ${{ secrets.GCP_GH_CI_PKEY }}
dir: 'nexus/server/tests/assets/'
dir: "nexus/server/tests/assets/"

- name: setup snowflake credentials
id: sf-credentials
uses: jsdaniell/[email protected]
with:
name: "snowflake_creds.json"
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}
dir: 'nexus/server/tests/assets/'
dir: "nexus/server/tests/assets/"

- name: cargo check
run: cargo check
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
case *protos.Peer_EventhubConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
return nil, fmt.Errorf("use eventhub group config instead")
case *protos.Peer_EventhubGroupConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig())
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down
103 changes: 32 additions & 71 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -23,7 +22,7 @@ import (

type EventHubConnector struct {
ctx context.Context
config *protos.EventHubConfig
config *protos.EventHubGroupConfig
pgMetadata *PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
Expand All @@ -33,16 +32,15 @@ type EventHubConnector struct {
// NewEventHubConnector creates a new EventHubConnector.
func NewEventHubConnector(
ctx context.Context,
config *protos.EventHubConfig,
config *protos.EventHubGroupConfig,
) (*EventHubConnector, error) {
defaultAzureCreds, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
log.Errorf("failed to get default azure credentials: %v", err)
return nil, err
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config.GetNamespace())

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb())
if err != nil {
log.Errorf("failed to create postgres metadata store: %v", err)
Expand Down Expand Up @@ -108,7 +106,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
maxParallelism = 10
}

batchPerTopic := make(map[string]*azeventhubs.EventDataBatch)
batchPerTopic := make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSON()
Expand All @@ -128,12 +126,17 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic = make(map[string]*azeventhubs.EventDataBatch)
batchPerTopic = make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
return nil
}

// TODO (kaushik): this is a hack to get the table name.
topicName := record.GetTableName()
topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to get topic name: %v", err)
return nil, err
}

addRecord := func() error {
if _, ok := batchPerTopic[topicName]; !ok {
Expand Down Expand Up @@ -178,7 +181,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

if i%eventsPerHeartBeat == 0 {
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName))
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName.ToString()))
}

if (i+1)%eventsPerBatch == 0 {
Expand Down Expand Up @@ -225,7 +228,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

func (c *EventHubConnector) sendEventBatch(
events map[string]*azeventhubs.EventDataBatch,
events map[ScopedEventhub]*azeventhubs.EventDataBatch,
maxParallelism int64,
flowName string,
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
Expand All @@ -249,13 +252,13 @@ func (c *EventHubConnector) sendEventBatch(
for tblName, eventBatch := range events {
guard <- struct{}{}
wg.Add(1)
go func(tblName string, eventBatch *azeventhubs.EventDataBatch) {
go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
defer func() {
<-guard
wg.Done()
}()

hub, err := c.hubManager.GetOrCreateHub(tblName)
hub, err := c.hubManager.GetOrCreateHubClient(tblName)
if err != nil {
once.Do(func() { firstErr = err })
return
Expand All @@ -277,14 +280,13 @@ func (c *EventHubConnector) sendEventBatch(
atomic.AddInt32(&numEventsPushed, numEvents)
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
numEventsPushed, tblName)
rowCount, ok := tableNameRowsMapping.Get(tblName)
}).Infof("pushed %d events to event hub: %s", numEvents, tblName)
rowCount, ok := tableNameRowsMapping.Get(tblName.ToString())
if !ok {
rowCount = uint32(0)
}
rowCount += uint32(numEvents)
tableNameRowsMapping.Set(tblName, rowCount)
tableNameRowsMapping.Set(tblName.ToString(), rowCount)
}(tblName, eventBatch)
}

Expand All @@ -301,74 +303,33 @@ func (c *EventHubConnector) sendEventBatch(

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the destination topic name.
// key is the source table and value is the "eh_peer.eh_topic" that ought to be used.
tableMap := req.GetTableNameMapping()

for _, table := range tableMap {
err := c.ensureEventHub(c.ctx, table, req.FlowJobName)
// parse peer name and topic name.
name, err := NewScopedEventhub(table)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
}).Errorf("failed to get event hub properties: %v", err)
}).Errorf("failed to parse peer and topic name: %v", err)
return nil, err
}
}

return nil, nil
}

func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string, flowName string) error {
hubClient, err := c.getEventHubMgmtClient()
if err != nil {
return err
}

namespace := c.config.GetNamespace()
resourceGroup := c.config.GetResourceGroup()
_, err = hubClient.Get(ctx, resourceGroup, namespace, name, nil)

// TODO (kaushik): make these configurable.
partitionCount := int64(3)
retention := int64(1)
if err != nil {
opts := armeventhub.Eventhub{
Properties: &armeventhub.Properties{
PartitionCount: &partitionCount,
MessageRetentionInDays: &retention,
},
}

_, err := hubClient.CreateOrUpdate(ctx, resourceGroup, namespace, name, opts, nil)
err = c.hubManager.EnsureEventHubExists(c.ctx, name)
if err != nil {
log.Errorf("failed to create event hub: %v", err)
return err
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
}).Errorf("failed to ensure event hub exists: %v", err)
return nil, err
}

log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("event hub %s created", name)
} else {
log.Infof("event hub %s already exists", name)
}

return nil
}

func (c *EventHubConnector) getEventHubMgmtClient() (*armeventhub.EventHubsClient, error) {
subID, err := utils.GetAzureSubscriptionID()
if err != nil {
log.Errorf("failed to get azure subscription id: %v", err)
return nil, err
}

hubClient, err := armeventhub.NewEventHubsClient(subID, c.creds, nil)
if err != nil {
log.Errorf("failed to get event hub client: %v", err)
return nil, err
}

return hubClient, nil
return &protos.CreateRawTableOutput{
TableIdentifier: "n/a",
}, nil
}

func (c *EventHubConnector) SetupNormalizedTables(
Expand Down
Loading

0 comments on commit 0ed16b4

Please sign in to comment.