Skip to content

Commit

Permalink
more tablename cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 5, 2024
1 parent 5477827 commit 625c691
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_simple_flow_sf")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf")
tableName := "test_simple_flow_sf"
srcTableName := s.attachSchemaSuffix(tableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName)

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
Expand Down Expand Up @@ -170,7 +171,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() {
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 20 rows into the source table")
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize table", srcTableName, dstTableName, "id,c1")
e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1")

env.CancelWorkflow()
}()
Expand Down Expand Up @@ -1114,8 +1115,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast2")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast2")
tableName := "test_cpkey_toast2"
srcTableName := s.attachSchemaSuffix(tableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName)

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
Expand All @@ -1130,7 +1132,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() {
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"),
FlowJobName: srcTableName,
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.sfHelper.Peer,
Expand Down Expand Up @@ -1159,13 +1161,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() {
}
s.t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize table", srcTableName, dstTableName, "id,c2,t,t2")
e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c2,t,t2")
_, err = s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update/delete", srcTableName, dstTableName, "id,c2,t,t2")
e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,c2,t,t2")

env.CancelWorkflow()
}()
Expand Down

0 comments on commit 625c691

Please sign in to comment.