Skip to content

Commit

Permalink
postmerge fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 27, 2023
1 parent 2af4c60 commit e552b5d
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 157 deletions.
5 changes: 0 additions & 5 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,6 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
return true
}

func isUniqueError(err error) bool {
var pgerr *pgconn.PgError
return errors.As(err, &pgerr) && pgerr.Code == pgerrcode.UniqueViolation
}

func (p *PostgresMetadataStore) SetupMetadata() error {
// start a transaction
tx, err := p.pool.Begin(p.ctx)
Expand Down
95 changes: 47 additions & 48 deletions flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/require"
"github.com/ysmood/got"
)

type PostgresReplicationSnapshotTestSuite struct {
Expand All @@ -24,6 +23,8 @@ type PostgresReplicationSnapshotTestSuite struct {
}

func setupSuite(t *testing.T) PostgresReplicationSnapshotTestSuite {
t.Helper()

connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{
Host: "localhost",
Port: 7132,
Expand Down Expand Up @@ -71,52 +72,6 @@ func setupSuite(t *testing.T) PostgresReplicationSnapshotTestSuite {
}
}

func (suite PostgresReplicationSnapshotTestSuite) TearDownSuite() {
teardownTx, err := suite.connector.pool.Begin(context.Background())
require.NoError(suite.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(suite.t, err)
}
}()

_, err = teardownTx.Exec(context.Background(),
fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema))
require.NoError(suite.t, err)

// Fetch all the publications
rows, err := teardownTx.Query(context.Background(),
"SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.t, err)

// Iterate over the publications and drop them
for rows.Next() {
var pubname pgtype.Text
err := rows.Scan(&pubname)
require.NoError(suite.t, err)

// Drop the publication in a new transaction
_, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String))
require.NoError(suite.t, err)
}

_, err = teardownTx.Exec(context.Background(),
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1",
fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.t, err)

err = teardownTx.Commit(context.Background())
require.NoError(suite.t, err)

require.True(suite.t, suite.connector.ConnectionActive() == nil)

err = suite.connector.Close()
require.NoError(suite.t, err)

require.False(suite.t, suite.connector.ConnectionActive() == nil)
}

func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
tables := map[string]string{
suite.schema + ".test_1": "test_1_dst",
Expand Down Expand Up @@ -149,5 +104,49 @@ func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
}

func TestPostgresReplTestSuite(t *testing.T) {
got.Each(t, e2eshared.GotSuite(setupSuite))
e2eshared.GotSuite(t, setupSuite, func(suite PostgresReplicationSnapshotTestSuite) {
teardownTx, err := suite.connector.pool.Begin(context.Background())
require.NoError(suite.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(suite.t, err)
}
}()

_, err = teardownTx.Exec(context.Background(),
fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema))
require.NoError(suite.t, err)

// Fetch all the publications
rows, err := teardownTx.Query(context.Background(),
"SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.t, err)

// Iterate over the publications and drop them
for rows.Next() {
var pubname pgtype.Text
err := rows.Scan(&pubname)
require.NoError(suite.t, err)

// Drop the publication in a new transaction
_, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String))
require.NoError(suite.t, err)
}

_, err = teardownTx.Exec(context.Background(),
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1",
fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.t, err)

err = teardownTx.Commit(context.Background())
require.NoError(suite.t, err)

require.True(suite.t, suite.connector.ConnectionActive() == nil)

err = suite.connector.Close()
require.NoError(suite.t, err)

require.False(suite.t, suite.connector.ConnectionActive() == nil)
})
}
45 changes: 22 additions & 23 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"
"github.com/ysmood/got"
)

