Skip to content

Commit

Permalink
postgres_repl_test: avoid concurrency errors
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 22, 2023
1 parent 8b43066 commit 4fd1636
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
21 changes: 14 additions & 7 deletions flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type PostgresReplicationSnapshotTestSuite struct {
t *testing.T

connector *PostgresConnector
schema string
}

func setupSuite(t *testing.T, g got.G) PostgresReplicationSnapshotTestSuite {
Expand All @@ -42,18 +43,22 @@ func setupSuite(t *testing.T, g got.G) PostgresReplicationSnapshotTestSuite {
}
}()

_, err = setupTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_repl_test CASCADE")
schema := "repltest_" + e2eshared.RandString(8)

_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema))
require.NoError(t, err)

_, err = setupTx.Exec(context.Background(), "CREATE SCHEMA pgpeer_repl_test")
_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("CREATE SCHEMA %s", schema))
require.NoError(t, err)

// setup 3 tables in pgpeer_repl_test schema
// test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5
// setup 3 tables test_1, test_2, test_3
// all have 5 text columns c1, c2, c3, c4, c5
tables := []string{"test_1", "test_2", "test_3"}
for _, table := range tables {
_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE pgpeer_repl_test.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", table))
fmt.Sprintf("CREATE TABLE %s.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", schema, table))
require.NoError(t, err)
}

Expand All @@ -64,6 +69,7 @@ func setupSuite(t *testing.T, g got.G) PostgresReplicationSnapshotTestSuite {
G: g,
t: t,
connector: connector,
schema: schema,
}
}

Expand All @@ -77,7 +83,8 @@ func (suite PostgresReplicationSnapshotTestSuite) TearDownSuite() {
}
}()

_, err = teardownTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_test CASCADE")
_, err = teardownTx.Exec(context.Background(),
fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema))
require.NoError(suite.t, err)

// Fetch all the publications
Expand Down Expand Up @@ -114,7 +121,7 @@ func (suite PostgresReplicationSnapshotTestSuite) TearDownSuite() {

func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
tables := map[string]string{
"pgpeer_repl_test.test_1": "test_1_dst",
suite.schema + ".test_1": "test_1_dst",
}

flowJobName := "test_simple_slot_creation"
Expand Down
9 changes: 9 additions & 0 deletions flow/e2e/e2eshared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2eshared
import (
"fmt"
"io"
"math/rand"
"os"
"testing"

Expand All @@ -23,6 +24,14 @@ func GotSuite[T interface{ TearDownSuite() }](setup func(t *testing.T, g got.G)
}
}

func RandString(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = byte('a' + rand.Intn('z'-'a'))
}
return string(b)
}

// ReadFileToBytes reads a file to a byte array.
func ReadFileToBytes(path string) ([]byte, error) {
var ret []byte
Expand Down

0 comments on commit 4fd1636

Please sign in to comment.