Skip to content

Commit

Permalink
more logging around TearDownPostgres
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 11, 2024
1 parent 9458da2 commit 9f00855
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 38 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func HeartbeatRoutine(
}
}
}()
return func() { shutdown <- struct{}{} }
return func() { close(shutdown) }
}

// if the functions are being called outside the context of a Temporal workflow,
Expand Down
8 changes: 2 additions & 6 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,9 @@ func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string

func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) {
err := e2e.TearDownPostgres(s.pool, s.bqSuffix)
if err != nil {
slog.Error("failed to tear down postgres", slog.Any("error", err))
s.t.FailNow()
}
e2e.TearDownPostgres(s)

err = s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId)
err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId)
if err != nil {
slog.Error("failed to tear down bigquery", slog.Any("error", err))
s.t.FailNow()
Expand Down
18 changes: 12 additions & 6 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -135,22 +137,26 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) {
return pool, nil
}

func TearDownPostgres(pool *pgxpool.Pool, suffix string) error {
// drop the e2e_test schema
func TearDownPostgres[T e2eshared.Suite](s T) {
t := s.T()
t.Helper()
pool := s.Pool()
suffix := s.Suffix()

if pool != nil {
deadline := time.Now().Add(time.Minute)
t.Log("begin tearing down postgres schema", suffix)
deadline := time.Now().Add(2 * time.Minute)
for {
err := cleanPostgres(pool, suffix)
if err == nil {
pool.Close()
return nil
return
} else if time.Now().After(deadline) {
return err
require.Fail(t, "failed to teardown postgres schema", suffix)
}
time.Sleep(time.Second)
}
}
return nil
}

// GeneratePostgresPeer generates a postgres peer config for testing.
Expand Down
17 changes: 13 additions & 4 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,21 @@ type PeerFlowE2ETestSuitePG struct {
suffix string
}

func (s PeerFlowE2ETestSuitePG) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuitePG) Pool() *pgxpool.Pool {
return s.pool
}

func (s PeerFlowE2ETestSuitePG) Suffix() string {
return s.suffix
}

func TestPeerFlowE2ETestSuitePG(t *testing.T) {
e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuitePG) {
err := e2e.TearDownPostgres(s.pool, s.suffix)
if err != nil {
require.Fail(s.t, "failed to drop Postgres schema", err)
}
e2e.TearDownPostgres(s)
})
}

Expand Down
19 changes: 14 additions & 5 deletions flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@ type PeerFlowE2ETestSuiteS3 struct {
suffix string
}

func (s PeerFlowE2ETestSuiteS3) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteS3) Pool() *pgxpool.Pool {
return s.pool
}

func (s PeerFlowE2ETestSuiteS3) Suffix() string {
return s.suffix
}

func tearDownSuite(s PeerFlowE2ETestSuiteS3) {
err := e2e.TearDownPostgres(s.pool, s.suffix)
if err != nil {
require.Fail(s.t, "failed to drop Postgres schema", err)
}
e2e.TearDownPostgres(s)

err = s.s3Helper.CleanUp()
err := s.s3Helper.CleanUp()
if err != nil {
require.Fail(s.t, "failed to clean up s3", err)
}
Expand Down
10 changes: 3 additions & 7 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,17 @@ func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*m

func TestPeerFlowE2ETestSuiteSF(t *testing.T) {
e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) {
err := e2e.TearDownPostgres(s.pool, s.pgSuffix)
if err != nil {
slog.Error("failed to tear down Postgres", slog.Any("error", err))
s.t.FailNow()
}
e2e.TearDownPostgres(s)

if s.sfHelper != nil {
err = s.sfHelper.Cleanup()
err := s.sfHelper.Cleanup()
if err != nil {
slog.Error("failed to tear down Snowflake", slog.Any("error", err))
s.t.FailNow()
}
}

err = s.connector.Close()
err := s.connector.Close()
if err != nil {
slog.Error("failed to close Snowflake connector", slog.Any("error", err))
s.t.FailNow()
Expand Down
17 changes: 14 additions & 3 deletions flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,24 @@ type PeerFlowE2ETestSuiteSQLServer struct {
suffix string
}

func (s PeerFlowE2ETestSuiteSQLServer) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteSQLServer) Pool() *pgxpool.Pool {
return s.pool
}

func (s PeerFlowE2ETestSuiteSQLServer) Suffix() string {
return s.suffix
}

func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) {
e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSQLServer) {
err := e2e.TearDownPostgres(s.pool, s.suffix)
require.NoError(s.t, err)
e2e.TearDownPostgres(s)

if s.sqlsHelper != nil {
err = s.sqlsHelper.CleanUp()
err := s.sqlsHelper.CleanUp()
require.NoError(s.t, err)
}
})
Expand Down
10 changes: 5 additions & 5 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen
if err == nil {
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
if err == nil {
if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING {
return
}
} else {
if err == nil {
if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING {
return
}
} else {
slog.Error(err.Error())
}
} else if counter > 15 {
Expand Down
6 changes: 5 additions & 1 deletion flow/e2eshared/e2eshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)

type RowSource interface {
type Suite interface {
T() *testing.T
Pool() *pgxpool.Pool
Suffix() string
}

type RowSource interface {
Suite
GetRows(table, cols string) (*model.QRecordBatch, error)
}

Expand Down

0 comments on commit 9f00855

Please sign in to comment.