Skip to content

Commit

Permalink
[eventhubs] Add more logs and retries
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 7, 2023
1 parent 498c2c5 commit e7893aa
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 39 deletions.
56 changes: 24 additions & 32 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewEventHubConnector(
return nil, err
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
hubManager := NewEventHubManager(defaultAzureCreds, config)
metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(),
metadataSchemaName)
Expand Down Expand Up @@ -124,7 +124,6 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
func (c *EventHubConnector) processBatch(
flowJobName string,
batch *model.CDCRecordStream,
eventsPerBatch int,
maxParallelism int64,
) (uint32, error) {
ctx := context.Background()
Expand All @@ -133,6 +132,18 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

flushBatch := func() error {
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic.Clear()
return nil
}

numRecords := 0
for record := range batch.GetRecords() {
numRecords++
Expand All @@ -144,18 +155,6 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

flushBatch := func() error {
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic.Clear()
return nil
}

topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
Expand All @@ -172,19 +171,16 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

if (numRecords)%eventsPerBatch == 0 {
err := flushBatch()
if err != nil {
return 0, err
}
if numRecords%1000 == 0 {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)
}
}

if batchPerTopic.Len() > 0 {
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
return 0, err
}
err := flushBatch()
if err != nil {
return 0, err
}

log.WithFields(log.Fields{
Expand All @@ -203,10 +199,6 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
shutdown <- true
}()

eventsPerBatch := int(req.PushBatchSize)
if eventsPerBatch <= 0 {
eventsPerBatch = 10000
}
maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
Expand All @@ -221,13 +213,13 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
// otherwise, we block until processBatch is done.
if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) {
go func() {
numRecords, err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism)
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
log.Errorf("[async] failed to process batch: %v", err)
}
}()
} else {
numRecords, err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism)
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
log.Errorf("failed to process batch: %v", err)
return nil, err
Expand Down Expand Up @@ -316,7 +308,7 @@ func (c *EventHubConnector) sendEventBatch(
return firstErr
}

log.Infof("successfully sent %d events to event hub", numEventsPushed)
log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed)
return nil
}

Expand All @@ -328,7 +320,7 @@ func (c *EventHubConnector) sendBatch(
subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

hub, err := c.hubManager.GetOrCreateHubClient(tblName)
hub, err := c.hubManager.GetOrCreateHubClient(subCtx, tblName)
if err != nil {
return err
}
Expand Down
34 changes: 27 additions & 7 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
Expand All @@ -24,7 +25,6 @@ type EventHubManager struct {
}

func NewEventHubManager(
ctx context.Context,
creds *azidentity.DefaultAzureCredential,
groupConfig *protos.EventHubGroupConfig,
) *EventHubManager {
Expand All @@ -40,7 +40,8 @@ func NewEventHubManager(
}
}

func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhubs.ProducerClient, error) {
func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
*azeventhubs.ProducerClient, error) {
ehConfig, ok := m.peerConfig.Get(name.PeerName)
if !ok {
return nil, fmt.Errorf("eventhub '%s' not configured", name)
Expand All @@ -53,9 +54,27 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub
namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace)
}

hub, ok := m.hubs.Load(name)
if !ok {
opts := &azeventhubs.ProducerClientOptions{}
var hubConnectOK bool
var hub any
hub, hubConnectOK = m.hubs.Load(name)
if hubConnectOK {
hubTmp := hub.(*azeventhubs.ProducerClient)
_, err := hubTmp.GetEventHubProperties(ctx, nil)
if err != nil {
log.Infof("eventhub %s not reachable. Will re-establish connection and re-create it. Err: %v", name, err)
m.hubs.Delete(name)
hubConnectOK = false
}
}

if !hubConnectOK {
opts := &azeventhubs.ProducerClientOptions{
RetryOptions: azeventhubs.RetryOptions{
MaxRetries: 32,
RetryDelay: 2 * time.Second,
MaxRetryDelay: 16 * time.Second,
},
}
hub, err := azeventhubs.NewProducerClient(namespace, name.Eventhub, m.creds, opts)
if err != nil {
return nil, fmt.Errorf("failed to create eventhub client: %v", err)
Expand All @@ -67,8 +86,9 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub
return hub.(*azeventhubs.ProducerClient), nil
}

func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (*azeventhubs.EventDataBatch, error) {
hub, err := m.GetOrCreateHubClient(name)
func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (
*azeventhubs.EventDataBatch, error) {
hub, err := m.GetOrCreateHubClient(ctx, name)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (p *PostgresCDCSource) consumeStream(
records.SignalAsEmpty()
}
records.RelationMessageMapping <- &p.relationMessageMapping
log.Infof("[finished] PullRecords streamed %d records", len(localRecords))
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
Expand Down

0 comments on commit e7893aa

Please sign in to comment.