Skip to content

Commit

Permalink
postgres_schema_delta: remove use of testify/suite
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 30, 2023
1 parent 1543705 commit d463b3d
Showing 1 changed file with 83 additions and 78 deletions.
161 changes: 83 additions & 78 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,97 +3,83 @@ package connpostgres
import (
"context"
"fmt"
"strings"
"testing"

"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/suite"
"github.com/stretchr/testify/require"
"github.com/ysmood/got"
)

type PostgresSchemaDeltaTestSuite struct {
suite.Suite
got.G
t *testing.T
connector *PostgresConnector
schema string
}

const schemaDeltaTestSchemaName = "pgschema_delta_test"
func SetupSuite(t *testing.T, g got.G) PostgresSchemaDeltaTestSuite {
t.Helper()

func (suite *PostgresSchemaDeltaTestSuite) failTestError(err error) {
if err != nil {
suite.FailNow(err.Error())
}
}

func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() {
var err error
suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{
connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
}, false)
suite.failTestError(err)
require.NoError(t, err)

setupTx, err := suite.connector.pool.Begin(context.Background())
suite.failTestError(err)
setupTx, err := connector.pool.Begin(context.Background())
require.NoError(t, err)
defer func() {
err := setupTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
suite.failTestError(err)
require.NoError(t, err)
}
}()
schema := "pgdelta_" + strings.ToLower(shared.RandomString(8))
_, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
schemaDeltaTestSchemaName))
suite.failTestError(err)
_, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schemaDeltaTestSchemaName))
suite.failTestError(err)
schema))
require.NoError(t, err)
_, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schema))
require.NoError(t, err)
err = setupTx.Commit(context.Background())
suite.failTestError(err)
}
require.NoError(t, err)

func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() {
teardownTx, err := suite.connector.pool.Begin(context.Background())
suite.failTestError(err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
suite.failTestError(err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
schemaDeltaTestSchemaName))
suite.failTestError(err)
err = teardownTx.Commit(context.Background())
suite.failTestError(err)

suite.True(suite.connector.ConnectionActive() == nil)
err = suite.connector.Close()
suite.failTestError(err)
suite.False(suite.connector.ConnectionActive() == nil)
return PostgresSchemaDeltaTestSuite{
G: g,
t: t,
connector: connector,
schema: schema,
}
}

func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
tableName := fmt.Sprintf("%s.simple_add_column", schemaDeltaTestSchemaName)
_, err := suite.connector.pool.Exec(context.Background(),
func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
tableName := fmt.Sprintf("%s.simple_add_column", s.schema)
_, err := s.connector.pool.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
suite.failTestError(err)
require.NoError(s.t, err)

err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: []*protos.DeltaAddedColumn{{
ColumnName: "hi",
ColumnType: string(qvalue.QValueKindInt64),
}},
}})
suite.failTestError(err)
require.NoError(s.t, err)

output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{tableName},
})
suite.failTestError(err)
suite.Equal(&protos.TableSchema{
require.NoError(s.t, err)
s.Equal(&protos.TableSchema{
TableIdentifier: tableName,
Columns: map[string]string{
"id": string(qvalue.QValueKindInt32),
Expand All @@ -103,11 +89,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
}, output.TableNameSchemaMapping[tableName])
}

func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
tableName := fmt.Sprintf("%s.add_drop_all_column_types", schemaDeltaTestSchemaName)
_, err := suite.connector.pool.Exec(context.Background(),
func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
tableName := fmt.Sprintf("%s.add_drop_all_column_types", s.schema)
_, err := s.connector.pool.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
suite.failTestError(err)
require.NoError(s.t, err)

expectedTableSchema := &protos.TableSchema{
TableIdentifier: tableName,
Expand Down Expand Up @@ -143,25 +129,25 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
}
}

err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: addedColumns,
}})
suite.failTestError(err)
require.NoError(s.t, err)

output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{tableName},
})
suite.failTestError(err)
suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName])
require.NoError(s.t, err)
s.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName])
}

func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", schemaDeltaTestSchemaName)
_, err := suite.connector.pool.Exec(context.Background(),
func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", s.schema)
_, err := s.connector.pool.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
suite.failTestError(err)
require.NoError(s.t, err)

expectedTableSchema := &protos.TableSchema{
TableIdentifier: tableName,
Expand Down Expand Up @@ -189,25 +175,25 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
}
}

err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: addedColumns,
}})
suite.failTestError(err)
require.NoError(s.t, err)

output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{tableName},
})
suite.failTestError(err)
suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName])
require.NoError(s.t, err)
s.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName])
}

func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", schemaDeltaTestSchemaName)
_, err := suite.connector.pool.Exec(context.Background(),
func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", s.schema)
_, err := s.connector.pool.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName))
suite.failTestError(err)
require.NoError(s.t, err)

expectedTableSchema := &protos.TableSchema{
TableIdentifier: tableName,
Expand All @@ -229,20 +215,39 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
}
}

err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: addedColumns,
}})
suite.failTestError(err)
require.NoError(s.t, err)

output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{tableName},
})
suite.failTestError(err)
suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName])
require.NoError(s.t, err)
s.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName])
}

func TestPostgresSchemaDeltaTestSuite(t *testing.T) {
suite.Run(t, new(PostgresSchemaDeltaTestSuite))
e2eshared.GotSuite(t, SetupSuite, func(s PostgresSchemaDeltaTestSuite) {
teardownTx, err := s.connector.pool.Begin(context.Background())
require.NoError(s.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(s.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
s.schema))
require.NoError(s.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(s.t, err)

require.True(s.t, s.connector.ConnectionActive() == nil)
err = s.connector.Close()
require.NoError(s.t, err)
require.False(s.t, s.connector.ConnectionActive() == nil)
})
}

0 comments on commit d463b3d

Please sign in to comment.