Skip to content

Commit

Permalink
Revert "Remove use of testify/suite" (#939)
Browse files Browse the repository at this point in the history
Accidentally merged #871 (for some reason automerge triggered after merge with 0/6 CI steps complete)
  • Loading branch information
serprex authored Dec 30, 2023
1 parent 0287c21 commit 1543705
Show file tree
Hide file tree
Showing 15 changed files with 593 additions and 586 deletions.
142 changes: 65 additions & 77 deletions flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,74 +7,106 @@ import (
"testing"
"time"

"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type PostgresReplicationSnapshotTestSuite struct {
t *testing.T

suite.Suite
connector *PostgresConnector
schema string
}

func setupSuite(t *testing.T) PostgresReplicationSnapshotTestSuite {
t.Helper()

connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{
func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() {
var err error
suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
}, true)
require.NoError(t, err)
require.NoError(suite.T(), err)

setupTx, err := connector.pool.Begin(context.Background())
require.NoError(t, err)
setupTx, err := suite.connector.pool.Begin(context.Background())
require.NoError(suite.T(), err)
defer func() {
err := setupTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(t, err)
require.NoError(suite.T(), err)
}
}()

schema := "repltest_" + shared.RandomString(8)

_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema))
require.NoError(t, err)
_, err = setupTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_repl_test CASCADE")
require.NoError(suite.T(), err)

_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("CREATE SCHEMA %s", schema))
require.NoError(t, err)
_, err = setupTx.Exec(context.Background(), "CREATE SCHEMA pgpeer_repl_test")
require.NoError(suite.T(), err)

// setup 3 tables test_1, test_2, test_3
// all have 5 text columns c1, c2, c3, c4, c5
// setup 3 tables in pgpeer_repl_test schema
// test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5
tables := []string{"test_1", "test_2", "test_3"}
for _, table := range tables {
_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", schema, table))
require.NoError(t, err)
fmt.Sprintf("CREATE TABLE pgpeer_repl_test.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", table))
require.NoError(suite.T(), err)
}

err = setupTx.Commit(context.Background())
require.NoError(t, err)
require.NoError(suite.T(), err)
}

func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() {
teardownTx, err := suite.connector.pool.Begin(context.Background())
require.NoError(suite.T(), err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(suite.T(), err)
}
}()

_, err = teardownTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_test CASCADE")
require.NoError(suite.T(), err)

// Fetch all the publications
rows, err := teardownTx.Query(context.Background(),
"SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.T(), err)

// Iterate over the publications and drop them
for rows.Next() {
var pubname pgtype.Text
err := rows.Scan(&pubname)
require.NoError(suite.T(), err)

return PostgresReplicationSnapshotTestSuite{
t: t,
connector: connector,
schema: schema,
// Drop the publication in a new transaction
_, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String))
require.NoError(suite.T(), err)
}

_, err = teardownTx.Exec(context.Background(),
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1",
fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.T(), err)

err = teardownTx.Commit(context.Background())
require.NoError(suite.T(), err)

suite.True(suite.connector.ConnectionActive() == nil)

err = suite.connector.Close()
require.NoError(suite.T(), err)

suite.False(suite.connector.ConnectionActive() == nil)
}

func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
tables := map[string]string{
suite.schema + ".test_1": "test_1_dst",
"pgpeer_repl_test.test_1": "test_1_dst",
}

flowJobName := "test_simple_slot_creation"
Expand All @@ -89,7 +121,7 @@ func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
// Moved to a go routine
go func() {
err := suite.connector.SetupReplication(signal, setupReplicationInput)
require.NoError(suite.t, err)
require.NoError(suite.T(), err)
}()

slog.Info("waiting for slot creation to complete", flowLog)
Expand All @@ -104,49 +136,5 @@ func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
}

func TestPostgresReplTestSuite(t *testing.T) {
e2eshared.GotSuite(t, setupSuite, func(suite PostgresReplicationSnapshotTestSuite) {
teardownTx, err := suite.connector.pool.Begin(context.Background())
require.NoError(suite.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(suite.t, err)
}
}()

_, err = teardownTx.Exec(context.Background(),
fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema))
require.NoError(suite.t, err)

// Fetch all the publications
rows, err := teardownTx.Query(context.Background(),
"SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.t, err)

// Iterate over the publications and drop them
for rows.Next() {
var pubname pgtype.Text
err := rows.Scan(&pubname)
require.NoError(suite.t, err)

// Drop the publication in a new transaction
_, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String))
require.NoError(suite.t, err)
}

_, err = teardownTx.Exec(context.Background(),
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1",
fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.t, err)

err = teardownTx.Commit(context.Background())
require.NoError(suite.t, err)

require.True(suite.t, suite.connector.ConnectionActive() == nil)

err = suite.connector.Close()
require.NoError(suite.t, err)

require.False(suite.t, suite.connector.ConnectionActive() == nil)
})
suite.Run(t, new(PostgresReplicationSnapshotTestSuite))
}
Loading

0 comments on commit 1543705

Please sign in to comment.