type PostgresSchemaDeltaTestSuite struct {
Expand All @@ -22,6 +21,8 @@ type PostgresSchemaDeltaTestSuite struct {
const schemaDeltaTestSchemaName = "pgschema_delta_test"

func setupSchemaDeltaSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
t.Helper()

connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{
Host: "localhost",
Port: 7132,
Expand Down Expand Up @@ -52,27 +53,6 @@ func setupSchemaDeltaSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
}
}

func (suite PostgresSchemaDeltaTestSuite) TearDownSuite() {
teardownTx, err := suite.connector.pool.Begin(context.Background())
require.NoError(suite.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(suite.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
schemaDeltaTestSchemaName))
require.NoError(suite.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(suite.t, err)

require.True(suite.t, suite.connector.ConnectionActive() == nil)
err = suite.connector.Close()
require.NoError(suite.t, err)
require.False(suite.t, suite.connector.ConnectionActive() == nil)
}

func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
tableName := fmt.Sprintf("%s.simple_add_column", schemaDeltaTestSchemaName)
_, err := suite.connector.pool.Exec(context.Background(),
Expand Down Expand Up @@ -244,5 +224,24 @@ func (suite PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
}

func TestPostgresSchemaDeltaTestSuite(t *testing.T) {
got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite))
e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(suite PostgresSchemaDeltaTestSuite) {
teardownTx, err := suite.connector.pool.Begin(context.Background())
require.NoError(suite.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(suite.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
schemaDeltaTestSchemaName))
require.NoError(suite.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(suite.t, err)

require.True(suite.t, suite.connector.ConnectionActive() == nil)
err = suite.connector.Close()
require.NoError(suite.t, err)
require.False(suite.t, suite.connector.ConnectionActive() == nil)
})
}
37 changes: 18 additions & 19 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/joho/godotenv"
"github.com/stretchr/testify/require"
"github.com/ysmood/got"
)

type PeerFlowE2ETestSuiteBQ struct {
Expand All @@ -34,13 +33,13 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
err := e2e.TearDownPostgres(s.pool, s.bqSuffix)
if err != nil {
slog.Error("failed to tear down postgres", slog.Any("error", err))
s.FailNow()
s.t.FailNow()
}

err = s.bqHelper.DropDataset(s.bqHelper.datasetName)
if err != nil {
slog.Error("failed to tear down bigquery", slog.Any("error", err))
s.FailNow()
s.t.FailNow()
}
})
}
Expand Down Expand Up @@ -148,7 +147,7 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)
e2e.RegisterWorkflowsAndActivities(s.t, env)

// TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores.
limits := peerflow.CDCFlowLimits{
Expand Down Expand Up @@ -1194,7 +1193,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTable1Name := s.attachSchemaSuffix("test1_bq")
Expand Down Expand Up @@ -1252,8 +1251,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name)
require.NoError(s.t, err)

s.Equal(1, count1)
s.Equal(1, count2)
require.Equal(s.t, 1, count1)
require.Equal(s.t, 1, count2)

err = s.bqHelper.DropDataset(secondDataset)
require.NoError(s.t, err)
Expand All @@ -1262,7 +1261,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(s.t, env)

cmpTableName := s.attachSchemaSuffix("test_softdel")
Expand Down Expand Up @@ -1332,7 +1331,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

Expand All @@ -1346,11 +1345,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(1, numNewRows)
require.Equal(s.t, int64(1), numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(s.t, env)

cmpTableName := s.attachSchemaSuffix("test_softdel_iud")
Expand Down Expand Up @@ -1418,7 +1417,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

Expand All @@ -1430,11 +1429,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(1, numNewRows)
require.Equal(s.t, int64(1), numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(s.t, env)

cmpTableName := s.attachSchemaSuffix("test_softdel_ud")
Expand Down Expand Up @@ -1506,7 +1505,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

Expand All @@ -1518,11 +1517,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(1, numNewRows)
require.Equal(s.t, int64(1), numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_softdel_iad")
Expand Down Expand Up @@ -1582,7 +1581,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

Expand All @@ -1594,5 +1593,5 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(0, numNewRows)
require.Equal(s.t, numNewRows, int64(0))
}
8 changes: 4 additions & 4 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
TableIdentifiers: []string{dstTableName},
})
require.NoError(s.t, err)
s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
require.NoError(s.t, err)

Expand Down Expand Up @@ -187,7 +187,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
TableIdentifiers: []string{dstTableName},
})
require.NoError(s.t, err)
s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2")
require.NoError(s.t, err)

Expand Down Expand Up @@ -217,7 +217,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
TableIdentifiers: []string{dstTableName},
})
require.NoError(s.t, err)
s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3")
require.NoError(s.t, err)

Expand Down Expand Up @@ -247,7 +247,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
TableIdentifiers: []string{dstTableName},
})
require.NoError(s.t, err)
s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
require.NoError(s.t, err)
}()
Expand Down
Loading

0 comments on commit e552b5d

Please sign in to comment.