From ed4438889b50c89b915a4f66dc47bbd0c18cbfcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 15:04:15 +0000 Subject: [PATCH 01/12] Don't spam logs about unknown types being sent as text Logs of this nature should only be generated once per connector PostgresCDCSource & QRepQueryExecutor now extend PostgresConnector, stops QRepQueryExecutor querying for customTypesMapping for snapshots --- flow/connectors/bigquery/bigquery.go | 5 +- flow/connectors/postgres/cdc.go | 20 ++- flow/connectors/postgres/postgres.go | 16 ++- flow/connectors/postgres/qrep.go | 33 +---- flow/connectors/postgres/qrep_bench_test.go | 18 ++- .../postgres/qrep_query_executor.go | 75 ++++------- .../postgres/qrep_query_executor_test.go | 47 +++---- flow/connectors/postgres/qvalue_convert.go | 13 +- flow/e2e/bigquery/peer_flow_bq_test.go | 123 +++++++++--------- flow/e2e/bigquery/qrep_flow_bq_test.go | 4 +- flow/e2e/congen.go | 18 +-- flow/e2e/postgres/peer_flow_pg_test.go | 102 +++++++-------- flow/e2e/postgres/qrep_flow_pg_test.go | 52 +++----- flow/e2e/s3/cdc_s3_test.go | 4 +- flow/e2e/s3/qrep_flow_s3_test.go | 10 +- flow/e2e/snowflake/peer_flow_sf_test.go | 123 +++++++++--------- flow/e2e/snowflake/qrep_flow_sf_test.go | 4 +- .../e2e/sqlserver/qrep_flow_sqlserver_test.go | 13 +- flow/e2e/test_utils.go | 31 +++-- flow/e2eshared/e2eshared.go | 13 -- 20 files changed, 347 insertions(+), 377 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 4d63a97657..5aed634e60 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -47,7 +47,6 @@ type BigQueryServiceAccount struct { ClientX509CertURL string `json:"client_x509_cert_url"` } -// BigQueryConnector is a Connector implementation for BigQuery. type BigQueryConnector struct { bqConfig *protos.BigqueryConfig client *bigquery.Client @@ -59,7 +58,6 @@ type BigQueryConnector struct { logger log.Logger } -// Create BigQueryServiceAccount from BigqueryConfig func NewBigQueryServiceAccount(bqConfig *protos.BigqueryConfig) (*BigQueryServiceAccount, error) { var serviceAccount BigQueryServiceAccount serviceAccount.Type = bqConfig.AuthType @@ -178,7 +176,6 @@ func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, pr return nil } -// NewBigQueryConnector creates a new BigQueryConnector from a PeerConnectionConfig. func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*BigQueryConnector, error) { logger := logger.LoggerFromCtx(ctx) @@ -246,7 +243,7 @@ func (c *BigQueryConnector) Close(_ context.Context) error { return c.client.Close() } -// ConnectionActive returns true if the connection is active. +// ConnectionActive returns nil if the connection is active. func (c *BigQueryConnector) ConnectionActive(ctx context.Context) error { _, err := c.client.DatasetInProject(c.projectID, c.datasetID).Metadata(ctx) if err != nil { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 44e81c8227..b2cce785e5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -23,10 +23,10 @@ import ( "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" ) type PostgresCDCSource struct { + *PostgresConnector replConn *pgx.Conn SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude @@ -35,11 +35,9 @@ type PostgresCDCSource struct { relationMessageMapping model.RelationMessageMapping typeMap *pgtype.Map commitLock bool - customTypeMapping map[uint32]string // for partitioned tables, maps child relid to parent relid childToParentRelIDMapping map[uint32]uint32 - logger slog.Logger // for storing chema delta audit logs to catalog catalogPool *pgxpool.Pool @@ -64,14 +62,14 @@ type startReplicationOpts struct { } // Create a new PostgresCDCSource -func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) { +func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) { childToParentRelIDMap, err := getChildToParentRelIDMap(ctx, cdcConfig.Connection) if err != nil { return nil, fmt.Errorf("error getting child to parent relid map: %w", err) } - flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &PostgresCDCSource{ + PostgresConnector: c, replConn: cdcConfig.Connection, SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, TableNameMapping: cdcConfig.TableNameMapping, @@ -81,8 +79,6 @@ func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, cus typeMap: pgtype.NewMap(), childToParentRelIDMapping: childToParentRelIDMap, commitLock: false, - customTypeMapping: customTypeMap, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), catalogPool: cdcConfig.CatalogPool, flowJobName: cdcConfig.FlowJobName, }, nil @@ -722,20 +718,20 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma if err != nil { return qvalue.QValue{}, err } - retVal, err := parseFieldFromPostgresOID(dataType, parsedData) + retVal, err := p.parseFieldFromPostgresOID(dataType, parsedData) if err != nil { return qvalue.QValue{}, err } return retVal, nil } else if dataType == uint32(oid.T_timetz) { // ugly TIMETZ workaround for CDC decoding. - retVal, err := parseFieldFromPostgresOID(dataType, string(data)) + retVal, err := p.parseFieldFromPostgresOID(dataType, string(data)) if err != nil { return qvalue.QValue{}, err } return retVal, nil } - typeName, ok := p.customTypeMapping[dataType] + typeName, ok := p.customTypesMapping[dataType] if ok { customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { @@ -826,9 +822,9 @@ func (p *PostgresCDCSource) processRelationMessage( for _, column := range currRel.Columns { // not present in previous relation message, but in current one, so added. if prevRelMap[column.Name] == nil { - qKind := postgresOIDToQValueKind(column.DataType) + qKind := p.postgresOIDToQValueKind(column.DataType) if qKind == qvalue.QValueKindInvalid { - typeName, ok := p.customTypeMapping[column.DataType] + typeName, ok := p.customTypesMapping[column.DataType] if ok { qKind = customTypeToQKind(typeName) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f41e015672..930d058b23 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -24,7 +24,6 @@ import ( "github.com/PeerDB-io/peer-flow/shared/alerting" ) -// PostgresConnector is a Connector implementation for Postgres. type PostgresConnector struct { connStr string config *protos.PostgresConfig @@ -33,10 +32,10 @@ type PostgresConnector struct { replConfig *pgx.ConnConfig customTypesMapping map[uint32]string metadataSchema string + hushWarnOID map[uint32]struct{} logger log.Logger } -// NewPostgresConnector creates a new instance of PostgresConnector. func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { connectionString := utils.GetPGConnectionString(pgConfig) @@ -84,6 +83,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) replConfig: replConfig, customTypesMapping: customTypeMap, metadataSchema: metadataSchema, + hushWarnOID: make(map[uint32]struct{}), logger: logger.LoggerFromCtx(ctx), }, nil } @@ -107,7 +107,11 @@ func (c *PostgresConnector) Close(ctx context.Context) error { return nil } -// ConnectionActive returns true if the connection is active. +func (c *PostgresConnector) Conn() *pgx.Conn { + return c.conn +} + +// ConnectionActive returns nil if the connection is active. func (c *PostgresConnector) ConnectionActive(ctx context.Context) error { if c.conn == nil { return fmt.Errorf("connection is nil") @@ -214,7 +218,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo } defer replConn.Close(ctx) - cdc, err := NewPostgresCDCSource(ctx, &PostgresCDCConfig{ + cdc, err := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{ Connection: replConn, SrcTableIDNameMapping: req.SrcTableIDNameMapping, Slot: slotName, @@ -223,7 +227,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo RelationMessageMapping: req.RelationMessageMapping, CatalogPool: catalogPool, FlowJobName: req.FlowJobName, - }, c.customTypesMapping) + }) if err != nil { return fmt.Errorf("failed to create cdc source: %w", err) } @@ -603,7 +607,7 @@ func (c *PostgresConnector) getTableSchemaForTable( columnNames := make([]string, 0, len(fields)) columns := make([]*protos.FieldDescription, 0, len(fields)) for _, fieldDescription := range fields { - genericColType := postgresOIDToQValueKind(fieldDescription.DataTypeOID) + genericColType := c.postgresOIDToQValueKind(fieldDescription.DataTypeOID) if genericColType == qvalue.QValueKindInvalid { typeName, ok := c.customTypesMapping[fieldDescription.DataTypeOID] if ok { diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 8a01d90cbe..13b92dfcd4 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -324,12 +324,8 @@ func (c *PostgresConnector) PullQRepRecords( partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) if partition.FullTablePartition { c.logger.Info("pulling full table partition", partitionIdLog) - executor, err := NewQRepQueryExecutorSnapshot(ctx, - c.conn, c.config.TransactionSnapshot, + executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) - if err != nil { - return nil, err - } query := config.Query return executor.ExecuteAndProcessQuery(ctx, query) } @@ -368,12 +364,8 @@ func (c *PostgresConnector) PullQRepRecords( return nil, err } - executor, err := NewQRepQueryExecutorSnapshot( - ctx, c.conn, c.config.TransactionSnapshot, + executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) - if err != nil { - return nil, err - } records, err := executor.ExecuteAndProcessQuery(ctx, query, rangeStart, rangeEnd) if err != nil { @@ -392,15 +384,11 @@ func (c *PostgresConnector) PullQRepRecordStream( partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) if partition.FullTablePartition { c.logger.Info("pulling full table partition", partitionIdLog) - executor, err := NewQRepQueryExecutorSnapshot( - ctx, c.conn, c.config.TransactionSnapshot, + executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) - if err != nil { - return 0, err - } query := config.Query - _, err = executor.ExecuteAndProcessQueryStream(ctx, stream, query) + _, err := executor.ExecuteAndProcessQueryStream(ctx, stream, query) return 0, err } c.logger.Info("Obtained ranges for partition for PullQRepStream", partitionIdLog) @@ -438,12 +426,8 @@ func (c *PostgresConnector) PullQRepRecordStream( return 0, err } - executor, err := NewQRepQueryExecutorSnapshot( - ctx, c.conn, c.config.TransactionSnapshot, + executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) - if err != nil { - return 0, err - } numRecords, err := executor.ExecuteAndProcessQueryStream(ctx, stream, query, rangeStart, rangeEnd) if err != nil { @@ -539,13 +523,10 @@ func (c *PostgresConnector) PullXminRecordStream( query += " WHERE age(xmin) > 0 AND age(xmin) <= age($1::xid)" } - executor, err := NewQRepQueryExecutorSnapshot( - ctx, c.conn, c.config.TransactionSnapshot, + executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) - if err != nil { - return 0, currentSnapshotXmin, err - } + var err error var numRecords int if partition.Range != nil { numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin( diff --git a/flow/connectors/postgres/qrep_bench_test.go b/flow/connectors/postgres/qrep_bench_test.go index e8f514bc38..1dddc617fc 100644 --- a/flow/connectors/postgres/qrep_bench_test.go +++ b/flow/connectors/postgres/qrep_bench_test.go @@ -4,24 +4,28 @@ import ( "context" "testing" - "github.com/jackc/pgx/v5" + "github.com/PeerDB-io/peer-flow/generated/protos" ) func BenchmarkQRepQueryExecutor(b *testing.B) { - connectionString := "postgres://postgres:postgres@localhost:7132/postgres" query := "SELECT * FROM bench.large_table" ctx := context.Background() - - // Create a separate connection for non-replication queries - conn, err := pgx.Connect(ctx, connectionString) + connector, err := NewPostgresConnector(ctx, + &protos.PostgresConfig{ + Host: "localhost", + Port: 7132, + User: "postgres", + Password: "postgres", + Database: "postgres", + }) if err != nil { b.Fatalf("failed to create connection: %v", err) } - defer conn.Close(context.Background()) + defer connector.Close(ctx) // Create a new QRepQueryExecutor instance - qe := NewQRepQueryExecutor(conn, context.Background(), "test flow", "test part") + qe := connector.NewQRepQueryExecutor("test flow", "test part") // Run the benchmark b.ResetTimer() diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 48900555f0..2b97759d0e 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -13,7 +13,6 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/geo" - "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/numeric" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -21,48 +20,32 @@ import ( ) type QRepQueryExecutor struct { - conn *pgx.Conn - snapshot string - testEnv bool - flowJobName string - partitionID string - customTypeMap map[uint32]string - logger log.Logger + *PostgresConnector + snapshot string + testEnv bool + flowJobName string + partitionID string + logger log.Logger } -func NewQRepQueryExecutor(conn *pgx.Conn, ctx context.Context, - flowJobName string, partitionID string, -) *QRepQueryExecutor { +func (c *PostgresConnector) NewQRepQueryExecutor(flowJobName string, partitionID string) *QRepQueryExecutor { return &QRepQueryExecutor{ - conn: conn, - snapshot: "", - flowJobName: flowJobName, - partitionID: partitionID, - logger: log.With( - logger.LoggerFromCtx(ctx), - slog.String(string(shared.PartitionIDKey), partitionID), - ), + PostgresConnector: c, + snapshot: "", + flowJobName: flowJobName, + partitionID: partitionID, + logger: log.With(c.logger, slog.String(string(shared.PartitionIDKey), partitionID)), } } -func NewQRepQueryExecutorSnapshot(ctx context.Context, conn *pgx.Conn, snapshot string, - flowJobName string, partitionID string, -) (*QRepQueryExecutor, error) { - CustomTypeMap, err := utils.GetCustomDataTypes(ctx, conn) - if err != nil { - return nil, fmt.Errorf("failed to get custom data types: %w", err) - } +func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(snapshot string, flowJobName string, partitionID string) *QRepQueryExecutor { return &QRepQueryExecutor{ - conn: conn, - snapshot: snapshot, - flowJobName: flowJobName, - partitionID: partitionID, - customTypeMap: CustomTypeMap, - logger: log.With( - logger.LoggerFromCtx(ctx), - slog.String(string(shared.PartitionIDKey), partitionID), - ), - }, nil + PostgresConnector: c, + snapshot: snapshot, + flowJobName: flowJobName, + partitionID: partitionID, + logger: log.With(c.logger, slog.String(string(shared.PartitionIDKey), partitionID)), + } } func (qe *QRepQueryExecutor) SetTestEnv(testEnv bool) { @@ -104,9 +87,9 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip qfields := make([]model.QField, len(fds)) for i, fd := range fds { cname := fd.Name - ctype := postgresOIDToQValueKind(fd.DataTypeOID) + ctype := qe.postgresOIDToQValueKind(fd.DataTypeOID) if ctype == qvalue.QValueKindInvalid { - typeName, ok := qe.customTypeMap[fd.DataTypeOID] + typeName, ok := qe.customTypesMapping[fd.DataTypeOID] if ok { ctype = customTypeToQKind(typeName) } else { @@ -145,7 +128,7 @@ func (qe *QRepQueryExecutor) ProcessRows( qe.logger.Info("Processing rows") // Iterate over the rows for rows.Next() { - record, err := mapRowToQRecord(qe.logger, rows, fieldDescriptions, qe.customTypeMap) + record, err := qe.mapRowToQRecord(rows, fieldDescriptions) if err != nil { qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) return nil, fmt.Errorf("failed to map row to QRecord: %w", err) @@ -186,7 +169,7 @@ func (qe *QRepQueryExecutor) processRowsStream( return numRows, ctx.Err() default: // Process the row as before - record, err := mapRowToQRecord(qe.logger, rows, fieldDescriptions, qe.customTypeMap) + record, err := qe.mapRowToQRecord(rows, fieldDescriptions) if err != nil { qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) stream.Records <- model.QRecordOrError{ @@ -450,28 +433,26 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( return totalRecordsFetched, nil } -func mapRowToQRecord( - logger log.Logger, +func (qe *QRepQueryExecutor) mapRowToQRecord( row pgx.Rows, fds []pgconn.FieldDescription, - customTypeMap map[uint32]string, ) ([]qvalue.QValue, error) { // make vals an empty array of QValue of size len(fds) record := make([]qvalue.QValue, len(fds)) values, err := row.Values() if err != nil { - logger.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err)) + qe.logger.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err)) return nil, fmt.Errorf("failed to scan row: %w", err) } for i, fd := range fds { // Check if it's a custom type first - typeName, ok := customTypeMap[fd.DataTypeOID] + typeName, ok := qe.customTypesMapping[fd.DataTypeOID] if !ok { - tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i]) + tmp, err := qe.parseFieldFromPostgresOID(fd.DataTypeOID, values[i]) if err != nil { - logger.Error("[pg_query_executor] failed to parse field", slog.Any("error", err)) + qe.logger.Error("[pg_query_executor] failed to parse field", slog.Any("error", err)) return nil, fmt.Errorf("failed to parse field: %w", err) } record[i] = tmp diff --git a/flow/connectors/postgres/qrep_query_executor_test.go b/flow/connectors/postgres/qrep_query_executor_test.go index bffb28214c..32d0a1a154 100644 --- a/flow/connectors/postgres/qrep_query_executor_test.go +++ b/flow/connectors/postgres/qrep_query_executor_test.go @@ -10,31 +10,35 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" + + "github.com/PeerDB-io/peer-flow/generated/protos" ) -func setupDB(t *testing.T) (*pgx.Conn, string) { +func setupDB(t *testing.T) (*PostgresConnector, string) { t.Helper() - config, err := pgx.ParseConfig("postgres://postgres:postgres@localhost:7132/postgres") - if err != nil { - t.Fatalf("unable to parse config: %v", err) - } - - conn, err := pgx.ConnectConfig(context.Background(), config) + connector, err := NewPostgresConnector(context.Background(), + &protos.PostgresConfig{ + Host: "localhost", + Port: 7132, + User: "postgres", + Password: "postgres", + Database: "postgres", + }) if err != nil { - t.Fatalf("unable to connect to database: %v", err) + t.Fatalf("unable to create connector: %v", err) } // Create unique schema name using current time schemaName := fmt.Sprintf("schema_%d", time.Now().Unix()) // Create the schema - _, err = conn.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s;", schemaName)) + _, err = connector.conn.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s;", schemaName)) if err != nil { t.Fatalf("unable to create schema: %v", err) } - return conn, schemaName + return connector, schemaName } func teardownDB(t *testing.T, conn *pgx.Conn, schemaName string) { @@ -47,12 +51,11 @@ func teardownDB(t *testing.T, conn *pgx.Conn, schemaName string) { } func TestExecuteAndProcessQuery(t *testing.T) { - conn, schemaName := setupDB(t) - defer conn.Close(context.Background()) - - defer teardownDB(t, conn, schemaName) - ctx := context.Background() + connector, schemaName := setupDB(t) + conn := connector.conn + defer connector.Close(ctx) + defer teardownDB(t, conn, schemaName) query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.test(id SERIAL PRIMARY KEY, data TEXT);", schemaName) _, err := conn.Exec(ctx, query) @@ -66,7 +69,7 @@ func TestExecuteAndProcessQuery(t *testing.T) { t.Fatalf("error while inserting into test table: %v", err) } - qe := NewQRepQueryExecutor(conn, ctx, "test flow", "test part") + qe := connector.NewQRepQueryExecutor("test flow", "test part") qe.SetTestEnv(true) query = fmt.Sprintf("SELECT * FROM %s.test;", schemaName) @@ -85,13 +88,11 @@ func TestExecuteAndProcessQuery(t *testing.T) { } func TestAllDataTypes(t *testing.T) { - conn, schemaName := setupDB(t) - defer conn.Close(context.Background()) - - // Call teardownDB function after test - defer teardownDB(t, conn, schemaName) - ctx := context.Background() + connector, schemaName := setupDB(t) + conn := connector.conn + defer conn.Close(ctx) + defer teardownDB(t, conn, schemaName) // Create a table that contains every data type we want to test query := fmt.Sprintf(` @@ -170,7 +171,7 @@ func TestAllDataTypes(t *testing.T) { t.Fatalf("error while inserting into test table: %v", err) } - qe := NewQRepQueryExecutor(conn, ctx, "test flow", "test part") + qe := connector.NewQRepQueryExecutor("test flow", "test part") // Select the row back out of the table query = fmt.Sprintf("SELECT * FROM %s.test;", schemaName) rows, err := qe.ExecuteQuery(context.Background(), query) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index bd39fd1048..64303fb35c 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "log/slog" "math/big" "strings" "time" @@ -18,7 +17,7 @@ import ( var big10 = big.NewInt(10) -func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { +func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { switch recvOID { case pgtype.BoolOID: return qvalue.QValueKindBoolean @@ -102,7 +101,11 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { return qvalue.QValueKindInvalid } else { - slog.Warn(fmt.Sprintf("unsupported field type: %v - type name - %s; returning as string", recvOID, typeName.Name)) + _, warned := c.hushWarnOID[recvOID] + if !warned { + c.logger.Warn(fmt.Sprintf("unsupported field type: %d - type name - %s; returning as string", recvOID, typeName.Name)) + c.hushWarnOID[recvOID] = struct{}{} + } return qvalue.QValueKindString } } @@ -370,8 +373,8 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( return val, nil } -func parseFieldFromPostgresOID(oid uint32, value interface{}) (qvalue.QValue, error) { - return parseFieldFromQValueKind(postgresOIDToQValueKind(oid), value) +func (c *PostgresConnector) parseFieldFromPostgresOID(oid uint32, value interface{}) (qvalue.QValue, error) { + return parseFieldFromQValueKind(c.postgresOIDToQValueKind(oid), value) } func numericToRat(numVal *pgtype.Numeric) (*big.Rat, error) { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 301359b3cc..0c3306b668 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -14,6 +14,7 @@ import ( "github.com/joho/godotenv" "github.com/stretchr/testify/require" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" @@ -28,7 +29,7 @@ type PeerFlowE2ETestSuiteBQ struct { t *testing.T bqSuffix string - conn *pgx.Conn + conn *connpostgres.PostgresConnector bqHelper *BigQueryTestHelper } @@ -37,6 +38,10 @@ func (s PeerFlowE2ETestSuiteBQ) T() *testing.T { } func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn { + return s.conn.Conn() +} + +func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector { return s.conn } @@ -207,7 +212,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { srcTableName := s.attachSchemaSuffix("test_no_data") dstTableName := "test_no_data" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, @@ -243,7 +248,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { srcTableName := s.attachSchemaSuffix("test_char_coltype") dstTableName := "test_char_coltype" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, @@ -282,7 +287,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") dstTableName := "test_simple_flow_bq" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, @@ -308,7 +313,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) e2e.EnvNoError(s.t, env, err) @@ -329,7 +334,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { srcTableName := s.attachSchemaSuffix("test_toast_bq_1") dstTableName := "test_toast_bq_1" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, @@ -360,7 +365,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { 2. changes no toast column 2. changes 1 toast column */ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s(t1,t2,k) SELECT random_string(9000),random_string(9000), 1 FROM generate_series(1,2); @@ -385,7 +390,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { srcTableName := s.attachSchemaSuffix("test_toast_bq_3") dstTableName := "test_toast_bq_3" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, @@ -411,7 +416,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s(t1,t2,k) SELECT random_string(9000),random_string(9000), 1 FROM generate_series(1,2); @@ -447,7 +452,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { srcTableName := s.attachSchemaSuffix("test_toast_bq_4") dstTableName := "test_toast_bq_4" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, t1 text, @@ -472,7 +477,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s(t1,k) SELECT random_string(9000), 1 FROM generate_series(1,1); @@ -501,7 +506,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { srcTableName := s.attachSchemaSuffix("test_toast_bq_5") dstTableName := "test_toast_bq_5" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, @@ -530,7 +535,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { transaction updating a single row multiple times with changed/unchanged toast columns */ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s(t1,t2,k) SELECT random_string(9000),random_string(9000), 1 FROM generate_series(1,1); @@ -556,12 +561,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { dstTableName := "test_types_bq" createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" var pgErr *pgconn.PgError - _, enumErr := s.conn.Exec(context.Background(), createMoodEnum) + _, enumErr := s.Conn().Exec(context.Background(), createMoodEnum) if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(pgErr) { require.NoError(s.t, enumErr) } - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, @@ -588,7 +593,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* test inserting various types*/ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', true,random_bytea(32),'s','test','1.1.10.2'::cidr, CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, @@ -651,7 +656,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { srcTableName := s.attachSchemaSuffix("test_nans_bq") dstTableName := "test_nans_bq" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 double precision,c2 double precision[]); `, srcTableName)) require.NoError(s.t, err) @@ -672,7 +677,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* test inserting various types*/ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2, 'NaN'::double precision, '{NaN, Infinity, -Infinity}'; `, srcTableName)) e2e.EnvNoError(s.t, env, err) @@ -694,7 +699,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { srcTableName := s.attachSchemaSuffix("test_invalid_geo_bq_avro_cdc") dstTableName := "test_invalid_geo_bq_avro_cdc" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, line GEOMETRY(LINESTRING) NOT NULL, @@ -720,7 +725,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,"polyPoly") VALUES ($1,$2) `, srcTableName), "010200000001000000000000000000F03F0000000000000040", "0103000020e6100000010000000c0000001a8361d35dc64140afdb8d2b1bc3c9bf1b8ed4685fc641405ba64c"+ @@ -732,7 +737,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { } s.t.Log("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,"polyPoly") VALUES ($1,$2) `, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040", "010300000001000000050000000000000000000000000000000000000000000000"+ @@ -778,7 +783,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { srcTable2Name := s.attachSchemaSuffix("test2_bq") dstTable2Name := "test2_bq" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s(id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) @@ -800,7 +805,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* inserting across multiple tables*/ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) @@ -834,7 +839,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { tableName := "test_simple_schema_changes" srcTableName := s.attachSchemaSuffix(tableName) - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 BIGINT @@ -858,7 +863,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { go func() { // insert first row. e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES (1)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") @@ -866,11 +871,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.EnvWaitForEqualTables(env, s, "normalize insert", tableName, "id,c1") // alter source table, add column c2 and insert another row. - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, added column c2") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES (2,2)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c2 in the source table") @@ -879,11 +884,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.EnvWaitForEqualTables(env, s, "normalize altered row", tableName, "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES (3,3)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table") @@ -892,11 +897,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.EnvWaitForEqualTables(env, s, "normalize altered row", tableName, "id,c1,c3") // alter source table, drop column c3 and insert another row. - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c3") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES (4)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row after dropping all columns in the source table") @@ -917,7 +922,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { tableName := "test_simple_cpkey" srcTableName := s.attachSchemaSuffix("test_simple_cpkey") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -946,7 +951,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) e2e.EnvNoError(s.t, env, err) @@ -956,10 +961,10 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { // verify we got our 10 rows e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1,c2,t") - _, err := s.conn.Exec(context.Background(), + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTables(env, s, "normalize update", tableName, "id,c1,c2,t") @@ -977,7 +982,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := "test_cpkey_toast1" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -1004,7 +1009,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { // and then insert, update and delete rows in the table. go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - rowsTx, err := s.conn.Begin(context.Background()) + rowsTx, err := s.Conn().Begin(context.Background()) e2e.EnvNoError(s.t, env, err) // insert 10 rows into the source table @@ -1039,7 +1044,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { tableName := "test_cpkey_toast2" srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -1070,7 +1075,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) e2e.EnvNoError(s.t, env, err) @@ -1078,10 +1083,10 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { s.t.Log("Inserted 10 rows into the source table") e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c2,t,t2") - _, err = s.conn.Exec(context.Background(), + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTables(env, s, "normalize update", tableName, "id,c2,t,t2") @@ -1097,7 +1102,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { srcTableName := s.attachSchemaSuffix("test_peerdb_cols") dstTableName := "test_peerdb_cols_dst" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, @@ -1122,13 +1127,13 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { // insert 1 row into the source table testKey := fmt.Sprintf("test_key_%d", 1) testValue := fmt.Sprintf("test_value_%d", 1) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) e2e.EnvNoError(s.t, env, err) // delete that row - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1 `, srcTableName)) e2e.EnvNoError(s.t, env, err) @@ -1152,7 +1157,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { srcTable2Name := s.attachSchemaSuffix("test2_bq") dstTable2Name := "test2_bq" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s(id serial primary key, c1 int, c2 text); CREATE TABLE %s(id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) @@ -1177,7 +1182,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* inserting across multiple tables*/ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) @@ -1212,7 +1217,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { srcName := "test_softdel_src" srcTableName := s.attachSchemaSuffix(srcName) - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, @@ -1248,15 +1253,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize insert", srcName, tableName, "id,c1,c2,t") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update", srcName, tableName, "id,c1,c2,t") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool { @@ -1292,7 +1297,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { srcTableName := fmt.Sprintf("%s_src", cmpTableName) dstTableName := "test_softdel_iud" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, @@ -1328,7 +1333,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - insertTx, err := s.conn.Begin(context.Background()) + insertTx, err := s.Conn().Begin(context.Background()) e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` @@ -1370,7 +1375,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { srcTableName := s.attachSchemaSuffix(srcName) dstName := "test_softdel_ud" - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, @@ -1406,12 +1411,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize insert", srcName, dstName, "id,c1,c2,t") - insertTx, err := s.conn.Begin(context.Background()) + insertTx, err := s.Conn().Begin(context.Background()) e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) @@ -1454,7 +1459,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { tableName := "test_softdel_iad" srcTableName := s.attachSchemaSuffix(tableName) - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, c1 INT, @@ -1490,11 +1495,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTables(env, s, "normalize insert", tableName, "id,c1,c2,t") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool { @@ -1508,7 +1513,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { } return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) }) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", tableName, "id,c1,c2,t") diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index d0048b948d..924700efc0 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -9,9 +9,9 @@ import ( ) func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.conn, s.bqSuffix, tableName) + err := e2e.CreateTableForQRep(s.Conn(), s.bqSuffix, tableName) require.NoError(s.t, err) - err = e2e.PopulateSourceTable(s.conn, s.bqSuffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.Conn(), s.bqSuffix, tableName, rowCount) require.NoError(s.t, err) } diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 73450d130d..8a7ff6235e 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -9,8 +9,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/e2eshared" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/generated/protos" ) @@ -120,34 +119,35 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error { } // SetupPostgres sets up the postgres connection. -func SetupPostgres(t *testing.T, suffix string) (*pgx.Conn, error) { +func SetupPostgres(t *testing.T, suffix string) (*connpostgres.PostgresConnector, error) { t.Helper() - conn, err := pgx.Connect(context.Background(), utils.GetPGConnectionString(GetTestPostgresConf())) + connector, err := connpostgres.NewPostgresConnector(context.Background(), GetTestPostgresConf()) if err != nil { return nil, fmt.Errorf("failed to create postgres connection: %w", err) } + conn := connector.Conn() err = cleanPostgres(conn, suffix) if err != nil { - conn.Close(context.Background()) + connector.Close(context.Background()) return nil, err } err = setupPostgresSchema(t, conn, suffix) if err != nil { - conn.Close(context.Background()) + connector.Close(context.Background()) return nil, err } - return conn, nil + return connector, nil } -func TearDownPostgres[T e2eshared.Suite](s T) { +func TearDownPostgres[T Suite](s T) { t := s.T() t.Helper() - conn := s.Conn() + conn := s.Connector().Conn() if conn != nil { suffix := s.Suffix() t.Log("begin tearing down postgres schema", suffix) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 642bc13a94..6d8f185437 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -36,7 +36,7 @@ func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, ro dstSchemaQualified, rowID) var isDeleted pgtype.Bool var syncedAt pgtype.Timestamp - err := s.conn.QueryRow(context.Background(), query).Scan(&isDeleted, &syncedAt) + err := s.Conn().QueryRow(context.Background(), query).Scan(&isDeleted, &syncedAt) if err != nil { return fmt.Errorf("failed to query row: %w", err) } @@ -63,7 +63,7 @@ func (s PeerFlowE2ETestSuitePG) WaitForSchema( s.t.Helper() e2e.EnvWaitFor(s.t, env, 3*time.Minute, reason, func() bool { s.t.Helper() - output, err := s.connector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ + output, err := s.conn.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) if err != nil { @@ -84,7 +84,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { srcTableName := s.attachSchemaSuffix("test_simple_flow") dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, @@ -112,7 +112,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') `, srcTableName), testKey, testValue) e2e.EnvNoError(s.t, env, err) @@ -135,7 +135,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { srcTableName := s.attachSchemaSuffix("test_geospatial_pg") dstTableName := s.attachSchemaSuffix("test_geospatial_pg_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, gg geography NOT NULL, @@ -157,7 +157,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 1 row into the source table - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(gg, gm) VALUES ('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))','LINESTRING(0 0, 1 1, 2 2)') `, srcTableName)) e2e.EnvNoError(s.t, env, err) @@ -179,7 +179,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { srcTableName := s.attachSchemaSuffix("test_types_pg") dstTableName := s.attachSchemaSuffix("test_types_pg_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c4 BOOLEAN, c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, c14 INET,c15 INTEGER,c21 MACADDR, @@ -202,7 +202,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1', true,'s','test','1.1.10.2'::cidr, CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, @@ -244,11 +244,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { dstTableName := s.attachSchemaSuffix("test_enum_flow_dst") createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" var pgErr *pgconn.PgError - _, enumErr := s.conn.Exec(context.Background(), createMoodEnum) + _, enumErr := s.Conn().Exec(context.Background(), createMoodEnum) if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(enumErr) { require.NoError(s.t, enumErr) } - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, my_mood mood, @@ -269,7 +269,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(my_mood, my_null_mood) VALUES ('happy',null) `, srcTableName)) e2e.EnvNoError(s.t, env, err) @@ -291,7 +291,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 BIGINT @@ -314,7 +314,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { go func() { // insert first row. e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") @@ -342,11 +342,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { }) // alter source table, add column c2 and insert another row. - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, added column c2") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c2 in the source table") @@ -379,11 +379,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { }) // alter source table, add column c3, drop column c2 and insert another row. - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table") @@ -421,11 +421,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { }) // alter source table, drop column c3 and insert another row. - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c3") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row after dropping all columns in the source table") @@ -475,7 +475,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -503,7 +503,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) e2e.EnvNoError(s.t, env, err) @@ -514,10 +514,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil }) - _, err := s.conn.Exec(context.Background(), + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize modifications", func() bool { return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil @@ -536,7 +536,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { randomString := s.attachSchemaSuffix("random_string") dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -565,7 +565,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { // and then insert, update and delete rows in the table. go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - rowsTx, err := s.conn.Begin(context.Background()) + rowsTx, err := s.Conn().Begin(context.Background()) e2e.EnvNoError(s.t, env, err) // insert 10 rows into the source table @@ -604,7 +604,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { randomString := s.attachSchemaSuffix("random_string") dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -637,7 +637,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000)) `, srcTableName, randomString), i, testValue) e2e.EnvNoError(s.t, env, err) @@ -647,10 +647,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize 10 rows", func() bool { return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") == nil }) - _, err = s.conn.Exec(context.Background(), + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize update", func() bool { @@ -670,7 +670,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { srcTableName := s.attachSchemaSuffix("test_peerdb_cols") dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, @@ -695,13 +695,13 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { // insert 1 row into the source table testKey := fmt.Sprintf("test_key_%d", 1) testValue := fmt.Sprintf("test_value_%d", 1) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) e2e.EnvNoError(s.t, env, err) // delete that row - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1 `, srcTableName)) e2e.EnvNoError(s.t, env, err) @@ -724,7 +724,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { srcTableName := fmt.Sprintf("%s_src", cmpTableName) dstTableName := s.attachSchemaSuffix("test_softdel_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, @@ -760,23 +760,23 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize row", func() bool { return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil }) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize update", func() bool { return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil }) // since we delete stuff, create another table to compare with - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) e2e.EnvNoError(s.t, env, err) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) @@ -810,7 +810,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { srcTableName := fmt.Sprintf("%s_src", cmpTableName) dstTableName := s.attachSchemaSuffix("test_softdel_iud_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, @@ -846,7 +846,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - insertTx, err := s.conn.Begin(context.Background()) + insertTx, err := s.Conn().Begin(context.Background()) e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` @@ -889,7 +889,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { srcTableName := fmt.Sprintf("%s_src", cmpTableName) dstTableName := s.attachSchemaSuffix("test_softdel_ud_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, @@ -925,14 +925,14 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize row", func() bool { return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil }) - insertTx, err := s.conn.Begin(context.Background()) + insertTx, err := s.Conn().Begin(context.Background()) e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) @@ -973,7 +973,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { srcTableName := s.attachSchemaSuffix("test_softdel_iad") dstTableName := s.attachSchemaSuffix("test_softdel_iad_dst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, c1 INT, @@ -1009,19 +1009,19 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize row", func() bool { return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil }) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool { return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil }) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize reinsert", func() bool { @@ -1050,7 +1050,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() { stmtDstTableName := fmt.Sprintf(`e2e_test_%s."%s"`, s.suffix, "testMixedCaseDst") dstTableName := s.attachSchemaSuffix("testMixedCaseDst") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( "pulseArmor" SERIAL PRIMARY KEY, "highGold" TEXT NOT NULL, @@ -1086,7 +1086,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() { for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s ("highGold","eVe") VALUES ($1, $2) `, stmtSrcTableName), testKey, testValue) e2e.EnvNoError(s.t, env, err) @@ -1122,7 +1122,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { isPaused := false sentUpdate := false - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, t TEXT DEFAULT md5(random()::text)); @@ -1157,10 +1157,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { addRows := func(numRows int) { for i := 0; i < numRows; i++ { - _, err = s.conn.Exec(context.Background(), + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name)) e2e.EnvNoError(s.t, env, err) - _, err = s.conn.Exec(context.Background(), + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name)) e2e.EnvNoError(s.t, env, err) } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 1723afca5d..0f5cf9c33a 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -22,10 +22,9 @@ import ( type PeerFlowE2ETestSuitePG struct { t *testing.T - conn *pgx.Conn - peer *protos.Peer - connector *connpostgres.PostgresConnector - suffix string + conn *connpostgres.PostgresConnector + peer *protos.Peer + suffix string } func (s PeerFlowE2ETestSuitePG) T() *testing.T { @@ -33,6 +32,10 @@ func (s PeerFlowE2ETestSuitePG) T() *testing.T { } func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn { + return s.conn.Conn() +} + +func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector { return s.conn } @@ -58,33 +61,20 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { suffix := "pg_" + strings.ToLower(shared.RandomString(8)) conn, err := e2e.SetupPostgres(t, suffix) - if err != nil { - require.Fail(t, "failed to setup postgres", err) - } - - connector, err := connpostgres.NewPostgresConnector(context.Background(), - &protos.PostgresConfig{ - Host: "localhost", - Port: 7132, - User: "postgres", - Password: "postgres", - Database: "postgres", - }) - require.NoError(t, err) + require.NoError(t, err, "failed to setup postgres") return PeerFlowE2ETestSuitePG{ - t: t, - conn: conn, - peer: generatePGPeer(e2e.GetTestPostgresConf()), - connector: connector, - suffix: suffix, + t: t, + conn: conn, + peer: generatePGPeer(e2e.GetTestPostgresConf()), + suffix: suffix, } } func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.conn, s.suffix, tableName) + err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, tableName) require.NoError(s.t, err) - err = e2e.PopulateSourceTable(s.conn, s.suffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.conn.Conn(), s.suffix, tableName, rowCount) require.NoError(s.t, err) } @@ -108,7 +98,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie "SELECT 1 FROM %s dst "+ "WHERE src.my_mood::text = dst.my_mood::text)) LIMIT 1;", srcSchemaQualified, dstSchemaQualified) - err := s.conn.QueryRow(context.Background(), query).Scan(&exists) + err := s.conn.Conn().QueryRow(context.Background(), query).Scan(&exists) if err != nil { return err } @@ -122,7 +112,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) - rows, err := s.conn.Query(context.Background(), query, pgx.QueryExecModeExec) + rows, err := s.conn.Conn().Query(context.Background(), query, pgx.QueryExecModeExec) if err != nil { return err } @@ -156,7 +146,7 @@ func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualif func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) - rows, _ := s.conn.Query(context.Background(), query) + rows, _ := s.conn.Conn().Query(context.Background(), query) defer rows.Close() for rows.Next() { @@ -176,12 +166,12 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { func (s PeerFlowE2ETestSuitePG) RunInt64Query(query string) (int64, error) { var count pgtype.Int8 - err := s.conn.QueryRow(context.Background(), query).Scan(&count) + err := s.conn.Conn().QueryRow(context.Background(), query).Scan(&count) return count.Int64, err } func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { - setupTx, err := s.conn.Begin(context.Background()) + setupTx, err := s.conn.Conn().Begin(context.Background()) require.NoError(s.t, err) // setup 3 tables in pgpeer_repl_test schema // test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5 @@ -207,7 +197,7 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { setupError := make(chan error) go func() { - setupError <- s.connector.SetupReplication(context.Background(), signal, setupReplicationInput) + setupError <- s.conn.SetupReplication(context.Background(), signal, setupReplicationInput) }() s.t.Log("waiting for slot creation to complete: ", flowJobName) @@ -230,7 +220,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { dstTable := "test_qrep_flow_avro_pg_2" - err := e2e.CreateTableForQRep(s.conn, s.suffix, dstTable) + err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, dstTable) require.NoError(s.t, err) srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 7295273597..3fd89ac74f 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -25,7 +25,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") flowJobName := s.attachSuffix("test_simple_flow_s3") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.conn.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, @@ -49,7 +49,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.conn.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) e2e.EnvNoError(s.t, env, err) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index a27d52171f..54a66f0ed0 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -7,10 +7,10 @@ import ( "testing" "time" - "github.com/jackc/pgx/v5" "github.com/joho/godotenv" "github.com/stretchr/testify/require" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/shared" @@ -19,7 +19,7 @@ import ( type PeerFlowE2ETestSuiteS3 struct { t *testing.T - conn *pgx.Conn + conn *connpostgres.PostgresConnector s3Helper *S3TestHelper suffix string } @@ -28,7 +28,7 @@ func (s PeerFlowE2ETestSuiteS3) T() *testing.T { return s.t } -func (s PeerFlowE2ETestSuiteS3) Conn() *pgx.Conn { +func (s PeerFlowE2ETestSuiteS3) Connector() *connpostgres.PostgresConnector { return s.conn } @@ -54,9 +54,9 @@ func TestPeerFlowE2ETestSuiteGCS(t *testing.T) { } func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.conn, s.suffix, tableName) + err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, tableName) require.NoError(s.t, err) - err = e2e.PopulateSourceTable(s.conn, s.suffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.conn.Conn(), s.suffix, tableName, rowCount) require.NoError(s.t, err) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index f669492d45..716eee667c 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -14,6 +14,7 @@ import ( "github.com/joho/godotenv" "github.com/stretchr/testify/require" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" @@ -29,7 +30,7 @@ type PeerFlowE2ETestSuiteSF struct { t *testing.T pgSuffix string - conn *pgx.Conn + conn *connpostgres.PostgresConnector sfHelper *SnowflakeTestHelper connector *connsnowflake.SnowflakeConnector } @@ -38,10 +39,14 @@ func (s PeerFlowE2ETestSuiteSF) T() *testing.T { return s.t } -func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { +func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector { return s.conn } +func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { + return s.Connector().Conn() +} + func (s PeerFlowE2ETestSuiteSF) Suffix() string { return s.pgSuffix } @@ -128,7 +133,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { srcTableName := s.attachSchemaSuffix(tableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, @@ -155,7 +160,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) e2e.EnvNoError(s.t, env, err) @@ -187,7 +192,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_replica_identity_no_pkey") // Create a table without a primary key and create a named unique index - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL, key TEXT NOT NULL, @@ -216,7 +221,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (id, key, value) VALUES ($1, $2, $3) `, srcTableName), i, testKey, testValue) e2e.EnvNoError(s.t, env, err) @@ -241,7 +246,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { srcTableName := s.attachSchemaSuffix(tableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_invalid_geo_sf_avro_cdc") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, line GEOMETRY(LINESTRING) NOT NULL, @@ -266,7 +271,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,poly) VALUES ($1,$2) `, srcTableName), "010200000001000000000000000000F03F0000000000000040", "0103000020e6100000010000000c0000001a8361d35dc64140afdb8d2b1bc3c9bf1b8ed4685fc641405ba64c"+ @@ -278,7 +283,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } s.t.Log("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,poly) VALUES ($1,$2) `, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040", "010300000001000000050000000000000000000000000000000000000000000000"+ @@ -321,7 +326,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { srcTableName := s.attachSchemaSuffix("test_toast_sf_1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_1") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, @@ -351,7 +356,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { 2. changes no toast column 2. changes 1 toast column */ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s (t1,t2,k) SELECT random_string(9000),random_string(9000), 1 FROM generate_series(1,2); @@ -375,7 +380,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { srcTableName := s.attachSchemaSuffix("test_toast_sf_3") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_3") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, @@ -400,7 +405,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s (t1,t2,k) SELECT random_string(9000),random_string(9000), 1 FROM generate_series(1,2); @@ -435,7 +440,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { srcTableName := s.attachSchemaSuffix("test_toast_sf_4") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_4") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, @@ -459,7 +464,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s (t1,k) SELECT random_string(9000), 1 FROM generate_series(1,1); @@ -488,7 +493,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { srcTableName := s.attachSchemaSuffix("test_toast_sf_5") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_5") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, @@ -516,7 +521,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { transaction updating a single row multiple times with changed/unchanged toast columns */ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s (t1,t2,k) SELECT random_string(9000),random_string(9000), 1 FROM generate_series(1,1); @@ -543,11 +548,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf") createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" var pgErr *pgconn.PgError - _, enumErr := s.conn.Exec(context.Background(), createMoodEnum) + _, enumErr := s.Conn().Exec(context.Background(), createMoodEnum) if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(pgErr) { require.NoError(s.t, enumErr) } - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, @@ -574,7 +579,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* test inserting various types*/ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', true,random_bytea(32),'s','test','1.1.10.2'::cidr, CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, @@ -640,7 +645,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { dstTable1Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test1_sf") dstTable2Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test2_sf") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) @@ -661,7 +666,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* inserting across multiple tables*/ - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) @@ -693,7 +698,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_schema_changes") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 BIGINT @@ -715,7 +720,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // and then insert and mutate schema repeatedly. go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") @@ -754,11 +759,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) // alter source table, add column c2 and insert another row. - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, added column c2") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c2 in the source table") @@ -798,11 +803,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table") @@ -847,11 +852,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c3") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row after dropping all columns in the source table") @@ -908,7 +913,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_cpkey") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -936,7 +941,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) e2e.EnvNoError(s.t, env, err) @@ -945,10 +950,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { e2e.EnvWaitForEqualTables(env, s, "normalize table", "test_simple_cpkey", "id,c1,c2,t") - _, err := s.conn.Exec(context.Background(), + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", "test_simple_cpkey", "id,c1,c2,t") @@ -965,7 +970,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast1") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -991,7 +996,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { // and then insert, update and delete rows in the table. go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - rowsTx, err := s.conn.Begin(context.Background()) + rowsTx, err := s.Conn().Begin(context.Background()) e2e.EnvNoError(s.t, env, err) // insert 10 rows into the source table @@ -1028,7 +1033,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { srcTableName := s.attachSchemaSuffix(tableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -1058,7 +1063,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) e2e.EnvNoError(s.t, env, err) @@ -1066,10 +1071,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { s.t.Log("Inserted 10 rows into the source table") e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c2,t,t2") - _, err = s.conn.Exec(context.Background(), + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,c2,t,t2") @@ -1087,7 +1092,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { srcTableName := s.attachSchemaSuffix(tableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT GENERATED ALWAYS AS IDENTITY, c1 INT GENERATED BY DEFAULT AS IDENTITY, @@ -1127,7 +1132,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100)) `, srcTableName), i, testValue) e2e.EnvNoError(s.t, env, err) @@ -1135,10 +1140,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { s.t.Log("Inserted 10 rows into the source table") e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1,t,t2") - _, err = s.conn.Exec(context.Background(), + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=0`, srcTableName)) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=0`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,c1,t,t2") @@ -1165,7 +1170,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { srcTableName := s.attachSchemaSuffix(tableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, dstName) - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, @@ -1201,15 +1206,15 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize row", tableName, dstName, "id,c1,c2,t") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update", tableName, dstName, "id,c1,c2,t") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTablesWithNames( @@ -1241,7 +1246,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { srcTableName := fmt.Sprintf("%s_src", cmpTableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iud") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, @@ -1277,7 +1282,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - insertTx, err := s.conn.Begin(context.Background()) + insertTx, err := s.Conn().Begin(context.Background()) e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` @@ -1320,7 +1325,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { srcTableName := s.attachSchemaSuffix(tableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, dstName) - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, @@ -1356,12 +1361,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize insert", tableName, dstName, "id,c1,c2,t") - insertTx, err := s.conn.Begin(context.Background()) + insertTx, err := s.Conn().Begin(context.Background()) e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) @@ -1404,7 +1409,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { srcTableName := s.attachSchemaSuffix(tableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, c1 INT, @@ -1440,11 +1445,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTables(env, s, "normalize row", tableName, "id,c1,c2,t") - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTablesWithNames( @@ -1456,7 +1461,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { "id,c1,c2,t", ) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", tableName, "id,c1,c2,t") @@ -1480,7 +1485,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { srcTableName := s.attachSchemaSuffix("testMixedCase") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "testMixedCase") - _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS e2e_test_%s."%s" ( "pulseArmor" SERIAL PRIMARY KEY, "highGold" TEXT NOT NULL, @@ -1508,7 +1513,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO e2e_test_%s."%s"("highGold","eVe") VALUES ($1, $2) `, s.pgSuffix, "testMixedCase"), testKey, testValue) e2e.EnvNoError(s.t, env, err) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 9541d61c86..a662513e6a 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -12,9 +12,9 @@ import ( //nolint:unparam func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) { - err := e2e.CreateTableForQRep(s.conn, s.pgSuffix, tableName) + err := e2e.CreateTableForQRep(s.Conn(), s.pgSuffix, tableName) require.NoError(s.t, err) - err = e2e.PopulateSourceTable(s.conn, s.pgSuffix, tableName, numRows) + err = e2e.PopulateSourceTable(s.Conn(), s.pgSuffix, tableName, numRows) require.NoError(s.t, err) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index e94641ab92..87d4285085 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -14,6 +14,7 @@ import ( "github.com/joho/godotenv" "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -25,7 +26,7 @@ import ( type PeerFlowE2ETestSuiteSQLServer struct { t *testing.T - conn *pgx.Conn + conn *connpostgres.PostgresConnector sqlsHelper *SQLServerHelper suffix string } @@ -35,6 +36,10 @@ func (s PeerFlowE2ETestSuiteSQLServer) T() *testing.T { } func (s PeerFlowE2ETestSuiteSQLServer) Conn() *pgx.Conn { + return s.conn.Conn() +} + +func (s PeerFlowE2ETestSuiteSQLServer) Connector() *connpostgres.PostgresConnector { return s.conn } @@ -115,10 +120,10 @@ func (s PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName st func (s PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string) { ctx := context.Background() - _, err := s.conn.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS e2e_test_%s.%s", s.suffix, tableName)) + _, err := s.Conn().Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS e2e_test_%s.%s", s.suffix, tableName)) require.NoError(s.t, err) - _, err = s.conn.Exec(ctx, + _, err = s.Conn().Exec(ctx, fmt.Sprintf("CREATE TABLE e2e_test_%s.%s (id TEXT, card_id TEXT, v_from TIMESTAMP, price NUMERIC, status INT)", s.suffix, tableName)) require.NoError(s.t, err) @@ -183,7 +188,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( // Verify that the destination table has the same number of rows as the source table var numRowsInDest pgtype.Int8 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s", dstTableName) - err = s.conn.QueryRow(context.Background(), countQuery).Scan(&numRowsInDest) + err = s.Conn().QueryRow(context.Background(), countQuery).Scan(&numRowsInDest) require.NoError(s.t, err) require.Equal(s.t, numRows, int(numRowsInDest.Int64)) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 74a7ddd0f8..ccd639cd94 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -35,6 +35,17 @@ import ( peerflow "github.com/PeerDB-io/peer-flow/workflows" ) +type Suite interface { + T() *testing.T + Connector() *connpostgres.PostgresConnector + Suffix() string +} + +type RowSource interface { + Suite + GetRows(table, cols string) (*model.QRecordBatch, error) +} + func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnvironment) { t.Helper() @@ -93,21 +104,21 @@ func EnvTrue(t *testing.T, env *testsuite.TestWorkflowEnvironment, val bool) { } } -func GetPgRows(conn *pgx.Conn, suffix string, table string, cols string) (*model.QRecordBatch, error) { - pgQueryExecutor := connpostgres.NewQRepQueryExecutor(conn, context.Background(), "testflow", "testpart") +func GetPgRows(conn *connpostgres.PostgresConnector, suffix string, table string, cols string) (*model.QRecordBatch, error) { + pgQueryExecutor := conn.NewQRepQueryExecutor("testflow", "testpart") pgQueryExecutor.SetTestEnv(true) return pgQueryExecutor.ExecuteAndProcessQuery( context.Background(), - fmt.Sprintf(`SELECT %s FROM e2e_test_%s."%s" ORDER BY id`, cols, suffix, table), + fmt.Sprintf(`SELECT %s FROM e2e_test_%s.%s ORDER BY id`, cols, suffix, connpostgres.QuoteIdentifier(table)), ) } -func RequireEqualTables(suite e2eshared.RowSource, table string, cols string) { +func RequireEqualTables(suite RowSource, table string, cols string) { t := suite.T() t.Helper() - pgRows, err := GetPgRows(suite.Conn(), suite.Suffix(), table, cols) + pgRows, err := GetPgRows(suite.Connector(), suite.Suffix(), table, cols) require.NoError(t, err) rows, err := suite.GetRows(table, cols) @@ -116,11 +127,11 @@ func RequireEqualTables(suite e2eshared.RowSource, table string, cols string) { require.True(t, e2eshared.CheckEqualRecordBatches(t, pgRows, rows)) } -func EnvEqualTables(env *testsuite.TestWorkflowEnvironment, suite e2eshared.RowSource, table string, cols string) { +func EnvEqualTables(env *testsuite.TestWorkflowEnvironment, suite RowSource, table string, cols string) { t := suite.T() t.Helper() - pgRows, err := GetPgRows(suite.Conn(), suite.Suffix(), table, cols) + pgRows, err := GetPgRows(suite.Connector(), suite.Suffix(), table, cols) EnvNoError(t, env, err) rows, err := suite.GetRows(table, cols) @@ -131,7 +142,7 @@ func EnvEqualTables(env *testsuite.TestWorkflowEnvironment, suite e2eshared.RowS func EnvWaitForEqualTables( env *testsuite.TestWorkflowEnvironment, - suite e2eshared.RowSource, + suite RowSource, reason string, table string, cols string, @@ -142,7 +153,7 @@ func EnvWaitForEqualTables( func EnvWaitForEqualTablesWithNames( env *testsuite.TestWorkflowEnvironment, - suite e2eshared.RowSource, + suite RowSource, reason string, srcTable string, dstTable string, @@ -154,7 +165,7 @@ func EnvWaitForEqualTablesWithNames( EnvWaitFor(t, env, 3*time.Minute, reason, func() bool { t.Helper() - pgRows, err := GetPgRows(suite.Conn(), suite.Suffix(), srcTable, cols) + pgRows, err := GetPgRows(suite.Connector(), suite.Suffix(), srcTable, cols) if err != nil { return false } diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index ce5c25d4fe..6d5646d42c 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -8,23 +8,10 @@ import ( "strings" "testing" - "github.com/jackc/pgx/v5" - "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" ) -type Suite interface { - T() *testing.T - Conn() *pgx.Conn - Suffix() string -} - -type RowSource interface { - Suite - GetRows(table, cols string) (*model.QRecordBatch, error) -} - func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) { t.Helper() t.Parallel() From 12c15bee5aa3158453c8bea040dabc7f3d1bd1e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 16:52:37 +0000 Subject: [PATCH 02/12] implement specialized geometry/hstore comparison since using connector we have proper parsing of pg types now in tests --- flow/e2e/postgres/qrep_flow_pg_test.go | 16 ++++----- flow/geo/geo.go | 15 --------- flow/model/qvalue/qvalue.go | 46 +++++++++++++++++--------- 3 files changed, 39 insertions(+), 38 deletions(-) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 0f5cf9c33a..65a043df78 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -72,9 +72,9 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { } func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, tableName) + err := e2e.CreateTableForQRep(s.Conn(), s.suffix, tableName) require.NoError(s.t, err) - err = e2e.PopulateSourceTable(s.conn.Conn(), s.suffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.Conn(), s.suffix, tableName, rowCount) require.NoError(s.t, err) } @@ -98,7 +98,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie "SELECT 1 FROM %s dst "+ "WHERE src.my_mood::text = dst.my_mood::text)) LIMIT 1;", srcSchemaQualified, dstSchemaQualified) - err := s.conn.Conn().QueryRow(context.Background(), query).Scan(&exists) + err := s.Conn().QueryRow(context.Background(), query).Scan(&exists) if err != nil { return err } @@ -112,7 +112,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) - rows, err := s.conn.Conn().Query(context.Background(), query, pgx.QueryExecModeExec) + rows, err := s.Conn().Query(context.Background(), query, pgx.QueryExecModeExec) if err != nil { return err } @@ -146,7 +146,7 @@ func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualif func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) - rows, _ := s.conn.Conn().Query(context.Background(), query) + rows, _ := s.Conn().Query(context.Background(), query) defer rows.Close() for rows.Next() { @@ -166,12 +166,12 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { func (s PeerFlowE2ETestSuitePG) RunInt64Query(query string) (int64, error) { var count pgtype.Int8 - err := s.conn.Conn().QueryRow(context.Background(), query).Scan(&count) + err := s.Conn().QueryRow(context.Background(), query).Scan(&count) return count.Int64, err } func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { - setupTx, err := s.conn.Conn().Begin(context.Background()) + setupTx, err := s.Conn().Begin(context.Background()) require.NoError(s.t, err) // setup 3 tables in pgpeer_repl_test schema // test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5 @@ -220,7 +220,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { dstTable := "test_qrep_flow_avro_pg_2" - err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, dstTable) + err := e2e.CreateTableForQRep(s.Conn(), s.suffix, dstTable) require.NoError(s.t, err) srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) diff --git a/flow/geo/geo.go b/flow/geo/geo.go index 882ed97dba..9640173973 100644 --- a/flow/geo/geo.go +++ b/flow/geo/geo.go @@ -43,18 +43,3 @@ func GeoToWKB(wkt string) ([]byte, error) { return geometryObject.ToWKB(), nil } - -// compares WKTs -func GeoCompare(wkt1, wkt2 string) bool { - geom1, geoErr := geom.NewGeomFromWKT(wkt1) - if geoErr != nil { - return false - } - - geom2, geoErr := geom.NewGeomFromWKT(wkt2) - if geoErr != nil { - return false - } - - return geom1.Equals(geom2) -} diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index b0b556b3d7..761b9e33b0 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -2,6 +2,7 @@ package qvalue import ( "bytes" + "encoding/json" "fmt" "math" "math/big" @@ -11,9 +12,8 @@ import ( "cloud.google.com/go/civil" "github.com/google/uuid" - - "github.com/PeerDB-io/peer-flow/geo" - hstore_util "github.com/PeerDB-io/peer-flow/hstore" + "github.com/jackc/pgx/v5/pgtype" + geom "github.com/twpayne/go-geos" ) // if new types are added, register them in gob - cdc_records_storage.go @@ -23,6 +23,12 @@ type QValue struct { } func (q QValue) Equals(other QValue) bool { + if q.Value == nil && other.Value == nil { + return true + } else if (q.Value == nil) != (other.Value == nil) { + return false + } + switch q.Kind { case QValueKindEmpty: return other.Kind == QValueKindEmpty @@ -66,6 +72,10 @@ func (q QValue) Equals(other QValue) bool { return compareJSON(q.Value, other.Value) case QValueKindBit: return compareBit(q.Value, other.Value) + case QValueKindGeometry: + return compareGeometry(q.Value, other.Value) + case QValueKindHStore: + return compareHstore(q.Value, other.Value) case QValueKindArrayFloat32: return compareNumericArrays(q.Value, other.Value) case QValueKindArrayFloat64: @@ -82,9 +92,9 @@ func (q QValue) Equals(other QValue) bool { return compareBoolArrays(q.Value, other.Value) case QValueKindArrayString: return compareArrayString(q.Value, other.Value) + default: + return false } - - return false } func (q QValue) GoTimeConvert() (string, error) { @@ -250,20 +260,26 @@ func compareString(value1, value2 interface{}) bool { if !ok1 || !ok2 { return false } - if str1 == str2 { - return true - } + return str1 == str2 +} - // Catch matching HStore - parsedHstore1, err := hstore_util.ParseHstore(str1) - if err == nil && parsedHstore1 == str2 { - return true +func compareHstore(value1, value2 interface{}) bool { + bytes, err := json.Marshal(value1.(pgtype.Hstore)) + if err != nil { + panic(err) } + return string(bytes) == value2.(string) +} + +func compareGeometry(value1, value2 interface{}) bool { + geo1 := value1.(*geom.Geom) - // Catch matching WKB(in Postgres)-WKT(in destination) geo values - geoConvertedWKT, err := geo.GeoValidate(str1) + geo2, err := geom.NewGeomFromWKT(value2.(string)) + if err != nil { + panic(err) + } - return err == nil && geo.GeoCompare(geoConvertedWKT, str2) + return geo1.Equals(geo2) } func compareStruct(value1, value2 interface{}) bool { From 2f86e4cf80e3c2f4674dad96741a312bc9ee5843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 17:18:09 +0000 Subject: [PATCH 03/12] support hstore polymorphism? --- flow/model/qvalue/qvalue.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 761b9e33b0..c36ac3a2f0 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -11,6 +11,7 @@ import ( "time" "cloud.google.com/go/civil" + hstore_util "github.com/PeerDB-io/peer-flow/hstore" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" geom "github.com/twpayne/go-geos" @@ -264,11 +265,17 @@ func compareString(value1, value2 interface{}) bool { } func compareHstore(value1, value2 interface{}) bool { + var parsedHStore1 string bytes, err := json.Marshal(value1.(pgtype.Hstore)) if err != nil { - panic(err) + parsedHStore1 = string(bytes) + } else { + parsedHStore1, err = hstore_util.ParseHstore(value1.(string)) + if err != nil { + panic(err) + } } - return string(bytes) == value2.(string) + return parsedHStore1 == value2.(string) } func compareGeometry(value1, value2 interface{}) bool { From a4aa925151e6bd60ecb6ea22888223b9adf134f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 17:30:39 +0000 Subject: [PATCH 04/12] bring back JSON always being equal --- flow/e2e/test_utils.go | 2 +- flow/model/qvalue/qvalue.go | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index ccd639cd94..efcee75636 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -517,7 +517,7 @@ func GetOwnersSelectorStringsSF() [2]string { sfFields := make([]string, 0, len(schema.Fields)) for _, field := range schema.Fields { pgFields = append(pgFields, fmt.Sprintf(`"%s"`, field.Name)) - if strings.Contains(field.Name, "geo") { + if strings.HasPrefix(field.Name, "geo") { colName := connsnowflake.SnowflakeIdentifierNormalize(field.Name) // Have to apply a WKT transformation here, diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index c36ac3a2f0..a1b06fd154 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -11,10 +11,11 @@ import ( "time" "cloud.google.com/go/civil" - hstore_util "github.com/PeerDB-io/peer-flow/hstore" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" geom "github.com/twpayne/go-geos" + + hstore_util "github.com/PeerDB-io/peer-flow/hstore" ) // if new types are added, register them in gob - cdc_records_storage.go @@ -24,7 +25,9 @@ type QValue struct { } func (q QValue) Equals(other QValue) bool { - if q.Value == nil && other.Value == nil { + if q.Kind == QValueKindJSON { + return true // TODO fix + } else if q.Value == nil && other.Value == nil { return true } else if (q.Value == nil) != (other.Value == nil) { return false From 00c3d4756f113e9785cad3f15f1d1f45fedc492d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 17:40:53 +0000 Subject: [PATCH 05/12] fix hstore code --- flow/model/qvalue/qvalue.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index a1b06fd154..17819361ec 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -268,17 +268,23 @@ func compareString(value1, value2 interface{}) bool { } func compareHstore(value1, value2 interface{}) bool { - var parsedHStore1 string - bytes, err := json.Marshal(value1.(pgtype.Hstore)) - if err != nil { - parsedHStore1 = string(bytes) - } else { - parsedHStore1, err = hstore_util.ParseHstore(value1.(string)) + str2 := value2.(string) + switch v1 := value1.(type) { + case pgtype.Hstore: + bytes, err := json.Marshal(v1) if err != nil { panic(err) } + return string(bytes) == str2 + case string: + parsedHStore1, err := hstore_util.ParseHstore(v1) + if err != nil { + panic(err) + } + return parsedHStore1 == str2 + default: + panic(fmt.Sprintf("invalid hstore value type %T: %v", value1, value1)) } - return parsedHStore1 == value2.(string) } func compareGeometry(value1, value2 interface{}) bool { From b85fa9c4d610d2760daf7decd6c01b63aaeeedb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 17:47:57 +0000 Subject: [PATCH 06/12] Also backpedal a bit on geometry change --- flow/model/qvalue/qvalue.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 17819361ec..11150a4daa 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -288,14 +288,23 @@ func compareHstore(value1, value2 interface{}) bool { } func compareGeometry(value1, value2 interface{}) bool { - geo1 := value1.(*geom.Geom) - geo2, err := geom.NewGeomFromWKT(value2.(string)) if err != nil { panic(err) } - return geo1.Equals(geo2) + switch v1 := value1.(type) { + case *geom.Geom: + return v1.Equals(geo2) + case string: + geo1, err := geom.NewGeomFromWKT(v1) + if err != nil { + panic(err) + } + return geo1.Equals(geo2) + default: + panic(fmt.Sprintf("invalid geometry value type %T: %v", value1, value1)) + } } func compareStruct(value1, value2 interface{}) bool { From 651ab44f62cccfc64943477d03045a5a554618b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 17:57:44 +0000 Subject: [PATCH 07/12] accept marginal error between raw geom & WKT geom --- flow/model/qvalue/qvalue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 11150a4daa..c39e819ec9 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -295,13 +295,13 @@ func compareGeometry(value1, value2 interface{}) bool { switch v1 := value1.(type) { case *geom.Geom: - return v1.Equals(geo2) + return v1.EqualsExact(geo2, 0.0001) case string: geo1, err := geom.NewGeomFromWKT(v1) if err != nil { panic(err) } - return geo1.Equals(geo2) + return geo1.EqualsExact(geo2, 0.0001) default: panic(fmt.Sprintf("invalid geometry value type %T: %v", value1, value1)) } From bcb36360f6bcd255e2e0fcc95918637616faee62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 18:23:48 +0000 Subject: [PATCH 08/12] geography is geometry --- flow/model/qvalue/qvalue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index c39e819ec9..fba56a9f07 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -76,7 +76,7 @@ func (q QValue) Equals(other QValue) bool { return compareJSON(q.Value, other.Value) case QValueKindBit: return compareBit(q.Value, other.Value) - case QValueKindGeometry: + case QValueKindGeometry, QValueKindGeography: return compareGeometry(q.Value, other.Value) case QValueKindHStore: return compareHstore(q.Value, other.Value) From 0b1c49ff66f99bd406bed1b1f21c0ae1d0b2ae7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 18:42:30 +0000 Subject: [PATCH 09/12] getBytes: return nil instead of []byte{} when nil, consider []byte{} & nil equal --- flow/model/qvalue/qvalue.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index fba56a9f07..1b3596c3a8 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -232,7 +232,7 @@ func compareBytes(value1, value2 interface{}) bool { bytes1, ok1 := getBytes(value1) bytes2, ok2 := getBytes(value2) - return ok1 && ok2 && bytes.Equal(bytes1, bytes2) + return ok1 && ok2 && (len(bytes1) == len(bytes2) || bytes.Equal(bytes1, bytes2)) } func compareNumeric(value1, value2 interface{}) bool { @@ -587,8 +587,7 @@ func getBytes(v interface{}) ([]byte, bool) { case string: return []byte(value), true case nil: - // return empty byte array - return []byte{}, true + return nil, true default: return nil, false } From 1ca92d0458e39b953c2301bdb801462731d090c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 18:57:12 +0000 Subject: [PATCH 10/12] go back to geo equals now that geography mixup figured out --- flow/model/qvalue/qvalue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 1b3596c3a8..199b53251d 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -295,13 +295,13 @@ func compareGeometry(value1, value2 interface{}) bool { switch v1 := value1.(type) { case *geom.Geom: - return v1.EqualsExact(geo2, 0.0001) + return v1.Equals(geo2) case string: geo1, err := geom.NewGeomFromWKT(v1) if err != nil { panic(err) } - return geo1.EqualsExact(geo2, 0.0001) + return geo1.Equals(geo2) default: panic(fmt.Sprintf("invalid geometry value type %T: %v", value1, value1)) } From d1b9b634b5f5e68841e109a07123e20cc8b93306 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 19:12:49 +0000 Subject: [PATCH 11/12] fix bytes comparison nil should equal [] --- flow/e2eshared/e2eshared.go | 2 +- flow/model/qvalue/qvalue.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 6d5646d42c..721302551e 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -67,7 +67,7 @@ func CheckQRecordEquality(t *testing.T, q []qvalue.QValue, other []qvalue.QValue for i, entry := range q { otherEntry := other[i] if !entry.Equals(otherEntry) { - t.Logf("entry %d: %v != %v", i, entry, otherEntry) + t.Logf("entry %d: %T %v != %T %v", i, entry.Value, entry, otherEntry.Value, otherEntry) return false } } diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 199b53251d..93b9ac8b7f 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -29,7 +29,8 @@ func (q QValue) Equals(other QValue) bool { return true // TODO fix } else if q.Value == nil && other.Value == nil { return true - } else if (q.Value == nil) != (other.Value == nil) { + } else if !q.Kind.IsArray() && q.Kind != QValueKindBytes && + (q.Value == nil) != (other.Value == nil) { return false } @@ -232,7 +233,7 @@ func compareBytes(value1, value2 interface{}) bool { bytes1, ok1 := getBytes(value1) bytes2, ok2 := getBytes(value2) - return ok1 && ok2 && (len(bytes1) == len(bytes2) || bytes.Equal(bytes1, bytes2)) + return ok1 && ok2 && bytes.Equal(bytes1, bytes2) } func compareNumeric(value1, value2 interface{}) bool { @@ -340,7 +341,7 @@ func compareBit(value1, value2 interface{}) bool { return false } - return bit1^bit2 == 0 + return bit1 == bit2 } func compareNumericArrays(value1, value2 interface{}) bool { From addacb26eed72eba2f0dab646295c25a78ceee41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 19:40:36 +0000 Subject: [PATCH 12/12] once again I'm dealing with go's nil != nil semantics --- flow/model/qvalue/qvalue.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 93b9ac8b7f..1ad07150b6 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -29,9 +29,6 @@ func (q QValue) Equals(other QValue) bool { return true // TODO fix } else if q.Value == nil && other.Value == nil { return true - } else if !q.Kind.IsArray() && q.Kind != QValueKindBytes && - (q.Value == nil) != (other.Value == nil) { - return false } switch q.Kind { @@ -237,10 +234,6 @@ func compareBytes(value1, value2 interface{}) bool { } func compareNumeric(value1, value2 interface{}) bool { - if value1 == nil && value2 == nil { - return true - } - rat1, ok1 := getRat(value1) rat2, ok2 := getRat(value2)