diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 54887adeac..5624a81795 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -82,6 +82,8 @@ jobs: - name: create hstore extension and increase logical replication limits run: | docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;" + docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION citext;" + docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION ltree;" docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;" docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;" docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;" diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 400ecc3d01..966d3af190 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -3,6 +3,7 @@ package e2e_bigquery import ( "context" "fmt" + "strings" "testing" "github.com/PeerDB-io/peer-flow/e2e" @@ -630,13 +631,20 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { srcTableName := s.attachSchemaSuffix("test_types_bq") dstTableName := "test_types_bq" + createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" + _, enumErr := s.pool.Exec(context.Background(), createMoodEnum) + if !strings.Contains(enumErr.Error(), "already exists") { + s.NoError(enumErr) + } + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); + c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[], + c45 mood, c46 hstore, c47 cidr, c48 citext, c49 ltree); CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) RETURNS bytea AS $body$ SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') @@ -681,7 +689,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), ARRAY[10299301,2579827], ARRAY[0.0003, 8902.0092], - ARRAY['hello','bye']; + ARRAY['hello','bye'],'sad', 'a=>1,b=>2'::hstore,'192.168.0.0/16','abc','Top.Top1.Top2'::ltree; `, srcTableName)) s.NoError(err) fmt.Println("Executed an insert with all types") @@ -700,7 +708,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44"}) + "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49"}) if err != nil { fmt.Println("error %w", err) } @@ -716,6 +724,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { srcTableName := s.attachSchemaSuffix("test_types_avro_bq") dstTableName := "test_types_avro_bq" + createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" + _, enumErr := s.pool.Exec(context.Background(), createMoodEnum) + if !strings.Contains(enumErr.Error(), "already exists") { + s.NoError(enumErr) + } _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, @@ -723,7 +736,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); + c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[], + c45 mood, c46 hstore, c47 cidr, c48 citext, c49 ltree); CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) RETURNS bytea AS $body$ SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') @@ -770,7 +784,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), ARRAY[9301,239827], ARRAY[0.0003, 1039.0034], - ARRAY['hello','bye']; + ARRAY['hello','bye'],'sad', 'a=>1,b=>2'::hstore,'192.168.0.0/16','abc','Top.Top1.Top2'::ltree; `, srcTableName)) s.NoError(err) fmt.Println("Executed an insert with all types") @@ -789,7 +803,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44"}) + "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49"}) if err != nil { fmt.Println("error %w", err) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9f92496eb3..a5e90d2058 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -677,6 +677,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { srcTableName := s.attachSchemaSuffix("test_types_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf") + createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" + _, enumErr := s.pool.Exec(context.Background(), createMoodEnum) + if !strings.Contains(enumErr.Error(), "already exists") { + s.NoError(enumErr) + } _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, @@ -684,7 +689,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), - c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); + c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON), + c48 mood, c49 hstore, c51 cidr, c52 citext, c53 ltree); CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) RETURNS bytea AS $body$ SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') @@ -728,7 +734,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), 'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', - 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; + 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))', + 'sad', 'a=>1,b=>2'::hstore,'192.168.0.0/16','abc','Top.Top1.Top2'::ltree; `, srcTableName)) s.NoError(err) fmt.Println("Executed an insert with all types") @@ -747,7 +754,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) + "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49", "c51", "c52", "c53"}) if err != nil { fmt.Println("error %w", err) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4f53e9a6a4..2979ebfaa9 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -114,6 +114,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, } func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string) error { + createEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" tblFields := []string{ "id UUID NOT NULL PRIMARY KEY", "card_id UUID", @@ -151,6 +152,8 @@ func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string) "f6 jsonb", "f7 jsonb", "f8 smallint", + "f9 mood", + "f10 hstore", } if strings.Contains(tableName, "sf") { tblFields = append(tblFields, "geometry_point geometry(point)", @@ -160,6 +163,11 @@ func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string) "geometry_polygon geometry(polygon)", "geography_polygon geography(polygon)") } + _, enumErr := pool.Exec(context.Background(), createEnum) + if enumErr != nil && !strings.Contains(enumErr.Error(), "already exists") { + return enumErr + } + tblFieldStr := strings.Join(tblFields, ",") _, err := pool.Exec(context.Background(), fmt.Sprintf(` @@ -212,7 +220,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012], ARRAY['varchar1', 'varchar2'], '{"key": 8.5}', '[{"key1": "value1", "key2": "value2", "key3": "value3"}]', - '{"key": "value"}', 15 %s + '{"key": "value"}', 15, 'happy', 'key1=>value1,key2=>value2' %s )`, id, uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), geoValues) @@ -233,7 +241,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro deal_id, ethereum_transaction_id, ignore_price, card_eth_value, paid_eth_price, card_bought_notified, address, account_id, asset_id, status, transaction_id, settled_at, reference_id, - settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8 + settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10 %s ) VALUES %s; `, suffix, tableName, geoColumns, strings.Join(rows, ","))) @@ -344,6 +352,8 @@ func GetOwnersSchema() *model.QRecordSchema { {Name: "f6", Type: qvalue.QValueKindJSON, Nullable: true}, {Name: "f7", Type: qvalue.QValueKindJSON, Nullable: true}, {Name: "f8", Type: qvalue.QValueKindInt16, Nullable: true}, + {Name: "f9", Type: qvalue.QValueKindString, Nullable: true}, + {Name: "f10", Type: qvalue.QValueKindString, Nullable: true}, }, } }