From e0b35460fd3372068f28d0a2735d54dc8697e98d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 5 Jan 2024 22:22:06 +0000 Subject: [PATCH] fix error reporting, fix flow names --- flow/e2e/postgres/peer_flow_pg_test.go | 3 +-- flow/e2e/snowflake/peer_flow_sf_test.go | 8 ++++---- flow/e2e/test_utils.go | 4 ++++ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 1cc897638f..73da88436f 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -750,7 +750,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 4, + ExitAfterRecords: -1, MaxBatchSize: 100, } @@ -777,7 +777,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) e2e.EnvWaitFor(s.t, env, time.Minute, "normalize transaction", func(ctx context.Context) bool { diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 16473f5b95..095efa2612 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -143,7 +143,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow"), + FlowJobName: srcTableName, TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, @@ -171,7 +171,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 20 rows into the source table") - e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1") + e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,key,value") env.CancelWorkflow() }() @@ -208,7 +208,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow"), + FlowJobName: srcTableName, TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, @@ -269,7 +269,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), + FlowJobName: srcTableName, TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index ffac338d6b..3c4f7b476e 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -143,6 +143,8 @@ func EnvWaitForEqualTables( table string, cols string, ) { + suite.T().Helper() + EnvWaitForEqualTablesWithNames(env, suite, reason, table, table, cols) } @@ -155,6 +157,8 @@ func EnvWaitForEqualTablesWithNames( cols string, ) { t := suite.T() + t.Helper() + EnvWaitFor(t, env, time.Minute, reason, func(ctx context.Context) bool { suffix := suite.Suffix() pool := suite.Pool()