Skip to content

Commit

Permalink
avoid CREATE OR REPLACE FUNCTION failing on concurrent tuple updates
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 30, 2023
1 parent 543e4e3 commit 5fb6166
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast1")
randomString := s.attachSchemaSuffix("random_string")
dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -352,11 +353,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
t TEXT,
t2 TEXT,
PRIMARY KEY(id,t)
);CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$
);CREATE OR REPLACE FUNCTION %s( int ) RETURNS TEXT as $$
SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz',
round(random() * 30)::integer, 1), '') FROM generate_series(1, $1);
$$ language sql;
`, srcTableName))
`, srcTableName, randomString))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
Expand Down Expand Up @@ -385,8 +386,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
`, srcTableName), i, testValue)
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000))
`, srcTableName, randomString), i, testValue)
require.NoError(s.t, err)
}
s.t.Log("Inserted 10 rows into the source table")
Expand Down Expand Up @@ -422,6 +423,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast2")
randomString := s.attachSchemaSuffix("random_string")
dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -432,11 +434,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
t TEXT,
t2 TEXT,
PRIMARY KEY(id,t)
);CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$
);CREATE OR REPLACE FUNCTION %s( int ) RETURNS TEXT as $$
SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz',
round(random() * 30)::integer, 1), '') FROM generate_series(1, $1);
$$ language sql;
`, srcTableName))
`, srcTableName, randomString))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
Expand All @@ -463,8 +465,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
`, srcTableName), i, testValue)
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000))
`, srcTableName, randomString), i, testValue)
require.NoError(s.t, err)
}
s.t.Log("Inserted 10 rows into the source table")
Expand Down

0 comments on commit 5fb6166

Please sign in to comment.