Skip to content

Commit

Permalink
qrep_flow_pg_test: remove testify/suite
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 30, 2023
1 parent 321818b commit bb136a7
Showing 1 changed file with 59 additions and 57 deletions.
116 changes: 59 additions & 57 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,72 +9,78 @@ import (

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/joho/godotenv"
"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/testsuite"
"github.com/stretchr/testify/require"
"github.com/ysmood/got"
)

const postgresSuffix = "postgres"

type PeerFlowE2ETestSuitePG struct {
suite.Suite
testsuite.WorkflowTestSuite
got.G
t *testing.T

pool *pgxpool.Pool
peer *protos.Peer
connector *connpostgres.PostgresConnector
suffix string
}

func TestPeerFlowE2ETestSuitePG(t *testing.T) {
suite.Run(t, new(PeerFlowE2ETestSuitePG))
e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuitePG) {
err := e2e.TearDownPostgres(s.pool, s.suffix)
if err != nil {
require.Fail(s.t, "failed to drop Postgres schema", err)
}
})
}

// Implement SetupAllSuite interface to setup the test suite
func (s *PeerFlowE2ETestSuitePG) SetupSuite() {
func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuitePG {
err := godotenv.Load()
if err != nil {
// it's okay if the .env file is not present
// we will use the default values
slog.Info("Unable to load .env file, using default values from env")
}

pool, err := e2e.SetupPostgres(postgresSuffix)
if err != nil || pool == nil {
s.Fail("failed to setup postgres", err)
suffix := "pg_" + strings.ToLower(shared.RandomString(8))
pool, err := e2e.SetupPostgres(suffix)
if err != nil {
require.Fail(t, "failed to setup postgres", err)
}
s.pool = pool
s.peer = generatePGPeer(e2e.GetTestPostgresConf())

s.connector, err = connpostgres.NewPostgresConnector(context.Background(),
var connector *connpostgres.PostgresConnector
connector, err = connpostgres.NewPostgresConnector(context.Background(),
&protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
}, false)
s.NoError(err)
}

// Implement TearDownAllSuite interface to tear down the test suite
func (s *PeerFlowE2ETestSuitePG) TearDownSuite() {
err := e2e.TearDownPostgres(s.pool, postgresSuffix)
if err != nil {
s.Fail("failed to drop Postgres schema", err)
require.NoError(t, err)

return PeerFlowE2ETestSuitePG{
G: g,
t: t,
pool: pool,
peer: generatePGPeer(e2e.GetTestPostgresConf()),
connector: connector,
suffix: suffix,
}
}

func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) {
err := e2e.CreateTableForQRep(s.pool, postgresSuffix, tableName)
s.NoError(err)
err = e2e.PopulateSourceTable(s.pool, postgresSuffix, tableName, rowCount)
s.NoError(err)
func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) {
err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName)
require.NoError(s.t, err)
err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount)
require.NoError(s.t, err)
}

func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error {
func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error {
// Execute the two EXCEPT queries
for {
err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector)
Expand Down Expand Up @@ -104,7 +110,7 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu
return nil
}

func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error {
func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error {
query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified,
selector, dstSchemaQualified)
rows, err := s.pool.Query(context.Background(), query)
Expand Down Expand Up @@ -138,7 +144,7 @@ func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQuali
return nil
}

func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error {
func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error {
query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified)

rows, _ := s.pool.Query(context.Background(), query)
Expand All @@ -159,9 +165,9 @@ func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error
return rows.Err()
}

func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.T(), env)
func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

numRows := 10

Expand All @@ -170,14 +176,14 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {

dstTable := "test_qrep_flow_avro_pg_2"

err := e2e.CreateTableForQRep(s.pool, postgresSuffix, dstTable)
s.NoError(err)
err := e2e.CreateTableForQRep(s.pool, s.suffix, dstTable)
require.NoError(s.t, err)

srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable)
srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
postgresSuffix, srcTable)
s.suffix, srcTable)

postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort)

Expand All @@ -191,27 +197,25 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
true,
"",
)
s.NoError(err)
require.NoError(s.t, err)

e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

err = env.GetWorkflowError()
s.NoError(err)
require.NoError(s.t, err)

err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*")
if err != nil {
s.FailNow(err.Error())
}
require.NoError(s.t, err)

env.AssertExpectations(s.T())
env.AssertExpectations(s.t)
}

func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.T(), env)
func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

numRows := 10

Expand All @@ -220,11 +224,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_

dstTable := "test_qrep_columns_pg_2"

srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable)
srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
postgresSuffix, srcTable)
s.suffix, srcTable)

postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort)

Expand All @@ -238,20 +242,18 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_
true,
"_PEERDB_SYNCED_AT",
)
s.NoError(err)
require.NoError(s.t, err)

e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

err = env.GetWorkflowError()
s.NoError(err)
require.NoError(s.t, err)

err = s.checkSyncedAt(dstSchemaQualified)
if err != nil {
s.FailNow(err.Error())
}
require.NoError(s.t, err)

env.AssertExpectations(s.T())
env.AssertExpectations(s.t)
}

0 comments on commit bb136a7

Please sign in to comment.