diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index ffceae7c53..b3f1616e59 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -16,7 +16,7 @@ jobs: timeout-minutes: 30 services: pg_cdc: - image: postgres:15.4-alpine + image: postgis:15-3.4-alpine ports: - 7132:5432 env: diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index de8e8ba9c8..418bcfd63e 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -596,7 +596,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML); + c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), + c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) RETURNS bytea AS $body$ SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') @@ -637,7 +638,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, txid_current_snapshot(), - '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'); + '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), + 'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', + 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', + 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) s.NoError(err) fmt.Println("Executed an insert with all types") @@ -656,7 +660,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32"}) + "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) if err != nil { fmt.Println("error %w", err) } @@ -679,7 +683,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML); + c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), + c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) RETURNS bytea AS $body$ SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') @@ -721,7 +726,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, txid_current_snapshot(), - '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'); + '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), + 'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', + 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', + 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) s.NoError(err) fmt.Println("Executed an insert with all types") @@ -740,7 +748,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { noNulls, err := s.sfHelper.CheckNull("test_types_sf_avro_cdc", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32"}) + "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) if err != nil { fmt.Println("error %w", err) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index f3b3b00fa3..76f98ed1ab 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -151,6 +151,12 @@ func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string) "f6 jsonb", "f7 jsonb", "f8 smallint", + "geometry_point geometry(point)", + "geography_point geography(point)", + "geometry_linestring geometry(linestring)", + "geography_linestring geography(linestring)", + "geometry_polygon geometry(polygon)", + "geography_polygon geography(polygon)", } tblFieldStr := strings.Join(tblFields, ",") @@ -198,7 +204,10 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012], ARRAY['varchar1', 'varchar2'], '{"key": 8.5}', '[{"key1": "value1", "key2": "value2", "key3": "value3"}]', - '{"key": "value"}', 15 + '{"key": "value"}', 15, + 'POINT(1 2)','POINT(40.7128 -74.0060)', + 'LINESTRING(0 0, 1 1, 2 2)','LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)', + 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))','POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))' )`, id, uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String()) @@ -213,7 +222,10 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro deal_id, ethereum_transaction_id, ignore_price, card_eth_value, paid_eth_price, card_bought_notified, address, account_id, asset_id, status, transaction_id, settled_at, reference_id, - settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8 + settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, + geometry_point, geography_point, + geometry_linestring, geography_linestring, + geometry_polygon, geography_polygon ) VALUES %s; `, suffix, tableName, strings.Join(rows, ","))) if err != nil {