Skip to content

Commit

Permalink
fixing review comments pt.2
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 12, 2024
1 parent 72ba829 commit 1b09e55
Showing 1 changed file with 46 additions and 41 deletions.
87 changes: 46 additions & 41 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
// add before to test initial load too.
addRows(18)
go func() {
defer env.CancelWorkflow()
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 18 rows into the source tables, exactly 3 batches
addRows(18)
Expand All @@ -1244,56 +1245,60 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
workflowState := getWorkFlowState()
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.EqualValues(s.t, 1, len(workflowState.TableMappings))
assert.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping))
assert.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping))
assert.Len(s.t, workflowState.TableMappings, 2)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 1)
// we have limited batch size to 6, so atleast 3 syncs needed
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

// wait for first RegisterDelayedCallback to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
return sentPause
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
// keep adding 1 more row - guarantee finishing another sync
addRows(1)
// isPaused - set from the WaitFor that sends update signal
return isPaused
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})
if !s.t.Failed() {
// wait for first RegisterDelayedCallback to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool {
return sentPause
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
// keep adding 1 more row - guarantee finishing another sync
addRows(1)
// isPaused - set from the WaitFor that sends update signal
return isPaused
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

// we have a paused mirror, wait for second signal to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
return sentUpdate
})
// we have a paused mirror, wait for second signal to hit.
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool {
return sentUpdate
})
}

// add rows to both tables before resuming - should handle
addRows(18)

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
})

workflowState = getWorkFlowState()
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.EqualValues(s.t, 2, len(workflowState.TableMappings))
assert.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping))
assert.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping))
// 3 from first insert of 18 rows in 1 table
// 1 from pre-pause
// 3 from second insert of 18 rows in 2 tables, batch size updated
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)
if !s.t.Failed() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool {
return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
})
}

env.CancelWorkflow()
if !s.t.Failed() {
workflowState = getWorkFlowState()
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.TableMappings, 2)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 2)
// 3 from first insert of 18 rows in 1 table
// 1 from pre-pause
// 3 from second insert of 18 rows in 2 tables, batch size updated
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3)
}
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
Expand Down

0 comments on commit 1b09e55

Please sign in to comment.