diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index a9ff879b24..97d1e568a4 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -34,7 +34,7 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error { batches = append(batches, newBatch) } - if err := h.tryAddEventToBatch(event, batches[len(batches)-1]); err != nil { + if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil { if strings.Contains(err.Error(), "too large for the batch") { overflowBatch, err := h.handleBatchOverflow(name, event) if err != nil { @@ -50,12 +50,6 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error { return nil } -func (h *HubBatches) tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error { - eventData := eventDataFromString(event) - opts := &azeventhubs.AddEventDataOptions{} - return batch.AddEventData(eventData, opts) -} - func (h *HubBatches) handleBatchOverflow( name ScopedEventhub, event string, @@ -64,7 +58,7 @@ func (h *HubBatches) handleBatchOverflow( if err != nil { return nil, err } - if err := h.tryAddEventToBatch(event, newBatch); err != nil { + if err := tryAddEventToBatch(event, newBatch); err != nil { return nil, fmt.Errorf("failed to add event data to new batch: %v", err) } return newBatch, nil @@ -88,6 +82,12 @@ func (h *HubBatches) Clear() { h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch) } +func tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error { + eventData := eventDataFromString(event) + opts := &azeventhubs.AddEventDataOptions{} + return batch.AddEventData(eventData, opts) +} + func eventDataFromString(s string) *azeventhubs.EventData { return &azeventhubs.EventData{ Body: []byte(s),