From 957b279e2a7eda445c5a3c437273af2991924ecd Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 01:51:39 +0530 Subject: [PATCH 1/8] refactors sendeventbatch --- flow/connectors/eventhub/eventhub.go | 109 ++---------------------- flow/connectors/eventhub/hub_batches.go | 88 +++++++++++++++++++ 2 files changed, 94 insertions(+), 103 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index f2faccf13a..fdcc4caf40 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -4,12 +4,9 @@ import ( "context" "errors" "fmt" - "sync" - "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" - azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -132,18 +129,6 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) - flushBatch := func() error { - err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) - if err != nil { - log.WithFields(log.Fields{ - "flowName": flowJobName, - }).Infof("failed to send event batch: %v", err) - return err - } - batchPerTopic.Clear() - return nil - } - numRecords := 0 for record := range batch.GetRecords() { numRecords++ @@ -182,10 +167,11 @@ func (c *EventHubConnector) processBatch( "flowName": flowJobName, }).Infof("processed %d records for sending", numRecords) - err := flushBatch() + err := batchPerTopic.flushAllBatches(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) if err != nil { return 0, err } + batchPerTopic.Clear() log.WithFields(log.Fields{ "flowName": flowJobName, @@ -256,101 +242,18 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }, nil } -func (c *EventHubConnector) sendEventBatch( - ctx context.Context, - events *HubBatches, - maxParallelism int64, - flowName string, - tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error { - if events.Len() == 0 { - log.WithFields(log.Fields{ - "flowName": flowName, - }).Infof("no events to send") - return nil - } - - var numEventsPushed int32 - var wg sync.WaitGroup - var once sync.Once - var firstErr error - // Limiting concurrent sends - guard := make(chan struct{}, maxParallelism) - - events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { - guard <- struct{}{} - wg.Add(1) - go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { - defer func() { - <-guard - wg.Done() - }() - - numEvents := eventBatch.NumEvents() - err := c.sendBatch(ctx, tblName, eventBatch) - if err != nil { - once.Do(func() { firstErr = err }) - return - } - - atomic.AddInt32(&numEventsPushed, numEvents) - log.WithFields(log.Fields{ - "flowName": flowName, - }).Infof("pushed %d events to event hub: %s", numEvents, tblName) - rowCount, ok := tableNameRowsMapping.Get(tblName.ToString()) - if !ok { - rowCount = uint32(0) - } - rowCount += uint32(numEvents) - tableNameRowsMapping.Set(tblName.ToString(), rowCount) - }(tblName, eventBatch) - }) - - wg.Wait() - - if firstErr != nil { - log.Error(firstErr) - return firstErr - } - - log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed) - return nil -} - -func (c *EventHubConnector) sendBatch( - ctx context.Context, - tblName ScopedEventhub, - events *azeventhubs.EventDataBatch, -) error { - subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - - hub, err := c.hubManager.GetOrCreateHubClient(subCtx, tblName) - if err != nil { - return err - } - - opts := &azeventhubs.SendEventDataBatchOptions{} - err = hub.SendEventDataBatch(subCtx, events, opts) - if err != nil { - return err - } - - log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString()) - return nil -} - func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { // create topics for each table // key is the source table and value is the "eh_peer.eh_topic" that ought to be used. tableMap := req.GetTableNameMapping() - for _, table := range tableMap { + for _, destinationTable := range tableMap { // parse peer name and topic name. - name, err := NewScopedEventhub(table) + name, err := NewScopedEventhub(destinationTable) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, - "table": table, + "table": destinationTable, }).Errorf("failed to parse peer and topic name: %v", err) return nil, err } @@ -359,7 +262,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, - "table": table, + "table": destinationTable, }).Errorf("failed to ensure event hub exists: %v", err) return nil, err } diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index 652e10d45f..d05c07d9f6 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -4,8 +4,13 @@ import ( "context" "fmt" "strings" + "sync" + "sync/atomic" + "time" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + cmap "github.com/orcaman/concurrent-map/v2" + log "github.com/sirupsen/logrus" ) // multimap from ScopedEventhub to *azeventhubs.EventDataBatch @@ -79,6 +84,89 @@ func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch } } +func (h *HubBatches) sendBatch( + ctx context.Context, + tblName ScopedEventhub, + events *azeventhubs.EventDataBatch, +) error { + subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + hub, err := h.manager.GetOrCreateHubClient(subCtx, tblName) + if err != nil { + return err + } + + opts := &azeventhubs.SendEventDataBatchOptions{} + err = hub.SendEventDataBatch(subCtx, events, opts) + if err != nil { + return err + } + + log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString()) + return nil +} + +func (h *HubBatches) flushAllBatches( + ctx context.Context, + events *HubBatches, + maxParallelism int64, + flowName string, + tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error { + if events.Len() == 0 { + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("no events to send") + return nil + } + + var numEventsPushed int32 + var wg sync.WaitGroup + var once sync.Once + var firstErr error + // Limiting concurrent sends + guard := make(chan struct{}, maxParallelism) + + events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { + guard <- struct{}{} + wg.Add(1) + go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { + defer func() { + <-guard + wg.Done() + }() + + numEvents := eventBatch.NumEvents() + err := h.sendBatch(ctx, tblName, eventBatch) + if err != nil { + once.Do(func() { firstErr = err }) + return + } + + atomic.AddInt32(&numEventsPushed, numEvents) + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("pushed %d events to event hub: %s", numEvents, tblName) + rowCount, ok := tableNameRowsMapping.Get(tblName.ToString()) + if !ok { + rowCount = uint32(0) + } + rowCount += uint32(numEvents) + tableNameRowsMapping.Set(tblName.ToString(), rowCount) + }(tblName, eventBatch) + }) + + wg.Wait() + + if firstErr != nil { + log.Error(firstErr) + return firstErr + } + + log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed) + return nil +} + // Clear removes all batches from the HubBatches func (h *HubBatches) Clear() { h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch) From 18cf975f3527333d3f0e72a255bc23a788ba283e Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 02:24:41 +0530 Subject: [PATCH 2/8] refactor wait group and tablemappings --- flow/connectors/eventhub/eventhub.go | 11 +++---- flow/connectors/eventhub/hub_batches.go | 44 ++++++++----------------- 2 files changed, 18 insertions(+), 37 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index fdcc4caf40..a3ebb49a9a 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -11,7 +11,6 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - cmap "github.com/orcaman/concurrent-map/v2" log "github.com/sirupsen/logrus" ) @@ -124,8 +123,6 @@ func (c *EventHubConnector) processBatch( maxParallelism int64, ) (uint32, error) { ctx := context.Background() - - tableNameRowsMapping := cmap.New[uint32]() batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) @@ -167,9 +164,11 @@ func (c *EventHubConnector) processBatch( "flowName": flowJobName, }).Infof("processed %d records for sending", numRecords) - err := batchPerTopic.flushAllBatches(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) - if err != nil { - return 0, err + var flushErr error + + flushErr = batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName) + if flushErr != nil { + return 0, flushErr } batchPerTopic.Clear() diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index d05c07d9f6..72db12efa7 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "strings" - "sync" "sync/atomic" "time" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" cmap "github.com/orcaman/concurrent-map/v2" log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) // multimap from ScopedEventhub to *azeventhubs.EventDataBatch @@ -109,11 +109,10 @@ func (h *HubBatches) sendBatch( func (h *HubBatches) flushAllBatches( ctx context.Context, - events *HubBatches, maxParallelism int64, flowName string, - tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error { - if events.Len() == 0 { +) error { + if h.Len() == 0 { log.WithFields(log.Fields{ "flowName": flowName, }).Infof("no events to send") @@ -121,26 +120,15 @@ func (h *HubBatches) flushAllBatches( } var numEventsPushed int32 - var wg sync.WaitGroup - var once sync.Once - var firstErr error - // Limiting concurrent sends - guard := make(chan struct{}, maxParallelism) - - events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { - guard <- struct{}{} - wg.Add(1) - go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { - defer func() { - <-guard - wg.Done() - }() - + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(int(maxParallelism)) // limit parallel merges to 8 + tableNameRowsMapping := cmap.New[uint32]() + h.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { + g.Go(func() error { numEvents := eventBatch.NumEvents() - err := h.sendBatch(ctx, tblName, eventBatch) + err := h.sendBatch(gCtx, tblName, eventBatch) if err != nil { - once.Do(func() { firstErr = err }) - return + return err } atomic.AddInt32(&numEventsPushed, numEvents) @@ -153,18 +141,12 @@ func (h *HubBatches) flushAllBatches( } rowCount += uint32(numEvents) tableNameRowsMapping.Set(tblName.ToString(), rowCount) - }(tblName, eventBatch) + return nil + }) }) - wg.Wait() - - if firstErr != nil { - log.Error(firstErr) - return firstErr - } - log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed) - return nil + return g.Wait() } // Clear removes all batches from the HubBatches From 0a310b3422e21f929b644950c981e9f44c3de592 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 02:30:03 +0530 Subject: [PATCH 3/8] minor change --- flow/connectors/eventhub/eventhub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index a3ebb49a9a..5821755cc0 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -174,7 +174,7 @@ func (c *EventHubConnector) processBatch( log.WithFields(log.Fields{ "flowName": flowJobName, - }).Infof("[total] successfully sent %d records to event hub", numRecords) + }).Infof("[total] successfully flushed %d records to event hub", numRecords) return uint32(numRecords), nil } From ea2178bd83842c3b8f320ba24998fd75853618f1 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 02:48:34 +0530 Subject: [PATCH 4/8] minor change --- flow/connectors/eventhub/eventhub.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 5821755cc0..e81a0c018c 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -164,9 +164,7 @@ func (c *EventHubConnector) processBatch( "flowName": flowJobName, }).Infof("processed %d records for sending", numRecords) - var flushErr error - - flushErr = batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName) + flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName) if flushErr != nil { return 0, flushErr } From 00f6ab63c9f763e0479c3aab9a046cb8ec7476bb Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 10:58:20 +0530 Subject: [PATCH 5/8] uses upsert and removes comment --- flow/connectors/eventhub/hub_batches.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index 72db12efa7..9f757f40df 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -121,7 +121,7 @@ func (h *HubBatches) flushAllBatches( var numEventsPushed int32 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(int(maxParallelism)) // limit parallel merges to 8 + g.SetLimit(int(maxParallelism)) tableNameRowsMapping := cmap.New[uint32]() h.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { g.Go(func() error { @@ -140,7 +140,12 @@ func (h *HubBatches) flushAllBatches( rowCount = uint32(0) } rowCount += uint32(numEvents) - tableNameRowsMapping.Set(tblName.ToString(), rowCount) + tableNameRowsMapping.Upsert(tblName.ToString(), rowCount, func(exist bool, valueInMap, newValue uint32) uint32 { + if exist { + return valueInMap + } + return newValue + }) return nil }) }) From e4a1981cf8db3125c5e8678189b88ee0a7fcaf3b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 11:03:05 +0530 Subject: [PATCH 6/8] lint --- flow/connectors/eventhub/hub_batches.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index 9f757f40df..ce4d53d96f 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -140,12 +140,13 @@ func (h *HubBatches) flushAllBatches( rowCount = uint32(0) } rowCount += uint32(numEvents) - tableNameRowsMapping.Upsert(tblName.ToString(), rowCount, func(exist bool, valueInMap, newValue uint32) uint32 { - if exist { - return valueInMap - } - return newValue - }) + tableNameRowsMapping.Upsert(tblName.ToString(), rowCount, + func(exist bool, valueInMap, newValue uint32) uint32 { + if exist { + return valueInMap + } + return newValue + }) return nil }) }) From a0b3292e74683e5624a529ead4a1576c60a80e74 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 20:03:01 +0530 Subject: [PATCH 7/8] corrects upsert --- flow/connectors/eventhub/hub_batches.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index ce4d53d96f..662f7ce2c7 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -135,17 +135,12 @@ func (h *HubBatches) flushAllBatches( log.WithFields(log.Fields{ "flowName": flowName, }).Infof("pushed %d events to event hub: %s", numEvents, tblName) - rowCount, ok := tableNameRowsMapping.Get(tblName.ToString()) - if !ok { - rowCount = uint32(0) - } - rowCount += uint32(numEvents) - tableNameRowsMapping.Upsert(tblName.ToString(), rowCount, - func(exist bool, valueInMap, newValue uint32) uint32 { - if exist { - return valueInMap + tableNameRowsMapping.Upsert(tblName.ToString(), uint32(numEvents), + func(exist bool, rowCount uint32, newRows uint32) uint32 { + if !exist { + return newRows } - return newValue + return rowCount + newRows }) return nil }) From c88d96026c32dc974626ddae4491ab406ab3d986 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 20:16:25 +0530 Subject: [PATCH 8/8] removes tablerows map --- flow/connectors/eventhub/hub_batches.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index 662f7ce2c7..c945fb4d3c 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -8,7 +8,6 @@ import ( "time" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" - cmap "github.com/orcaman/concurrent-map/v2" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" ) @@ -122,7 +121,6 @@ func (h *HubBatches) flushAllBatches( var numEventsPushed int32 g, gCtx := errgroup.WithContext(ctx) g.SetLimit(int(maxParallelism)) - tableNameRowsMapping := cmap.New[uint32]() h.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { g.Go(func() error { numEvents := eventBatch.NumEvents() @@ -135,18 +133,12 @@ func (h *HubBatches) flushAllBatches( log.WithFields(log.Fields{ "flowName": flowName, }).Infof("pushed %d events to event hub: %s", numEvents, tblName) - tableNameRowsMapping.Upsert(tblName.ToString(), uint32(numEvents), - func(exist bool, rowCount uint32, newRows uint32) uint32 { - if !exist { - return newRows - } - return rowCount + newRows - }) return nil }) }) - log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed) + log.Infof("[sendEventBatch] successfully sent %d events in total to event hub", + numEventsPushed) return g.Wait() }