diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 01fd941a17..6364dd7c66 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "sync/atomic" "time" @@ -118,28 +119,62 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, err } - // TODO (kaushik): this is a hack to get the table name. - topicName := record.GetTableName() - - if _, ok := batchPerTopic[topicName]; !ok { - batch, err := c.hubManager.CreateEventDataBatch(topicName) + flushBatch := func() error { + err := c.sendEventBatch(batchPerTopic, maxParallelism, + req.FlowJobName, tableNameRowsMapping) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, - }).Infof("failed to create event data batch: %v", err) - return nil, err + }).Infof("failed to send event batch: %v", err) + return err } - batchPerTopic[topicName] = batch + batchPerTopic = make(map[string]*azeventhubs.EventDataBatch) + return nil } - opts := &azeventhubs.AddEventDataOptions{} + // TODO (kaushik): this is a hack to get the table name. + topicName := record.GetTableName() + + addRecord := func() error { + if _, ok := batchPerTopic[topicName]; !ok { + batch, err := c.hubManager.CreateEventDataBatch(topicName) + if err != nil { + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Infof("failed to create event data batch: %v", err) + return err + } + batchPerTopic[topicName] = batch + } + + opts := &azeventhubs.AddEventDataOptions{} + eventData := eventDataFromString(json) + return batchPerTopic[topicName].AddEventData(eventData, opts) + } - err = batchPerTopic[topicName].AddEventData(eventDataFromString(json), opts) + err = addRecord() if err != nil { - log.WithFields(log.Fields{ - "flowName": req.FlowJobName, - }).Infof("failed to add event data to batch: %v", err) - return nil, err + // if the error contains `EventData could not be added because it is too large for the batch` + // then flush the batch and try again. + if strings.Contains(err.Error(), "too large for the batch") { + err := flushBatch() + if err != nil { + return nil, err + } + + err = addRecord() + if err != nil { + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Infof("failed to add event data to batch (retried): %v", err) + return nil, err + } + } else { + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Infof("failed to add event data to batch: %v", err) + return nil, err + } } if i%eventsPerHeartBeat == 0 { @@ -147,13 +182,10 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } if (i+1)%eventsPerBatch == 0 { - err := c.sendEventBatch(batchPerTopic, maxParallelism, - req.FlowJobName, tableNameRowsMapping) + err := flushBatch() if err != nil { return nil, err } - - batchPerTopic = make(map[string]*azeventhubs.EventDataBatch) } }