Skip to content

Commit

Permalink
adds more types for sf,bq tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 9, 2023
1 parent 319cde6 commit a6a4c17
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ jobs:
- name: create hstore extension and increase logical replication limits
run: |
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION citext;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION ltree;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;"
Expand Down
26 changes: 20 additions & 6 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e_bigquery
import (
"context"
"fmt"
"strings"
"testing"

"github.com/PeerDB-io/peer-flow/e2e"
Expand Down Expand Up @@ -630,13 +631,20 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
srcTableName := s.attachSchemaSuffix("test_types_bq")
dstTableName := "test_types_bq"

createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
_, enumErr := s.pool.Exec(context.Background(), createMoodEnum)
if !strings.Contains(enumErr.Error(), "already exists") {
s.NoError(enumErr)
}

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN,
c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION,
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]);
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[],
c45 mood, c46 hstore, c47 cidr, c48 citext, c49 ltree);
CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer)
RETURNS bytea AS $body$
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex')
Expand Down Expand Up @@ -681,7 +689,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'),
ARRAY[10299301,2579827],
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'];
ARRAY['hello','bye'],'sad', 'a=>1,b=>2'::hstore,'192.168.0.0/16','abc','Top.Top1.Top2'::ltree;
`, srcTableName))
s.NoError(err)
fmt.Println("Executed an insert with all types")
Expand All @@ -700,7 +708,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{"c41", "c1", "c2", "c3", "c4",
"c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18",
"c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36",
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44"})
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49"})
if err != nil {
fmt.Println("error %w", err)
}
Expand All @@ -716,14 +724,20 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() {

srcTableName := s.attachSchemaSuffix("test_types_avro_bq")
dstTableName := "test_types_avro_bq"
createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
_, enumErr := s.pool.Exec(context.Background(), createMoodEnum)
if !strings.Contains(enumErr.Error(), "already exists") {
s.NoError(enumErr)
}

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN,
c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION,
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]);
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[],
c45 mood, c46 hstore, c47 cidr, c48 citext, c49 ltree);
CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer)
RETURNS bytea AS $body$
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex')
Expand Down Expand Up @@ -770,7 +784,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() {
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'),
ARRAY[9301,239827],
ARRAY[0.0003, 1039.0034],
ARRAY['hello','bye'];
ARRAY['hello','bye'],'sad', 'a=>1,b=>2'::hstore,'192.168.0.0/16','abc','Top.Top1.Top2'::ltree;
`, srcTableName))
s.NoError(err)
fmt.Println("Executed an insert with all types")
Expand All @@ -789,7 +803,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() {
noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{"c41", "c1", "c2", "c3", "c4",
"c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18",
"c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36",
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44"})
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49"})
if err != nil {
fmt.Println("error %w", err)
}
Expand Down
13 changes: 10 additions & 3 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,14 +677,20 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() {
srcTableName := s.attachSchemaSuffix("test_types_sf")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf")

createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
_, enumErr := s.pool.Exec(context.Background(), createMoodEnum)
if !strings.Contains(enumErr.Error(), "already exists") {
s.NoError(enumErr)
}
_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN,
c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION,
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT),
c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON));
c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON),
c48 mood, c49 hstore, c51 cidr, c52 citext, c53 ltree);
CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer)
RETURNS bytea AS $body$
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex')
Expand Down Expand Up @@ -728,7 +734,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() {
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'),
'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)',
'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))';
'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))',
'sad', 'a=>1,b=>2'::hstore,'192.168.0.0/16','abc','Top.Top1.Top2'::ltree;
`, srcTableName))
s.NoError(err)
fmt.Println("Executed an insert with all types")
Expand All @@ -747,7 +754,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() {
noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4",
"c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18",
"c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36",
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"})
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49", "c51", "c52", "c53"})
if err != nil {
fmt.Println("error %w", err)
}
Expand Down
14 changes: 12 additions & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment,
}

func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string) error {
createEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
tblFields := []string{
"id UUID NOT NULL PRIMARY KEY",
"card_id UUID",
Expand Down Expand Up @@ -151,6 +152,8 @@ func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string)
"f6 jsonb",
"f7 jsonb",
"f8 smallint",
"f9 mood",
"f10 hstore",
}
if strings.Contains(tableName, "sf") {
tblFields = append(tblFields, "geometry_point geometry(point)",
Expand All @@ -160,6 +163,11 @@ func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string)
"geometry_polygon geometry(polygon)",
"geography_polygon geography(polygon)")
}
_, enumErr := pool.Exec(context.Background(), createEnum)
if enumErr != nil && !strings.Contains(enumErr.Error(), "already exists") {
return enumErr
}

tblFieldStr := strings.Join(tblFields, ",")

_, err := pool.Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -212,7 +220,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012],
ARRAY['varchar1', 'varchar2'], '{"key": 8.5}',
'[{"key1": "value1", "key2": "value2", "key3": "value3"}]',
'{"key": "value"}', 15 %s
'{"key": "value"}', 15, 'happy', 'key1=>value1,key2=>value2' %s
)`,
id, uuid.New().String(), uuid.New().String(),
uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), geoValues)
Expand All @@ -233,7 +241,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
deal_id, ethereum_transaction_id, ignore_price, card_eth_value,
paid_eth_price, card_bought_notified, address, account_id,
asset_id, status, transaction_id, settled_at, reference_id,
settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8
settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10
%s
) VALUES %s;
`, suffix, tableName, geoColumns, strings.Join(rows, ",")))
Expand Down Expand Up @@ -344,6 +352,8 @@ func GetOwnersSchema() *model.QRecordSchema {
{Name: "f6", Type: qvalue.QValueKindJSON, Nullable: true},
{Name: "f7", Type: qvalue.QValueKindJSON, Nullable: true},
{Name: "f8", Type: qvalue.QValueKindInt16, Nullable: true},
{Name: "f9", Type: qvalue.QValueKindString, Nullable: true},
{Name: "f10", Type: qvalue.QValueKindString, Nullable: true},
},
}
}
Expand Down

0 comments on commit a6a4c17

Please sign in to comment.