diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 6610fdc8a7..5ceaabdd87 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -948,6 +948,15 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { return flowStatus == protos.FlowStatus_STATUS_PAUSED }) + _, err = s.Conn().Exec(context.Background(), + `SELECT pg_terminate_backend(pid) FROM pg_stat_activity + WHERE query LIKE '%START_REPLICATION%' AND query LIKE '%dynconfig%' AND backend_type='walsender'`) + require.NoError(s.t, err) + time.Sleep(5 * time.Second) + + // add rows to both tables before resuming - should handle + addRows(18) + e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ IdleTimeout: 14, BatchSize: 12, @@ -959,15 +968,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { }, }) - // add rows to both tables before resuming - should handle - addRows(18) - - e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { + e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize 18 records - first table", func() bool { return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil }) e2e.EnvWaitFor(s.t, env, 2*time.Minute, "initial load + normalize 18 records - second table", func() bool {