From 1b09e555676a24edd986fac91e591dee4d68fa99 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 12 Feb 2024 23:18:00 +0530 Subject: [PATCH] fixing review comments pt.2 --- flow/e2e/postgres/peer_flow_pg_test.go | 87 ++++++++++++++------------ 1 file changed, 46 insertions(+), 41 deletions(-) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 694bce3832..9138fa8117 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -1233,6 +1233,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { // add before to test initial load too. addRows(18) go func() { + defer env.CancelWorkflow() e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 18 rows into the source tables, exactly 3 batches addRows(18) @@ -1244,56 +1245,60 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { workflowState := getWorkFlowState() assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) - assert.EqualValues(s.t, 1, len(workflowState.TableMappings)) - assert.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping)) - assert.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping)) + assert.Len(s.t, workflowState.TableMappings, 2) + assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1) + assert.Len(s.t, workflowState.TableNameSchemaMapping, 1) // we have limited batch size to 6, so atleast 3 syncs needed assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) - // wait for first RegisterDelayedCallback to hit. - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { - return sentPause - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { - // keep adding 1 more row - guarantee finishing another sync - addRows(1) - // isPaused - set from the WaitFor that sends update signal - return isPaused - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { - return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil - }) + if !s.t.Failed() { + // wait for first RegisterDelayedCallback to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { + return sentPause + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { + // keep adding 1 more row - guarantee finishing another sync + addRows(1) + // isPaused - set from the WaitFor that sends update signal + return isPaused + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) - // we have a paused mirror, wait for second signal to hit. - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { - return sentUpdate - }) + // we have a paused mirror, wait for second signal to hit. + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { + return sentUpdate + }) + } // add rows to both tables before resuming - should handle addRows(18) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { - return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { - return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { - return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil - }) - - workflowState = getWorkFlowState() - assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) - assert.EqualValues(s.t, 2, len(workflowState.TableMappings)) - assert.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping)) - assert.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping)) - // 3 from first insert of 18 rows in 1 table - // 1 from pre-pause - // 3 from second insert of 18 rows in 2 tables, batch size updated - assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) + if !s.t.Failed() { + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { + return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { + return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil + }) + } - env.CancelWorkflow() + if !s.t.Failed() { + workflowState = getWorkFlowState() + assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) + assert.Len(s.t, workflowState.TableMappings, 2) + assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2) + assert.Len(s.t, workflowState.TableNameSchemaMapping, 2) + // 3 from first insert of 18 rows in 1 table + // 1 from pre-pause + // 3 from second insert of 18 rows in 2 tables, batch size updated + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) + } }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)