diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index e9c77f544b..9205c73634 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -102,8 +102,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { err = s.comparePGTables(srcTableName, dstTableName, "id,key,value") require.NoError(s.t, err) - - env.AssertExpectations(s.t) } func WaitFuncSchema( @@ -268,7 +266,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) wg.Wait() - env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { @@ -337,9 +334,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - time.Sleep(10 * time.Second) wg.Wait() - env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { @@ -419,8 +414,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") require.NoError(s.t, err) - - env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { @@ -457,13 +450,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 10, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. + wg := sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table @@ -476,28 +472,24 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } s.t.Log("Inserted 10 rows into the source table") - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize 10 rows", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") == nil + }) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) - }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize update", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") == nil + }) - // verify our updates and delete happened - err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - require.NoError(s.t, err) + env.CancelWorkflow() + }() - env.AssertExpectations(s.t) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + wg.Wait() } func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { @@ -560,7 +552,6 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { require.Contains(s.t, err.Error(), "continue as new") checkErr := s.checkPeerdbColumns(dstTableName, 1) require.NoError(s.t, checkErr) - env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { @@ -602,7 +593,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 3, + ExitAfterRecords: -1, MaxBatchSize: 100, } @@ -618,11 +609,15 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize update", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) // since we delete stuff, create another table to compare with _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) @@ -630,23 +625,26 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) + + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - wg.Wait() // verify our updates and delete happened err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t") require.NoError(s.t, err) - softDeleteQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, - dstTableName) - numRows, err := s.countRowsInQuery(softDeleteQuery) + softDeleteQuery := fmt.Sprintf( + `SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`, + dstTableName, + ) + numRows, err := s.RunInt64Query(softDeleteQuery) require.NoError(s.t, err) require.Equal(s.t, int64(1), numRows) } @@ -729,9 +727,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { require.NoError(s.t, err) softDeleteQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`, dstTableName) - numRows, err := s.countRowsInQuery(softDeleteQuery) + numRows, err := s.RunInt64Query(softDeleteQuery) require.NoError(s.t, err) require.Equal(s.t, int64(1), numRows) } @@ -781,13 +779,18 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. + wg := sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -806,21 +809,24 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { e2e.EnvNoError(s.t, env, err) e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) + + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize transaction", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil + }) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") + wg.Wait() // verify our updates and delete happened - err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t") require.NoError(s.t, err) softDeleteQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`, dstTableName) - numRows, err := s.countRowsInQuery(softDeleteQuery) + numRows, err := s.RunInt64Query(softDeleteQuery) require.NoError(s.t, err) require.Equal(s.t, int64(1), numRows) } @@ -863,41 +869,47 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 3, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and delete rows in the table. + wg := sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil + }) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitFor(s.t, env, time.Minute, "normalize reinsert", func(ctx context.Context) bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - - // verify our updates and delete happened - err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - require.NoError(s.t, err) + wg.Wait() softDeleteQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`, dstTableName) - numRows, err := s.countRowsInQuery(softDeleteQuery) + numRows, err := s.RunInt64Query(softDeleteQuery) require.NoError(s.t, err) require.Equal(s.t, int64(0), numRows) } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index cf0eda3b03..8c32b69696 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -165,7 +165,7 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { return rows.Err() } -func (s PeerFlowE2ETestSuitePG) countRowsInQuery(query string) (int64, error) { +func (s PeerFlowE2ETestSuitePG) RunInt64Query(query string) (int64, error) { var count pgtype.Int8 err := s.pool.QueryRow(context.Background(), query).Scan(&count) return count.Int64, err