diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 30be24eba6..f2faccf13a 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -38,7 +38,7 @@ func NewEventHubConnector( return nil, err } - hubManager := NewEventHubManager(ctx, defaultAzureCreds, config) + hubManager := NewEventHubManager(defaultAzureCreds, config) metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101 pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), metadataSchemaName) @@ -124,7 +124,6 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error func (c *EventHubConnector) processBatch( flowJobName string, batch *model.CDCRecordStream, - eventsPerBatch int, maxParallelism int64, ) (uint32, error) { ctx := context.Background() @@ -133,6 +132,18 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) + flushBatch := func() error { + err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) + if err != nil { + log.WithFields(log.Fields{ + "flowName": flowJobName, + }).Infof("failed to send event batch: %v", err) + return err + } + batchPerTopic.Clear() + return nil + } + numRecords := 0 for record := range batch.GetRecords() { numRecords++ @@ -144,18 +155,6 @@ func (c *EventHubConnector) processBatch( return 0, err } - flushBatch := func() error { - err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) - if err != nil { - log.WithFields(log.Fields{ - "flowName": flowJobName, - }).Infof("failed to send event batch: %v", err) - return err - } - batchPerTopic.Clear() - return nil - } - topicName, err := NewScopedEventhub(record.GetTableName()) if err != nil { log.WithFields(log.Fields{ @@ -172,19 +171,20 @@ func (c *EventHubConnector) processBatch( return 0, err } - if (numRecords)%eventsPerBatch == 0 { - err := flushBatch() - if err != nil { - return 0, err - } + if numRecords%1000 == 0 { + log.WithFields(log.Fields{ + "flowName": flowJobName, + }).Infof("processed %d records for sending", numRecords) } } - if batchPerTopic.Len() > 0 { - err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) - if err != nil { - return 0, err - } + log.WithFields(log.Fields{ + "flowName": flowJobName, + }).Infof("processed %d records for sending", numRecords) + + err := flushBatch() + if err != nil { + return 0, err } log.WithFields(log.Fields{ @@ -203,10 +203,6 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S shutdown <- true }() - eventsPerBatch := int(req.PushBatchSize) - if eventsPerBatch <= 0 { - eventsPerBatch = 10000 - } maxParallelism := req.PushParallelism if maxParallelism <= 0 { maxParallelism = 10 @@ -221,13 +217,13 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S // otherwise, we block until processBatch is done. if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) { go func() { - numRecords, err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism) + numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) if err != nil { log.Errorf("[async] failed to process batch: %v", err) } }() } else { - numRecords, err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism) + numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) if err != nil { log.Errorf("failed to process batch: %v", err) return nil, err @@ -316,7 +312,7 @@ func (c *EventHubConnector) sendEventBatch( return firstErr } - log.Infof("successfully sent %d events to event hub", numEventsPushed) + log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed) return nil } @@ -328,7 +324,7 @@ func (c *EventHubConnector) sendBatch( subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - hub, err := c.hubManager.GetOrCreateHubClient(tblName) + hub, err := c.hubManager.GetOrCreateHubClient(subCtx, tblName) if err != nil { return err } diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 1241c0ec7a..a30f94d162 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" @@ -24,7 +25,6 @@ type EventHubManager struct { } func NewEventHubManager( - ctx context.Context, creds *azidentity.DefaultAzureCredential, groupConfig *protos.EventHubGroupConfig, ) *EventHubManager { @@ -40,7 +40,8 @@ func NewEventHubManager( } } -func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhubs.ProducerClient, error) { +func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( + *azeventhubs.ProducerClient, error) { ehConfig, ok := m.peerConfig.Get(name.PeerName) if !ok { return nil, fmt.Errorf("eventhub '%s' not configured", name) @@ -53,9 +54,27 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace) } - hub, ok := m.hubs.Load(name) - if !ok { - opts := &azeventhubs.ProducerClientOptions{} + var hubConnectOK bool + var hub any + hub, hubConnectOK = m.hubs.Load(name) + if hubConnectOK { + hubTmp := hub.(*azeventhubs.ProducerClient) + _, err := hubTmp.GetEventHubProperties(ctx, nil) + if err != nil { + log.Infof("eventhub %s not reachable. Will re-establish connection and re-create it. Err: %v", name, err) + m.hubs.Delete(name) + hubConnectOK = false + } + } + + if !hubConnectOK { + opts := &azeventhubs.ProducerClientOptions{ + RetryOptions: azeventhubs.RetryOptions{ + MaxRetries: 32, + RetryDelay: 2 * time.Second, + MaxRetryDelay: 16 * time.Second, + }, + } hub, err := azeventhubs.NewProducerClient(namespace, name.Eventhub, m.creds, opts) if err != nil { return nil, fmt.Errorf("failed to create eventhub client: %v", err) @@ -67,8 +86,9 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub return hub.(*azeventhubs.ProducerClient), nil } -func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (*azeventhubs.EventDataBatch, error) { - hub, err := m.GetOrCreateHubClient(name) +func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) ( + *azeventhubs.EventDataBatch, error) { + hub, err := m.GetOrCreateHubClient(ctx, name) if err != nil { return nil, err } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 533b4ee0c5..278f0ed863 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -191,6 +191,7 @@ func (p *PostgresCDCSource) consumeStream( records.SignalAsEmpty() } records.RelationMessageMapping <- &p.relationMessageMapping + log.Infof("[finished] PullRecords streamed %d records", len(localRecords)) }() shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {