diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 163d61447e..6aba3994f9 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -150,7 +150,6 @@ func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -219,7 +218,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -290,7 +288,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testin func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -379,7 +376,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -449,7 +445,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -514,7 +509,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -591,7 +585,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -661,7 +654,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -731,7 +723,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -812,7 +803,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -893,7 +883,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -954,7 +943,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1113,7 +1101,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1189,7 +1176,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1268,7 +1254,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1340,3 +1325,64 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { env.AssertExpectations(s.T()) } + +func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_exclude_sf") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_exclude_sf") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT GENERATED ALWAYS AS IDENTITY, + c1 INT GENERATED BY DEFAULT AS IDENTITY, + c2 INT, + t TEXT, + t2 TEXT, + PRIMARY KEY(id,t) + ); + `, srcTableName)) + require.NoError(t, err) + + c := e2e.FlowConnectionGenerationConfig{} + + config := &protos.FlowConnectionConfigs{ + FlowJobName: s.attachSuffix("test_exclude_flow"), + Destination: s.sfHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + Exclude: []string{"c2"}, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcSyncMode: c.CDCSyncMode, + CdcStagingPath: c.CdcStagingPath, + } + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 2, + MaxBatchSize: 100, + } + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, &config, &limits, nil) + require.True(t, env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // TODO insert rows, wait + + query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id", + s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName) + sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query) + require.NoError(t, err) + + for _, field := range sfRows.Schema.Fields { + require.NotEqual(t, field.Name, "c2") + } + require.Equal(t, len(sfRows.Schema.Fields), 4) +}