Skip to content

Commit

Permalink
lesser flaky pls (#1906)
Browse files Browse the repository at this point in the history
Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
heavycrystal and serprex authored Jul 5, 2024
1 parent a21141b commit eb7c91c
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,15 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
return flowStatus == protos.FlowStatus_STATUS_PAUSED
})

_, err = s.Conn().Exec(context.Background(),
`SELECT pg_terminate_backend(pid) FROM pg_stat_activity
WHERE query LIKE '%START_REPLICATION%' AND query LIKE '%dynconfig%' AND backend_type='walsender'`)
require.NoError(s.t, err)
time.Sleep(5 * time.Second)

// add rows to both tables before resuming - should handle
addRows(18)

e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{
IdleTimeout: 14,
BatchSize: 12,
Expand All @@ -959,15 +968,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
},
})

// add rows to both tables before resuming - should handle
addRows(18)

e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal)

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool {
e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})
e2e.EnvWaitFor(s.t, env, 2*time.Minute, "initial load + normalize 18 records - second table", func() bool {
Expand Down

0 comments on commit eb7c91c

Please sign in to comment.