diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index f2faccf13a..e81a0c018c 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -4,17 +4,13 @@ import ( "context" "errors" "fmt" - "sync" - "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" - azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - cmap "github.com/orcaman/concurrent-map/v2" log "github.com/sirupsen/logrus" ) @@ -127,23 +123,9 @@ func (c *EventHubConnector) processBatch( maxParallelism int64, ) (uint32, error) { ctx := context.Background() - - tableNameRowsMapping := cmap.New[uint32]() batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) - flushBatch := func() error { - err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) - if err != nil { - log.WithFields(log.Fields{ - "flowName": flowJobName, - }).Infof("failed to send event batch: %v", err) - return err - } - batchPerTopic.Clear() - return nil - } - numRecords := 0 for record := range batch.GetRecords() { numRecords++ @@ -182,14 +164,15 @@ func (c *EventHubConnector) processBatch( "flowName": flowJobName, }).Infof("processed %d records for sending", numRecords) - err := flushBatch() - if err != nil { - return 0, err + flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName) + if flushErr != nil { + return 0, flushErr } + batchPerTopic.Clear() log.WithFields(log.Fields{ "flowName": flowJobName, - }).Infof("[total] successfully sent %d records to event hub", numRecords) + }).Infof("[total] successfully flushed %d records to event hub", numRecords) return uint32(numRecords), nil } @@ -256,101 +239,18 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }, nil } -func (c *EventHubConnector) sendEventBatch( - ctx context.Context, - events *HubBatches, - maxParallelism int64, - flowName string, - tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error { - if events.Len() == 0 { - log.WithFields(log.Fields{ - "flowName": flowName, - }).Infof("no events to send") - return nil - } - - var numEventsPushed int32 - var wg sync.WaitGroup - var once sync.Once - var firstErr error - // Limiting concurrent sends - guard := make(chan struct{}, maxParallelism) - - events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { - guard <- struct{}{} - wg.Add(1) - go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { - defer func() { - <-guard - wg.Done() - }() - - numEvents := eventBatch.NumEvents() - err := c.sendBatch(ctx, tblName, eventBatch) - if err != nil { - once.Do(func() { firstErr = err }) - return - } - - atomic.AddInt32(&numEventsPushed, numEvents) - log.WithFields(log.Fields{ - "flowName": flowName, - }).Infof("pushed %d events to event hub: %s", numEvents, tblName) - rowCount, ok := tableNameRowsMapping.Get(tblName.ToString()) - if !ok { - rowCount = uint32(0) - } - rowCount += uint32(numEvents) - tableNameRowsMapping.Set(tblName.ToString(), rowCount) - }(tblName, eventBatch) - }) - - wg.Wait() - - if firstErr != nil { - log.Error(firstErr) - return firstErr - } - - log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed) - return nil -} - -func (c *EventHubConnector) sendBatch( - ctx context.Context, - tblName ScopedEventhub, - events *azeventhubs.EventDataBatch, -) error { - subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - - hub, err := c.hubManager.GetOrCreateHubClient(subCtx, tblName) - if err != nil { - return err - } - - opts := &azeventhubs.SendEventDataBatchOptions{} - err = hub.SendEventDataBatch(subCtx, events, opts) - if err != nil { - return err - } - - log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString()) - return nil -} - func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { // create topics for each table // key is the source table and value is the "eh_peer.eh_topic" that ought to be used. tableMap := req.GetTableNameMapping() - for _, table := range tableMap { + for _, destinationTable := range tableMap { // parse peer name and topic name. - name, err := NewScopedEventhub(table) + name, err := NewScopedEventhub(destinationTable) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, - "table": table, + "table": destinationTable, }).Errorf("failed to parse peer and topic name: %v", err) return nil, err } @@ -359,7 +259,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, - "table": table, + "table": destinationTable, }).Errorf("failed to ensure event hub exists: %v", err) return nil, err } diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index 652e10d45f..c945fb4d3c 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -4,8 +4,12 @@ import ( "context" "fmt" "strings" + "sync/atomic" + "time" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) // multimap from ScopedEventhub to *azeventhubs.EventDataBatch @@ -79,6 +83,65 @@ func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch } } +func (h *HubBatches) sendBatch( + ctx context.Context, + tblName ScopedEventhub, + events *azeventhubs.EventDataBatch, +) error { + subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + hub, err := h.manager.GetOrCreateHubClient(subCtx, tblName) + if err != nil { + return err + } + + opts := &azeventhubs.SendEventDataBatchOptions{} + err = hub.SendEventDataBatch(subCtx, events, opts) + if err != nil { + return err + } + + log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString()) + return nil +} + +func (h *HubBatches) flushAllBatches( + ctx context.Context, + maxParallelism int64, + flowName string, +) error { + if h.Len() == 0 { + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("no events to send") + return nil + } + + var numEventsPushed int32 + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(int(maxParallelism)) + h.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { + g.Go(func() error { + numEvents := eventBatch.NumEvents() + err := h.sendBatch(gCtx, tblName, eventBatch) + if err != nil { + return err + } + + atomic.AddInt32(&numEventsPushed, numEvents) + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("pushed %d events to event hub: %s", numEvents, tblName) + return nil + }) + }) + + log.Infof("[sendEventBatch] successfully sent %d events in total to event hub", + numEventsPushed) + return g.Wait() +} + // Clear removes all batches from the HubBatches func (h *HubBatches) Clear() { h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)