Skip to content

Commit

Permalink
Split cdc_flow into cdc_flow / sync_flow (#1365)
Browse files Browse the repository at this point in the history
Benefits:
1. sync_flow / normalize_flow restart when canceled, easy to restart broken flows
2. doesn't keep connection open during pause
3. pause loop moved to start so that upgrades that don't change state values can avoid breaking mirrors by first pausing them to avoid determinism errors in replay
4. moves towards a world where sync_flow history size can be reduced to increase syncs between reconnects (or work out `RecreateSession`)
  • Loading branch information
serprex authored Feb 28, 2024
1 parent 69c6f34 commit c7a14e2
Show file tree
Hide file tree
Showing 7 changed files with 567 additions and 328 deletions.
146 changes: 80 additions & 66 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2e"
Expand Down Expand Up @@ -1111,7 +1112,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst")
dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst")
sentPause := false
isPaused := false
sentUpdate := false

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -1160,13 +1160,13 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
}

getWorkflowState := func() peerflow.CDCFlowWorkflowState {
var workflowState peerflow.CDCFlowWorkflowState
var state peerflow.CDCFlowWorkflowState
val, err := env.QueryWorkflow(shared.CDCFlowStateQuery)
e2e.EnvNoError(s.t, env, err)
err = val.Get(&workflowState)
err = val.Get(&state)
e2e.EnvNoError(s.t, env, err)

return workflowState
return state
}

getFlowStatus := func() protos.FlowStatus {
Expand All @@ -1179,23 +1179,68 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
return flowStatus
}

var workflowState peerflow.CDCFlowWorkflowState

// signals in tests are weird, you need to register them before starting the workflow
// otherwise you guessed it, errors out. really don't like this.
// too short of a gap between signals also causes issues
// might have something to do with how test workflows handle fast-forwarding time.
env.RegisterDelayedCallback(func() {
workflowState = getWorkflowState()
e2e.EnvSignalWorkflow(env, model.FlowSignal, model.PauseSignal)
s.t.Log("Sent pause signal")
sentPause = true
}, 28*time.Second)

// add before to test initial load too.
addRows(18)
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 18 rows into the source tables, exactly 3 batches
addRows(18)

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

workflowState = getWorkflowState()
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1)
// we have limited batch size to 6, so atleast 3 syncs needed
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

if !s.t.Failed() {
// wait for first RegisterDelayedCallback to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
// adding 1 more row while pausing - guarantee finishing another sync
addRows(1)

return sentPause
})
} else {
env.CancelWorkflow()
}
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
err = env.GetWorkflowError()
if !workflow.IsContinueAsNewError(err) {
require.NoError(s.t, err)
require.Error(s.t, err)
}
workflowState.ActiveSignal = model.PauseSignal
env = e2e.NewTemporalTestWorkflowEnvironment(s.t)

// this signal being sent also unblocks another WaitFor
env.RegisterDelayedCallback(func() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send update signal after pause confirmed", func() bool {
flowStatus := getFlowStatus()
if flowStatus != protos.FlowStatus_STATUS_PAUSED {
return false
}
isPaused = true
e2e.EnvSignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{
IdleTimeout: 14,
BatchSize: 12,
Expand All @@ -1210,7 +1255,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
sentUpdate = true
return true
})
}, 56*time.Second)
}, 28*time.Second)
env.RegisterDelayedCallback(func() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool {
if !sentUpdate {
Expand All @@ -1220,75 +1265,44 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
s.t.Log("Sent resume signal")
return true
})
}, 84*time.Second)
}, 56*time.Second)

// add before to test initial load too.
addRows(18)
go func() {
defer env.CancelWorkflow()
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 18 rows into the source tables, exactly 3 batches
addRows(18)

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

workflowState := getWorkflowState()
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1)
// we have limited batch size to 6, so atleast 3 syncs needed
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

if !s.t.Failed() {
// wait for first RegisterDelayedCallback to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
return sentPause
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
// keep adding 1 more row - guarantee finishing another sync
addRows(1)
// isPaused - set from the WaitFor that sends update signal
return isPaused
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

// we have a paused mirror, wait for second signal to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
return sentUpdate
})
// we have a paused mirror, wait for second signal to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
return sentUpdate
})

// add rows to both tables before resuming - should handle
addRows(18)
// add rows to both tables before resuming - should handle
addRows(18)

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
})

workflowState = getWorkflowState()
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2)
// 3 from first insert of 18 rows in 1 table
// 1 from pre-pause
// 3 from second insert of 18 rows in 2 tables, batch size updated
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)
}
workflowState = getWorkflowState()
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2)
// 3 from first insert of 18 rows in 1 table
// 1 from pre-pause
// 3 from second insert of 18 rows in 2 tables, batch size updated
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, &workflowState)
e2e.RequireEnvCanceled(s.t, env)
}
16 changes: 16 additions & 0 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{
Name: "cdc-dynamic-properties",
}

var SyncStopSignal = TypedSignal[struct{}]{
Name: "sync-stop",
}

var SyncErrorSignal = TypedSignal[string]{
Name: "sync-error",
}

var SyncResultSignal = TypedSignal[SyncResponse]{
Name: "sync-result",
}

var SyncOptionsSignal = TypedSignal[*protos.SyncFlowOptions]{
Name: "sync-options",
}

var NormalizeSignal = TypedSignal[NormalizePayload]{
Name: "normalize",
}
Expand Down
Loading

0 comments on commit c7a14e2

Please sign in to comment.