Skip to content

Commit

Permalink
azure eventhubs use the new api (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Sep 24, 2023
1 parent 07e36c5 commit e7748b9
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 613 deletions.
118 changes: 56 additions & 62 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (
"sync/atomic"
"time"

"github.com/Azure/azure-amqp-common-go/v4/aad"
"github.com/Azure/azure-amqp-common-go/v4/auth"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
Expand All @@ -23,13 +21,12 @@ import (
)

type EventHubConnector struct {
ctx context.Context
config *protos.EventHubConfig
pgMetadata *PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
tokenProvider auth.TokenProvider
hubs cmap.ConcurrentMap[string, *eventhub.Hub]
ctx context.Context
config *protos.EventHubConfig
pgMetadata *PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
hubManager *EventHubManager
}

// NewEventHubConnector creates a new EventHubConnector.
Expand All @@ -43,11 +40,7 @@ func NewEventHubConnector(
return nil, err
}

jwtTokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
log.Errorf("failed to get jwt token provider: %v", err)
return nil, err
}
hubManager := NewEventHubManager(ctx, defaultAzureCreds, config.GetNamespace())

pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb())
if err != nil {
Expand All @@ -56,36 +49,26 @@ func NewEventHubConnector(
}

return &EventHubConnector{
ctx: ctx,
config: config,
pgMetadata: pgMetadata,
creds: defaultAzureCreds,
tokenProvider: jwtTokenProvider,
hubs: cmap.New[*eventhub.Hub](),
ctx: ctx,
config: config,
pgMetadata: pgMetadata,
creds: defaultAzureCreds,
hubManager: hubManager,
}, nil
}

func (c *EventHubConnector) Close() error {
var allErrors error

// close all the event hub connections.
for _, hubName := range c.hubs.Keys() {
hub, ok := c.hubs.Get(hubName)
if !ok {
log.Errorf("failed to get event hub connection: %v", hubName)
allErrors = errors.Join(allErrors, fmt.Errorf("failed to get event hub connection: %v", hubName))
continue
}

err := hub.Close(c.ctx)
if err != nil {
log.Errorf("failed to close event hub connection: %v", err)
allErrors = errors.Join(allErrors, err)
}
// close all the eventhub connections.
err := c.hubManager.Close()
if err != nil {
log.Errorf("failed to close eventhub connections: %v", err)
allErrors = errors.Join(allErrors, err)
}

// close the postgres metadata store.
err := c.pgMetadata.Close()
err = c.pgMetadata.Close()
if err != nil {
log.Errorf("failed to close postgres metadata store: %v", err)
allErrors = errors.Join(allErrors, err)
Expand Down Expand Up @@ -124,7 +107,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
maxParallelism = 10
}

batchPerTopic := make(map[string][]*eventhub.Event)
batchPerTopic := make(map[string]*azeventhubs.EventDataBatch)
startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSON()
Expand All @@ -139,10 +122,25 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
topicName := record.GetTableName()

if _, ok := batchPerTopic[topicName]; !ok {
batchPerTopic[topicName] = make([]*eventhub.Event, 0)
batch, err := c.hubManager.CreateEventDataBatch(topicName)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to create event data batch: %v", err)
return nil, err
}
batchPerTopic[topicName] = batch
}

batchPerTopic[topicName] = append(batchPerTopic[topicName], eventhub.NewEventFromString(json))
opts := &azeventhubs.AddEventDataOptions{}

err = batchPerTopic[topicName].AddEventData(eventDataFromString(json), opts)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event data to batch: %v", err)
return nil, err
}

if i%eventsPerHeartBeat == 0 {
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName))
Expand All @@ -155,7 +153,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

batchPerTopic = make(map[string][]*eventhub.Event)
batchPerTopic = make(map[string]*azeventhubs.EventDataBatch)
}
}

Expand Down Expand Up @@ -194,7 +192,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}, nil
}

