Skip to content

Commit

Permalink
flush with ticker
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 20, 2023
1 parent 53a342b commit 3aee3e2
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 43 deletions.
98 changes: 57 additions & 41 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,54 +126,70 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

eventHubFlushTimeout :=
time.Duration(utils.GetEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)) *
time.Second

ticker := time.NewTicker(eventHubFlushTimeout)
defer ticker.Stop()

numRecords := 0
for record := range batch.GetRecords() {
numRecords++
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to convert record to json: %v", err)
return 0, err
}
for {
select {
case record, ok := <-batch.GetRecords():
if !ok {
err := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
if err != nil {
return 0, err
}

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("[total] successfully sent %d records to event hub", numRecords)
return uint32(numRecords), nil
}

topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to get topic name: %v", err)
return 0, err
}
numRecords++
json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to convert record to json: %v", err)
return 0, err
}

err = batchPerTopic.AddEvent(ctx, topicName, json, false)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to add event to batch: %v", err)
return 0, err
}
topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to get topic name: %v", err)
return 0, err
}

if numRecords%1000 == 0 {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)
}
}
err = batchPerTopic.AddEvent(ctx, topicName, json, false)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to add event to batch: %v", err)
return 0, err
}

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)
if numRecords%1000 == 0 {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)
}

flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
if flushErr != nil {
return 0, flushErr
}
batchPerTopic.Clear()
case <-ticker.C:
err := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
if err != nil {
return 0, err
}

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("[total] successfully flushed %d records to event hub", numRecords)
return uint32(numRecords), nil
ticker.Stop()
ticker = time.NewTicker(eventHubFlushTimeout)
}
}
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
Expand Down
9 changes: 7 additions & 2 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,14 @@ func (h *HubBatches) flushAllBatches(
})
})

log.Infof("[sendEventBatch] successfully sent %d events in total to event hub",
log.Infof("[flush] successfully sent %d events in total to event hub",
numEventsPushed)
return g.Wait()
err := g.Wait()

// clear the batches after flushing them.
h.Clear()

return err
}

// Clear removes all batches from the HubBatches
Expand Down

0 comments on commit 3aee3e2

Please sign in to comment.