Skip to content

Commit

Permalink
adds test for gcs/s3
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 9, 2023
1 parent a6a4c17 commit bae9265
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 1 deletion.
99 changes: 98 additions & 1 deletion flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e_s3
import (
"context"
"fmt"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/e2e"
Expand Down Expand Up @@ -146,7 +147,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
fmt.Println("JobName: ", flowJobName)

files, err := s.s3Helper.ListAllFiles(ctx, flowJobName)
fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files))
require.NoError(s.T(), err)
Expand All @@ -155,3 +156,99 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteS3) Test_Types_GCS_Interop() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)
setupErr := s.setupS3("gcs")
if setupErr != nil {
s.Fail("failed to setup S3", setupErr)
}

srcTableName := s.attachSchemaSuffix("test_types_interop")
dstTableName := fmt.Sprintf("%s.%s_%d", "peerdb_test_gcs_interop", "test_types_interop", time.Now().Unix())
flowJobName := s.attachSuffix("test_simple_flow")

createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
_, enumErr := s.pool.Exec(context.Background(), createMoodEnum)
if !strings.Contains(enumErr.Error(), "already exists") {
s.NoError(enumErr)
}

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN,
c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION,
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT),
c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON),
c48 mood, c49 hstore, c51 cidr, c52 citext, c53 ltree);
CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer)
RETURNS bytea AS $body$
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex')
FROM generate_series(1, $1);
$body$
LANGUAGE 'sql'
VOLATILE
SET search_path = 'pg_catalog';
`, srcTableName))
s.NoError(err)
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: flowJobName,
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.s3Helper.GetPeer(),
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 5,
}

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
s.NoError(err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s SELECT 2,2,b'1',b'101',
true,random_bytea(32),'s','test','1.1.10.2'::cidr,
CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1,
'5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval,
'{"sai":1}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr,
1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector,
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'),
'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)',
'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))',
'sad', 'a=>1,b=>2'::hstore,'192.168.0.0/16','abc','Top.Top1.Top2'::ltree;
`, srcTableName))
s.NoError(err)
fmt.Println("Executed an insert with all types")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

files, err := s.s3Helper.ListAllFiles(ctx, flowJobName)
fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files))
require.NoError(s.T(), err)

require.Equal(s.T(), 1, len(files))

env.AssertExpectations(s.T())
}
2 changes: 2 additions & 0 deletions ui/components/PeerComponent.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => {
case DBType.EVENTHUB_GROUP:
case DBType.EVENTHUB:
return '/svgs/ms.svg';
case DBType.S3:
return '/svgs/aws.svg';
default:
return '';
}
Expand Down

0 comments on commit bae9265

Please sign in to comment.