Skip to content

Commit

Permalink
initial sketch for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Nov 13, 2023
1 parent 543524c commit 934852e
Showing 1 changed file with 61 additions and 15 deletions.
76 changes: 61 additions & 15 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() {

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -219,7 +218,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -290,7 +288,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testin

func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -379,7 +376,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -449,7 +445,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -514,7 +509,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -591,7 +585,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -661,7 +654,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -731,7 +723,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -812,7 +803,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -893,7 +883,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -954,7 +943,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -1113,7 +1101,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -1189,7 +1176,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -1268,7 +1254,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -1340,3 +1325,64 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) {

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) {
t.Parallel()
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

srcTableName := s.attachSchemaSuffix("test_exclude_sf")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_exclude_sf")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT GENERATED ALWAYS AS IDENTITY,
c1 INT GENERATED BY DEFAULT AS IDENTITY,
c2 INT,
t TEXT,
t2 TEXT,
PRIMARY KEY(id,t)
);
`, srcTableName))
require.NoError(t, err)

c := e2e.FlowConnectionGenerationConfig{}

config := &protos.FlowConnectionConfigs{
FlowJobName: s.attachSuffix("test_exclude_flow"),
Destination: s.sfHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
Exclude: []string{"c2"},
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcSyncMode: c.CDCSyncMode,
CdcStagingPath: c.CdcStagingPath,
}

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, &config, &limits, nil)
require.True(t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
s.Error(err)
s.Contains(err.Error(), "continue as new")

// TODO insert rows, wait

query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id",
s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName)
sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query)
require.NoError(t, err)

for _, field := range sfRows.Schema.Fields {
require.NotEqual(t, field.Name, "c2")
}
require.Equal(t, len(sfRows.Schema.Fields), 4)
}

0 comments on commit 934852e

Please sign in to comment.