Skip to content

Commit

Permalink
test for column exclusion with schema changes (#1519)
Browse files Browse the repository at this point in the history
should have caught #1512
  • Loading branch information
heavycrystal authored Mar 21, 2024
1 parent 970232d commit 15c1a68
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,3 +1143,87 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() {

e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion_With_Schema_Changes() {
tc := e2e.NewTemporalClient(s.t)

tableName := "test_exclude_schema_changes_sf"
srcTableName := s.attachSchemaSuffix(tableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName)

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT GENERATED ALWAYS AS IDENTITY,
c1 INT GENERATED BY DEFAULT AS IDENTITY,
c2 INT,
t TEXT,
PRIMARY KEY(id,t)
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix(tableName),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.sfHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
Exclude: []string{"c2"},
},
},
Source: e2e.GeneratePostgresPeer(),
CdcStagingPath: connectionGen.CdcStagingPath,
SyncedAtColName: "_PEERDB_SYNCED_AT",
MaxBatchSize: 100,
}

// wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, config, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

// insert 10 rows into the source table
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t) VALUES ($1,$2)
`, srcTableName), i, testValue)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1,t")
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("ALTER TABLE %s ADD COLUMN t2 TEXT", srcTableName))
e2e.EnvNoError(s.t, env, err)
// insert 10 more rows into the source table
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100))
`, srcTableName), i, testValue)
e2e.EnvNoError(s.t, env, err)
}
_, err = s.Conn().Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=0`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,c1,t,t2")

env.Cancel()

e2e.RequireEnvCanceled(s.t, env)

sfRows, err := s.GetRows(tableName, "*")
require.NoError(s.t, err)

for _, field := range sfRows.Schema.Fields {
require.NotEqual(s.t, "c2", field.Name)
}
require.Len(s.t, sfRows.Schema.Fields, 5)
}

0 comments on commit 15c1a68

Please sign in to comment.