Skip to content

Commit

Permalink
push sequentially and also fix processes message heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 4, 2023
1 parent 79bbbeb commit 1f53658
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 41 deletions.
50 changes: 23 additions & 27 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)

type EventHubConnector struct {
Expand Down Expand Up @@ -120,7 +121,6 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
func (c *EventHubConnector) processBatch(
flowJobName string,
batch *model.CDCRecordStream,
maxParallelism int64,
) (uint32, error) {
ctx := context.Background()
batchPerTopic := NewHubBatches(c.hubManager)
Expand All @@ -136,23 +136,34 @@ func (c *EventHubConnector) processBatch(
lastSeenLSN := int64(0)
lastUpdatedOffset := int64(0)

numRecords := 0
numRecords := atomic.NewUint32(0)
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords.Load(), flowJobName,
)
})
defer func() {
shutdown <- true
}()

for {
select {
case record, ok := <-batch.GetRecords():
if !ok {
err := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
if err != nil {
return 0, err
}

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("[total] successfully sent %d records to event hub", numRecords)
return uint32(numRecords), nil
}).Infof("[total] successfully sent %d records to event hub",
numRecords.Load())
return numRecords.Load(), nil
}

numRecords++
numRecords.Inc()

recordLSN := record.GetCheckPointID()
if recordLSN > lastSeenLSN {
Expand Down Expand Up @@ -183,14 +194,15 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

if numRecords%1000 == 0 {
curNumRecords := numRecords.Load()
if curNumRecords%1000 == 0 {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)
}).Infof("processed %d records for sending", curNumRecords)
}

case <-ticker.C:
err := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
if err != nil {
return 0, err
}
Expand All @@ -211,37 +223,21 @@ func (c *EventHubConnector) processBatch(
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
}

var err error
batch := req.Records
var numRecords uint32

shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords, req.FlowJobName,
)
})
defer func() {
shutdown <- true
}()

// if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true
// we kick off processBatch in a goroutine and return immediately.
// otherwise, we block until processBatch is done.
if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) {
go func() {
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
numRecords, err = c.processBatch(req.FlowJobName, batch)
if err != nil {
log.Errorf("[async] failed to process batch: %v", err)
}
}()
} else {
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
numRecords, err = c.processBatch(req.FlowJobName, batch)
if err != nil {
log.Errorf("failed to process batch: %v", err)
return nil, err
Expand Down
33 changes: 19 additions & 14 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

// multimap from ScopedEventhub to *azeventhubs.EventDataBatch
Expand Down Expand Up @@ -75,10 +74,14 @@ func (h *HubBatches) Len() int {
}

// ForEach calls the given function for each ScopedEventhub and batch pair
func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch)) {
func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch) error) error {
for name, batch := range h.batch {
fn(name, batch)
err := fn(name, batch)
if err != nil {
return err
}
}
return nil
}

func (h *HubBatches) sendBatch(
Expand Down Expand Up @@ -106,7 +109,6 @@ func (h *HubBatches) sendBatch(

func (h *HubBatches) flushAllBatches(
ctx context.Context,
maxParallelism int64,
flowName string,
) error {
if h.Len() == 0 {
Expand All @@ -117,12 +119,13 @@ func (h *HubBatches) flushAllBatches(
}

var numEventsPushed int32
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(maxParallelism))
h.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
g.Go(func() error {
err := h.ForEach(
func(
tblName ScopedEventhub,
eventBatch *azeventhubs.EventDataBatch,
) error {
numEvents := eventBatch.NumEvents()
err := h.sendBatch(gCtx, tblName, eventBatch)
err := h.sendBatch(ctx, tblName, eventBatch)
if err != nil {
return err
}
Expand All @@ -133,15 +136,17 @@ func (h *HubBatches) flushAllBatches(
}).Infof("pushed %d events to event hub: %s", numEvents, tblName)
return nil
})
})

err := g.Wait()
log.Infof("[flush] successfully sent %d events in total to event hub",
numEventsPushed)

// clear the batches after flushing them.
h.Clear()

if err != nil {
return fmt.Errorf("failed to flushAllBatches: %v", err)
}

log.Infof("[flush] successfully sent %d events in total to event hub",
numEventsPushed)

return err
}

Expand Down

0 comments on commit 1f53658

Please sign in to comment.