diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index f2faccf13a..fdcc4caf40 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -4,12 +4,9 @@ import ( "context" "errors" "fmt" - "sync" - "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" - azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -132,18 +129,6 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) - flushBatch := func() error { - err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) - if err != nil { - log.WithFields(log.Fields{ - "flowName": flowJobName, - }).Infof("failed to send event batch: %v", err) - return err - } - batchPerTopic.Clear() - return nil - } - numRecords := 0 for record := range batch.GetRecords() { numRecords++ @@ -182,10 +167,11 @@ func (c *EventHubConnector) processBatch( "flowName": flowJobName, }).Infof("processed %d records for sending", numRecords) - err := flushBatch() + err := batchPerTopic.flushAllBatches(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) if err != nil { return 0, err } + batchPerTopic.Clear() log.WithFields(log.Fields{ "flowName": flowJobName, @@ -256,101 +242,18 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }, nil } -func (c *EventHubConnector) sendEventBatch( - ctx context.Context, - events *HubBatches, - maxParallelism int64, - flowName string, - tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error { - if events.Len() == 0 { - log.WithFields(log.Fields{ - "flowName": flowName, - }).Infof("no events to send") - return nil - } - - var numEventsPushed int32 - var wg sync.WaitGroup - var once sync.Once - var firstErr error - // Limiting concurrent sends - guard := make(chan struct{}, maxParallelism) - - events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { - guard <- struct{}{} - wg.Add(1) - go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { - defer func() { - <-guard - wg.Done() - }() - - numEvents := eventBatch.NumEvents() - err := c.sendBatch(ctx, tblName, eventBatch) - if err != nil { - once.Do(func() { firstErr = err }) - return - } - - atomic.AddInt32(&numEventsPushed, numEvents) - log.WithFields(log.Fields{ - "flowName": flowName, - }).Infof("pushed %d events to event hub: %s", numEvents, tblName) - rowCount, ok := tableNameRowsMapping.Get(tblName.ToString()) - if !ok { - rowCount = uint32(0) - } - rowCount += uint32(numEvents) - tableNameRowsMapping.Set(tblName.ToString(), rowCount) - }(tblName, eventBatch) - }) - - wg.Wait() - - if firstErr != nil { - log.Error(firstErr) - return firstErr - } - - log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed) - return nil -} - -func (c *EventHubConnector) sendBatch( - ctx context.Context, - tblName ScopedEventhub, - events *azeventhubs.EventDataBatch, -) error { - subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - - hub, err := c.hubManager.GetOrCreateHubClient(subCtx, tblName) - if err != nil { - return err - } - - opts := &azeventhubs.SendEventDataBatchOptions{} - err = hub.SendEventDataBatch(subCtx, events, opts) - if err != nil { - return err - } - - log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString()) - return nil -} - func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { // create topics for each table // key is the source table and value is the "eh_peer.eh_topic" that ought to be used. tableMap := req.GetTableNameMapping() - for _, table := range tableMap { + for _, destinationTable := range tableMap { // parse peer name and topic name. - name, err := NewScopedEventhub(table) + name, err := NewScopedEventhub(destinationTable) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, - "table": table, + "table": destinationTable, }).Errorf("failed to parse peer and topic name: %v", err) return nil, err } @@ -359,7 +262,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, - "table": table, + "table": destinationTable, }).Errorf("failed to ensure event hub exists: %v", err) return nil, err } diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index 652e10d45f..d05c07d9f6 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -4,8 +4,13 @@ import ( "context" "fmt" "strings" + "sync" + "sync/atomic" + "time" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + cmap "github.com/orcaman/concurrent-map/v2" + log "github.com/sirupsen/logrus" ) // multimap from ScopedEventhub to *azeventhubs.EventDataBatch @@ -79,6 +84,89 @@ func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch } } +func (h *HubBatches) sendBatch( + ctx context.Context, + tblName ScopedEventhub, + events *azeventhubs.EventDataBatch, +) error { + subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + hub, err := h.manager.GetOrCreateHubClient(subCtx, tblName) + if err != nil { + return err + } + + opts := &azeventhubs.SendEventDataBatchOptions{} + err = hub.SendEventDataBatch(subCtx, events, opts) + if err != nil { + return err + } + + log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString()) + return nil +} + +func (h *HubBatches) flushAllBatches( + ctx context.Context, + events *HubBatches, + maxParallelism int64, + flowName string, + tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error { + if events.Len() == 0 { + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("no events to send") + return nil + } + + var numEventsPushed int32 + var wg sync.WaitGroup + var once sync.Once + var firstErr error + // Limiting concurrent sends + guard := make(chan struct{}, maxParallelism) + + events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { + guard <- struct{}{} + wg.Add(1) + go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { + defer func() { + <-guard + wg.Done() + }() + + numEvents := eventBatch.NumEvents() + err := h.sendBatch(ctx, tblName, eventBatch) + if err != nil { + once.Do(func() { firstErr = err }) + return + } + + atomic.AddInt32(&numEventsPushed, numEvents) + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("pushed %d events to event hub: %s", numEvents, tblName) + rowCount, ok := tableNameRowsMapping.Get(tblName.ToString()) + if !ok { + rowCount = uint32(0) + } + rowCount += uint32(numEvents) + tableNameRowsMapping.Set(tblName.ToString(), rowCount) + }(tblName, eventBatch) + }) + + wg.Wait() + + if firstErr != nil { + log.Error(firstErr) + return firstErr + } + + log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed) + return nil +} + // Clear removes all batches from the HubBatches func (h *HubBatches) Clear() { h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)