From 8754c01b1d5948dcea4bbb214232d4bff857fe63 Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Mon, 13 Nov 2023 23:04:53 +0000 Subject: [PATCH] initial sketch for testing --- flow/e2e/snowflake/peer_flow_sf_test.go | 76 ++++++++++++++++++++----- 1 file changed, 61 insertions(+), 15 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 13efa18e75..a49ad4340b 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -150,7 +150,6 @@ func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -228,7 +227,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -299,7 +297,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testin func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -388,7 +385,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -458,7 +454,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -523,7 +518,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -600,7 +594,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -670,7 +663,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -740,7 +732,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -821,7 +812,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -902,7 +892,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -963,7 +952,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1126,7 +1114,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1202,7 +1189,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1281,7 +1267,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { t.Parallel() - env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1353,3 +1338,64 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { env.AssertExpectations(s.T()) } + +func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_exclude_sf") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_exclude_sf") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT GENERATED ALWAYS AS IDENTITY, + c1 INT GENERATED BY DEFAULT AS IDENTITY, + c2 INT, + t TEXT, + t2 TEXT, + PRIMARY KEY(id,t) + ); + `, srcTableName)) + require.NoError(t, err) + + c := e2e.FlowConnectionGenerationConfig{} + + config := &protos.FlowConnectionConfigs{ + FlowJobName: s.attachSuffix("test_exclude_flow"), + Destination: s.sfHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + Exclude: []string{"c2"}, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcSyncMode: c.CDCSyncMode, + CdcStagingPath: c.CdcStagingPath, + } + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 2, + MaxBatchSize: 100, + } + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, &config, &limits, nil) + require.True(t, env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // TODO insert rows, wait + + query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id", + s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName) + sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query) + require.NoError(t, err) + + for _, field := range sfRows.Schema.Fields { + require.NotEqual(t, field.Name, "c2") + } + require.Equal(t, len(sfRows.Schema.Fields), 4) +}