Skip to content

Commit

Permalink
one-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 15, 2024
1 parent 790d305 commit 021b19b
Show file tree
Hide file tree
Showing 17 changed files with 399 additions and 356 deletions.
139 changes: 115 additions & 24 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
Expand All @@ -45,6 +46,16 @@ type SlotSnapshotSignal struct {
type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
CdcCacheRw sync.RWMutex
CdcCache map[string]connectors.CDCPullConnector
}

func (a *FlowableActivity) recordSlotSize(

Check failure on line 53 in flow/activities/flowable.go

View workflow job for this annotation

GitHub Actions / lint

func `(*FlowableActivity).recordSlotSize` is unused (unused)
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) {
}

func (a *FlowableActivity) CheckConnection(
Expand Down Expand Up @@ -207,7 +218,93 @@ func (a *FlowableActivity) CreateNormalizedTable(
}, nil
}

func (a *FlowableActivity) StartFlow(ctx context.Context,
func (a *FlowableActivity) MaintainPull(
ctx context.Context,
config *protos.FlowConnectionConfigs,
sessionID string,
) error {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
return err
}
defer connectors.CloseConnector(ctx, srcConn)

if err := srcConn.SetupReplConn(ctx); err != nil {
return err
}

a.CdcCacheRw.Lock()
a.CdcCache[sessionID] = srcConn
a.CdcCacheRw.Unlock()

ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", config.FlowJobName)
if config.ReplicationSlotName != "" {
slotNameForMetrics = config.ReplicationSlotName
}

go func() {
err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotNameForMetrics, config.Source.Name)
if err != nil {
logger.LoggerFromCtx(ctx).Error("failed to record slot size", "error", err)
}

// separate goroutine so recordSlotSize doesn't block heartbeats
// meanwhile recordSlotSize synchronously for ticker backpressure
slotSizeTicker := time.NewTicker(5 * time.Minute)
defer slotSizeTicker.Stop()
for {
select {
case <-slotSizeTicker.C:
err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotNameForMetrics, config.Source.Name)
if err != nil {
logger.LoggerFromCtx(ctx).Error("failed to record slot size", "error", err)
}
case <-ctx.Done():
return
}
}
}()

for {
select {
case <-ticker.C:
activity.RecordHeartbeat(ctx, "keep session alive")
if err := srcConn.ReplPing(ctx); err != nil {
activity.GetLogger(ctx).Error("Failed to send keep alive ping to replication connection", "Error", err)
}
case <-ctx.Done():
a.CdcCacheRw.Lock()
delete(a.CdcCache, sessionID)
a.CdcCacheRw.Unlock()
return nil
}
}
}

func (a *FlowableActivity) WaitForSourceConnector(
ctx context.Context,
sessionID string,
) error {
for {
a.CdcCacheRw.RLock()
_, ok := a.CdcCache[sessionID]
a.CdcCacheRw.RUnlock()
if ok {
return nil
}
activity.RecordHeartbeat(ctx, "wait another second for source connector")
if err := ctx.Err(); err != nil {
return err
}
time.Sleep(time.Second)
}
}

func (a *FlowableActivity) StartFlow(
ctx context.Context,
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName)
Expand All @@ -226,15 +323,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
return nil, fmt.Errorf("failed to get source connector: %w", err)
a.CdcCacheRw.RLock()
srcConn, ok := a.CdcCache[input.SessionId]
a.CdcCacheRw.RUnlock()
if !ok {
return nil, errors.New("source connector missing from CdcCache")
}
defer connectors.CloseConnector(ctx, srcConn)

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName)
if input.FlowConnectionConfigs.ReplicationSlotName != "" {
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, err
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
Expand All @@ -243,23 +339,22 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})
defer shutdown()

errGroup, errCtx := errgroup.WithContext(ctx)
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

batchSize := input.SyncFlowOptions.BatchSize
if batchSize <= 0 {
batchSize = 1_000_000
}

lastOffset, err := dstConn.GetLastOffset(ctx, input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, err
}

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName

lastOffset, err := dstConn.GetLastOffset(ctx, flowName)
if err != nil {
return nil, err
}

errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
Expand Down Expand Up @@ -352,19 +447,15 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}
lastCheckpoint := pglogrepl.LSN(recordBatch.GetLastCheckpoint())

err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
res.CurrentSyncBatchID,
uint32(numRecords),
pglogrepl.LSN(lastCheckpoint),
lastCheckpoint,
)
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand All @@ -375,7 +466,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
pglogrepl.LSN(lastCheckpoint),
lastCheckpoint,
)
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
36 changes: 0 additions & 36 deletions flow/activities/slot.go

This file was deleted.

7 changes: 5 additions & 2 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/connectors"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -127,9 +128,10 @@ func WorkerMain(opts *WorkerOptions) error {
return queueErr
}

w := worker.New(c, taskQueue, worker.Options{})
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
})
w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig)
w.RegisterWorkflow(peerflow.SyncFlowWorkflow)
w.RegisterWorkflow(peerflow.SetupFlowWorkflow)
w.RegisterWorkflow(peerflow.NormalizeFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepFlowWorkflow)
Expand All @@ -146,6 +148,7 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerter,
CdcCache: make(map[string]connectors.CDCPullConnector),
})

err = w.Run(worker.InterruptCh())
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM `%s`;",
rawTableName, stagingTable)

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}
lastCP := req.Records.GetLastCheckpoint()

activity.RecordHeartbeat(ctx,
fmt.Sprintf("Flow job %s: performing insert and update transaction"+
Expand Down
14 changes: 2 additions & 12 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,8 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, err
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCheckpoint,
LastSyncedCheckpointID: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
Expand All @@ -130,12 +125,7 @@ func (c *ClickhouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return nil, err
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, req.Records.GetLastCheckpoint())
if err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type CDCPullConnector interface {
*protos.EnsurePullabilityBatchOutput, error)

// Methods related to retrieving and pushing records for this connector as a source and destination.
SetupReplConn(context.Context) error

// Ping source to keep connection alive. Can be called concurrently with PullRecords; skips ping in that case.
ReplPing(context.Context) error

// PullRecords pulls records from the source, and returns a RecordBatch.
// This method should be idempotent, and should be able to be called multiple times with the same request.
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,7 @@ func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncReco
return nil, err
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
c.logger.Error("failed to get last checkpoint", slog.Any("error", err))
return nil, err
}
lastCheckpoint := req.Records.GetLastCheckpoint()

err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
if err != nil {
Expand Down
Loading

0 comments on commit 021b19b

Please sign in to comment.