diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 90b16be522..27e6156245 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1143,3 +1143,87 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { e2e.RequireEnvCanceled(s.t, env) } + +func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion_With_Schema_Changes() { + tc := e2e.NewTemporalClient(s.t) + + tableName := "test_exclude_schema_changes_sf" + srcTableName := s.attachSchemaSuffix(tableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT GENERATED ALWAYS AS IDENTITY, + c1 INT GENERATED BY DEFAULT AS IDENTITY, + c2 INT, + t TEXT, + PRIMARY KEY(id,t) + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix(tableName), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.sfHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + Exclude: []string{"c2"}, + }, + }, + Source: e2e.GeneratePostgresPeer(), + CdcStagingPath: connectionGen.CdcStagingPath, + SyncedAtColName: "_PEERDB_SYNCED_AT", + MaxBatchSize: 100, + } + + // wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, config, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + + // insert 10 rows into the source table + for i := range 10 { + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c2,t) VALUES ($1,$2) + `, srcTableName), i, testValue) + e2e.EnvNoError(s.t, env, err) + } + s.t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1,t") + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("ALTER TABLE %s ADD COLUMN t2 TEXT", srcTableName)) + e2e.EnvNoError(s.t, env, err) + // insert 10 more rows into the source table + for i := range 10 { + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100)) + `, srcTableName), i, testValue) + e2e.EnvNoError(s.t, env, err) + } + _, err = s.Conn().Exec(context.Background(), + fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=1`, srcTableName)) + e2e.EnvNoError(s.t, env, err) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=0`, srcTableName)) + e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,c1,t,t2") + + env.Cancel() + + e2e.RequireEnvCanceled(s.t, env) + + sfRows, err := s.GetRows(tableName, "*") + require.NoError(s.t, err) + + for _, field := range sfRows.Schema.Fields { + require.NotEqual(s.t, "c2", field.Name) + } + require.Len(s.t, sfRows.Schema.Fields, 5) +}