Skip to content

Commit 2a60ff3

Browse files
authored
Send the events as soon as we have hit the batch size limit (#686)
This is for performance, we want to keep `START_REPLICATION` connection alive for as long as we can. This was observed to reduce the latency overall significantly.
1 parent b4cdb12 commit 2a60ff3

File tree

2 files changed

+105
-86
lines changed

2 files changed

+105
-86
lines changed

flow/connectors/eventhub/eventhub.go

+57-41
Original file line numberDiff line numberDiff line change
@@ -126,54 +126,70 @@ func (c *EventHubConnector) processBatch(
126126
batchPerTopic := NewHubBatches(c.hubManager)
127127
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)
128128

129+
eventHubFlushTimeout :=
130+
time.Duration(utils.GetEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)) *
131+
time.Second
132+
133+
ticker := time.NewTicker(eventHubFlushTimeout)
134+
defer ticker.Stop()
135+
129136
numRecords := 0
130-
for record := range batch.GetRecords() {
131-
numRecords++
132-
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
133-
if err != nil {
134-
log.WithFields(log.Fields{
135-
"flowName": flowJobName,
136-
}).Infof("failed to convert record to json: %v", err)
137-
return 0, err
138-
}
137+
for {
138+
select {
139+
case record, ok := <-batch.GetRecords():
140+
if !ok {
141+
err := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
142+
if err != nil {
143+
return 0, err
144+
}
145+
146+
log.WithFields(log.Fields{
147+
"flowName": flowJobName,
148+
}).Infof("[total] successfully sent %d records to event hub", numRecords)
149+
return uint32(numRecords), nil
150+
}
139151

140-
topicName, err := NewScopedEventhub(record.GetTableName())
141-
if err != nil {
142-
log.WithFields(log.Fields{
143-
"flowName": flowJobName,
144-
}).Infof("failed to get topic name: %v", err)
145-
return 0, err
146-
}
152+
numRecords++
153+
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
154+
if err != nil {
155+
log.WithFields(log.Fields{
156+
"flowName": flowJobName,
157+
}).Infof("failed to convert record to json: %v", err)
158+
return 0, err
159+
}
147160

148-
err = batchPerTopic.AddEvent(ctx, topicName, json)
149-
if err != nil {
150-
log.WithFields(log.Fields{
151-
"flowName": flowJobName,
152-
}).Infof("failed to add event to batch: %v", err)
153-
return 0, err
154-
}
161+
topicName, err := NewScopedEventhub(record.GetTableName())
162+
if err != nil {
163+
log.WithFields(log.Fields{
164+
"flowName": flowJobName,
165+
}).Infof("failed to get topic name: %v", err)
166+
return 0, err
167+
}
155168

156-
if numRecords%1000 == 0 {
157-
log.WithFields(log.Fields{
158-
"flowName": flowJobName,
159-
}).Infof("processed %d records for sending", numRecords)
160-
}
161-
}
169+
err = batchPerTopic.AddEvent(ctx, topicName, json, false)
170+
if err != nil {
171+
log.WithFields(log.Fields{
172+
"flowName": flowJobName,
173+
}).Infof("failed to add event to batch: %v", err)
174+
return 0, err
175+
}
162176

163-
log.WithFields(log.Fields{
164-
"flowName": flowJobName,
165-
}).Infof("processed %d records for sending", numRecords)
177+
if numRecords%1000 == 0 {
178+
log.WithFields(log.Fields{
179+
"flowName": flowJobName,
180+
}).Infof("processed %d records for sending", numRecords)
181+
}
166182

167-
flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
168-
if flushErr != nil {
169-
return 0, flushErr
170-
}
171-
batchPerTopic.Clear()
183+
case <-ticker.C:
184+
err := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
185+
if err != nil {
186+
return 0, err
187+
}
172188

173-
log.WithFields(log.Fields{
174-
"flowName": flowJobName,
175-
}).Infof("[total] successfully flushed %d records to event hub", numRecords)
176-
return uint32(numRecords), nil
189+
ticker.Stop()
190+
ticker = time.NewTicker(eventHubFlushTimeout)
191+
}
192+
}
177193
}
178194

