From 93fe68d418dd1ebc652fe545dab40b1839e2fbe9 Mon Sep 17 00:00:00 2001 From: alishakawaguchi Date: Wed, 26 Jun 2024 12:35:02 -0700 Subject: [PATCH] Worker integration tests (#2211) --- .github/workflows/go.yml | 26 - .../sqlmanager/postgres/integration_test.go | 34 +- worker/pkg/query-builder/integration_test.go | 34 +- .../datasync/workflow/integration_test.go | 85 +- .../testdata/double-reference/insert.sql | 29 + .../testdata/double-reference/job_mappings.go | 292 ++++++ .../double-reference/source-create.sql | 68 ++ .../testdata/double-reference/teardown.sql | 1 + .../testdata/double-reference/tests.go | 40 + .../testdata/gen-jobmappings-config.json | 11 + .../datasync/workflow/testdata/generators.go | 3 + .../workflow/testdata/jobmapping_generator.go | 210 ++++ .../datasync/workflow/testdata/types.go | 24 + .../virtual-foreign-keys/job_mappings.go | 276 +++++ .../testdata/virtual-foreign-keys/tests.go | 127 +++ .../workflow/workflow_integration_test.go | 958 ++++-------------- .../testdata/double-reference/job_mappings.go | 12 + .../virtual-foreign-keys/job_mappings.go | 12 + 18 files changed, 1362 insertions(+), 880 deletions(-) create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/double-reference/insert.sql create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/double-reference/job_mappings.go create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/double-reference/source-create.sql create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/double-reference/teardown.sql create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/double-reference/tests.go create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/gen-jobmappings-config.json create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/generators.go create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/jobmapping_generator.go create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/types.go create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/job_mappings.go create mode 100644 worker/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/tests.go create mode 100644 worker/pkg/workflows/pkg/workflows/datasync/workflow/testdata/double-reference/job_mappings.go create mode 100644 worker/pkg/workflows/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/job_mappings.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index eba4833458..4b6875054d 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -56,30 +56,6 @@ jobs: test: name: test runs-on: ubuntu-latest - services: - postgres: - image: postgres:15 - ports: - - 5432:5432 - env: - POSTGRES_DB: postgres - POSTGRES_PASSWORD: postgres - PGUSER: postgres - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 - redis: - image: redis - ports: - - 6379:6379 - options: >- - --health-cmd "redis-cli ping" - --health-interval 10s - --health-timeout 5s - --health-retries 5 - steps: - name: Checkout uses: actions/checkout@v4 @@ -95,8 +71,6 @@ jobs: go test -race -coverprofile=coverage.out -covermode=atomic ./... env: INTEGRATION_TESTS_ENABLED: 1 - TEST_DB_URL: postgres://postgres:postgres@localhost:5432/postgres - TEST_REDIS_URL: redis://localhost:6379 - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 diff --git a/backend/pkg/sqlmanager/postgres/integration_test.go b/backend/pkg/sqlmanager/postgres/integration_test.go index a90ead1377..1a58b49434 100644 --- a/backend/pkg/sqlmanager/postgres/integration_test.go +++ b/backend/pkg/sqlmanager/postgres/integration_test.go @@ -40,24 +40,20 @@ func (s *IntegrationTestSuite) SetupSuite() { s.ctx = context.Background() s.schema = "sqlmanagerpostgres" - dburl := os.Getenv("TEST_DB_URL") - if dburl == "" { - pgcontainer, err := testpg.RunContainer(s.ctx, - testcontainers.WithImage("postgres:15"), - testcontainers.WithWaitStrategy( - wait.ForLog("database system is ready to accept connections"). - WithOccurrence(2).WithStartupTimeout(5*time.Second), - ), - ) - if err != nil { - panic(err) - } - s.pgcontainer = pgcontainer - connstr, err := pgcontainer.ConnectionString(s.ctx) - if err != nil { - panic(err) - } - dburl = connstr + pgcontainer, err := testpg.RunContainer(s.ctx, + testcontainers.WithImage("postgres:15"), + testcontainers.WithWaitStrategy( + wait.ForLog("database system is ready to accept connections"). + WithOccurrence(2).WithStartupTimeout(5*time.Second), + ), + ) + if err != nil { + panic(err) + } + s.pgcontainer = pgcontainer + connstr, err := pgcontainer.ConnectionString(s.ctx) + if err != nil { + panic(err) } setupSql, err := os.ReadFile("./testdata/setup.sql") @@ -72,7 +68,7 @@ func (s *IntegrationTestSuite) SetupSuite() { } s.teardownSql = string(teardownSql) - pool, err := pgxpool.New(s.ctx, dburl) + pool, err := pgxpool.New(s.ctx, connstr) if err != nil { panic(err) } diff --git a/worker/pkg/query-builder/integration_test.go b/worker/pkg/query-builder/integration_test.go index e7c8e5e7c5..4cb82b2175 100644 --- a/worker/pkg/query-builder/integration_test.go +++ b/worker/pkg/query-builder/integration_test.go @@ -36,24 +36,20 @@ func (s *IntegrationTestSuite) SetupSuite() { s.ctx = context.Background() s.schema = "genbenthosconfigs_querybuilder" - dburl := os.Getenv("TEST_DB_URL") - if dburl == "" { - pgcontainer, err := testpg.RunContainer(s.ctx, - testcontainers.WithImage("postgres:15"), - testcontainers.WithWaitStrategy( - wait.ForLog("database system is ready to accept connections"). - WithOccurrence(2).WithStartupTimeout(5*time.Second), - ), - ) - if err != nil { - panic(err) - } - s.pgcontainer = pgcontainer - connstr, err := pgcontainer.ConnectionString(s.ctx) - if err != nil { - panic(err) - } - dburl = connstr + pgcontainer, err := testpg.RunContainer(s.ctx, + testcontainers.WithImage("postgres:15"), + testcontainers.WithWaitStrategy( + wait.ForLog("database system is ready to accept connections"). + WithOccurrence(2).WithStartupTimeout(5*time.Second), + ), + ) + if err != nil { + panic(err) + } + s.pgcontainer = pgcontainer + connstr, err := pgcontainer.ConnectionString(s.ctx) + if err != nil { + panic(err) } setupSql, err := os.ReadFile("./testdata/setup.sql") @@ -68,7 +64,7 @@ func (s *IntegrationTestSuite) SetupSuite() { } s.teardownSql = string(teardownSql) - pool, err := pgxpool.New(s.ctx, dburl) + pool, err := pgxpool.New(s.ctx, connstr) if err != nil { panic(err) } diff --git a/worker/pkg/workflows/datasync/workflow/integration_test.go b/worker/pkg/workflows/datasync/workflow/integration_test.go index 5c9b26fd96..c3edbdfd35 100644 --- a/worker/pkg/workflows/datasync/workflow/integration_test.go +++ b/worker/pkg/workflows/datasync/workflow/integration_test.go @@ -44,30 +44,25 @@ type IntegrationTestSuite struct { func (s *IntegrationTestSuite) SetupSuite() { s.ctx = context.Background() - testDbUrl := os.Getenv("TEST_DB_URL") - dburl := fmt.Sprintf("%s?sslmode=disable", testDbUrl) - if testDbUrl == "" { - pgcontainer, err := testpg.RunContainer(s.ctx, - testcontainers.WithImage("postgres:15"), - postgres.WithDatabase("postgres"), - testcontainers.WithWaitStrategy( - wait.ForLog("database system is ready to accept connections"). - WithOccurrence(2).WithStartupTimeout(5*time.Second), - ), - ) - if err != nil { - panic(err) - } - s.pgcontainer = pgcontainer - connstr, err := pgcontainer.ConnectionString(s.ctx, "sslmode=disable") - if err != nil { - panic(err) - } - dburl = connstr + pgcontainer, err := testpg.RunContainer(s.ctx, + testcontainers.WithImage("postgres:15"), + postgres.WithDatabase("postgres"), + testcontainers.WithWaitStrategy( + wait.ForLog("database system is ready to accept connections"). + WithOccurrence(2).WithStartupTimeout(5*time.Second), + ), + ) + if err != nil { + panic(err) + } + s.pgcontainer = pgcontainer + connstr, err := pgcontainer.ConnectionString(s.ctx, "sslmode=disable") + if err != nil { + panic(err) } s.databases = []string{"datasync_source", "datasync_target"} - pool, err := pgxpool.New(s.ctx, dburl) + pool, err := pgxpool.New(s.ctx, connstr) if err != nil { panic(err) } @@ -81,7 +76,7 @@ func (s *IntegrationTestSuite) SetupSuite() { } } - srcUrl, err := getDbPgUrl(dburl, "datasync_source", "disable") + srcUrl, err := getDbPgUrl(connstr, "datasync_source", "disable") if err != nil { panic(err) } @@ -92,7 +87,7 @@ func (s *IntegrationTestSuite) SetupSuite() { } s.sourcePgPool = sourceConn - targetUrl, err := getDbPgUrl(dburl, "datasync_target", "disable") + targetUrl, err := getDbPgUrl(connstr, "datasync_target", "disable") if err != nil { panic(err) } @@ -106,19 +101,43 @@ func (s *IntegrationTestSuite) SetupSuite() { s.querier = pg_queries.New() // redis - redisUrl := os.Getenv("TEST_REDIS_URL") - s.redisUrl = redisUrl - if redisUrl == "" { - redisContainer, err := redis.RunContainer(s.ctx, - testcontainers.WithImage("docker.io/redis:7"), - redis.WithSnapshotting(10, 1), - redis.WithLogLevel(redis.LogLevelVerbose), - ) + redisContainer, err := redis.RunContainer(s.ctx, + testcontainers.WithImage("docker.io/redis:7"), + redis.WithSnapshotting(10, 1), + redis.WithLogLevel(redis.LogLevelVerbose), + ) + if err != nil { + panic(err) + } + s.rediscontainer = redisContainer + s.redisUrl, err = redisContainer.ConnectionString(s.ctx) + if err != nil { + panic(err) + } +} + +func (s *IntegrationTestSuite) SetupSourceDb(testFolder string, files []string) { + s.T().Logf("setting up source db. folder: %s \n", testFolder) + for _, file := range files { + setupSourceSql, err := os.ReadFile(fmt.Sprintf("./testdata/%s/%s", testFolder, file)) + if err != nil { + panic(err) + } + _, err = s.sourcePgPool.Exec(s.ctx, string(setupSourceSql)) + if err != nil { + panic(err) + } + } +} + +func (s *IntegrationTestSuite) SetupTargetDb(testFolder string, files []string) { + s.T().Logf("setting up target db. folder: %s \n", testFolder) + for _, file := range files { + setupTargetSql, err := os.ReadFile(fmt.Sprintf("./testdata/%s/%s", testFolder, file)) if err != nil { panic(err) } - s.rediscontainer = redisContainer - s.redisUrl, err = redisContainer.ConnectionString(s.ctx) + _, err = s.targetPgPool.Exec(s.ctx, string(setupTargetSql)) if err != nil { panic(err) } diff --git a/worker/pkg/workflows/datasync/workflow/testdata/double-reference/insert.sql b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/insert.sql new file mode 100644 index 0000000000..08dbddb8b0 --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/insert.sql @@ -0,0 +1,29 @@ +SET search_path TO double_reference; +-- COMPANY DATA +INSERT INTO company (name, url, employee_count, uuid) +VALUES + ('Acme Corporation', 'www.acme.com', 500, uuid_generate_v4()), + ('Global Enterprises', 'globalenterprises.net', 1200, uuid_generate_v4()), + ('Tech Innovations', 'www.techinnovations.io', 250, uuid_generate_v4()); + +-- DEPARTMENT DATA +INSERT INTO department (name, url, company_id, uuid) +VALUES + ('Marketing', 'marketing.acme.com', 1, uuid_generate_v4()), -- Acme Corporation + ('Sales', 'sales.acme.com', 1, uuid_generate_v4()), + ('Finance', null, 2, uuid_generate_v4()), -- Global Enterprises + ('R&D', 'rnd.techinnovations.io', 3, uuid_generate_v4()); -- Tech Innovations + +-- TRANSACTION DATA +INSERT INTO transaction (id, amount, created, updated, department_id, date, currency, description, uuid) +VALUES + (1, 250.50, now() - interval '2 weeks', now(), 1, '2024-05-01', 'USD', 'Office Supplies', uuid_generate_v4()), + (2, 1250.00, now() - interval '5 days', now(), 2, '2024-05-06', 'GBP', 'Travel Expenses', uuid_generate_v4()), + (3, 87.25, now() - interval '1 month', now(), 3, '2024-04-20', 'EUR', 'Lunch Meeting', uuid_generate_v4()); + -- Repeat with varied data ... + +-- EXPENSE REPORT DATA +INSERT INTO expense_report (id, invoice_id, date, amount, department_source_id, department_destination_id, created, updated, currency, transaction_type, paid, adjustment_amount, transaction_id) +VALUES + (1, 'INV-1234', '2024-05-03', 500.00, 1, 2, now() - interval '15 days', now(), 'USD', 1, true, null, 1), + (2,'INV-5678', '2024-04-28', 128.75, 3, 1, now() - interval '20 days', now() - interval '1 day', 'CAD', 2, false, 12.50, 3); diff --git a/worker/pkg/workflows/datasync/workflow/testdata/double-reference/job_mappings.go b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/job_mappings.go new file mode 100644 index 0000000000..922837b80e --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/job_mappings.go @@ -0,0 +1,292 @@ + +package testdata_doublereference + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func GetDefaultSyncJobMappings()[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + { + Schema: "double_reference", + Table: "company", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "company", + Column: "name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "company", + Column: "url", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "company", + Column: "employee_count", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "company", + Column: "uuid", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "department", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "department", + Column: "name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "department", + Column: "url", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "department", + Column: "company_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "department", + Column: "user_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "department", + Column: "uuid", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "amount", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "created", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "updated", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "department_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "date", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "currency", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "settings", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "description", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "timezone", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "transaction", + Column: "uuid", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "invoice_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "date", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "amount", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "department_source_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "department_destination_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "created", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "updated", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "currency", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "transaction_type", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "paid", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "transaction_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "double_reference", + Table: "expense_report", + Column: "adjustment_amount", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + } +} + diff --git a/worker/pkg/workflows/datasync/workflow/testdata/double-reference/source-create.sql b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/source-create.sql new file mode 100644 index 0000000000..5973b47c85 --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/source-create.sql @@ -0,0 +1,68 @@ +CREATE SCHEMA IF NOT EXISTS double_reference; +SET search_path TO double_reference; + + + CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +CREATE TABLE + IF NOT EXISTS "company" ( + "id" BIGSERIAL NOT NULL, + "name" text NOT NULL, + "url" text NULL, + "employee_count" integer NULL, + "uuid" uuid NOT NULL DEFAULT uuid_generate_v4 (), + CONSTRAINT company_pkey PRIMARY KEY (id), + CONSTRAINT company_uuid_key UNIQUE (uuid) + ); +CREATE TABLE + IF NOT EXISTS "department" ( + "id" BIGSERIAL NOT NULL, + "name" text NOT NULL, + "url" text NULL, + "company_id" bigint NOT NULL, -- to be fk + "user_id" bigint NULL, + "uuid" uuid NOT NULL DEFAULT uuid_generate_v4 (), + CONSTRAINT department_pkey PRIMARY KEY (id), + CONSTRAINT department_uuid_key UNIQUE (uuid), + CONSTRAINT department_company_id_fkey FOREIGN KEY (company_id) REFERENCES company (id) ON DELETE CASCADE + ); +-- market +CREATE TABLE IF NOT EXISTS transaction ( + id bigint NOT NULL, + amount double precision NOT NULL, + created timestamp without time zone, + updated timestamp without time zone, + department_id bigint, -- to be fk + date date, + currency text NOT NULL, + settings json DEFAULT '{ + "historicalCount": 0, + "vacation": false, + "conference": true, + "travel": true + }'::json NOT NULL, + description text, + timezone text DEFAULT 'America/New_York'::text NOT NULL, + uuid uuid DEFAULT uuid_generate_v4() NOT NULL, + CONSTRAINT transaction_pkey PRIMARY KEY (id), + CONSTRAINT transaction_department_id_fkey FOREIGN KEY (department_id) REFERENCES department (ID) ON DELETE CASCADE +); +CREATE TABLE IF NOT EXISTS expense_report ( + id bigint NOT NULL, + invoice_id text, + date date NOT NULL, + amount numeric(15,2), + department_source_id bigint, -- fk + department_destination_id bigint, --fk + created timestamp without time zone, + updated timestamp without time zone, + currency character varying(5), + transaction_type integer NOT NULL, + paid boolean DEFAULT false, + transaction_id bigint, -- fk + adjustment_amount numeric(15,2), + CONSTRAINT transaction_type_valid_values CHECK ((transaction_type = ANY (ARRAY[1, 2]))), + CONSTRAINT expense_report_pkey PRIMARY KEY (id), + CONSTRAINT expense_report_dept_source_fkey FOREIGN KEY (department_source_id) REFERENCES department (ID) ON DELETE CASCADE, + CONSTRAINT expense_report_dept_destination_fkey FOREIGN KEY (department_destination_id) REFERENCES department (ID) ON DELETE CASCADE, + CONSTRAINT expense_report_transaction_fkey FOREIGN KEY (transaction_id) REFERENCES transaction (ID) ON DELETE CASCADE +); diff --git a/worker/pkg/workflows/datasync/workflow/testdata/double-reference/teardown.sql b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/teardown.sql new file mode 100644 index 0000000000..e6a2e9baeb --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/teardown.sql @@ -0,0 +1 @@ +DROP SCHEMA IF EXISTS double_reference CASCADE; diff --git a/worker/pkg/workflows/datasync/workflow/testdata/double-reference/tests.go b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/tests.go new file mode 100644 index 0000000000..44ee780ced --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/double-reference/tests.go @@ -0,0 +1,40 @@ +package testdata_doublereference + +import workflow_testdata "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/workflow/testdata" + +func GetSyncTests() []*workflow_testdata.IntegrationTest { + return []*workflow_testdata.IntegrationTest{ + { + Name: "Double reference sync", + Folder: "double-reference", + SourceFilePaths: []string{"source-create.sql", "insert.sql"}, + TargetFilePaths: []string{"source-create.sql"}, + JobMappings: GetDefaultSyncJobMappings(), + Expected: map[string]*workflow_testdata.ExpectedOutput{ + "double_reference.company": &workflow_testdata.ExpectedOutput{RowCount: 3}, + "double_reference.department": &workflow_testdata.ExpectedOutput{RowCount: 4}, + "double_reference.expense_report": &workflow_testdata.ExpectedOutput{RowCount: 2}, + "double_reference.transaction": &workflow_testdata.ExpectedOutput{RowCount: 3}, + }, + }, + { + Name: "Double reference subset", + Folder: "double-reference", + SourceFilePaths: []string{"source-create.sql", "insert.sql"}, + TargetFilePaths: []string{"source-create.sql"}, + SubsetMap: map[string]string{ + "double_reference.company": "id in (1)", + }, + JobOptions: &workflow_testdata.TestJobOptions{ + SubsetByForeignKeyConstraints: true, + }, + JobMappings: GetDefaultSyncJobMappings(), + Expected: map[string]*workflow_testdata.ExpectedOutput{ + "double_reference.company": &workflow_testdata.ExpectedOutput{RowCount: 1}, + "double_reference.department": &workflow_testdata.ExpectedOutput{RowCount: 2}, + "double_reference.expense_report": &workflow_testdata.ExpectedOutput{RowCount: 1}, + "double_reference.transaction": &workflow_testdata.ExpectedOutput{RowCount: 2}, + }, + }, + } +} diff --git a/worker/pkg/workflows/datasync/workflow/testdata/gen-jobmappings-config.json b/worker/pkg/workflows/datasync/workflow/testdata/gen-jobmappings-config.json new file mode 100644 index 0000000000..b7f8c9a512 --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/gen-jobmappings-config.json @@ -0,0 +1,11 @@ + +[ + { + "folder": "double-reference", + "sql_file": "source-create.sql" + }, + { + "folder": "virtual-foreign-keys", + "sql_file": "source-setup.sql" + } +] diff --git a/worker/pkg/workflows/datasync/workflow/testdata/generators.go b/worker/pkg/workflows/datasync/workflow/testdata/generators.go new file mode 100644 index 0000000000..8db8592897 --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/generators.go @@ -0,0 +1,3 @@ +package workflow_testdata + +//go:generate go run jobmapping_generator.go gen-jobmappings-config.json $GOPACKAGE diff --git a/worker/pkg/workflows/datasync/workflow/testdata/jobmapping_generator.go b/worker/pkg/workflows/datasync/workflow/testdata/jobmapping_generator.go new file mode 100644 index 0000000000..e09010a684 --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/jobmapping_generator.go @@ -0,0 +1,210 @@ +//go:build ignore + +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "html/template" + "io" + "os" + "strings" + + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + pg_query "github.com/pganalyze/pg_query_go/v5" + pgquery "github.com/wasilibs/go-pgquery" +) + +type Input struct { + Folder string `json:"folder"` + SqlFile string `json:"sql_file"` +} + +type Column struct { + Name string +} + +type Table struct { + Schema string + Name string + Columns []*Column +} + +type JobMapping struct { + Schema string + Table string + Column string + Transformer string + Config string +} + +func parseSQLSchema(sql string) ([]*Table, error) { + tree, err := pgquery.Parse(sql) + if err != nil { + return nil, err + } + + tables := []*Table{} + var schema string + for _, stmt := range tree.GetStmts() { + s := stmt.GetStmt() + switch s.Node.(type) { + case *pg_query.Node_CreateSchemaStmt: + schema = s.GetCreateSchemaStmt().GetSchemaname() + case *pg_query.Node_CreateStmt: + table := s.GetCreateStmt().GetRelation().GetRelname() + columns := []*Column{} + for _, col := range s.GetCreateStmt().GetTableElts() { + if col.GetColumnDef() != nil { + columns = append(columns, &Column{ + Name: col.GetColumnDef().Colname, + }) + } + } + tables = append(tables, &Table{ + Schema: schema, + Name: table, + Columns: columns, + }) + } + } + return tables, nil +} + +func generateJobMapping(tables []*Table) []*mgmtv1alpha1.JobMapping { + mappings := []*mgmtv1alpha1.JobMapping{} + for _, t := range tables { + for _, c := range t.Columns { + mappings = append(mappings, &mgmtv1alpha1.JobMapping{ + Schema: t.Schema, + Table: t.Name, + Column: c.Name, + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }) + + } + } + return mappings +} + +type TemplateData struct { + PackageName string + Mappings []*mgmtv1alpha1.JobMapping +} + +func formatJobMappings(pkgName string, mappings []*mgmtv1alpha1.JobMapping) (string, error) { + const tmpl = ` +package {{ .PackageName }} + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func GetDefaultSyncJobMappings()[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + {{- range .Mappings }} + { + Schema: "{{ .Schema }}", + Table: "{{ .Table }}", + Column: "{{ .Column }}", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + {{- end }} + } +} + +` + data := TemplateData{ + PackageName: pkgName, + Mappings: mappings, + } + t := template.Must(template.New("jobmappings").Parse(tmpl)) + var out bytes.Buffer + err := t.Execute(&out, data) + if err != nil { + return "", err + } + return out.String(), nil +} + +func main() { + args := os.Args + if len(args) < 3 { + panic("must provide necessary args") + } + + configFile := args[1] + gopackage := args[2] + + packageSplit := strings.Split(gopackage, "_") + goPkg := packageSplit[len(packageSplit)-1] + + jsonFile, err := os.Open(configFile) + if err != nil { + fmt.Println("failed to open file: %s", err) + return + } + defer jsonFile.Close() + + byteValue, err := io.ReadAll(jsonFile) + if err != nil { + fmt.Println("failed to read file: %s", err) + return + } + + var inputs []Input + if err := json.Unmarshal(byteValue, &inputs); err != nil { + fmt.Println("failed to unmarshal JSON: %s", err) + return + } + for _, input := range inputs { + goPkgName := strings.ReplaceAll(fmt.Sprintf("%s_%s", goPkg, input.Folder), "-", "") + sqlFile, err := os.Open(fmt.Sprintf("%s/%s", input.Folder, input.SqlFile)) + if err != nil { + fmt.Println("failed to open file: %s", err) + } + + byteValue, err := io.ReadAll(sqlFile) + if err != nil { + fmt.Println("failed to read file: %s", err) + } + + sqlContent := string(byteValue) + sqlFile.Close() + + tables, err := parseSQLSchema(sqlContent) + if err != nil { + fmt.Println("Error parsing SQL schema:", err) + return + } + + jobMapping := generateJobMapping(tables) + + formattedJobMappings, err := formatJobMappings(goPkgName, jobMapping) + if err != nil { + fmt.Println("Error formatting job mappings:", err) + return + } + + output := fmt.Sprintf("%s/job_mappings.go", input.Folder) + outputFile, err := os.Create(output) + if err != nil { + fmt.Println("Error creating jobmapping.go file:", err) + return + } + + _, err = outputFile.WriteString(formattedJobMappings) + if err != nil { + fmt.Println("Error writing to jobmapping.go file:", err) + return + } + outputFile.Close() + } + + return +} diff --git a/worker/pkg/workflows/datasync/workflow/testdata/types.go b/worker/pkg/workflows/datasync/workflow/testdata/types.go new file mode 100644 index 0000000000..6e17ad1ad2 --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/types.go @@ -0,0 +1,24 @@ +package workflow_testdata + +import mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + +type ExpectedOutput struct { + RowCount int +} + +type TestJobOptions struct { + SubsetByForeignKeyConstraints bool +} +type IntegrationTest struct { + Name string + SourceFilePaths []string + TargetFilePaths []string + Folder string + SubsetMap map[string]string // schema.table -> where clause + TransformerMap map[string]map[string]*mgmtv1alpha1.JobMappingTransformer // schema.table.column -> transformer config + JobMappings []*mgmtv1alpha1.JobMapping + JobOptions *TestJobOptions + VirtualForeignKeys []*mgmtv1alpha1.VirtualForeignConstraint + ExpectError *bool + Expected map[string]*ExpectedOutput // schema.table -> expected output +} diff --git a/worker/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/job_mappings.go b/worker/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/job_mappings.go new file mode 100644 index 0000000000..3a9c2393da --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/job_mappings.go @@ -0,0 +1,276 @@ + +package testdata_virtualforeignkeys + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func GetDefaultSyncJobMappings()[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + { + Schema: "vfk_hr", + Table: "regions", + Column: "region_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "regions", + Column: "region_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "countries", + Column: "country_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "countries", + Column: "country_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "countries", + Column: "region_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "locations", + Column: "location_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "locations", + Column: "street_address", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "locations", + Column: "postal_code", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "locations", + Column: "city", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "locations", + Column: "state_province", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "locations", + Column: "country_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "departments", + Column: "department_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "departments", + Column: "department_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "departments", + Column: "location_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "jobs", + Column: "job_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "jobs", + Column: "job_title", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "jobs", + Column: "min_salary", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "jobs", + Column: "max_salary", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "employee_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "first_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "last_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "email", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "phone_number", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "hire_date", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "job_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "salary", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "manager_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Column: "department_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "dependents", + Column: "dependent_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "dependents", + Column: "first_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "dependents", + Column: "last_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "dependents", + Column: "relationship", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "vfk_hr", + Table: "dependents", + Column: "employee_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + } +} + diff --git a/worker/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/tests.go b/worker/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/tests.go new file mode 100644 index 0000000000..367c6ec4d5 --- /dev/null +++ b/worker/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/tests.go @@ -0,0 +1,127 @@ +package testdata_virtualforeignkeys + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + workflow_testdata "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/workflow/testdata" +) + +func GetSyncTests() []*workflow_testdata.IntegrationTest { + return []*workflow_testdata.IntegrationTest{ + { + Name: "Virtual Foreign Keys sync", + Folder: "virtual-foreign-keys", + SourceFilePaths: []string{"source-setup.sql"}, + TargetFilePaths: []string{"target-setup.sql"}, + JobMappings: GetDefaultSyncJobMappings(), + VirtualForeignKeys: GetVirtualForeignKeys(), + Expected: map[string]*workflow_testdata.ExpectedOutput{ + "vfk_hr.regions": &workflow_testdata.ExpectedOutput{RowCount: 4}, + "vfk_hr.countries": &workflow_testdata.ExpectedOutput{RowCount: 25}, + "vfk_hr.locations": &workflow_testdata.ExpectedOutput{RowCount: 7}, + "vfk_hr.departments": &workflow_testdata.ExpectedOutput{RowCount: 11}, + "vfk_hr.dependents": &workflow_testdata.ExpectedOutput{RowCount: 30}, + "vfk_hr.employees": &workflow_testdata.ExpectedOutput{RowCount: 40}, + "vfk_hr.jobs": &workflow_testdata.ExpectedOutput{RowCount: 19}, + }, + }, + { + Name: "Virtual Foreign Keys subset", + Folder: "virtual-foreign-keys", + SourceFilePaths: []string{"source-setup.sql"}, + TargetFilePaths: []string{"target-setup.sql"}, + SubsetMap: map[string]string{ + "vfk_hr.employees": "first_name = 'Alexander'", + }, + JobOptions: &workflow_testdata.TestJobOptions{ + SubsetByForeignKeyConstraints: true, + }, + JobMappings: GetDefaultSyncJobMappings(), + VirtualForeignKeys: GetVirtualForeignKeys(), + Expected: map[string]*workflow_testdata.ExpectedOutput{ + "vfk_hr.regions": &workflow_testdata.ExpectedOutput{RowCount: 4}, + "vfk_hr.countries": &workflow_testdata.ExpectedOutput{RowCount: 25}, + "vfk_hr.locations": &workflow_testdata.ExpectedOutput{RowCount: 7}, + "vfk_hr.departments": &workflow_testdata.ExpectedOutput{RowCount: 11}, + "vfk_hr.dependents": &workflow_testdata.ExpectedOutput{RowCount: 2}, + "vfk_hr.employees": &workflow_testdata.ExpectedOutput{RowCount: 5}, + "vfk_hr.jobs": &workflow_testdata.ExpectedOutput{RowCount: 19}, + }, + }, + } +} + +func GetVirtualForeignKeys() []*mgmtv1alpha1.VirtualForeignConstraint { + return []*mgmtv1alpha1.VirtualForeignConstraint{ + { + Schema: "vfk_hr", + Table: "countries", + Columns: []string{"region_id"}, + ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ + Schema: "vfk_hr", + Table: "regions", + Columns: []string{"region_id"}, + }, + }, + { + Schema: "vfk_hr", + Table: "departments", + Columns: []string{"location_id"}, + ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ + Schema: "vfk_hr", + Table: "locations", + Columns: []string{"location_id"}, + }, + }, + { + Schema: "vfk_hr", + Table: "dependents", + Columns: []string{"employee_id"}, + ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ + Schema: "vfk_hr", + Table: "employees", + Columns: []string{"employee_id"}, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Columns: []string{"manager_id"}, + ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ + Schema: "vfk_hr", + Table: "employees", + Columns: []string{"employee_id"}, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Columns: []string{"department_id"}, + ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ + Schema: "vfk_hr", + Table: "departments", + Columns: []string{"department_id"}, + }, + }, + { + Schema: "vfk_hr", + Table: "employees", + Columns: []string{"job_id"}, + ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ + Schema: "vfk_hr", + Table: "jobs", + Columns: []string{"job_id"}, + }, + }, + { + Schema: "vfk_hr", + Table: "locations", + Columns: []string{"country_id"}, + ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ + Schema: "vfk_hr", + Table: "countries", + Columns: []string{"country_id"}, + }, + }, + } + +} diff --git a/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go b/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go index eb3f13c593..60f319c903 100644 --- a/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go +++ b/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go @@ -16,6 +16,7 @@ import ( "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" "github.com/nucleuscloud/neosync/backend/pkg/sqlconnect" sql_manager "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager" + sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared" mockTemporalClient "github.com/nucleuscloud/neosync/worker/internal/mocks/go.temporal.io/sdk/client" genbenthosconfigs_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/gen-benthos-configs" runsqlinittablestmts_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/run-sql-init-table-stmts" @@ -23,448 +24,164 @@ import ( sync_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/sync" syncactivityopts_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/sync-activity-opts" syncrediscleanup_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/sync-redis-clean-up" + workflow_testdata "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/workflow/testdata" + testdata_doublereference "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/workflow/testdata/double-reference" + testdata_virtualforeignkeys "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric" "go.temporal.io/sdk/testsuite" ) -func (s *IntegrationTestSuite) Test_Workflow_VirtualForeignKeys_Passthrough() { - s.SetupTestByFolder("virtual-foreign-keys") - mux := http.NewServeMux() - mux.Handle(mgmtv1alpha1connect.JobServiceGetJobProcedure, connect.NewUnaryHandler( - mgmtv1alpha1connect.JobServiceGetJobProcedure, - func(ctx context.Context, r *connect.Request[mgmtv1alpha1.GetJobRequest]) (*connect.Response[mgmtv1alpha1.GetJobResponse], error) { - return connect.NewResponse(&mgmtv1alpha1.GetJobResponse{ - Job: &mgmtv1alpha1.Job{ - Id: "115aaf2c-776e-4847-8268-d914e3c15968", - Source: &mgmtv1alpha1.JobSource{ - Options: &mgmtv1alpha1.JobSourceOptions{ - Config: &mgmtv1alpha1.JobSourceOptions_Postgres{ - Postgres: &mgmtv1alpha1.PostgresSourceConnectionOptions{ - ConnectionId: "c9b6ce58-5c8e-4dce-870d-96841b19d988", +func getAllSyncTests() []*workflow_testdata.IntegrationTest { + allTests := []*workflow_testdata.IntegrationTest{} + drTests := testdata_doublereference.GetSyncTests() + vfkTests := testdata_virtualforeignkeys.GetSyncTests() + allTests = append(allTests, drTests...) + allTests = append(allTests, vfkTests...) + return allTests +} + +func (s *IntegrationTestSuite) Test_Workflow_Sync() { + tests := getAllSyncTests() + for _, tt := range tests { + s.T().Run(tt.Name, func(t *testing.T) { + s.T().Logf("running integration test: %s \n", tt.Name) + s.SetupSourceDb(tt.Folder, tt.SourceFilePaths) + s.SetupTargetDb(tt.Folder, tt.TargetFilePaths) + + schemas := []*mgmtv1alpha1.PostgresSourceSchemaOption{} + subsetMap := map[string]*mgmtv1alpha1.PostgresSourceSchemaOption{} + for table, where := range tt.SubsetMap { + schema, table := sqlmanager_shared.SplitTableKey(table) + if _, exists := subsetMap[schema]; !exists { + subsetMap[schema] = &mgmtv1alpha1.PostgresSourceSchemaOption{ + Schema: schema, + Tables: []*mgmtv1alpha1.PostgresSourceTableOption{}, + } + } + w := where + subsetMap[schema].Tables = append(subsetMap[schema].Tables, &mgmtv1alpha1.PostgresSourceTableOption{ + Table: table, + WhereClause: &w, + }) + } + + for _, s := range subsetMap { + schemas = append(schemas, s) + } + + var subsetByForeignKeyConstraints bool + if tt.JobOptions != nil && tt.JobOptions.SubsetByForeignKeyConstraints { + subsetByForeignKeyConstraints = true + } + + mux := http.NewServeMux() + mux.Handle(mgmtv1alpha1connect.JobServiceGetJobProcedure, connect.NewUnaryHandler( + mgmtv1alpha1connect.JobServiceGetJobProcedure, + func(ctx context.Context, r *connect.Request[mgmtv1alpha1.GetJobRequest]) (*connect.Response[mgmtv1alpha1.GetJobResponse], error) { + return connect.NewResponse(&mgmtv1alpha1.GetJobResponse{ + Job: &mgmtv1alpha1.Job{ + Id: "115aaf2c-776e-4847-8268-d914e3c15968", + Source: &mgmtv1alpha1.JobSource{ + Options: &mgmtv1alpha1.JobSourceOptions{ + Config: &mgmtv1alpha1.JobSourceOptions_Postgres{ + Postgres: &mgmtv1alpha1.PostgresSourceConnectionOptions{ + ConnectionId: "c9b6ce58-5c8e-4dce-870d-96841b19d988", + Schemas: schemas, + SubsetByForeignKeyConstraints: subsetByForeignKeyConstraints, + }, + }, }, }, - }, - }, - Mappings: []*mgmtv1alpha1.JobMapping{ - { - Schema: "vfk_hr", - Table: "regions", - Column: "region_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "regions", - Column: "region_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "regions", - Column: "region_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "countries", - Column: "country_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "countries", - Column: "country_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "countries", - Column: "region_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "location_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "street_address", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "postal_code", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "city", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "state_province", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "country_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "departments", - Column: "department_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "departments", - Column: "department_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "departments", - Column: "location_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "jobs", - Column: "job_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "jobs", - Column: "job_title", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "jobs", - Column: "min_salary", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "jobs", - Column: "max_salary", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "dependent_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "first_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "last_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "relationship", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "employee_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "employee_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "first_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "last_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "email", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "phone_number", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "hire_date", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "job_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "salary", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "manager_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "department_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - }, - VirtualForeignKeys: []*mgmtv1alpha1.VirtualForeignConstraint{ - { - Schema: "vfk_hr", - Table: "countries", - Columns: []string{"region_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "regions", - Columns: []string{"region_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "departments", - Columns: []string{"location_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "locations", - Columns: []string{"location_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Columns: []string{"employee_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"employee_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"manager_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"employee_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"department_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "departments", - Columns: []string{"department_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"job_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "jobs", - Columns: []string{"job_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Columns: []string{"country_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "countries", - Columns: []string{"country_id"}, + + Destinations: []*mgmtv1alpha1.JobDestination{ + { + ConnectionId: "226add85-5751-4232-b085-a0ae93afc7ce", + }, }, - }, - }, - Destinations: []*mgmtv1alpha1.JobDestination{ - { - ConnectionId: "226add85-5751-4232-b085-a0ae93afc7ce", - }, - }, + Mappings: tt.JobMappings, + VirtualForeignKeys: tt.VirtualForeignKeys, + }}), nil }, - }), nil - }, - )) + )) - mux.Handle(mgmtv1alpha1connect.ConnectionServiceGetConnectionProcedure, connect.NewUnaryHandler( - mgmtv1alpha1connect.ConnectionServiceGetConnectionProcedure, - func(ctx context.Context, r *connect.Request[mgmtv1alpha1.GetConnectionRequest]) (*connect.Response[mgmtv1alpha1.GetConnectionResponse], error) { - if r.Msg.GetId() == "c9b6ce58-5c8e-4dce-870d-96841b19d988" { - return connect.NewResponse(&mgmtv1alpha1.GetConnectionResponse{ - Connection: &mgmtv1alpha1.Connection{ - Id: "c9b6ce58-5c8e-4dce-870d-96841b19d988", - Name: "source", - ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{ - Config: &mgmtv1alpha1.ConnectionConfig_PgConfig{ - PgConfig: &mgmtv1alpha1.PostgresConnectionConfig{ - ConnectionConfig: &mgmtv1alpha1.PostgresConnectionConfig_Url{ - Url: s.sourceDsn, + mux.Handle(mgmtv1alpha1connect.ConnectionServiceGetConnectionProcedure, connect.NewUnaryHandler( + mgmtv1alpha1connect.ConnectionServiceGetConnectionProcedure, + func(ctx context.Context, r *connect.Request[mgmtv1alpha1.GetConnectionRequest]) (*connect.Response[mgmtv1alpha1.GetConnectionResponse], error) { + if r.Msg.GetId() == "c9b6ce58-5c8e-4dce-870d-96841b19d988" { + return connect.NewResponse(&mgmtv1alpha1.GetConnectionResponse{ + Connection: &mgmtv1alpha1.Connection{ + Id: "c9b6ce58-5c8e-4dce-870d-96841b19d988", + Name: "source", + ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{ + Config: &mgmtv1alpha1.ConnectionConfig_PgConfig{ + PgConfig: &mgmtv1alpha1.PostgresConnectionConfig{ + ConnectionConfig: &mgmtv1alpha1.PostgresConnectionConfig_Url{ + Url: s.sourceDsn, + }, + }, }, }, }, - }, - }, - }), nil - } - if r.Msg.GetId() == "226add85-5751-4232-b085-a0ae93afc7ce" { - return connect.NewResponse(&mgmtv1alpha1.GetConnectionResponse{ - Connection: &mgmtv1alpha1.Connection{ - Id: "226add85-5751-4232-b085-a0ae93afc7ce", - Name: "target", - ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{ - Config: &mgmtv1alpha1.ConnectionConfig_PgConfig{ - PgConfig: &mgmtv1alpha1.PostgresConnectionConfig{ - ConnectionConfig: &mgmtv1alpha1.PostgresConnectionConfig_Url{ - Url: s.targetDsn, + }), nil + } + if r.Msg.GetId() == "226add85-5751-4232-b085-a0ae93afc7ce" { + return connect.NewResponse(&mgmtv1alpha1.GetConnectionResponse{ + Connection: &mgmtv1alpha1.Connection{ + Id: "226add85-5751-4232-b085-a0ae93afc7ce", + Name: "target", + ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{ + Config: &mgmtv1alpha1.ConnectionConfig_PgConfig{ + PgConfig: &mgmtv1alpha1.PostgresConnectionConfig{ + ConnectionConfig: &mgmtv1alpha1.PostgresConnectionConfig_Url{ + Url: s.targetDsn, + }, + }, }, }, }, - }, - }, - }), nil - } - return nil, nil - }, - )) - srv := startHTTPServer(s.T(), mux) - executeWorkflow(s.T(), srv, s.redisUrl, "115aaf2c-776e-4847-8268-d914e3c15968") + }), nil + } + return nil, nil + }, + )) + srv := startHTTPServer(s.T(), mux) + executeWorkflow(s.T(), srv, s.redisUrl, "115aaf2c-776e-4847-8268-d914e3c15968", tt.Name) - tables := []string{"regions", "countries", "locations", "departments", "dependents", "locations", "jobs", "employees"} - for _, t := range tables { - rows, err := s.targetPgPool.Query(s.ctx, fmt.Sprintf("select * from vfk_hr.%s;", t)) - require.NoError(s.T(), err) - for rows.Next() { - values, err := rows.Values() - count := 0 - for i := range values { - count = i + for table, expected := range tt.Expected { + rows, err := s.targetPgPool.Query(s.ctx, fmt.Sprintf("select * from %s;", table)) + require.NoError(s.T(), err) + count := 0 + for rows.Next() { + count++ + } + require.Equalf(s.T(), expected.RowCount, count, fmt.Sprintf("Test: %s Table: %s", tt.Name, table)) } - require.Greater(s.T(), count, 0) - require.NoError(s.T(), err) - } - } - s.TearDownTestByFolder("virtual-foreign-keys") + s.TearDownTestByFolder(tt.Folder) + }) + } } func (s *IntegrationTestSuite) Test_Workflow_VirtualForeignKeys_Transform() { - s.SetupTestByFolder("virtual-foreign-keys") + testFolder := "virtual-foreign-keys" + s.SetupSourceDb(testFolder, []string{"source-setup.sql"}) + s.SetupTargetDb(testFolder, []string{"target-setup.sql"}) + virtualForeignKeys := testdata_virtualforeignkeys.GetVirtualForeignKeys() + jobmappings := testdata_virtualforeignkeys.GetDefaultSyncJobMappings() + + for _, m := range jobmappings { + if m.Table == "countries" && m.Column == "country_id" { + m.Transformer = &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT, + Config: &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{ + TransformJavascriptConfig: &mgmtv1alpha1.TransformJavascript{Code: `if (value == 'US') { return 'SU'; } return value;`}, + }, + }, + } + } + } // neosync api mocks mux := http.NewServeMux() mux.Handle(mgmtv1alpha1connect.JobServiceGetJobProcedure, connect.NewUnaryHandler( @@ -482,357 +199,8 @@ func (s *IntegrationTestSuite) Test_Workflow_VirtualForeignKeys_Transform() { }, }, }, - Mappings: []*mgmtv1alpha1.JobMapping{ - { - Schema: "vfk_hr", - Table: "regions", - Column: "region_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "regions", - Column: "region_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "regions", - Column: "region_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "countries", - Column: "country_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT, - Config: &mgmtv1alpha1.TransformerConfig{ - Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{ - TransformJavascriptConfig: &mgmtv1alpha1.TransformJavascript{Code: `if (value == 'US') { return 'SU'; } return value;`}, - }, - }, - }, - }, - { - Schema: "vfk_hr", - Table: "countries", - Column: "country_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "countries", - Column: "region_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "location_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "street_address", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "postal_code", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "city", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "state_province", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Column: "country_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "departments", - Column: "department_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "departments", - Column: "department_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "departments", - Column: "location_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "jobs", - Column: "job_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "jobs", - Column: "job_title", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "jobs", - Column: "min_salary", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "jobs", - Column: "max_salary", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "dependent_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "first_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "last_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "relationship", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Column: "employee_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "employee_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "first_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "last_name", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "email", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "phone_number", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "hire_date", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "job_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "salary", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "manager_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Column: "department_id", - Transformer: &mgmtv1alpha1.JobMappingTransformer{ - Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, - }, - }, - }, - VirtualForeignKeys: []*mgmtv1alpha1.VirtualForeignConstraint{ - { - Schema: "vfk_hr", - Table: "countries", - Columns: []string{"region_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "regions", - Columns: []string{"region_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "departments", - Columns: []string{"location_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "locations", - Columns: []string{"location_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "dependents", - Columns: []string{"employee_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"employee_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"manager_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"employee_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"department_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "departments", - Columns: []string{"department_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "employees", - Columns: []string{"job_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "jobs", - Columns: []string{"job_id"}, - }, - }, - { - Schema: "vfk_hr", - Table: "locations", - Columns: []string{"country_id"}, - ForeignKey: &mgmtv1alpha1.VirtualForeignKey{ - Schema: "vfk_hr", - Table: "countries", - Columns: []string{"country_id"}, - }, - }, - }, + Mappings: jobmappings, + VirtualForeignKeys: virtualForeignKeys, Destinations: []*mgmtv1alpha1.JobDestination{ { ConnectionId: "226add85-5751-4232-b085-a0ae93afc7ce", @@ -884,24 +252,47 @@ func (s *IntegrationTestSuite) Test_Workflow_VirtualForeignKeys_Transform() { }, )) srv := startHTTPServer(s.T(), mux) - executeWorkflow(s.T(), srv, s.redisUrl, "fd4d8660-31a0-48b2-9adf-10f11b94898f") + executeWorkflow(s.T(), srv, s.redisUrl, "fd4d8660-31a0-48b2-9adf-10f11b94898f", "Virtual Foreign Key primary key transform") - tables := []string{"regions", "countries", "locations", "departments", "dependents", "locations", "jobs", "employees"} + tables := []string{"regions", "countries", "locations", "departments", "dependents", "jobs", "employees"} for _, t := range tables { rows, err := s.targetPgPool.Query(s.ctx, fmt.Sprintf("select * from vfk_hr.%s;", t)) require.NoError(s.T(), err) + count := 0 for rows.Next() { - values, err := rows.Values() - count := 0 - for i := range values { - count = i - } - require.Greater(s.T(), count, 0) - require.NoError(s.T(), err) + count++ } + require.Greater(s.T(), count, 0) + require.NoError(s.T(), err) } - s.TearDownTestByFolder("virtual-foreign-keys") + rows := s.sourcePgPool.QueryRow(s.ctx, "select count(*) from vfk_hr.countries where country_id = 'US';") + var rowCount int + err := rows.Scan(&rowCount) + require.NoError(s.T(), err) + require.Equal(s.T(), 1, rowCount) + + rows = s.sourcePgPool.QueryRow(s.ctx, "select count(*) from vfk_hr.locations where country_id = 'US';") + err = rows.Scan(&rowCount) + require.NoError(s.T(), err) + require.Equal(s.T(), 3, rowCount) + + rows = s.targetPgPool.QueryRow(s.ctx, "select count(*) from vfk_hr.countries where country_id = 'US';") + err = rows.Scan(&rowCount) + require.NoError(s.T(), err) + require.Equal(s.T(), 0, rowCount) + + rows = s.targetPgPool.QueryRow(s.ctx, "select count(*) from vfk_hr.countries where country_id = 'SU';") + err = rows.Scan(&rowCount) + require.NoError(s.T(), err) + require.Equal(s.T(), 1, rowCount) + + rows = s.targetPgPool.QueryRow(s.ctx, "select count(*) from vfk_hr.locations where country_id = 'SU';") + err = rows.Scan(&rowCount) + require.NoError(s.T(), err) + require.Equal(s.T(), 3, rowCount) + + s.TearDownTestByFolder(testFolder) } func executeWorkflow( @@ -909,6 +300,7 @@ func executeWorkflow( srv *httptest.Server, redisUrl string, jobId string, + testName string, ) { connclient := mgmtv1alpha1connect.NewConnectionServiceClient(srv.Client(), srv.URL) jobclient := mgmtv1alpha1connect.NewJobServiceClient(srv.Client(), srv.URL) @@ -955,10 +347,10 @@ func executeWorkflow( env.SetTestTimeout(120 * time.Second) // increase the test timeout env.ExecuteWorkflow(Workflow, &WorkflowRequest{JobId: jobId}) - require.True(t, env.IsWorkflowCompleted()) + require.Truef(t, env.IsWorkflowCompleted(), fmt.Sprintf("Workflow did not complete. Test: %s", testName)) err := env.GetWorkflowError() - require.Nil(t, err) + require.Nilf(t, err, fmt.Sprintf("Workflow error. Test: %s", testName)) } func startHTTPServer(tb testing.TB, h http.Handler) *httptest.Server { diff --git a/worker/pkg/workflows/pkg/workflows/datasync/workflow/testdata/double-reference/job_mappings.go b/worker/pkg/workflows/pkg/workflows/datasync/workflow/testdata/double-reference/job_mappings.go new file mode 100644 index 0000000000..8d63b48959 --- /dev/null +++ b/worker/pkg/workflows/pkg/workflows/datasync/workflow/testdata/double-reference/job_mappings.go @@ -0,0 +1,12 @@ + +package main + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func getDefaultJobMappings()[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + } +} + diff --git a/worker/pkg/workflows/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/job_mappings.go b/worker/pkg/workflows/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/job_mappings.go new file mode 100644 index 0000000000..8d63b48959 --- /dev/null +++ b/worker/pkg/workflows/pkg/workflows/datasync/workflow/testdata/virtual-foreign-keys/job_mappings.go @@ -0,0 +1,12 @@ + +package main + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func getDefaultJobMappings()[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + } +} +