Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azure eventhubs use the new api #412

Merged
merged 3 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 56 additions & 62 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (
"sync/atomic"
"time"

"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-amqp-common-go/v4/auth"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
Expand All @@ -23,13 +21,12 @@ import (
)

type EventHubConnector struct {
ctx context.Context
config *protos.EventHubConfig
pgMetadata *PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
tokenProvider auth.TokenProvider
hubs cmap.ConcurrentMap[string, *eventhub.Hub]
ctx context.Context
config *protos.EventHubConfig
pgMetadata *PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
hubManager *EventHubManager
}

// NewEventHubConnector creates a new EventHubConnector.
Expand All @@ -43,11 +40,7 @@ func NewEventHubConnector(
return nil, err
}

jwtTokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
log.Errorf("failed to get jwt token provider: %v", err)
return nil, err
}
hubManager := NewEventHubManager(ctx, defaultAzureCreds, config.GetNamespace())

pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb())
if err != nil {
Expand All @@ -56,36 +49,26 @@ func NewEventHubConnector(
}

return &EventHubConnector{
ctx: ctx,
config: config,
pgMetadata: pgMetadata,
creds: defaultAzureCreds,
tokenProvider: jwtTokenProvider,
hubs: cmap.New[*eventhub.Hub](),
ctx: ctx,
config: config,
pgMetadata: pgMetadata,
creds: defaultAzureCreds,
hubManager: hubManager,
}, nil
}

func (c *EventHubConnector) Close() error {
var allErrors error

// close all the event hub connections.
for _, hubName := range c.hubs.Keys() {
hub, ok := c.hubs.Get(hubName)
if !ok {
log.Errorf("failed to get event hub connection: %v", hubName)
allErrors = errors.Join(allErrors, fmt.Errorf("failed to get event hub connection: %v", hubName))
continue
}

err := hub.Close(c.ctx)
if err != nil {
log.Errorf("failed to close event hub connection: %v", err)
allErrors = errors.Join(allErrors, err)
}
// close all the eventhub connections.
err := c.hubManager.Close()
if err != nil {
log.Errorf("failed to close eventhub connections: %v", err)
allErrors = errors.Join(allErrors, err)
}

// close the postgres metadata store.
err := c.pgMetadata.Close()
err = c.pgMetadata.Close()
if err != nil {
log.Errorf("failed to close postgres metadata store: %v", err)
allErrors = errors.Join(allErrors, err)
Expand Down Expand Up @@ -124,7 +107,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
maxParallelism = 10
}

batchPerTopic := make(map[string][]*eventhub.Event)
batchPerTopic := make(map[string]*azeventhubs.EventDataBatch)
startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSON()
Expand All @@ -139,10 +122,25 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
topicName := record.GetTableName()

if _, ok := batchPerTopic[topicName]; !ok {
batchPerTopic[topicName] = make([]*eventhub.Event, 0)
batch, err := c.hubManager.CreateEventDataBatch(topicName)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to create event data batch: %v", err)
return nil, err
}
batchPerTopic[topicName] = batch
}

batchPerTopic[topicName] = append(batchPerTopic[topicName], eventhub.NewEventFromString(json))
opts := &azeventhubs.AddEventDataOptions{}

err = batchPerTopic[topicName].AddEventData(eventDataFromString(json), opts)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event data to batch: %v", err)
return nil, err
}

if i%eventsPerHeartBeat == 0 {
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName))
Expand All @@ -155,7 +153,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

batchPerTopic = make(map[string][]*eventhub.Event)
batchPerTopic = make(map[string]*azeventhubs.EventDataBatch)
}
}

Expand Down Expand Up @@ -194,7 +192,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}, nil
}

