Skip to content

Commit

Permalink
Flush if record size exceeds max bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Sep 24, 2023
1 parent f1ce7c9 commit fe469c3
Showing 1 changed file with 50 additions and 18 deletions.
68 changes: 50 additions & 18 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -118,42 +119,73 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

// TODO (kaushik): this is a hack to get the table name.
topicName := record.GetTableName()

if _, ok := batchPerTopic[topicName]; !ok {
batch, err := c.hubManager.CreateEventDataBatch(topicName)
flushBatch := func() error {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to create event data batch: %v", err)
return nil, err
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic[topicName] = batch
batchPerTopic = make(map[string]*azeventhubs.EventDataBatch)
return nil
}

opts := &azeventhubs.AddEventDataOptions{}
// TODO (kaushik): this is a hack to get the table name.
topicName := record.GetTableName()

addRecord := func() error {
if _, ok := batchPerTopic[topicName]; !ok {
batch, err := c.hubManager.CreateEventDataBatch(topicName)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to create event data batch: %v", err)
return err
}
batchPerTopic[topicName] = batch
}

opts := &azeventhubs.AddEventDataOptions{}
eventData := eventDataFromString(json)
return batchPerTopic[topicName].AddEventData(eventData, opts)
}

err = batchPerTopic[topicName].AddEventData(eventDataFromString(json), opts)
err = addRecord()
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event data to batch: %v", err)
return nil, err
// if the error contains `EventData could not be added because it is too large for the batch`
// then flush the batch and try again.
if strings.Contains(err.Error(), "too large for the batch") {
err := flushBatch()
if err != nil {
return nil, err
}

err = addRecord()
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event data to batch (retried): %v", err)
return nil, err
}
} else {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("failed to add event data to batch: %v", err)
return nil, err
}
}

if i%eventsPerHeartBeat == 0 {
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("sent %d records to hub: %s", i, topicName))
}

if (i+1)%eventsPerBatch == 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName, tableNameRowsMapping)
err := flushBatch()
if err != nil {
return nil, err
}

batchPerTopic = make(map[string]*azeventhubs.EventDataBatch)
}
}

Expand Down

0 comments on commit fe469c3

Please sign in to comment.