func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
func (c *EventHubConnector) sendEventBatch(
events map[string]*azeventhubs.EventDataBatch,
maxParallelism int64,
flowName string,
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
Expand All @@ -218,28 +217,32 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
for tblName, eventBatch := range events {
guard <- struct{}{}
wg.Add(1)
go func(tblName string, eventBatch []*eventhub.Event) {
go func(tblName string, eventBatch *azeventhubs.EventDataBatch) {
defer func() {
<-guard
wg.Done()
}()

hub, err := c.getOrCreateHubConnection(tblName)
hub, err := c.hubManager.GetOrCreateHub(tblName)
if err != nil {
once.Do(func() { firstErr = err })
return
}

numEvents := eventBatch.NumEvents()
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("obtained hub connection and now sending %d events to event hub: %s",
len(eventBatch), tblName)
err = hub.SendBatch(subCtx, eventhub.NewEventBatchIterator(eventBatch...))
numEvents, tblName)

opts := &azeventhubs.SendEventDataBatchOptions{}
err = hub.SendEventDataBatch(subCtx, eventBatch, opts)
if err != nil {
once.Do(func() { firstErr = err })
return
}

atomic.AddInt32(&numEventsPushed, int32(len(eventBatch)))
atomic.AddInt32(&numEventsPushed, numEvents)
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
Expand All @@ -248,7 +251,7 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
if !ok {
rowCount = uint32(0)
}
rowCount += uint32(len(eventBatch))
rowCount += uint32(numEvents)
tableNameRowsMapping.Set(tblName, rowCount)
}(tblName, eventBatch)
}
Expand All @@ -264,21 +267,6 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
return nil
}

func (c *EventHubConnector) getOrCreateHubConnection(name string) (*eventhub.Hub, error) {
hub, ok := c.hubs.Get(name)
if !ok {
hub, err := eventhub.NewHub(c.config.GetNamespace(), name, c.tokenProvider)
if err != nil {
log.Errorf("failed to create event hub connection: %v", err)
return nil, err
}
c.hubs.Set(name, hub)
return hub, nil
}

return hub, nil
}

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the destination topic name.
Expand Down Expand Up @@ -365,3 +353,9 @@ func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
metadataSchema))
return err
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
}
}
72 changes: 72 additions & 0 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package conneventhub

import (
"context"
"fmt"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
cmap "github.com/orcaman/concurrent-map/v2"
)

type EventHubManager struct {
ctx context.Context
creds *azidentity.DefaultAzureCredential
namespace string
hubs cmap.ConcurrentMap[string, *azeventhubs.ProducerClient]
}

func NewEventHubManager(
ctx context.Context,
creds *azidentity.DefaultAzureCredential,
namespace string,
) *EventHubManager {
hubs := cmap.New[*azeventhubs.ProducerClient]()
return &EventHubManager{
ctx: ctx,
creds: creds,
namespace: namespace,
hubs: hubs,
}
}

func (m *EventHubManager) GetOrCreateHub(name string) (*azeventhubs.ProducerClient, error) {
hub, ok := m.hubs.Get(name)

if !ok {
opts := &azeventhubs.ProducerClientOptions{}
hub, err := azeventhubs.NewProducerClient(m.namespace, name, m.creds, opts)
if err != nil {
return nil, fmt.Errorf("failed to create eventhub client: %v", err)
}
m.hubs.Set(name, hub)
return hub, nil
}

return hub, nil
}

func (m *EventHubManager) Close() error {
for hub := range m.hubs.IterBuffered() {
err := hub.Val.Close(m.ctx)
if err != nil {
return fmt.Errorf("failed to close eventhub client: %v", err)
}
}
return nil
}

func (m *EventHubManager) CreateEventDataBatch(name string) (*azeventhubs.EventDataBatch, error) {
hub, err := m.GetOrCreateHub(name)
if err != nil {
return nil, err
}

opts := &azeventhubs.EventDataBatchOptions{}
batch, err := hub.NewEventDataBatch(m.ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to create event data batch: %v", err)
}

return batch, nil
}
Loading

0 comments on commit e7748b9

Please sign in to comment.