Skip to content

Commit

Permalink
event more eventhub improvements (#461)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Sep 30, 2023
1 parent 548bbb0 commit 43b4a16
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 66 deletions.
78 changes: 12 additions & 66 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -106,7 +105,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
maxParallelism = 10
}

batchPerTopic := make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

startTime := time.Now()
Expand All @@ -128,20 +127,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic = make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
return nil
}

flushTopic := func(topic ScopedEventhub) error {
err := c.sendBatch(topic, batchPerTopic[topic])
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to send event batch - %s: %v", topic, err)
return err
}

delete(batchPerTopic, topic)
batchPerTopic.Clear()
return nil
}

Expand All @@ -153,46 +139,12 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

addRecord := func() error {
if _, ok := batchPerTopic[topicName]; !ok {
batch, err := c.hubManager.CreateEventDataBatch(topicName)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to create event data batch: %v", err)
return err
}
batchPerTopic[topicName] = batch
}

opts := &azeventhubs.AddEventDataOptions{}
eventData := eventDataFromString(json)
return batchPerTopic[topicName].AddEventData(eventData, opts)
}

err = addRecord()
err = batchPerTopic.AddEvent(topicName, json)
if err != nil {
// if the error contains `EventData could not be added because it is too large for the batch`
// then flush the batch and try again.
if strings.Contains(err.Error(), "too large for the batch") {
err := flushTopic(topicName)
if err != nil {
return nil, err
}

err = addRecord()
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event data to batch (retried): %v", err)
return nil, err
}
} else {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event data to batch: %v", err)
return nil, err
}
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event to batch: %v", err)
return nil, err
}

if i%eventsPerHeartBeat == 0 {
Expand All @@ -208,7 +160,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

// send the remaining events.
if len(batchPerTopic) > 0 {
if batchPerTopic.Len() > 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
if err != nil {
Expand Down Expand Up @@ -243,11 +195,11 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

func (c *EventHubConnector) sendEventBatch(
events map[ScopedEventhub]*azeventhubs.EventDataBatch,
events *HubBatches,
maxParallelism int64,
flowName string,
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
if len(events) == 0 {
if events.Len() == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("no events to send")
Expand All @@ -261,7 +213,7 @@ func (c *EventHubConnector) sendEventBatch(
// Limiting concurrent sends
guard := make(chan struct{}, maxParallelism)

for tblName, eventBatch := range events {
events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
guard <- struct{}{}
wg.Add(1)
go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
Expand All @@ -288,7 +240,7 @@ func (c *EventHubConnector) sendEventBatch(
rowCount += uint32(numEvents)
tableNameRowsMapping.Set(tblName.ToString(), rowCount)
}(tblName, eventBatch)
}
})

wg.Wait()

Expand Down Expand Up @@ -359,9 +311,3 @@ func (c *EventHubConnector) SetupNormalizedTables(
TableExistsMapping: nil,
}, nil
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
}
}
95 changes: 95 additions & 0 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package conneventhub

import (
"fmt"
"strings"

azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

// multimap from ScopedEventhub to *azeventhubs.EventDataBatch
type HubBatches struct {
batches map[ScopedEventhub][]*azeventhubs.EventDataBatch
manager *EventHubManager
}

func NewHubBatches(manager *EventHubManager) *HubBatches {
return &HubBatches{
batches: make(map[ScopedEventhub][]*azeventhubs.EventDataBatch),
manager: manager,
}
}

func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {
batches, ok := h.batches[name]
if !ok {
batches = []*azeventhubs.EventDataBatch{}
}

if len(batches) == 0 {
newBatch, err := h.manager.CreateEventDataBatch(name)
if err != nil {
return err
}
batches = append(batches, newBatch)
}

if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil {
if strings.Contains(err.Error(), "too large for the batch") {
overflowBatch, err := h.handleBatchOverflow(name, event)
if err != nil {
return fmt.Errorf("failed to handle batch overflow: %v", err)
}
batches = append(batches, overflowBatch)
} else {
return fmt.Errorf("failed to add event data: %v", err)
}
}

h.batches[name] = batches
return nil
}

func (h *HubBatches) handleBatchOverflow(
name ScopedEventhub,
event string,
) (*azeventhubs.EventDataBatch, error) {
newBatch, err := h.manager.CreateEventDataBatch(name)
if err != nil {
return nil, err
}
if err := tryAddEventToBatch(event, newBatch); err != nil {
return nil, fmt.Errorf("failed to add event data to new batch: %v", err)
}
return newBatch, nil
}

func (h *HubBatches) Len() int {
return len(h.batches)
}

// ForEach calls the given function for each ScopedEventhub and batch pair
func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch)) {
for name, batches := range h.batches {
for _, batch := range batches {
fn(name, batch)
}
}
}

// Clear removes all batches from the HubBatches
func (h *HubBatches) Clear() {
h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)
}

func tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error {
eventData := eventDataFromString(event)
opts := &azeventhubs.AddEventDataOptions{}
return batch.AddEventData(eventData, opts)
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
}
}

0 comments on commit 43b4a16

Please sign in to comment.