Skip to content

Commit

Permalink
initial sketch for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Nov 14, 2023
1 parent 40382e4 commit 8754c01
Showing 1 changed file with 61 additions and 15 deletions.
76 changes: 61 additions & 15 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() {

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -228,7 +227,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -299,7 +297,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testin

func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -388,7 +385,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -458,7 +454,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -523,7 +518,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -600,7 +594,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -670,7 +663,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -740,7 +732,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -821,7 +812,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -902,7 +892,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -963,7 +952,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -1126,7 +1114,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -1202,7 +1189,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -1281,7 +1267,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) {

func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) {
t.Parallel()

env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

Expand Down Expand Up @@ -1353,3 +1338,64 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) {

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) {
t.Parallel()
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

srcTableName := s.attachSchemaSuffix("test_exclude_sf")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_exclude_sf")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT GENERATED ALWAYS AS IDENTITY,
c1 INT GENERATED BY DEFAULT AS IDENTITY,
c2 INT,
t TEXT,
t2 TEXT,
PRIMARY KEY(id,t)
);
`, srcTableName))
require.NoError(t, err)

c := e2e.FlowConnectionGenerationConfig{}

config := &protos.FlowConnectionConfigs{
FlowJobName: s.attachSuffix("test_exclude_flow"),
Destination: s.sfHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
Exclude: []string{"c2"},
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcSyncMode: c.CDCSyncMode,
CdcStagingPath: c.CdcStagingPath,
}

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, &config, &limits, nil)
require.True(t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
s.Error(err)
s.Contains(err.Error(), "continue as new")

// TODO insert rows, wait

query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id",
s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName)
sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query)
require.NoError(t, err)

for _, field := range sfRows.Schema.Fields {
require.NotEqual(t, field.Name, "c2")
}
require.Equal(t, len(sfRows.Schema.Fields), 4)
}

0 comments on commit 8754c01

Please sign in to comment.