Skip to content

Commit

Permalink
fixed tests and added tests for cpkeys
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 11, 2023
1 parent bc4ebc8 commit bcc7c00
Show file tree
Hide file tree
Showing 4 changed files with 582 additions and 266 deletions.
158 changes: 158 additions & 0 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,161 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

srcTableName := s.attachSchemaSuffix("test_simple_cpkey")
dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT GENERATED ALWAYS AS IDENTITY,
c1 INT GENERATED BY DEFAULT AS IDENTITY,
c2 INT,
t TEXT,
PRIMARY KEY(id,t)
);
`, srcTableName))
s.NoError(err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_cpkey_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t) VALUES ($1,$2)
`, srcTableName), i, testValue)
s.NoError(err)
}
fmt.Println("Inserted 10 rows into the source table")

// verify we got our 10 rows
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t")
s.NoError(err)

_, err := s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
s.NoError(err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
s.NoError(err)

// verify our updates and delete happened
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t")
s.NoError(err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast1")
dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT GENERATED ALWAYS AS IDENTITY,
c1 INT GENERATED BY DEFAULT AS IDENTITY,
c2 INT,
t TEXT,
t2 TEXT,
PRIMARY KEY(id,t)
);CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$
SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz',
round(random() * 30)::integer, 1), '') FROM generate_series(1, $1);
$$ language sql;
`, srcTableName))
s.NoError(err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
`, srcTableName), i, testValue)
s.NoError(err)
}
fmt.Println("Inserted 10 rows into the source table")

// verify we got our 10 rows
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2")
s.NoError(err)

_, err := s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
s.NoError(err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
s.NoError(err)

// verify our updates and delete happened
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2")
s.NoError(err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")

env.AssertExpectations(s.T())
}
6 changes: 5 additions & 1 deletion flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,15 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {

numRows := 10

//nolint:gosec
srcTable := "test_qrep_flow_avro_pg_1"
s.setupSourceTable(srcTable, numRows)

//nolint:gosec
dstTable := "test_qrep_flow_avro_pg_2"
e2e.CreateSourceTableQRep(s.pool, postgresSuffix, dstTable) // the name is misleading, but this is the destination table
// the name is misleading, but this is the destination table
err := e2e.CreateSourceTableQRep(s.pool, postgresSuffix, dstTable)
s.NoError(err)

srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable)
Expand Down
154 changes: 154 additions & 0 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,3 +902,157 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

srcTableName := s.attachSchemaSuffix("test_simple_cpkey")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_cpkey")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT GENERATED ALWAYS AS IDENTITY,
c1 INT GENERATED BY DEFAULT AS IDENTITY,
c2 INT,
t TEXT,
PRIMARY KEY(id,t)
);
`, srcTableName))
s.NoError(err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_cpkey_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.sfHelper.Peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 5,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t) VALUES ($1,$2)
`, srcTableName), i, testValue)
s.NoError(err)
}
fmt.Println("Inserted 10 rows into the source table")

// verify we got our 10 rows
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false)

_, err := s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
s.NoError(err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
s.NoError(err)

// verify our updates and delete happened
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast_1")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast_1")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT GENERATED ALWAYS AS IDENTITY,
c1 INT GENERATED BY DEFAULT AS IDENTITY,
c2 INT,
t TEXT,
t2 TEXT,
PRIMARY KEY(id,t)
);CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$
SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz',
round(random() * 30)::integer, 1), '') FROM generate_series(1, $1);
$$ language sql;
`, srcTableName))
s.NoError(err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.sfHelper.Peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 5,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
`, srcTableName), i, testValue)
s.NoError(err)
}
fmt.Println("Inserted 10 rows into the source table")

// verify we got our 10 rows
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
s.compareTableContentsSF("test_cpkey_toast_1", "id,c1,c2,t,t2", false)

_, err := s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
s.NoError(err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
s.NoError(err)

// verify our updates and delete happened
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
s.compareTableContentsSF("test_cpkey_toast_1", "id,c1,c2,t,t2", false)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")

env.AssertExpectations(s.T())
}
Loading

0 comments on commit bcc7c00

Please sign in to comment.