diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 36aaa089cf..773a8eb52b 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -10,6 +10,7 @@ import ( "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.temporal.io/sdk/testsuite" "go.temporal.io/sdk/worker" @@ -1241,13 +1242,13 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { }) workflowState := getWorkFlowState() - require.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - require.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) - require.EqualValues(s.t, 1, len(workflowState.TableMappings)) - require.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping)) - require.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping)) + assert.EqualValues(s.t, 5, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) + assert.EqualValues(s.t, 1, len(workflowState.TableMappings)) + assert.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping)) + assert.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping)) // we have limited batch size to 6, so atleast 3 syncs needed - require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) // wait for first RegisterDelayedCallback to hit. e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { @@ -1272,8 +1273,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { addRows(18) e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { - flowStatus := getFlowStatus() - return flowStatus == protos.FlowStatus_STATUS_RUNNING + return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING }) e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil @@ -1283,18 +1283,19 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { }) workflowState = getWorkFlowState() - require.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - require.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) - require.EqualValues(s.t, 2, len(workflowState.TableMappings)) - require.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping)) - require.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping)) + assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) + assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) + assert.EqualValues(s.t, 2, len(workflowState.TableMappings)) + assert.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping)) + assert.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping)) // 3 from first insert of 18 rows in 1 table // 1 from pre-pause // 3 from second insert of 18 rows in 2 tables, batch size updated - require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) + assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) + e2e.RequireEnvCanceled(s.t, env) }