Skip to content

Commit

Permalink
adds tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 16, 2023
1 parent e105e18 commit 8d00d57
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
timeout-minutes: 30
services:
pg_cdc:
image: postgres:15.4-alpine
image: postgis:15-3.4-alpine
ports:
- 7132:5432
env:
Expand Down
20 changes: 14 additions & 6 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() {
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c41 XML);
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT),
c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON));
CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer)
RETURNS bytea AS $body$
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex')
Expand Down Expand Up @@ -637,7 +638,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() {
1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector,
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello');
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'),
'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)',
'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))';
`, srcTableName))
s.NoError(err)
fmt.Println("Executed an insert with all types")
Expand All @@ -656,7 +660,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() {
noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4",
"c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18",
"c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36",
"c37", "c38", "c7", "c8", "c32"})
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"})
if err != nil {
fmt.Println("error %w", err)
}
Expand All @@ -679,7 +683,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() {
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c41 XML);
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT),
c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON));
CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer)
RETURNS bytea AS $body$
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex')
Expand Down Expand Up @@ -721,7 +726,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() {
1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector,
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello');
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'),
'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)',
'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))';
`, srcTableName))
s.NoError(err)
fmt.Println("Executed an insert with all types")
Expand All @@ -740,7 +748,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() {
noNulls, err := s.sfHelper.CheckNull("test_types_sf_avro_cdc", []string{"c41", "c1", "c2", "c3", "c4",
"c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18",
"c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36",
"c37", "c38", "c7", "c8", "c32"})
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"})
if err != nil {
fmt.Println("error %w", err)
}
Expand Down
16 changes: 14 additions & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string)
"f6 jsonb",
"f7 jsonb",
"f8 smallint",
"geometry_point geometry(point)",
"geography_point geography(point)",
"geometry_linestring geometry(linestring)",
"geography_linestring geography(linestring)",
"geometry_polygon geometry(polygon)",
"geography_polygon geography(polygon)",
}

tblFieldStr := strings.Join(tblFields, ",")
Expand Down Expand Up @@ -198,7 +204,10 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012],
ARRAY['varchar1', 'varchar2'], '{"key": 8.5}',
'[{"key1": "value1", "key2": "value2", "key3": "value3"}]',
'{"key": "value"}', 15
'{"key": "value"}', 15,
'POINT(1 2)','POINT(40.7128 -74.0060)',
'LINESTRING(0 0, 1 1, 2 2)','LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)',
'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))','POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'
)`,
id, uuid.New().String(), uuid.New().String(),
uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String())
Expand All @@ -213,7 +222,10 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
deal_id, ethereum_transaction_id, ignore_price, card_eth_value,
paid_eth_price, card_bought_notified, address, account_id,
asset_id, status, transaction_id, settled_at, reference_id,
settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8
settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8,
geometry_point, geography_point,
geometry_linestring, geography_linestring,
geometry_polygon, geography_polygon
) VALUES %s;
`, suffix, tableName, strings.Join(rows, ",")))
if err != nil {
Expand Down

0 comments on commit 8d00d57

Please sign in to comment.