Skip to content

Commit

Permalink
POC: generic Test_Simple_Flow (#1440)
Browse files Browse the repository at this point in the history
Lay out basic design for generic connector testing,
replacing `Test_Simple_Flow` with a single definition used for PG/SF/BQ

More can be ported in followup PRs,
so when new connectors are added they only implement `GenericSuite` to have e2e tests

Also fix hstore comparison so it works with Snowflake
  • Loading branch information
serprex authored Mar 7, 2024
1 parent 6b47812 commit b213528
Show file tree
Hide file tree
Showing 20 changed files with 495 additions and 490 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
matrix:
runner: [ubicloud-standard-16-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 40
timeout-minutes: 30
services:
catalog:
image: imresamu/postgis:15-3.4-alpine
Expand Down Expand Up @@ -96,7 +96,7 @@ jobs:
temporal operator search-attribute create --name MirrorName --type Text --namespace default
./peer-flow worker &
./peer-flow snapshot-worker &
go test -p 32 ./... -timeout 1200s
go test -p 32 ./... -timeout 900s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
40 changes: 21 additions & 19 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,25 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
}

func TestPostgresSchemaDeltaTestSuite(t *testing.T) {
e2eshared.RunSuite(t, SetupSuite, func(s PostgresSchemaDeltaTestSuite) {
teardownTx, err := s.connector.conn.Begin(context.Background())
require.NoError(s.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(s.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
s.schema))
require.NoError(s.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(s.t, err)

require.NoError(s.t, s.connector.ConnectionActive(context.Background()))
require.NoError(s.t, s.connector.Close())
require.Error(s.t, s.connector.ConnectionActive(context.Background()))
})
e2eshared.RunSuite(t, SetupSuite)
}

func (s PostgresSchemaDeltaTestSuite) Teardown() {
teardownTx, err := s.connector.conn.Begin(context.Background())
require.NoError(s.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(s.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
s.schema))
require.NoError(s.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(s.t, err)

require.NoError(s.t, s.connector.ConnectionActive(context.Background()))
require.NoError(s.t, s.connector.Close())
require.Error(s.t, s.connector.ConnectionActive(context.Background()))
}
102 changes: 102 additions & 0 deletions flow/e2e/bigquery/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package e2e_bigquery

import (
"fmt"
"strings"
"testing"
"time"

"github.com/jackc/pgx/v5"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
)

type PeerFlowE2ETestSuiteBQ struct {
t *testing.T

bqSuffix string
conn *connpostgres.PostgresConnector
bqHelper *BigQueryTestHelper
}

func (s PeerFlowE2ETestSuiteBQ) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn {
return s.conn.Conn()
}

func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector {
return s.conn
}

func (s PeerFlowE2ETestSuiteBQ) Suffix() string {
return s.bqSuffix
}

func (s PeerFlowE2ETestSuiteBQ) Peer() *protos.Peer {
return s.bqHelper.Peer
}

func (s PeerFlowE2ETestSuiteBQ) DestinationTable(table string) string {
return table
}

func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func (s PeerFlowE2ETestSuiteBQ) Teardown() {
e2e.TearDownPostgres(s)

err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId)
if err != nil {
s.t.Fatalf("failed to tear down bigquery: %v", err)
}
}

func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
t.Helper()

suffix := shared.RandomString(8)
tsSuffix := time.Now().Format("20060102150405")
bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix)
conn, err := e2e.SetupPostgres(t, bqSuffix)
if err != nil || conn == nil {
t.Fatalf("failed to setup postgres: %v", err)
}

bqHelper, err := NewBigQueryTestHelper()
if err != nil {
t.Fatalf("Failed to create helper: %v", err)
}

err = bqHelper.RecreateDataset()
if err != nil {
t.Fatalf("Failed to recreate dataset: %v", err)
}

return PeerFlowE2ETestSuiteBQ{
t: t,
bqSuffix: bqSuffix,
conn: conn,
bqHelper: bqHelper,
}
}
149 changes: 1 addition & 148 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,76 +4,23 @@ import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/joho/godotenv"
"github.com/stretchr/testify/require"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

type PeerFlowE2ETestSuiteBQ struct {
t *testing.T

bqSuffix string
conn *connpostgres.PostgresConnector
bqHelper *BigQueryTestHelper
}

func (s PeerFlowE2ETestSuiteBQ) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn {
return s.conn.Conn()
}

func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector {
return s.conn
}

func (s PeerFlowE2ETestSuiteBQ) Suffix() string {
return s.bqSuffix
}

func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) {
e2e.TearDownPostgres(s)

err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId)
if err != nil {
s.t.Fatalf("failed to tear down bigquery: %v", err)
}
})
e2eshared.RunSuite(t, SetupSuite)
}

func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, value string) error {
Expand Down Expand Up @@ -147,52 +94,6 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel
return nil
}

// setupBigQuery sets up the bigquery connection.
func setupBigQuery(t *testing.T) *BigQueryTestHelper {
t.Helper()

bqHelper, err := NewBigQueryTestHelper()
if err != nil {
t.Fatalf("Failed to create helper: %v", err)
}

err = bqHelper.RecreateDataset()
if err != nil {
t.Fatalf("Failed to recreate dataset: %v", err)
}

return bqHelper
}

// Implement SetupAllSuite interface to setup the test suite
func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
t.Helper()

err := godotenv.Load()
if err != nil {
// it's okay if the .env file is not present
// we will use the default values
t.Log("Unable to load .env file, using default values from env")
}

suffix := shared.RandomString(8)
tsSuffix := time.Now().Format("20060102150405")
bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix)
conn, err := e2e.SetupPostgres(t, bqSuffix)
if err != nil || conn == nil {
t.Fatalf("failed to setup postgres: %v", err)
}

bq := setupBigQuery(t)

return PeerFlowE2ETestSuiteBQ{
t: t,
bqSuffix: bqSuffix,
conn: conn,
bqHelper: bq,
}
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
tc := e2e.NewTemporalClient(s.t)

Expand Down Expand Up @@ -268,54 +169,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
e2e.RequireEnvCanceled(s.t, env)
}

// Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table.
// The test inserts 10 rows into the source table and verifies that the data is
// correctly synced to the destination table after sync flow completes.
func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
tc := e2e.NewTemporalClient(s.t)

srcTableName := s.attachSchemaSuffix("test_simple_flow_bq")
dstTableName := "test_simple_flow_bq"

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_complete_simple_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig.MaxBatchSize = 100

env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

// insert 10 rows into the source table
for i := range 10 {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitForEqualTables(env, s, "normalize inserts", dstTableName, "id,key,value")

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
tc := e2e.NewTemporalClient(s.t)

Expand Down
11 changes: 3 additions & 8 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,9 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) {
rows = append(rows, row)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO e2e_test_%s.%s (
watermark_ts,
mytimestamp,
mytztimestamp,
medieval,
mybaddate,
mydate
) VALUES %s;
INSERT INTO e2e_test_%s.%s (
watermark_ts, mytimestamp, mytztimestamp, medieval, mybaddate, mydate
) VALUES %s;
`, s.bqSuffix, tableName, strings.Join(rows, ",")))
require.NoError(s.t, err)
}
Expand Down
Loading

0 comments on commit b213528

Please sign in to comment.