Skip to content

Commit

Permalink
minor change
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Sep 30, 2023
1 parent d1128b5 commit e89df75
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {
batches = append(batches, newBatch)
}

if err := h.tryAddEventToBatch(event, batches[len(batches)-1]); err != nil {
if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil {
if strings.Contains(err.Error(), "too large for the batch") {
overflowBatch, err := h.handleBatchOverflow(name, event)
if err != nil {
Expand All @@ -50,12 +50,6 @@ func (h *HubBatches) AddEvent(name ScopedEventhub, event string) error {
return nil
}

func (h *HubBatches) tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error {
eventData := eventDataFromString(event)
opts := &azeventhubs.AddEventDataOptions{}
return batch.AddEventData(eventData, opts)
}

func (h *HubBatches) handleBatchOverflow(
name ScopedEventhub,
event string,
Expand All @@ -64,7 +58,7 @@ func (h *HubBatches) handleBatchOverflow(
if err != nil {
return nil, err
}
if err := h.tryAddEventToBatch(event, newBatch); err != nil {
if err := tryAddEventToBatch(event, newBatch); err != nil {
return nil, fmt.Errorf("failed to add event data to new batch: %v", err)
}
return newBatch, nil
Expand All @@ -88,6 +82,12 @@ func (h *HubBatches) Clear() {
h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)
}

func tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error {
eventData := eventDataFromString(event)
opts := &azeventhubs.AddEventDataOptions{}
return batch.AddEventData(eventData, opts)
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
Expand Down

0 comments on commit e89df75

Please sign in to comment.