Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors Sendeventbatch #674

Merged
merged 8 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 9 additions & 109 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
cmap "github.com/orcaman/concurrent-map/v2"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -127,23 +123,9 @@ func (c *EventHubConnector) processBatch(
maxParallelism int64,
) (uint32, error) {
ctx := context.Background()

tableNameRowsMapping := cmap.New[uint32]()
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

flushBatch := func() error {
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic.Clear()
return nil
}

numRecords := 0
for record := range batch.GetRecords() {
numRecords++
Expand Down Expand Up @@ -182,14 +164,15 @@ func (c *EventHubConnector) processBatch(
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)

err := flushBatch()
if err != nil {
return 0, err
flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
if flushErr != nil {
return 0, flushErr
}
batchPerTopic.Clear()

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("[total] successfully sent %d records to event hub", numRecords)
}).Infof("[total] successfully flushed %d records to event hub", numRecords)
return uint32(numRecords), nil
}

Expand Down Expand Up @@ -256,101 +239,18 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}, nil
}

func (c *EventHubConnector) sendEventBatch(
ctx context.Context,
events *HubBatches,
maxParallelism int64,
flowName string,
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
if events.Len() == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("no events to send")
return nil
}

var numEventsPushed int32
var wg sync.WaitGroup
var once sync.Once
var firstErr error
// Limiting concurrent sends
guard := make(chan struct{}, maxParallelism)

events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
guard <- struct{}{}
wg.Add(1)
go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
defer func() {
<-guard
wg.Done()
}()

numEvents := eventBatch.NumEvents()
err := c.sendBatch(ctx, tblName, eventBatch)
if err != nil {
once.Do(func() { firstErr = err })
return
}

atomic.AddInt32(&numEventsPushed, numEvents)
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s", numEvents, tblName)
rowCount, ok := tableNameRowsMapping.Get(tblName.ToString())
if !ok {
rowCount = uint32(0)
}
rowCount += uint32(numEvents)
tableNameRowsMapping.Set(tblName.ToString(), rowCount)
}(tblName, eventBatch)
})

wg.Wait()

if firstErr != nil {
log.Error(firstErr)
return firstErr
}

log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed)
return nil
}

func (c *EventHubConnector) sendBatch(
ctx context.Context,
tblName ScopedEventhub,
events *azeventhubs.EventDataBatch,
) error {
subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

hub, err := c.hubManager.GetOrCreateHubClient(subCtx, tblName)
if err != nil {
return err
}

opts := &azeventhubs.SendEventDataBatchOptions{}
err = hub.SendEventDataBatch(subCtx, events, opts)
if err != nil {
return err
}

log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString())
return nil
}

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the "eh_peer.eh_topic" that ought to be used.
tableMap := req.GetTableNameMapping()

for _, table := range tableMap {
for _, destinationTable := range tableMap {
// parse peer name and topic name.
name, err := NewScopedEventhub(table)
name, err := NewScopedEventhub(destinationTable)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
"table": destinationTable,
}).Errorf("failed to parse peer and topic name: %v", err)
return nil, err
}
Expand All @@ -359,7 +259,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
"table": destinationTable,
}).Errorf("failed to ensure event hub exists: %v", err)
return nil, err
}
Expand Down
71 changes: 71 additions & 0 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
cmap "github.com/orcaman/concurrent-map/v2"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

// multimap from ScopedEventhub to *azeventhubs.EventDataBatch
Expand Down Expand Up @@ -79,6 +84,72 @@ func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch
}
}

func (h *HubBatches) sendBatch(
ctx context.Context,
tblName ScopedEventhub,
events *azeventhubs.EventDataBatch,
) error {
subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

hub, err := h.manager.GetOrCreateHubClient(subCtx, tblName)
if err != nil {
return err
}

opts := &azeventhubs.SendEventDataBatchOptions{}
err = hub.SendEventDataBatch(subCtx, events, opts)
if err != nil {
return err
}

log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString())
return nil
}

func (h *HubBatches) flushAllBatches(
ctx context.Context,
maxParallelism int64,
flowName string,
) error {
if h.Len() == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("no events to send")
return nil
}

var numEventsPushed int32
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(maxParallelism))
tableNameRowsMapping := cmap.New[uint32]()
h.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
g.Go(func() error {
numEvents := eventBatch.NumEvents()
err := h.sendBatch(gCtx, tblName, eventBatch)
if err != nil {
return err
}

atomic.AddInt32(&numEventsPushed, numEvents)
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s", numEvents, tblName)
tableNameRowsMapping.Upsert(tblName.ToString(), uint32(numEvents),
func(exist bool, rowCount uint32, newRows uint32) uint32 {
if !exist {
return newRows
}
return rowCount + newRows
})
return nil
})
})

log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed)
return g.Wait()
}

// Clear removes all batches from the HubBatches
func (h *HubBatches) Clear() {
h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)
Expand Down