Skip to content

Commit

Permalink
Support EventHubGroup as an alias over multiple eventhub peers (#450)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Sep 28, 2023
1 parent bdba9ae commit b7589fd
Show file tree
Hide file tree
Showing 18 changed files with 950 additions and 252 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
build:
strategy:
matrix:
runner: [ubuntu-latest, ubicloud]
runner: [ubicloud-standard-8]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
services:
Expand Down Expand Up @@ -46,15 +46,15 @@ jobs:
with:
name: "bq_service_account.json"
json: ${{ secrets.GCP_GH_CI_PKEY }}
dir: 'nexus/server/tests/assets/'
dir: "nexus/server/tests/assets/"

- name: setup snowflake credentials
id: sf-credentials
uses: jsdaniell/[email protected]
with:
name: "snowflake_creds.json"
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}
dir: 'nexus/server/tests/assets/'
dir: "nexus/server/tests/assets/"

- name: cargo check
run: cargo check
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
case *protos.Peer_EventhubConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
return nil, fmt.Errorf("use eventhub group config instead")
case *protos.Peer_EventhubGroupConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig())
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down
103 changes: 32 additions & 71 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -23,7 +22,7 @@ import (

type EventHubConnector struct {
ctx context.Context
config *protos.EventHubConfig
config *protos.EventHubGroupConfig
pgMetadata *PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
Expand All @@ -33,16 +32,15 @@ type EventHubConnector struct {
// NewEventHubConnector creates a new EventHubConnector.
func NewEventHubConnector(
ctx context.Context,
config *protos.EventHubConfig,
config *protos.EventHubGroupConfig,
) (*EventHubConnector, error) {
defaultAzureCreds, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
log.Errorf("failed to get default azure credentials: %v", err)
return nil, err
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config.GetNamespace())

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb())
if err != nil {
log.Errorf("failed to create postgres metadata store: %v", err)
Expand Down Expand Up @@ -108,7 +106,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
maxParallelism = 10
}

batchPerTopic := make(map[string]*azeventhubs.EventDataBatch)
batchPerTopic := make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSON()
Expand All @@ -128,12 +126,17 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic = make(map[string]*azeventhubs.EventDataBatch)
batchPerTopic = make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
return nil
}

// TODO (kaushik): this is a hack to get the table name.
topicName := record.GetTableName()
topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to get topic name: %v", err)
return nil, err
}

addRecord := func() error {
if _, ok := batchPerTopic[topicName]; !ok {
Expand Down Expand Up @@ -178,7 +181,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

if i%eventsPerHeartBeat == 0 {
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName))
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName.ToString()))
}

if (i+1)%eventsPerBatch == 0 {
Expand Down Expand Up @@ -225,7 +228,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

func (c *EventHubConnector) sendEventBatch(
events map[string]*azeventhubs.EventDataBatch,
events map[ScopedEventhub]*azeventhubs.EventDataBatch,
maxParallelism int64,
flowName string,
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
Expand All @@ -249,13 +252,13 @@ func (c *EventHubConnector) sendEventBatch(
for tblName, eventBatch := range events {
guard <- struct{}{}
wg.Add(1)
go func(tblName string, eventBatch *azeventhubs.EventDataBatch) {
go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
defer func() {
<-guard
wg.Done()
}()

hub, err := c.hubManager.GetOrCreateHub(tblName)
hub, err := c.hubManager.GetOrCreateHubClient(tblName)
if err != nil {
once.Do(func() { firstErr = err })
return
Expand All @@ -277,14 +280,13 @@ func (c *EventHubConnector) sendEventBatch(
atomic.AddInt32(&numEventsPushed, numEvents)
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
numEventsPushed, tblName)
rowCount, ok := tableNameRowsMapping.Get(tblName)
}).Infof("pushed %d events to event hub: %s", numEvents, tblName)
rowCount, ok := tableNameRowsMapping.Get(tblName.ToString())
if !ok {
rowCount = uint32(0)
}
rowCount += uint32(numEvents)
tableNameRowsMapping.Set(tblName, rowCount)
tableNameRowsMapping.Set(tblName.ToString(), rowCount)
}(tblName, eventBatch)
}

Expand All @@ -301,74 +303,33 @@ func (c *EventHubConnector) sendEventBatch(

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the destination topic name.
// key is the source table and value is the "eh_peer.eh_topic" that ought to be used.
tableMap := req.GetTableNameMapping()

for _, table := range tableMap {
err := c.ensureEventHub(c.ctx, table, req.FlowJobName)
// parse peer name and topic name.
name, err := NewScopedEventhub(table)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
}).Errorf("failed to get event hub properties: %v", err)
}).Errorf("failed to parse peer and topic name: %v", err)
return nil, err
}
}

return nil, nil
}

func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string, flowName string) error {
hubClient, err := c.getEventHubMgmtClient()
if err != nil {
return err
}

namespace := c.config.GetNamespace()
resourceGroup := c.config.GetResourceGroup()
_, err = hubClient.Get(ctx, resourceGroup, namespace, name, nil)

// TODO (kaushik): make these configurable.
partitionCount := int64(3)
retention := int64(1)
if err != nil {
opts := armeventhub.Eventhub{
Properties: &armeventhub.Properties{
PartitionCount: &partitionCount,
MessageRetentionInDays: &retention,
},
}

_, err := hubClient.CreateOrUpdate(ctx, resourceGroup, namespace, name, opts, nil)
err = c.hubManager.EnsureEventHubExists(c.ctx, name)
if err != nil {
log.Errorf("failed to create event hub: %v", err)
return err
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
}).Errorf("failed to ensure event hub exists: %v", err)
return nil, err
}

log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("event hub %s created", name)
} else {
log.Infof("event hub %s already exists", name)
}

return nil
}

func (c *EventHubConnector) getEventHubMgmtClient() (*armeventhub.EventHubsClient, error) {
subID, err := utils.GetAzureSubscriptionID()
if err != nil {
log.Errorf("failed to get azure subscription id: %v", err)
return nil, err
}

hubClient, err := armeventhub.NewEventHubsClient(subID, c.creds, nil)
if err != nil {
log.Errorf("failed to get event hub client: %v", err)
return nil, err
}

return hubClient, nil
return &protos.CreateRawTableOutput{
TableIdentifier: "n/a",
}, nil
}

func (c *EventHubConnector) SetupNormalizedTables(
Expand Down
Loading

0 comments on commit b7589fd

Please sign in to comment.