diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 7c44aa762d..b614ec3295 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strings" "sync" "sync/atomic" "time" @@ -106,7 +105,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S maxParallelism = 10 } - batchPerTopic := make(map[ScopedEventhub]*azeventhubs.EventDataBatch) + batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) startTime := time.Now() @@ -128,20 +127,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }).Infof("failed to send event batch: %v", err) return err } - batchPerTopic = make(map[ScopedEventhub]*azeventhubs.EventDataBatch) - return nil - } - - flushTopic := func(topic ScopedEventhub) error { - err := c.sendBatch(topic, batchPerTopic[topic]) - if err != nil { - log.WithFields(log.Fields{ - "flowName": req.FlowJobName, - }).Infof("failed to send event batch - %s: %v", topic, err) - return err - } - - delete(batchPerTopic, topic) + batchPerTopic.Clear() return nil } @@ -153,46 +139,12 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, err } - addRecord := func() error { - if _, ok := batchPerTopic[topicName]; !ok { - batch, err := c.hubManager.CreateEventDataBatch(topicName) - if err != nil { - log.WithFields(log.Fields{ - "flowName": req.FlowJobName, - }).Infof("failed to create event data batch: %v", err) - return err - } - batchPerTopic[topicName] = batch - } - - opts := &azeventhubs.AddEventDataOptions{} - eventData := eventDataFromString(json) - return batchPerTopic[topicName].AddEventData(eventData, opts) - } - - err = addRecord() + err = batchPerTopic.AddEvent(topicName, json) if err != nil { - // if the error contains `EventData could not be added because it is too large for the batch` - // then flush the batch and try again. - if strings.Contains(err.Error(), "too large for the batch") { - err := flushTopic(topicName) - if err != nil { - return nil, err - } - - err = addRecord() - if err != nil { - log.WithFields(log.Fields{ - "flowName": req.FlowJobName, - }).Infof("failed to add event data to batch (retried): %v", err) - return nil, err - } - } else { - log.WithFields(log.Fields{ - "flowName": req.FlowJobName, - }).Infof("failed to add event data to batch: %v", err) - return nil, err - } + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Infof("failed to add event to batch: %v", err) + return nil, err } if i%eventsPerHeartBeat == 0 { @@ -208,7 +160,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } // send the remaining events. - if len(batchPerTopic) > 0 { + if batchPerTopic.Len() > 0 { err := c.sendEventBatch(batchPerTopic, maxParallelism, req.FlowJobName, tableNameRowsMapping) if err != nil { @@ -243,11 +195,11 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } func (c *EventHubConnector) sendEventBatch( - events map[ScopedEventhub]*azeventhubs.EventDataBatch, + events *HubBatches, maxParallelism int64, flowName string, tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error { - if len(events) == 0 { + if events.Len() == 0 { log.WithFields(log.Fields{ "flowName": flowName, }).Infof("no events to send") @@ -261,7 +213,7 @@ func (c *EventHubConnector) sendEventBatch( // Limiting concurrent sends guard := make(chan struct{}, maxParallelism) - for tblName, eventBatch := range events { + events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { guard <- struct{}{} wg.Add(1) go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) { @@ -288,7 +240,7 @@ func (c *EventHubConnector) sendEventBatch( rowCount += uint32(numEvents) tableNameRowsMapping.Set(tblName.ToString(), rowCount) }(tblName, eventBatch) - } + }) wg.Wait() @@ -359,9 +311,3 @@ func (c *EventHubConnector) SetupNormalizedTables( TableExistsMapping: nil, }, nil } - -func eventDataFromString(s string) *azeventhubs.EventData { - return &azeventhubs.EventData{ - Body: []byte(s), - } -} diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go new file mode 100644 index 0000000000..97d1e568a4 --- /dev/null +++ b/flow/connectors/eventhub/hub_batches.go @@ -0,0 +1,95 @@ +package conneventhub + +import ( + "fmt" + "strings" + + azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" +) + +// multimap from ScopedEventhub to *azeventhubs.EventDataBatch +type HubBatches struct { + batches map[ScopedEventhub][]*azeventhubs.EventDataBatch + manager *EventHubManager +} + +func NewHubBatches(manager *EventHubManager) *HubBatches { + return &HubBatches{ + batches: make(map[ScopedEventhub][]*azeventhubs.EventDataBatch), + manager: manager, + } +} + +func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error { + batches, ok := h.batches[name] + if !ok { + batches = []*azeventhubs.EventDataBatch{} + } + + if len(batches) == 0 { + newBatch, err := h.manager.CreateEventDataBatch(name) + if err != nil { + return err + } + batches = append(batches, newBatch) + } + + if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil { + if strings.Contains(err.Error(), "too large for the batch") { + overflowBatch, err := h.handleBatchOverflow(name, event) + if err != nil { + return fmt.Errorf("failed to handle batch overflow: %v", err) + } + batches = append(batches, overflowBatch) + } else { + return fmt.Errorf("failed to add event data: %v", err) + } + } + + h.batches[name] = batches + return nil +} + +func (h *HubBatches) handleBatchOverflow( + name ScopedEventhub, + event string, +) (*azeventhubs.EventDataBatch, error) { + newBatch, err := h.manager.CreateEventDataBatch(name) + if err != nil { + return nil, err + } + if err := tryAddEventToBatch(event, newBatch); err != nil { + return nil, fmt.Errorf("failed to add event data to new batch: %v", err) + } + return newBatch, nil +} + +func (h *HubBatches) Len() int { + return len(h.batches) +} + +// ForEach calls the given function for each ScopedEventhub and batch pair +func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch)) { + for name, batches := range h.batches { + for _, batch := range batches { + fn(name, batch) + } + } +} + +// Clear removes all batches from the HubBatches +func (h *HubBatches) Clear() { + h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch) +} + +func tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error { + eventData := eventDataFromString(event) + opts := &azeventhubs.AddEventDataOptions{} + return batch.AddEventData(eventData, opts) +} + +func eventDataFromString(s string) *azeventhubs.EventData { + return &azeventhubs.EventData{ + Body: []byte(s), + } +}