Skip to content

Commit

Permalink
equaltables already handles schema
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 5, 2024
1 parent 7f2ff94 commit 5477827
Showing 1 changed file with 21 additions and 19 deletions.
40 changes: 21 additions & 19 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_exclude_sf")
tableName := "test_exclude_sf"
srcTableName := s.attachSchemaSuffix(tableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_exclude_sf")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -1212,7 +1213,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
ExitAfterRecords: -1,
MaxBatchSize: 100,
}

Expand All @@ -1231,22 +1232,20 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {
}
s.t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize table", srcTableName, dstTableName, "id,t,t2")
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize table", tableName, dstTableName, "id,t,t2")
_, err = s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update/delete", srcTableName, dstTableName, "id,t,t2")
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update/delete", tableName, dstTableName, "id,t,t2")

env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)

query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id",
s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName)
sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query)
sfRows, err := s.GetRows("*", "test_exclude_sf")
require.NoError(s.t, err)

for _, field := range sfRows.Schema.Fields {
Expand All @@ -1260,7 +1259,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_softdel_src")
tableName := "test_softdel_src"
srcTableName := s.attachSchemaSuffix(tableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -1306,19 +1306,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize row", srcTableName, dstTableName, "id,c1,c2,t")
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize row", tableName, dstTableName, "id,c1,c2,t")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update", srcTableName, dstTableName, "id,c1,c2,t")
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update", tableName, dstTableName, "id,c1,c2,t")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(
env,
s,
"normalize delete",
srcTableName,
tableName,
dstTableName+" WHERE NOT _PEERDB_IS_DELETED",
"id,c1,c2,t",
)
Expand Down Expand Up @@ -1422,7 +1422,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_softdel_ud_src")
tableName := "test_softdel_ud_src"
srcTableName := s.attachSchemaSuffix(tableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_ud")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -1468,7 +1469,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize insert", srcTableName, dstTableName, "id,c1,c2,t")
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize insert", tableName, dstTableName, "id,c1,c2,t")

insertTx, err := s.pool.Begin(context.Background())
e2e.EnvNoError(s.t, env, err)
Expand All @@ -1487,7 +1488,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() {
env,
s,
"normalize transaction",
srcTableName,
tableName,
dstTableName+" WHERE NOT _PEERDB_IS_DELETED",
"id,c1,c2,t",
)
Expand All @@ -1508,7 +1509,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_softdel_iad")
tableName := "test_softdel_iad"
srcTableName := s.attachSchemaSuffix(tableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iad")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -1522,7 +1524,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() {
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_iad"),
FlowJobName: srcTableName,
}

config := &protos.FlowConnectionConfigs{
Expand Down Expand Up @@ -1554,23 +1556,23 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize row", srcTableName, dstTableName, "id,c1,c2,t")
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize row", tableName, dstTableName, "id,c1,c2,t")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(
env,
s,
"normalize delete",
srcTableName,
tableName,
dstTableName+" WHERE NOT _PEERDB_IS_DELETED",
"id,c1,c2,t",
)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize reinsert", srcTableName, dstTableName, "id,c1,c2,t")
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize reinsert", tableName, dstTableName, "id,c1,c2,t")

env.CancelWorkflow()
}()
Expand Down

0 comments on commit 5477827

Please sign in to comment.