Skip to content

Commit

Permalink
POC: generic Test_Simple_Flow
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 6, 2024
1 parent 0a32419 commit 2002605
Show file tree
Hide file tree
Showing 13 changed files with 442 additions and 306 deletions.
40 changes: 21 additions & 19 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,25 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
}

func TestPostgresSchemaDeltaTestSuite(t *testing.T) {
e2eshared.RunSuite(t, SetupSuite, func(s PostgresSchemaDeltaTestSuite) {
teardownTx, err := s.connector.conn.Begin(context.Background())
require.NoError(s.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(s.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
s.schema))
require.NoError(s.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(s.t, err)

require.NoError(s.t, s.connector.ConnectionActive(context.Background()))
require.NoError(s.t, s.connector.Close())
require.Error(s.t, s.connector.ConnectionActive(context.Background()))
})
e2eshared.RunSuite(t, SetupSuite)
}

func (s PostgresSchemaDeltaTestSuite) Teardown() {
teardownTx, err := s.connector.conn.Begin(context.Background())
require.NoError(s.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(s.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
s.schema))
require.NoError(s.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(s.t, err)

require.NoError(s.t, s.connector.ConnectionActive(context.Background()))
require.NoError(s.t, s.connector.Close())
require.Error(s.t, s.connector.ConnectionActive(context.Background()))
}
98 changes: 98 additions & 0 deletions flow/e2e/bigquery/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package e2e_bigquery

import (
"fmt"
"strings"
"testing"
"time"

"github.com/jackc/pgx/v5"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
)

type PeerFlowE2ETestSuiteBQ struct {
t *testing.T

bqSuffix string
conn *connpostgres.PostgresConnector
bqHelper *BigQueryTestHelper
}

func (s PeerFlowE2ETestSuiteBQ) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn {
return s.conn.Conn()
}

func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector {
return s.conn
}

func (s PeerFlowE2ETestSuiteBQ) Suffix() string {
return s.bqSuffix
}

func (s PeerFlowE2ETestSuiteBQ) Peer() *protos.Peer {
return s.bqHelper.Peer
}

func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func (s PeerFlowE2ETestSuiteBQ) Teardown() {
e2e.TearDownPostgres(s)

err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId)
if err != nil {
s.t.Fatalf("failed to tear down bigquery: %v", err)
}
}

func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
t.Helper()

suffix := shared.RandomString(8)
tsSuffix := time.Now().Format("20060102150405")
bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix)
conn, err := e2e.SetupPostgres(t, bqSuffix)
if err != nil || conn == nil {
t.Fatalf("failed to setup postgres: %v", err)
}

bqHelper, err := NewBigQueryTestHelper()
if err != nil {
t.Fatalf("Failed to create helper: %v", err)
}

err = bqHelper.RecreateDataset()
if err != nil {
t.Fatalf("Failed to recreate dataset: %v", err)
}

return PeerFlowE2ETestSuiteBQ{
t: t,
bqSuffix: bqSuffix,
conn: conn,
bqHelper: bqHelper,
}
}
101 changes: 1 addition & 100 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,76 +4,23 @@ import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/joho/godotenv"
"github.com/stretchr/testify/require"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

type PeerFlowE2ETestSuiteBQ struct {
t *testing.T

bqSuffix string
conn *connpostgres.PostgresConnector
bqHelper *BigQueryTestHelper
}

func (s PeerFlowE2ETestSuiteBQ) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn {
return s.conn.Conn()
}

func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector {
return s.conn
}

func (s PeerFlowE2ETestSuiteBQ) Suffix() string {
return s.bqSuffix
}

func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) {
e2e.TearDownPostgres(s)

err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId)
if err != nil {
s.t.Fatalf("failed to tear down bigquery: %v", err)
}
})
e2eshared.RunSuite(t, SetupSuite)
}

func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, value string) error {
Expand Down Expand Up @@ -147,52 +94,6 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel
return nil
}

// setupBigQuery sets up the bigquery connection.
func setupBigQuery(t *testing.T) *BigQueryTestHelper {
t.Helper()

bqHelper, err := NewBigQueryTestHelper()
if err != nil {
t.Fatalf("Failed to create helper: %v", err)
}

err = bqHelper.RecreateDataset()
if err != nil {
t.Fatalf("Failed to recreate dataset: %v", err)
}

return bqHelper
}

// Implement SetupAllSuite interface to setup the test suite
func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
t.Helper()

err := godotenv.Load()
if err != nil {
// it's okay if the .env file is not present
// we will use the default values
t.Log("Unable to load .env file, using default values from env")
}

suffix := shared.RandomString(8)
tsSuffix := time.Now().Format("20060102150405")
bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix)
conn, err := e2e.SetupPostgres(t, bqSuffix)
if err != nil || conn == nil {
t.Fatalf("failed to setup postgres: %v", err)
}

bq := setupBigQuery(t)

return PeerFlowE2ETestSuiteBQ{
t: t,
bqSuffix: bqSuffix,
conn: conn,
bqHelper: bq,
}
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
tc := e2e.NewTemporalClient(s.t)

Expand Down
89 changes: 89 additions & 0 deletions flow/e2e/generic/peer_flow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package e2e_generic

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2e/bigquery"
"github.com/PeerDB-io/peer-flow/e2e/postgres"
"github.com/PeerDB-io/peer-flow/e2e/snowflake"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

type GenericSuite interface {
e2e.RowSource
Peer() *protos.Peer
}

func TestGenericPG(t *testing.T) {
e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite))
}

func TestGenericSF(t *testing.T) {
e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite))
}

func TestGenericBQ(t *testing.T) {
e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite))
}

type GenericWrapper struct {
GenericSuite
}

func SetupGenericSuite[T GenericSuite](f func(t *testing.T) T) func(t *testing.T) GenericWrapper {
return func(t *testing.T) GenericWrapper {
t.Helper()
return GenericWrapper{f(t)}
}
}

func (s GenericWrapper) Test_Simple_Flow() {
t := s.T()
srcTableName := e2e.AttachSchema(s, "test_simple_flow")
dstTableName := e2e.AttachSchema(s, "test_simple_flow_dst")

_, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL,
myh HSTORE NOT NULL
);
`, srcTableName))
require.NoError(t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: e2e.AddSuffix(s, "test_simple_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
Destination: s.Peer(),
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig.MaxBatchSize = 100

tc := e2e.NewTemporalClient(t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)

e2e.SetupCDCFlowStatusQuery(t, env, connectionGen)
// insert 10 rows into the source table
for i := range 10 {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"')
`, srcTableName), testKey, testValue)
e2e.EnvNoError(t, env, err)
}
t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitForEqualTables(env, s, "normalizing 10 rows", "id,key,value,myh", `id,key,value,myh`)
env.Cancel()
e2e.RequireEnvCanceled(t, env)
}
Loading

0 comments on commit 2002605

Please sign in to comment.