Skip to content

Commit

Permalink
Queues: communicate flushes back to source in order to keep slot size…
Browse files Browse the repository at this point in the history
… down (#1626)

This is particularly important for queues, as to reduce reconnections long sync batches are desirable
  • Loading branch information
serprex authored Apr 19, 2024
1 parent f6a5655 commit 7fb283a
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 38 deletions.
18 changes: 11 additions & 7 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"reflect"
"sync/atomic"
"time"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -111,8 +112,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, err
}
logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))
consumedOffset := atomic.Int64{}
consumedOffset.Store(lastOffset)

// start a goroutine to pull records from the source
recordBatch := model.NewCDCStream[Items]()
startTime := time.Now()

Expand All @@ -123,6 +125,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: lastOffset,
ConsumedOffset: &consumedOffset,
MaxBatchSize: batchSize,
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(options.IdleTimeoutSeconds),
Expand Down Expand Up @@ -184,12 +187,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon

syncStartTime = time.Now()
res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[Items]{
SyncBatchID: syncBatchID,
Records: recordBatch,
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
SyncBatchID: syncBatchID,
Records: recordBatch,
ConsumedOffset: &consumedOffset,
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
33 changes: 14 additions & 19 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ type ScopedEventhubData struct {
// returns the number of records synced
func (c *EventHubConnector) processBatch(
ctx context.Context,
flowJobName string,
batch *model.CDCStream[model.RecordItems],
script string,
req *model.SyncRecordsRequest[model.RecordItems],
) (uint32, error) {
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false)
Expand All @@ -191,21 +189,20 @@ func (c *EventHubConnector) processBatch(
defer ticker.Stop()

lastSeenLSN := int64(0)
lastUpdatedOffset := int64(0)

numRecords := atomic.Uint32{}

var ls *lua.LState
var fn *lua.LFunction
if script != "" {
if req.Script != "" {
var err error
ls, err = utils.LoadScript(ctx, script, func(ls *lua.LState) int {
ls, err = utils.LoadScript(ctx, req.Script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
_ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t"))
_ = c.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t"))
return 0
})
if err != nil {
Expand All @@ -223,10 +220,10 @@ func (c *EventHubConnector) processBatch(

for {
select {
case record, ok := <-batch.GetRecords():
case record, ok := <-req.Records.GetRecords():
if !ok {
c.logger.Info("flushing batches because no more records")
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
err := batchPerTopic.flushAllBatches(ctx, req.FlowJobName)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -322,25 +319,23 @@ func (c *EventHubConnector) processBatch(
return 0, fmt.Errorf("[eventhub] context cancelled %w", ctx.Err())

case <-ticker.C:
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
err := batchPerTopic.flushAllBatches(ctx, req.FlowJobName)
if err != nil {
return 0, err
}

if lastSeenLSN > lastUpdatedOffset {
err = c.SetLastOffset(ctx, flowJobName, lastSeenLSN)
lastUpdatedOffset = lastSeenLSN
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN))
if err != nil {
return 0, fmt.Errorf("failed to update last offset: %w", err)
} else if lastSeenLSN > req.ConsumedOffset.Load() {
if err := c.SetLastOffset(ctx, req.FlowJobName, lastSeenLSN); err != nil {
c.logger.Warn("[eventhubs] SetLastOffset error", slog.Any("error", err))
} else {
shared.AtomicInt64Max(req.ConsumedOffset, lastSeenLSN)
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN))
}
}
}
}
}

func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
numRecords, err := c.processBatch(ctx, req.FlowJobName, req.Records, req.Script)
numRecords, err := c.processBatch(ctx, req)
if err != nil {
c.logger.Error("failed to process batch", slog.Any("error", err))
return nil, err
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds())
defer ticker.Stop()

lastUpdatedOffset := int64(0)
for {
select {
case <-ctx.Done():
Expand All @@ -226,11 +225,11 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
if err := c.client.Flush(ctx); err != nil {
c.logger.Warn("[kafka] flush error", slog.Any("error", err))
continue
} else if lastSeen > lastUpdatedOffset {
} else if lastSeen > req.ConsumedOffset.Load() {
if err := c.SetLastOffset(ctx, req.FlowJobName, lastSeen); err != nil {
c.logger.Warn("[kafka] SetLastOffset error", slog.Any("error", err))
} else {
lastUpdatedOffset = lastSeen
shared.AtomicInt64Max(req.ConsumedOffset, lastSeen)
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeen))
}
}
Expand Down
8 changes: 3 additions & 5 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,12 @@ func PullCdcRecords[Items model.Items](
records := req.RecordStream
// clientXLogPos is the last checkpoint id, we need to ack that we have processed
// until clientXLogPos each time we send a standby status update.
// consumedXLogPos is the lsn that has been committed on the destination.
var clientXLogPos, consumedXLogPos pglogrepl.LSN
var clientXLogPos pglogrepl.LSN
if req.LastOffset > 0 {
clientXLogPos = pglogrepl.LSN(req.LastOffset)
consumedXLogPos = clientXLogPos

err := pglogrepl.SendStandbyStatusUpdate(ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(req.ConsumedOffset.Load())})
if err != nil {
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}
Expand Down Expand Up @@ -357,7 +355,7 @@ func PullCdcRecords[Items model.Items](
for {
if pkmRequiresResponse {
err := pglogrepl.SendStandbyStatusUpdate(ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(req.ConsumedOffset.Load())})
if err != nil {
return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}
Expand Down
7 changes: 3 additions & 4 deletions flow/connectors/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord
ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds())
defer ticker.Stop()

lastUpdatedOffset := int64(0)
for {
select {
case <-ctx.Done():
Expand All @@ -251,12 +250,12 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord
// flush loop doesn't block processing new messages
case <-ticker.C:
lastSeen := lastSeenLSN.Load()
if lastSeen > lastUpdatedOffset {
if lastSeen > req.ConsumedOffset.Load() {
if err := c.SetLastOffset(ctx, req.FlowJobName, lastSeen); err != nil {
c.logger.Warn("[pubsub] SetLastOffset error", slog.Any("error", err))
} else {
lastUpdatedOffset = lastSeen
c.logger.Info("processBatch", slog.Int64("updated last offset", lastUpdatedOffset))
shared.AtomicInt64Max(req.ConsumedOffset, lastSeen)
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeen))
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package model

import (
"sync/atomic"
"time"

"github.com/jackc/pglogrepl"
Expand All @@ -27,6 +28,8 @@ func NewNameAndExclude(name string, exclude []string) NameAndExclude {
type PullRecordsRequest[T Items] struct {
// record batch for pushing changes into
RecordStream *CDCStream[T]
// ConsumedOffset can be reported as committed to reduce slot size
ConsumedOffset *atomic.Int64
// FlowJobName is the name of the flow job.
FlowJobName string
// relId to name Mapping
Expand Down Expand Up @@ -74,6 +77,8 @@ type TableWithPkey struct {

type SyncRecordsRequest[T Items] struct {
Records *CDCStream[T]
// ConsumedOffset allows destination to confirm lsn for slot
ConsumedOffset *atomic.Int64
// FlowJobName is the name of the flow job.
FlowJobName string
// Staging path for AVRO files in CDC
Expand Down

0 comments on commit 7fb283a

Please sign in to comment.