Skip to content

Commit

Permalink
Send the events as soon as we have hit the batch size limit
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 20, 2023
1 parent b4cdb12 commit 88c7277
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 44 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

err = batchPerTopic.AddEvent(ctx, topicName, json)
err = batchPerTopic.AddEvent(ctx, topicName, json, false)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
Expand Down
84 changes: 41 additions & 43 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package conneventhub

import (
"context"
"errors"
"fmt"
"strings"
"sync/atomic"
"time"

Expand All @@ -14,72 +14,70 @@ import (

// multimap from ScopedEventhub to *azeventhubs.EventDataBatch
type HubBatches struct {
batches map[ScopedEventhub][]*azeventhubs.EventDataBatch
batch map[ScopedEventhub]*azeventhubs.EventDataBatch
manager *EventHubManager
}

func NewHubBatches(manager *EventHubManager) *HubBatches {
return &HubBatches{
batches: make(map[ScopedEventhub][]*azeventhubs.EventDataBatch),
batch: make(map[ScopedEventhub]*azeventhubs.EventDataBatch),
manager: manager,
}
}

func (h *HubBatches) AddEvent(ctx context.Context, name ScopedEventhub, event string) error {
batches, ok := h.batches[name]
if !ok {
batches = []*azeventhubs.EventDataBatch{}
}

if len(batches) == 0 {
func (h *HubBatches) AddEvent(
ctx context.Context,
name ScopedEventhub,
event string,
// this is true when we are retrying to send the event after the batch size exceeded
// this should initially be false, and then true when we are retrying.
retryForBatchSizeExceed bool,
) error {
batch, ok := h.batch[name]
if !ok || batch == nil {
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
if err != nil {
return err
return fmt.Errorf("failed to create event data batch: %v", err)
}
batches = append(batches, newBatch)
batch = newBatch
h.batch[name] = batch
}

if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil {
if strings.Contains(err.Error(), "too large for the batch") {
overflowBatch, err := h.handleBatchOverflow(ctx, name, event)
if err != nil {
return fmt.Errorf("failed to handle batch overflow: %v", err)
}
batches = append(batches, overflowBatch)
} else {
return fmt.Errorf("failed to add event data: %v", err)
}
err := tryAddEventToBatch(event, batch)
if err == nil {
// we successfully added the event to the batch, so we're done.
return nil
}

h.batches[name] = batches
return nil
}
if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
if retryForBatchSizeExceed {
// if we are already retrying, then we should just return the error
// as we have already tried to send the event to the batch.
return err
}

func (h *HubBatches) handleBatchOverflow(
ctx context.Context,
name ScopedEventhub,
event string,
) (*azeventhubs.EventDataBatch, error) {
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
if err != nil {
return nil, err
}
if err := tryAddEventToBatch(event, newBatch); err != nil {
return nil, fmt.Errorf("failed to add event data to new batch: %v", err)
// if the event is too large, send the current batch and
// delete it from the map, so that a new batch can be created
// for the event next time.
if err := h.sendBatch(ctx, name, batch); err != nil {
return fmt.Errorf("failed to send batch: %v", err)
}
delete(h.batch, name)

return h.AddEvent(ctx, name, event, true)
} else {
return fmt.Errorf("failed to add event to batch: %v", err)
}
return newBatch, nil
}

func (h *HubBatches) Len() int {
return len(h.batches)
return len(h.batch)
}

// ForEach calls the given function for each ScopedEventhub and batch pair
func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch)) {
for name, batches := range h.batches {
for _, batch := range batches {
fn(name, batch)
}
for name, batch := range h.batch {
fn(name, batch)
}
}

Expand Down Expand Up @@ -144,7 +142,7 @@ func (h *HubBatches) flushAllBatches(

// Clear removes all batches from the HubBatches
func (h *HubBatches) Clear() {
h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)
h.batch = make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
}

func tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error {
Expand Down

0 comments on commit 88c7277

Please sign in to comment.