func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
func (c *EventHubConnector) sendEventBatch(
events map[string]*azeventhubs.EventDataBatch,
maxParallelism int64,
flowName string,
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
Expand All @@ -218,28 +217,32 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
for tblName, eventBatch := range events {
guard <- struct{}{}
wg.Add(1)
go func(tblName string, eventBatch []*eventhub.Event) {
go func(tblName string, eventBatch *azeventhubs.EventDataBatch) {
defer func() {
<-guard
wg.Done()
}()

hub, err := c.getOrCreateHubConnection(tblName)
hub, err := c.hubManager.GetOrCreateHub(tblName)
if err != nil {
once.Do(func() { firstErr = err })
return
}

numEvents := eventBatch.NumEvents()
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("obtained hub connection and now sending %d events to event hub: %s",
len(eventBatch), tblName)
err = hub.SendBatch(subCtx, eventhub.NewEventBatchIterator(eventBatch...))
numEvents, tblName)

opts := &azeventhubs.SendEventDataBatchOptions{}
err = hub.SendEventDataBatch(subCtx, eventBatch, opts)
if err != nil {
once.Do(func() { firstErr = err })
return
}

atomic.AddInt32(&numEventsPushed, int32(len(eventBatch)))
atomic.AddInt32(&numEventsPushed, numEvents)
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
Expand All @@ -248,7 +251,7 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
if !ok {
rowCount = uint32(0)
}
rowCount += uint32(len(eventBatch))
rowCount += uint32(numEvents)
tableNameRowsMapping.Set(tblName, rowCount)
}(tblName, eventBatch)
}
Expand All @@ -264,21 +267,6 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
return nil
}

func (c *EventHubConnector) getOrCreateHubConnection(name string) (*eventhub.Hub, error) {
hub, ok := c.hubs.Get(name)
if !ok {
hub, err := eventhub.NewHub(c.config.GetNamespace(), name, c.tokenProvider)
if err != nil {
log.Errorf("failed to create event hub connection: %v", err)
return nil, err
}
c.hubs.Set(name, hub)
return hub, nil
}

return hub, nil
}

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the destination topic name.
Expand Down Expand Up @@ -365,3 +353,9 @@ func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
metadataSchema))
return err
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
}
}
72 changes: 72 additions & 0 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package conneventhub

import (
"context"
"fmt"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
cmap "github.com/orcaman/concurrent-map/v2"
)

type EventHubManager struct {
ctx context.Context
creds *azidentity.DefaultAzureCredential
namespace string
hubs cmap.ConcurrentMap[string, *azeventhubs.ProducerClient]
}

func NewEventHubManager(
ctx context.Context,
creds *azidentity.DefaultAzureCredential,
namespace string,
) *EventHubManager {
hubs := cmap.New[*azeventhubs.ProducerClient]()
return &EventHubManager{
ctx: ctx,
creds: creds,
namespace: namespace,
hubs: hubs,
}
}

func (m *EventHubManager) GetOrCreateHub(name string) (*azeventhubs.ProducerClient, error) {
hub, ok := m.hubs.Get(name)

if !ok {
opts := &azeventhubs.ProducerClientOptions{}
hub, err := azeventhubs.NewProducerClient(m.namespace, name, m.creds, opts)
if err != nil {
return nil, fmt.Errorf("failed to create eventhub client: %v", err)
}
m.hubs.Set(name, hub)
return hub, nil
}

return hub, nil
}

func (m *EventHubManager) Close() error {
for hub := range m.hubs.IterBuffered() {
err := hub.Val.Close(m.ctx)
if err != nil {
return fmt.Errorf("failed to close eventhub client: %v", err)
}
}
return nil
}

func (m *EventHubManager) CreateEventDataBatch(name string) (*azeventhubs.EventDataBatch, error) {
hub, err := m.GetOrCreateHub(name)
if err != nil {
return nil, err
}

opts := &azeventhubs.EventDataBatchOptions{}
batch, err := hub.NewEventDataBatch(m.ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to create event data batch: %v", err)
}

return batch, nil
}
Loading
Loading