Skip to content

Commit

Permalink
Remove GetLastSyncedID (#1251)
Browse files Browse the repository at this point in the history
StartFlow can call GetLastOffset itself
  • Loading branch information
serprex authored Feb 11, 2024
1 parent 6dc797b commit 3b91bce
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 49 deletions.
26 changes: 6 additions & 20 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,6 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
return nil
}

func (a *FlowableActivity) GetLastSyncedID(
ctx context.Context,
config *protos.GetLastSyncedIDInput,
) (*protos.LastSyncState, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

var lastOffset int64
lastOffset, err = dstConn.GetLastOffset(ctx, config.FlowJobName)
if err != nil {
return nil, err
}
return &protos.LastSyncState{Checkpoint: lastOffset}, nil
}

func (a *FlowableActivity) EnsurePullability(
ctx context.Context,
config *protos.EnsurePullabilityBatchInput,
Expand Down Expand Up @@ -225,6 +206,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
batchSize = 1_000_000
}

lastOffset, err := dstConn.GetLastOffset(ctx, input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, err
}

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
Expand All @@ -234,7 +220,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
FlowJobName: flowName,
SrcTableIDNameMapping: input.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
LastOffset: lastOffset,
MaxBatchSize: batchSize,
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.SyncFlowOptions.IdleTimeoutSeconds),
Expand Down
24 changes: 0 additions & 24 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,6 @@ func (s *SyncFlowExecution) executeSyncFlow(
) (*model.SyncResponse, error) {
s.logger.Info("executing sync flow", slog.String("flowName", s.CDCFlowName))

syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
})

// execute GetLastSyncedID on destination peer
lastSyncInput := &protos.GetLastSyncedIDInput{
PeerConnectionConfig: config.Destination,
FlowJobName: s.CDCFlowName,
}

lastSyncFuture := workflow.ExecuteLocalActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput)
var dstSyncState *protos.LastSyncState
if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil {
return nil, fmt.Errorf("failed to get last synced ID from destination peer: %w", err)
}

if dstSyncState != nil {
msg := fmt.Sprintf("last synced ID from destination peer - %d\n", dstSyncState.Checkpoint)
s.logger.Info(msg)
} else {
s.logger.Info("no last synced ID from destination peer")
}

startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 72 * time.Hour,
HeartbeatTimeout: time.Minute,
Expand All @@ -73,7 +50,6 @@ func (s *SyncFlowExecution) executeSyncFlow(
// execute StartFlow on the peers to start the flow
startFlowInput := &protos.StartFlowInput{
FlowConnectionConfigs: config,
LastSyncState: dstSyncState,
SyncFlowOptions: opts,
RelationMessageMapping: relationMessageMapping,
SrcTableIdNameMapping: opts.SrcTableIdNameMapping,
Expand Down
7 changes: 2 additions & 5 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,13 @@ message SyncFlowOptions {
repeated TableMapping table_mappings = 6;
}

// deprecated, unused
message LastSyncState {
int64 checkpoint = 1;
}

message StartFlowInput {
// deprecated, unused
LastSyncState last_sync_state = 1;
FlowConnectionConfigs flow_connection_configs = 2;
SyncFlowOptions sync_flow_options = 3;
Expand All @@ -125,11 +127,6 @@ message StartNormalizeInput {
int64 SyncBatchID = 3;
}

message GetLastSyncedIDInput {
peerdb_peers.Peer peer_connection_config = 1;
string flow_job_name = 2;
}

message EnsurePullabilityBatchInput {
peerdb_peers.Peer peer_connection_config = 1;
string flow_job_name = 2;
Expand Down

0 comments on commit 3b91bce

Please sign in to comment.