Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event more eventhub improvements #461

Merged
merged 2 commits into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 12 additions & 66 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -106,7 +105,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
maxParallelism = 10
}

batchPerTopic := make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

startTime := time.Now()
Expand All @@ -128,20 +127,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic = make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
return nil
}

flushTopic := func(topic ScopedEventhub) error {
err := c.sendBatch(topic, batchPerTopic[topic])
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to send event batch - %s: %v", topic, err)
return err
}

delete(batchPerTopic, topic)
batchPerTopic.Clear()
return nil
}

Expand All @@ -153,46 +139,12 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

addRecord := func() error {
if _, ok := batchPerTopic[topicName]; !ok {
batch, err := c.hubManager.CreateEventDataBatch(topicName)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to create event data batch: %v", err)
return err
}
batchPerTopic[topicName] = batch
}

opts := &azeventhubs.AddEventDataOptions{}
eventData := eventDataFromString(json)
return batchPerTopic[topicName].AddEventData(eventData, opts)
}

err = addRecord()
err = batchPerTopic.AddEvent(topicName, json)
if err != nil {
// if the error contains `EventData could not be added because it is too large for the batch`
// then flush the batch and try again.
if strings.Contains(err.Error(), "too large for the batch") {
err := flushTopic(topicName)
if err != nil {
return nil, err
}

err = addRecord()
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event data to batch (retried): %v", err)
return nil, err
}
} else {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event data to batch: %v", err)
return nil, err
}
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event to batch: %v", err)
return nil, err
}

if i%eventsPerHeartBeat == 0 {
Expand All @@ -208,7 +160,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

// send the remaining events.
if len(batchPerTopic) > 0 {
if batchPerTopic.Len() > 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
if err != nil {
Expand Down Expand Up @@ -243,11 +195,11 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

func (c *EventHubConnector) sendEventBatch(
events map[ScopedEventhub]*azeventhubs.EventDataBatch,
events *HubBatches,
maxParallelism int64,
flowName string,
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
if len(events) == 0 {
if events.Len() == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("no events to send")
Expand All @@ -261,7 +213,7 @@ func (c *EventHubConnector) sendEventBatch(
// Limiting concurrent sends
guard := make(chan struct{}, maxParallelism)

for tblName, eventBatch := range events {
events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
guard <- struct{}{}
wg.Add(1)
go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
Expand All @@ -288,7 +240,7 @@ func (c *EventHubConnector) sendEventBatch(
rowCount += uint32(numEvents)
tableNameRowsMapping.Set(tblName.ToString(), rowCount)
}(tblName, eventBatch)
}
})

wg.Wait()

Expand Down Expand Up @@ -359,9 +311,3 @@ func (c *EventHubConnector) SetupNormalizedTables(
TableExistsMapping: nil,
}, nil
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
}
}
95 changes: 95 additions & 0 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package conneventhub

import (
"fmt"
"strings"

azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

// multimap from ScopedEventhub to *azeventhubs.EventDataBatch
type HubBatches struct {
batches map[ScopedEventhub][]*azeventhubs.EventDataBatch
manager *EventHubManager
}

func NewHubBatches(manager *EventHubManager) *HubBatches {
return &HubBatches{
batches: make(map[ScopedEventhub][]*azeventhubs.EventDataBatch),
manager: manager,
}
}

func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {
batches, ok := h.batches[name]
if !ok {
batches = []*azeventhubs.EventDataBatch{}
}

if len(batches) == 0 {
newBatch, err := h.manager.CreateEventDataBatch(name)
if err != nil {
return err
}
batches = append(batches, newBatch)
}

if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil {
if strings.Contains(err.Error(), "too large for the batch") {
overflowBatch, err := h.handleBatchOverflow(name, event)
if err != nil {
return fmt.Errorf("failed to handle batch overflow: %v", err)
}
batches = append(batches, overflowBatch)
} else {
return fmt.Errorf("failed to add event data: %v", err)
}
}

h.batches[name] = batches
return nil
}

func (h *HubBatches) handleBatchOverflow(
name ScopedEventhub,
event string,
) (*azeventhubs.EventDataBatch, error) {
newBatch, err := h.manager.CreateEventDataBatch(name)
if err != nil {
return nil, err
}
if err := tryAddEventToBatch(event, newBatch); err != nil {
return nil, fmt.Errorf("failed to add event data to new batch: %v", err)
}
return newBatch, nil
}

func (h *HubBatches) Len() int {
return len(h.batches)
}

// ForEach calls the given function for each ScopedEventhub and batch pair
func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch)) {
for name, batches := range h.batches {
for _, batch := range batches {
fn(name, batch)
}
}
}

// Clear removes all batches from the HubBatches
func (h *HubBatches) Clear() {
h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)
}

func tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error {
eventData := eventDataFromString(event)
opts := &azeventhubs.AddEventDataOptions{}
return batch.AddEventData(eventData, opts)
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
}
}
Loading