Skip to content

Commit

Permalink
fix error reporting, fix flow names
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 5, 2024
1 parent 703eb5d commit 1d31e45
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
3 changes: 1 addition & 2 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 4,
ExitAfterRecords: -1,
MaxBatchSize: 100,
}

Expand All @@ -777,7 +777,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)

e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))

e2e.EnvWaitFor(s.t, env, time.Minute, "normalize transaction", func(ctx context.Context) bool {
Expand Down
8 changes: 4 additions & 4 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() {
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_simple_flow"),
FlowJobName: srcTableName,
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.sfHelper.Peer,
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() {
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 20 rows into the source table")
e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1")
e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,key,value")

env.CancelWorkflow()
}()
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() {
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_simple_flow"),
FlowJobName: srcTableName,
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.sfHelper.Peer,
Expand Down Expand Up @@ -269,7 +269,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"),
FlowJobName: srcTableName,
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.sfHelper.Peer,
Expand Down
4 changes: 4 additions & 0 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func EnvWaitForEqualTables(
table string,
cols string,
) {
suite.T().Helper()

EnvWaitForEqualTablesWithNames(env, suite, reason, table, table, cols)
}

Expand All @@ -155,6 +157,8 @@ func EnvWaitForEqualTablesWithNames(
cols string,
) {
t := suite.T()
t.Helper()

EnvWaitFor(t, env, time.Minute, reason, func(ctx context.Context) bool {
suffix := suite.Suffix()
pool := suite.Pool()
Expand Down

0 comments on commit 1d31e45

Please sign in to comment.