179195
func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {

flow/connectors/eventhub/hub_batches.go

+48-45
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package conneventhub
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
6-
"strings"
77
"sync/atomic"
88
"time"
99

@@ -14,72 +14,70 @@ import (
1414

1515
// multimap from ScopedEventhub to *azeventhubs.EventDataBatch
1616
type HubBatches struct {
17-
batches map[ScopedEventhub][]*azeventhubs.EventDataBatch
17+
batch map[ScopedEventhub]*azeventhubs.EventDataBatch
1818
manager *EventHubManager
1919
}
2020

2121
func NewHubBatches(manager *EventHubManager) *HubBatches {
2222
return &HubBatches{
23-
batches: make(map[ScopedEventhub][]*azeventhubs.EventDataBatch),
23+
batch: make(map[ScopedEventhub]*azeventhubs.EventDataBatch),
2424
manager: manager,
2525
}
2626
}
2727

28-
func (h *HubBatches) AddEvent(ctx context.Context, name ScopedEventhub, event string) error {
29-
batches, ok := h.batches[name]
30-
if !ok {
31-
batches = []*azeventhubs.EventDataBatch{}
32-
}
33-
34-
if len(batches) == 0 {
28+
func (h *HubBatches) AddEvent(
29+
ctx context.Context,
30+
name ScopedEventhub,
31+
event string,
32+
// this is true when we are retrying to send the event after the batch size exceeded
33+
// this should initially be false, and then true when we are retrying.
34+
retryForBatchSizeExceed bool,
35+
) error {
36+
batch, ok := h.batch[name]
37+
if !ok || batch == nil {
3538
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
3639
if err != nil {
37-
return err
40+
return fmt.Errorf("failed to create event data batch: %v", err)
3841
}
39-
batches = append(batches, newBatch)
42+
batch = newBatch
43+
h.batch[name] = batch
4044
}
4145

42-
if err := tryAddEventToBatch(event, batches[len(batches)-1]); err != nil {
43-
if strings.Contains(err.Error(), "too large for the batch") {
44-
overflowBatch, err := h.handleBatchOverflow(ctx, name, event)
45-
if err != nil {
46-
return fmt.Errorf("failed to handle batch overflow: %v", err)
47-
}
48-
batches = append(batches, overflowBatch)
49-
} else {
50-
return fmt.Errorf("failed to add event data: %v", err)
51-
}
46+
err := tryAddEventToBatch(event, batch)
47+
if err == nil {
48+
// we successfully added the event to the batch, so we're done.
49+
return nil
5250
}
5351

54-
h.batches[name] = batches
55-
return nil
56-
}
52+
if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
53+
if retryForBatchSizeExceed {
54+
// if we are already retrying, then we should just return the error
55+
// as we have already tried to send the event to the batch.
56+
return fmt.Errorf("[retry-failed] event too large to add to batch: %v", err)
57+
}
5758

58-
func (h *HubBatches) handleBatchOverflow(
59-
ctx context.Context,
60-
name ScopedEventhub,
61-
event string,
62-
) (*azeventhubs.EventDataBatch, error) {
63-
newBatch, err := h.manager.CreateEventDataBatch(ctx, name)
64-
if err != nil {
65-
return nil, err
66-
}
67-
if err := tryAddEventToBatch(event, newBatch); err != nil {
68-
return nil, fmt.Errorf("failed to add event data to new batch: %v", err)
59+
// if the event is too large, send the current batch and
60+
// delete it from the map, so that a new batch can be created
61+
// for the event next time.
62+
if err := h.sendBatch(ctx, name, batch); err != nil {
63+
return fmt.Errorf("failed to send batch: %v", err)
64+
}
65+
delete(h.batch, name)
66+
67+
return h.AddEvent(ctx, name, event, true)
68+
} else {
69+
return fmt.Errorf("failed to add event to batch: %v", err)
6970
}
70-
return newBatch, nil
7171
}
7272

7373
func (h *HubBatches) Len() int {
74-
return len(h.batches)
74+
return len(h.batch)
7575
}
7676

7777
// ForEach calls the given function for each ScopedEventhub and batch pair
7878
func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch)) {
79-
for name, batches := range h.batches {
80-
for _, batch := range batches {
81-
fn(name, batch)
82-
}
79+
for name, batch := range h.batch {
80+
fn(name, batch)
8381
}
8482
}
8583

@@ -137,14 +135,19 @@ func (h *HubBatches) flushAllBatches(
137135
})
138136
})
139137

140-
log.Infof("[sendEventBatch] successfully sent %d events in total to event hub",
138+
err := g.Wait()
139+
log.Infof("[flush] successfully sent %d events in total to event hub",
141140
numEventsPushed)
142-
return g.Wait()
141+
142+
// clear the batches after flushing them.
143+
h.Clear()
144+
145+
return err
143146
}
144147

145148
// Clear removes all batches from the HubBatches
146149
func (h *HubBatches) Clear() {
147-
h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)
150+
h.batch = make(map[ScopedEventhub]*azeventhubs.EventDataBatch)
148151
}
149152

150153
func tryAddEventToBatch(event string, batch *azeventhubs.EventDataBatch) error {

0 commit comments

Comments
 (0)