From 373bfb2621765dd95c7003b943bc0fd734eac36e Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 7 Mar 2024 01:58:41 +0530 Subject: [PATCH 1/5] Clickhouse: specify columns more in insert select (#1442) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR is an attempt to fix #1439 , by explicitly specifying the columns in `select * from s3(...)` in Qrep for Clickhouse. Functionally tested Co-authored-by: Philip Dubé --- flow/connectors/clickhouse/qrep_avro_sync.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index f4631ca4f4..2a6d1f217e 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -130,12 +130,12 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( continue } - selector = append(selector, colName) + selector = append(selector, "`"+colName+"`") } selectorStr := strings.Join(selector, ",") //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s(%s) SELECT * FROM s3('%s','%s','%s', 'Avro')", - config.DestinationTableIdentifier, selectorStr, avroFileUrl, + query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s', 'Avro')", + config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) From 6b47812b9ff398a6018719b503c7478867a83ce9 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Thu, 7 Mar 2024 05:01:07 +0530 Subject: [PATCH 2/5] fix(telemetry): issue with aws sns (#1444) `subject[:100]` is byte based, not character based, causing AWS to reject control characters Properly limit to 100 characters, rather than 100 bytes --- flow/shared/telemetry/sns_message_sender.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 42bdd026a7..67cf3eebae 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/hex" "strings" + "unicode" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -37,7 +38,19 @@ func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, h := sha256.New() h.Write([]byte(deduplicationString)) deduplicationHash := hex.EncodeToString(h.Sum(nil)) - + // AWS SNS Subject constraints + var messageSubjectBuilder strings.Builder + maxSubjectSize := 99 + for currentLength, char := range subject { + if unicode.IsPrint(char) { + messageSubjectBuilder.WriteRune(char) + } else { + messageSubjectBuilder.WriteRune(' ') + } + if currentLength > maxSubjectSize { + break + } + } publish, err := s.client.Publish(ctx, &sns.PublishInput{ Message: aws.String(body), MessageAttributes: map[string]types.MessageAttributeValue{ @@ -66,7 +79,7 @@ func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, StringValue: aws.String(deduplicationHash), }, }, - Subject: aws.String(subject[:100]), + Subject: aws.String(messageSubjectBuilder.String()), TopicArn: aws.String(s.topic), }) if err != nil { From b2135280ae797fdafa80ea88d556a72004f37931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 7 Mar 2024 14:02:07 +0000 Subject: [PATCH 3/5] POC: generic Test_Simple_Flow (#1440) Lay out basic design for generic connector testing, replacing `Test_Simple_Flow` with a single definition used for PG/SF/BQ More can be ported in followup PRs, so when new connectors are added they only implement `GenericSuite` to have e2e tests Also fix hstore comparison so it works with Snowflake --- .github/workflows/flow.yml | 4 +- .../postgres/postgres_schema_delta_test.go | 40 ++--- flow/e2e/bigquery/bigquery.go | 102 ++++++++++++ flow/e2e/bigquery/peer_flow_bq_test.go | 149 +---------------- flow/e2e/bigquery/qrep_flow_bq_test.go | 11 +- flow/e2e/congen.go | 36 ++-- flow/e2e/generic/peer_flow_test.go | 82 +++++++++ flow/e2e/postgres/peer_flow_pg_test.go | 48 +----- flow/e2e/postgres/postgres.go | 79 +++++++++ flow/e2e/postgres/qrep_flow_pg_test.go | 51 +----- flow/e2e/s3/qrep_flow_s3_test.go | 30 ++-- flow/e2e/snowflake/peer_flow_sf_test.go | 155 +----------------- flow/e2e/snowflake/snowflake.go | 110 +++++++++++++ .../snowflake/snowflake_schema_delta_test.go | 10 +- .../e2e/sqlserver/qrep_flow_sqlserver_test.go | 24 +-- flow/e2e/test_utils.go | 37 ++++- flow/e2eshared/e2eshared.go | 8 +- flow/model/qvalue/qvalue.go | 6 +- flow/workflows/cdc_flow.go | 1 - flow/workflows/setup_flow.go | 2 +- 20 files changed, 495 insertions(+), 490 deletions(-) create mode 100644 flow/e2e/bigquery/bigquery.go create mode 100644 flow/e2e/generic/peer_flow_test.go create mode 100644 flow/e2e/postgres/postgres.go create mode 100644 flow/e2e/snowflake/snowflake.go diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 5f6899b15b..38bc636f51 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -12,7 +12,7 @@ jobs: matrix: runner: [ubicloud-standard-16-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} - timeout-minutes: 40 + timeout-minutes: 30 services: catalog: image: imresamu/postgis:15-3.4-alpine @@ -96,7 +96,7 @@ jobs: temporal operator search-attribute create --name MirrorName --type Text --namespace default ./peer-flow worker & ./peer-flow snapshot-worker & - go test -p 32 ./... -timeout 1200s + go test -p 32 ./... -timeout 900s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index c059c36836..e5603b02d9 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -196,23 +196,25 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PostgresSchemaDeltaTestSuite) { - teardownTx, err := s.connector.conn.Begin(context.Background()) - require.NoError(s.t, err) - defer func() { - err := teardownTx.Rollback(context.Background()) - if err != pgx.ErrTxClosed { - require.NoError(s.t, err) - } - }() - _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", - s.schema)) - require.NoError(s.t, err) - err = teardownTx.Commit(context.Background()) - require.NoError(s.t, err) - - require.NoError(s.t, s.connector.ConnectionActive(context.Background())) - require.NoError(s.t, s.connector.Close()) - require.Error(s.t, s.connector.ConnectionActive(context.Background())) - }) + e2eshared.RunSuite(t, SetupSuite) +} + +func (s PostgresSchemaDeltaTestSuite) Teardown() { + teardownTx, err := s.connector.conn.Begin(context.Background()) + require.NoError(s.t, err) + defer func() { + err := teardownTx.Rollback(context.Background()) + if err != pgx.ErrTxClosed { + require.NoError(s.t, err) + } + }() + _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", + s.schema)) + require.NoError(s.t, err) + err = teardownTx.Commit(context.Background()) + require.NoError(s.t, err) + + require.NoError(s.t, s.connector.ConnectionActive(context.Background())) + require.NoError(s.t, s.connector.Close()) + require.Error(s.t, s.connector.ConnectionActive(context.Background())) } diff --git a/flow/e2e/bigquery/bigquery.go b/flow/e2e/bigquery/bigquery.go new file mode 100644 index 0000000000..73f5c38d6e --- /dev/null +++ b/flow/e2e/bigquery/bigquery.go @@ -0,0 +1,102 @@ +package e2e_bigquery + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuiteBQ struct { + t *testing.T + + bqSuffix string + conn *connpostgres.PostgresConnector + bqHelper *BigQueryTestHelper +} + +func (s PeerFlowE2ETestSuiteBQ) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn { + return s.conn.Conn() +} + +func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s PeerFlowE2ETestSuiteBQ) Suffix() string { + return s.bqSuffix +} + +func (s PeerFlowE2ETestSuiteBQ) Peer() *protos.Peer { + return s.bqHelper.Peer +} + +func (s PeerFlowE2ETestSuiteBQ) DestinationTable(table string) string { + return table +} + +func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) + bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) + s.t.Logf("running query on bigquery: %s", bqSelQuery) + return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) +} + +func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) + bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where) + s.t.Logf("running query on bigquery: %s", bqSelQuery) + return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) +} + +func (s PeerFlowE2ETestSuiteBQ) Teardown() { + e2e.TearDownPostgres(s) + + err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId) + if err != nil { + s.t.Fatalf("failed to tear down bigquery: %v", err) + } +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { + t.Helper() + + suffix := shared.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) + conn, err := e2e.SetupPostgres(t, bqSuffix) + if err != nil || conn == nil { + t.Fatalf("failed to setup postgres: %v", err) + } + + bqHelper, err := NewBigQueryTestHelper() + if err != nil { + t.Fatalf("Failed to create helper: %v", err) + } + + err = bqHelper.RecreateDataset() + if err != nil { + t.Fatalf("Failed to recreate dataset: %v", err) + } + + return PeerFlowE2ETestSuiteBQ{ + t: t, + bqSuffix: bqSuffix, + conn: conn, + bqHelper: bqHelper, + } +} diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 14e95deda2..ec28b5f97b 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -4,76 +4,23 @@ import ( "context" "errors" "fmt" - "strings" "testing" "time" "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) -type PeerFlowE2ETestSuiteBQ struct { - t *testing.T - - bqSuffix string - conn *connpostgres.PostgresConnector - bqHelper *BigQueryTestHelper -} - -func (s PeerFlowE2ETestSuiteBQ) T() *testing.T { - return s.t -} - -func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn { - return s.conn.Conn() -} - -func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector { - return s.conn -} - -func (s PeerFlowE2ETestSuiteBQ) Suffix() string { - return s.bqSuffix -} - -func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) { - s.t.Helper() - qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) - s.t.Logf("running query on bigquery: %s", bqSelQuery) - return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) -} - -func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) { - s.t.Helper() - qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where) - s.t.Logf("running query on bigquery: %s", bqSelQuery) - return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) -} - func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) { - e2e.TearDownPostgres(s) - - err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId) - if err != nil { - s.t.Fatalf("failed to tear down bigquery: %v", err) - } - }) + e2eshared.RunSuite(t, SetupSuite) } func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, value string) error { @@ -147,52 +94,6 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel return nil } -// setupBigQuery sets up the bigquery connection. -func setupBigQuery(t *testing.T) *BigQueryTestHelper { - t.Helper() - - bqHelper, err := NewBigQueryTestHelper() - if err != nil { - t.Fatalf("Failed to create helper: %v", err) - } - - err = bqHelper.RecreateDataset() - if err != nil { - t.Fatalf("Failed to recreate dataset: %v", err) - } - - return bqHelper -} - -// Implement SetupAllSuite interface to setup the test suite -func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { - t.Helper() - - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - - suffix := shared.RandomString(8) - tsSuffix := time.Now().Format("20060102150405") - bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) - conn, err := e2e.SetupPostgres(t, bqSuffix) - if err != nil || conn == nil { - t.Fatalf("failed to setup postgres: %v", err) - } - - bq := setupBigQuery(t) - - return PeerFlowE2ETestSuiteBQ{ - t: t, - bqSuffix: bqSuffix, - conn: conn, - bqHelper: bq, - } -} - func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { tc := e2e.NewTemporalClient(s.t) @@ -268,54 +169,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { e2e.RequireEnvCanceled(s.t, env) } -// Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. -// The test inserts 10 rows into the source table and verifies that the data is -// correctly synced to the destination table after sync flow completes. -func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { - tc := e2e.NewTemporalClient(s.t) - - srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") - dstTableName := "test_simple_flow_bq" - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - require.NoError(s.t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_complete_simple_flow"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.bqHelper.Peer, - CdcStagingPath: "", - } - - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 100 - - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - - // insert 10 rows into the source table - for i := range 10 { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - e2e.EnvNoError(s.t, env, err) - } - s.t.Log("Inserted 10 rows into the source table") - - e2e.EnvWaitForEqualTables(env, s, "normalize inserts", dstTableName, "id,key,value") - - env.Cancel() - e2e.RequireEnvCanceled(s.t, env) -} - func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { tc := e2e.NewTemporalClient(s.t) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index c7f6a5c7f8..ae0841f153 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -45,14 +45,9 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) { rows = append(rows, row) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO e2e_test_%s.%s ( - watermark_ts, - mytimestamp, - mytztimestamp, - medieval, - mybaddate, - mydate - ) VALUES %s; + INSERT INTO e2e_test_%s.%s ( + watermark_ts, mytimestamp, mytztimestamp, medieval, mybaddate, mydate + ) VALUES %s; `, s.bqSuffix, tableName, strings.Join(rows, ","))) require.NoError(s.t, err) } diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 82387c9ff3..2fd3c180e5 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -162,32 +162,36 @@ func GeneratePostgresPeer() *protos.Peer { type FlowConnectionGenerationConfig struct { FlowJobName string + TableMappings []*protos.TableMapping TableNameMapping map[string]string Destination *protos.Peer CdcStagingPath string SoftDelete bool } -// GenerateSnowflakePeer generates a snowflake peer config for testing. -func GenerateSnowflakePeer(snowflakeConfig *protos.SnowflakeConfig) (*protos.Peer, error) { - ret := &protos.Peer{} - ret.Name = "test_snowflake_peer" - ret.Type = protos.DBType_SNOWFLAKE - - ret.Config = &protos.Peer_SnowflakeConfig{ - SnowflakeConfig: snowflakeConfig, +func TableMappings(s GenericSuite, tables ...string) []*protos.TableMapping { + if len(tables)&1 != 0 { + panic("must receive even number of table names") } - - return ret, nil + tm := make([]*protos.TableMapping, 0, len(tables)/2) + for i := 0; i < len(tables); i += 2 { + tm = append(tm, &protos.TableMapping{ + SourceTableIdentifier: AttachSchema(s, tables[i]), + DestinationTableIdentifier: s.DestinationTable(tables[i+1]), + }) + } + return tm } func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() *protos.FlowConnectionConfigs { - tblMappings := []*protos.TableMapping{} - for k, v := range c.TableNameMapping { - tblMappings = append(tblMappings, &protos.TableMapping{ - SourceTableIdentifier: k, - DestinationTableIdentifier: v, - }) + tblMappings := c.TableMappings + if tblMappings == nil { + for k, v := range c.TableNameMapping { + tblMappings = append(tblMappings, &protos.TableMapping{ + SourceTableIdentifier: k, + DestinationTableIdentifier: v, + }) + } } ret := &protos.FlowConnectionConfigs{ diff --git a/flow/e2e/generic/peer_flow_test.go b/flow/e2e/generic/peer_flow_test.go new file mode 100644 index 0000000000..20c5847df4 --- /dev/null +++ b/flow/e2e/generic/peer_flow_test.go @@ -0,0 +1,82 @@ +package e2e_generic + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/bigquery" + "github.com/PeerDB-io/peer-flow/e2e/postgres" + "github.com/PeerDB-io/peer-flow/e2e/snowflake" + "github.com/PeerDB-io/peer-flow/e2eshared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +func TestGenericPG(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite)) +} + +func TestGenericSF(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite)) +} + +func TestGenericBQ(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite)) +} + +type Generic struct { + e2e.GenericSuite +} + +func SetupGenericSuite[T e2e.GenericSuite](f func(t *testing.T) T) func(t *testing.T) Generic { + return func(t *testing.T) Generic { + t.Helper() + return Generic{f(t)} + } +} + +func (s Generic) Test_Simple_Flow() { + t := s.T() + srcTable := "test_simple" + dstTable := "test_simple_dst" + srcSchemaTable := e2e.AttachSchema(s, srcTable) + + _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL, + myh HSTORE NOT NULL + ); + `, srcSchemaTable)) + require.NoError(t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "test_simple"), + TableMappings: e2e.TableMappings(s, srcTable, dstTable), + Destination: s.Peer(), + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + + tc := e2e.NewTemporalClient(t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + + e2e.SetupCDCFlowStatusQuery(t, env, connectionGen) + // insert 10 rows into the source table + for i := range 10 { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') + `, srcSchemaTable), testKey, testValue) + e2e.EnvNoError(t, env, err) + } + t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,key,value,myh`) + env.Cancel() + e2e.RequireEnvCanceled(t, env) +} diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index fa55bbb3fc..2e69376b01 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -77,52 +77,6 @@ func (s PeerFlowE2ETestSuitePG) WaitForSchema( }) } -func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { - srcTableName := s.attachSchemaSuffix("test_simple_flow") - dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL, - myh HSTORE NOT NULL - ); - `, srcTableName)) - require.NoError(s.t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.peer, - } - - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 100 - - tc := e2e.NewTemporalClient(s.t) - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - // insert 10 rows into the source table - for i := range 10 { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') - `, srcTableName), testKey, testValue) - e2e.EnvNoError(s.t, env, err) - } - s.t.Log("Inserted 10 rows into the source table") - - e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize 10 rows", func() bool { - return s.comparePGTables(srcTableName, dstTableName, "id,key,value") == nil - }) - env.Cancel() - - e2e.RequireEnvCanceled(s.t, env) -} - func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { srcTableName := s.attachSchemaSuffix("test_geospatial_pg") dstTableName := s.attachSchemaSuffix("test_geospatial_pg_dst") @@ -1097,7 +1051,7 @@ func (s PeerFlowE2ETestSuitePG) Test_ContinueAsNew() { } s.t.Log("Inserted 144 rows into the source table") - e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize 72 syncs", func() bool { + e2e.EnvWaitFor(s.t, env, 4*time.Minute, "normalize 72 syncs", func() bool { return s.comparePGTables(srcTableName, dstTableName, "id,key,value") == nil }) env.Cancel() diff --git a/flow/e2e/postgres/postgres.go b/flow/e2e/postgres/postgres.go new file mode 100644 index 0000000000..23ca778c8d --- /dev/null +++ b/flow/e2e/postgres/postgres.go @@ -0,0 +1,79 @@ +package e2e_postgres + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuitePG struct { + t *testing.T + + conn *connpostgres.PostgresConnector + peer *protos.Peer + suffix string +} + +func (s PeerFlowE2ETestSuitePG) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn { + return s.conn.Conn() +} + +func (s PeerFlowE2ETestSuitePG) Suffix() string { + return s.suffix +} + +func (s PeerFlowE2ETestSuitePG) Peer() *protos.Peer { + return s.peer +} + +func (s PeerFlowE2ETestSuitePG) DestinationTable(table string) string { + return e2e.AttachSchema(s, table) +} + +func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error) { + s.t.Helper() + pgQueryExecutor := s.conn.NewQRepQueryExecutor("testflow", "testpart") + pgQueryExecutor.SetTestEnv(true) + + return pgQueryExecutor.ExecuteAndProcessQuery( + context.Background(), + fmt.Sprintf(`SELECT %s FROM e2e_test_%s.%s ORDER BY id`, cols, s.suffix, connpostgres.QuoteIdentifier(table)), + ) +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { + t.Helper() + + suffix := "pg_" + strings.ToLower(shared.RandomString(8)) + conn, err := e2e.SetupPostgres(t, suffix) + require.NoError(t, err, "failed to setup postgres") + + return PeerFlowE2ETestSuitePG{ + t: t, + conn: conn, + peer: e2e.GeneratePostgresPeer(), + suffix: suffix, + } +} + +func (s PeerFlowE2ETestSuitePG) Teardown() { + e2e.TearDownPostgres(s) +} diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index fb49d3d242..abb7867d24 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -10,7 +10,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -21,56 +20,8 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -type PeerFlowE2ETestSuitePG struct { - t *testing.T - - conn *connpostgres.PostgresConnector - peer *protos.Peer - suffix string -} - -func (s PeerFlowE2ETestSuitePG) T() *testing.T { - return s.t -} - -func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn { - return s.conn.Conn() -} - -func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector { - return s.conn -} - -func (s PeerFlowE2ETestSuitePG) Suffix() string { - return s.suffix -} - func TestPeerFlowE2ETestSuitePG(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuitePG) { - e2e.TearDownPostgres(s) - }) -} - -func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { - t.Helper() - - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - - suffix := "pg_" + strings.ToLower(shared.RandomString(8)) - conn, err := e2e.SetupPostgres(t, suffix) - require.NoError(t, err, "failed to setup postgres") - - return PeerFlowE2ETestSuitePG{ - t: t, - conn: conn, - peer: e2e.GeneratePostgresPeer(), - suffix: suffix, - } + e2eshared.RunSuite(t, SetupSuite) } func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 240f3a78b9..d59378d52c 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -36,21 +35,12 @@ func (s PeerFlowE2ETestSuiteS3) Suffix() string { return s.suffix } -func tearDownSuite(s PeerFlowE2ETestSuiteS3) { - e2e.TearDownPostgres(s) - - err := s.s3Helper.CleanUp(context.Background()) - if err != nil { - require.Fail(s.t, "failed to clean up s3", err) - } -} - func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - e2eshared.RunSuite(t, SetupSuiteS3, tearDownSuite) + e2eshared.RunSuite(t, SetupSuiteS3) } func TestPeerFlowE2ETestSuiteGCS(t *testing.T) { - e2eshared.RunSuite(t, SetupSuiteGCS, tearDownSuite) + e2eshared.RunSuite(t, SetupSuiteGCS) } func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { @@ -63,13 +53,6 @@ func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { t.Helper() - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - suffix := "s3_" + strings.ToLower(shared.RandomString(8)) conn, err := e2e.SetupPostgres(t, suffix) if err != nil || conn == nil { @@ -89,6 +72,15 @@ func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { } } +func (s PeerFlowE2ETestSuiteS3) Teardown() { + e2e.TearDownPostgres(s) + + err := s.s3Helper.CleanUp(context.Background()) + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } +} + func SetupSuiteS3(t *testing.T) PeerFlowE2ETestSuiteS3 { t.Helper() return setupSuite(t, false) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9eaf491e47..525d2c7256 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -9,176 +9,27 @@ import ( "time" "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" - connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) -type PeerFlowE2ETestSuiteSF struct { - t *testing.T - - pgSuffix string - conn *connpostgres.PostgresConnector - sfHelper *SnowflakeTestHelper - connector *connsnowflake.SnowflakeConnector -} - -func (s PeerFlowE2ETestSuiteSF) T() *testing.T { - return s.t -} - -func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector { - return s.conn -} - -func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { - return s.Connector().Conn() -} - -func (s PeerFlowE2ETestSuiteSF) Suffix() string { - return s.pgSuffix -} - -func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error) { - s.t.Helper() - qualifiedTableName := fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) - sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) - s.t.Logf("running query on snowflake: %s", sfSelQuery) - return s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) -} - func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { - e2e.TearDownPostgres(s) - - if s.sfHelper != nil { - err := s.sfHelper.Cleanup() - if err != nil { - s.t.Fatalf("failed to tear down Snowflake: %v", err) - } - } - - err := s.connector.Close() - if err != nil { - s.t.Fatalf("failed to close Snowflake connector: %v", err) - } - }) + e2eshared.RunSuite(t, SetupSuite) } func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tableName) + return e2e.AttachSchema(s, tableName) } func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s.pgSuffix) -} - -func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { - t.Helper() - - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - - suffix := shared.RandomString(8) - tsSuffix := time.Now().Format("20060102150405") - pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) - - conn, err := e2e.SetupPostgres(t, pgSuffix) - if err != nil || conn == nil { - t.Fatalf("failed to setup Postgres: %v", err) - } - - sfHelper, err := NewSnowflakeTestHelper() - if err != nil { - t.Fatalf("failed to setup Snowflake: %v", err) - } - - connector, err := connsnowflake.NewSnowflakeConnector( - context.Background(), - sfHelper.Config, - ) - require.NoError(t, err) - - suite := PeerFlowE2ETestSuiteSF{ - t: t, - pgSuffix: pgSuffix, - conn: conn, - sfHelper: sfHelper, - connector: connector, - } - - return suite -} - -func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { - tc := e2e.NewTemporalClient(s.t) - - tableName := "test_simple_flow_sf" - srcTableName := s.attachSchemaSuffix(tableName) - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - require.NoError(s.t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix(tableName), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.sfHelper.Peer, - } - - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 100 - - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - // wait for PeerFlowStatusQuery to finish setup - // and then insert 20 rows into the source table - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - // insert 20 rows into the source table - for i := range 20 { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - e2e.EnvNoError(s.t, env, err) - } - s.t.Log("Inserted 20 rows into the source table") - e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,key,value") - - env.Cancel() - - e2e.RequireEnvCanceled(s.t, env) - - // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago - // it should match the count. - newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE' - `, dstTableName) - numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - require.NoError(s.t, err) - require.Equal(s.t, 20, numNewRows) + return e2e.AddSuffix(s, input) } func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { diff --git a/flow/e2e/snowflake/snowflake.go b/flow/e2e/snowflake/snowflake.go new file mode 100644 index 0000000000..45132ef601 --- /dev/null +++ b/flow/e2e/snowflake/snowflake.go @@ -0,0 +1,110 @@ +package e2e_snowflake + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuiteSF struct { + t *testing.T + + pgSuffix string + conn *connpostgres.PostgresConnector + sfHelper *SnowflakeTestHelper + connector *connsnowflake.SnowflakeConnector +} + +func (s PeerFlowE2ETestSuiteSF) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { + return s.Connector().Conn() +} + +func (s PeerFlowE2ETestSuiteSF) Suffix() string { + return s.pgSuffix +} + +func (s PeerFlowE2ETestSuiteSF) Peer() *protos.Peer { + return s.sfHelper.Peer +} + +func (s PeerFlowE2ETestSuiteSF) DestinationTable(table string) string { + return fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, table) +} + +func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) + sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) + s.t.Logf("running query on snowflake: %s", sfSelQuery) + return s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { + t.Helper() + + suffix := shared.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) + + conn, err := e2e.SetupPostgres(t, pgSuffix) + if err != nil || conn == nil { + t.Fatalf("failed to setup Postgres: %v", err) + } + + sfHelper, err := NewSnowflakeTestHelper() + if err != nil { + t.Fatalf("failed to setup Snowflake: %v", err) + } + + connector, err := connsnowflake.NewSnowflakeConnector( + context.Background(), + sfHelper.Config, + ) + require.NoError(t, err) + + suite := PeerFlowE2ETestSuiteSF{ + t: t, + pgSuffix: pgSuffix, + conn: conn, + sfHelper: sfHelper, + connector: connector, + } + + return suite +} + +func (s PeerFlowE2ETestSuiteSF) Teardown() { + e2e.TearDownPostgres(s) + + if s.sfHelper != nil { + err := s.sfHelper.Cleanup() + if err != nil { + s.t.Fatalf("failed to tear down Snowflake: %v", err) + } + } + + err := s.connector.Close() + if err != nil { + s.t.Fatalf("failed to close Snowflake connector: %v", err) + } +} diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index 68c70e56aa..d607e90451 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -304,9 +304,11 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } +func (s SnowflakeSchemaDeltaTestSuite) Teardown() { + require.NoError(s.t, s.sfTestHelper.Cleanup()) + require.NoError(s.t, s.connector.Close()) +} + func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { - e2eshared.RunSuite(t, setupSchemaDeltaSuite, func(s SnowflakeSchemaDeltaTestSuite) { - require.NoError(s.t, s.sfTestHelper.Cleanup()) - require.NoError(s.t, s.connector.Close()) - }) + e2eshared.RunSuite(t, setupSchemaDeltaSuite) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 448d0f32b7..153fa07cc6 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -11,7 +11,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -48,26 +47,21 @@ func (s PeerFlowE2ETestSuiteSQLServer) Suffix() string { } func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSQLServer) { - e2e.TearDownPostgres(s) + e2eshared.RunSuite(t, SetupSuite) +} - if s.sqlsHelper != nil { - err := s.sqlsHelper.CleanUp() - require.NoError(s.t, err) - } - }) +func (s PeerFlowE2ETestSuiteSQLServer) Teardown() { + e2e.TearDownPostgres(s) + + if s.sqlsHelper != nil { + err := s.sqlsHelper.CleanUp() + require.NoError(s.t, err) + } } func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSQLServer { t.Helper() - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - suffix := "sqls_" + strings.ToLower(shared.RandomString(8)) conn, err := e2e.SetupPostgres(t, suffix) if err != nil { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 8592eb406e..c018f32df2 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -15,6 +15,7 @@ import ( "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" + "github.com/joho/godotenv" "github.com/stretchr/testify/require" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/client" @@ -33,7 +34,14 @@ import ( peerflow "github.com/PeerDB-io/peer-flow/workflows" ) +func init() { + // it's okay if the .env file is not present + // we will use the default values + _ = godotenv.Load() +} + type Suite interface { + e2eshared.Suite T() *testing.T Connector() *connpostgres.PostgresConnector Suffix() string @@ -44,6 +52,20 @@ type RowSource interface { GetRows(table, cols string) (*model.QRecordBatch, error) } +type GenericSuite interface { + RowSource + Peer() *protos.Peer + DestinationTable(table string) string +} + +func AttachSchema(s Suite, table string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.Suffix(), table) +} + +func AddSuffix(s Suite, str string) string { + return fmt.Sprintf("%s_%s", str, s.Suffix()) +} + // Helper function to assert errors in go routines running concurrent to workflows // This achieves two goals: // 1. cancel workflow to avoid waiting on goroutine which has failed @@ -129,11 +151,13 @@ func EnvWaitForEqualTablesWithNames( pgRows, err := GetPgRows(suite.Connector(), suite.Suffix(), srcTable, cols) if err != nil { + t.Log(err) return false } rows, err := suite.GetRows(dstTable, cols) if err != nil { + t.Log(err) return false } @@ -163,18 +187,21 @@ func SetupCDCFlowStatusQuery(t *testing.T, env WorkflowRun, connectionGen FlowCo for { time.Sleep(time.Second) counter++ - response, err := env.Query(shared.CDCFlowStateQuery, connectionGen.FlowJobName) + response, err := env.Query(shared.FlowStatusQuery, connectionGen.FlowJobName) if err == nil { - var state peerflow.CDCFlowWorkflowState - err = response.Get(&state) + var status protos.FlowStatus + err = response.Get(&status) if err != nil { t.Fatal(err) - } else if state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING { + } else if status == protos.FlowStatus_STATUS_RUNNING { return + } else if counter > 30 { + env.Cancel() + t.Fatal("UNEXPECTED STATUS TIMEOUT", status) } } else if counter > 15 { env.Cancel() - t.Fatal("UNEXPECTED SETUP CDC TIMEOUT", err.Error()) + t.Fatal("UNEXPECTED STATUS QUERY TIMEOUT", err.Error()) } else if counter > 5 { // log the error for informational purposes t.Log(err.Error()) diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index b242ebb1ea..087ff58014 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -12,7 +12,11 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) { +type Suite interface { + Teardown() +} + +func RunSuite[T Suite](t *testing.T, setup func(t *testing.T) T) { t.Helper() t.Parallel() @@ -26,7 +30,7 @@ func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) subtest.Parallel() suite := setup(subtest) subtest.Cleanup(func() { - teardown(suite) + suite.Teardown() }) m.Func.Call([]reflect.Value{reflect.ValueOf(suite)}) }) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 972063d494..013e0ca9e1 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -8,6 +8,7 @@ import ( "math/big" "reflect" "strconv" + "strings" "time" "cloud.google.com/go/civil" @@ -271,11 +272,14 @@ func compareHstore(value1, value2 interface{}) bool { } return string(bytes) == str2 case string: + if v1 == str2 { + return true + } parsedHStore1, err := hstore_util.ParseHstore(v1) if err != nil { panic(err) } - return parsedHStore1 == str2 + return parsedHStore1 == strings.ReplaceAll(strings.ReplaceAll(str2, " ", ""), "\n", "") default: panic(fmt.Sprintf("invalid hstore value type %T: %v", value1, value1)) } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 14ca09092d..bab81b3b23 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -41,7 +41,6 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping)) } return &CDCFlowWorkflowState{ - // 1 more than the limit of 10 ActiveSignal: model.NoopSignal, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, FlowConfigUpdate: nil, diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index ccb699d8a4..ac3e66f93c 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -227,7 +227,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( } normalizedTableMapping[normalizedTableName] = tableSchema - s.logger.Info("normalized table schema: ", normalizedTableName, " -> ", tableSchema) + s.logger.Info("normalized table schema", slog.String("table", normalizedTableName), slog.Any("schema", tableSchema)) } // now setup the normalized tables on the destination peer From d8a7bd61046b9463846e793dc686c55ec4f2612e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 7 Mar 2024 14:56:43 +0000 Subject: [PATCH 4/5] Fix cases of !BADKEY in datadog (#1443) Explicitly passing flow name in no longer necessary after #1357 --- flow/activities/flowable.go | 10 +++++----- flow/activities/snapshot_activity.go | 6 +++--- flow/cmd/mirror_status.go | 3 ++- flow/workflows/cdc_flow.go | 4 ++-- flow/workflows/qrep_flow.go | 6 +++--- flow/workflows/setup_flow.go | 2 +- flow/workflows/snapshot_flow.go | 10 +++++----- flow/workflows/xmin_flow.go | 2 +- 8 files changed, 22 insertions(+), 21 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 3900d07385..5d9c4e690f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -301,7 +301,7 @@ func (a *FlowableActivity) SyncFlow( } shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "transferring records for job - " + flowName + return "transferring records for job" }) defer shutdown() @@ -474,7 +474,7 @@ func (a *FlowableActivity) StartNormalize( defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName + return "normalizing records from batch for job" }) defer shutdown() @@ -542,7 +542,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, defer connectors.CloseConnector(ctx, srcConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "getting partitions for job - " + config.FlowJobName + return "getting partitions for job" }) defer shutdown() @@ -725,7 +725,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "consolidating partitions for job - " + config.FlowJobName + return "consolidating partitions for job" }) defer shutdown() @@ -980,7 +980,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "renaming tables for job - " + config.FlowJobName + return "renaming tables for job" }) defer shutdown() diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 262d3d0dbd..84df6ecfab 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -32,7 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s connectors.CloseConnector(ctx, s.connector) delete(a.SnapshotConnections, flowJobName) } - a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName) + a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job") return nil } @@ -50,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } - a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName) + a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job") conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { @@ -84,7 +84,7 @@ func (a *SnapshotActivity) SetupReplication( var slotInfo connpostgres.SlotCreationResult select { case slotInfo = <-slotSignal.SlotCreated: - logger.Info("slot created", slotInfo.SlotName) + logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName)) case err := <-replicationErr: closeConnectionForError(err) return nil, fmt.Errorf("failed to setup replication: %w", err) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index e8160277f5..f72cd5311e 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -144,7 +144,8 @@ func (h *FlowRequestHandler) cloneTableSummary( rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%") if err != nil { - slog.Error("unable to query initial load partition - "+flowJobName, slog.Any("error", err)) + slog.Error("unable to query initial load partition", + slog.String(string(shared.FlowNameKey), flowJobName), slog.Any("error", err)) return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err) } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index bab81b3b23..93e1d4659b 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -236,7 +236,7 @@ func CDCFlowWorkflow( for state.ActiveSignal == model.PauseSignal { // only place we block on receive, so signal processing is immediate for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil { - logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + logger.Info(fmt.Sprintf("mirror has been paused for %s", time.Since(startTime).Round(time.Second))) selector.Select(ctx) } if err := ctx.Err(); err != nil { @@ -255,7 +255,7 @@ func CDCFlowWorkflow( } } - logger.Info("mirror has been resumed after ", time.Since(startTime)) + logger.Info(fmt.Sprintf("mirror has been resumed after %s", time.Since(startTime).Round(time.Second))) state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 839eab9ddc..2f0124e717 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -119,7 +119,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex // fetch the schema for the watermark table watermarkTableSchema, err := q.getTableSchema(ctx, q.config.WatermarkTable) if err != nil { - q.logger.Error("failed to fetch schema for watermark table: ", err) + q.logger.Error("failed to fetch schema for watermark table", slog.Any("error", err)) return fmt.Errorf("failed to fetch schema for watermark table: %w", err) } @@ -161,7 +161,7 @@ func (q *QRepFlowExecution) GetPartitions( return nil, fmt.Errorf("failed to fetch partitions to replicate: %w", err) } - q.logger.Info("partitions to replicate - ", slog.Int("num_partitions", len(partitions.Partitions))) + q.logger.Info("partitions to replicate", slog.Int("num_partitions", len(partitions.Partitions))) return partitions, nil } @@ -439,7 +439,7 @@ func QRepFlowWorkflow( state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED for q.activeSignal == model.PauseSignal { - logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + logger.Info(fmt.Sprintf("mirror has been paused for %s", time.Since(startTime).Round(time.Second))) // only place we block on receive, so signal processing is immediate val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute) if ok { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index ac3e66f93c..0574f0d24e 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -192,7 +192,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( var tblSchemaOutput *protos.GetTableSchemaBatchOutput if err := future.Get(ctx, &tblSchemaOutput); err != nil { - s.logger.Error("failed to fetch schema for source tables: ", err) + s.logger.Error("failed to fetch schema for source tables", slog.Any("error", err)) return nil, fmt.Errorf("failed to fetch schema for source table %s: %w", sourceTables, err) } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 050bc604e5..ce9ab27d78 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -30,7 +30,7 @@ func (s *SnapshotFlowExecution) setupReplication( ctx workflow.Context, ) (*protos.SetupReplicationOutput, error) { flowName := s.config.FlowJobName - s.logger.Info("setting up replication on source for peer flow - ", flowName) + s.logger.Info("setting up replication on source for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 4 * time.Hour, @@ -59,7 +59,7 @@ func (s *SnapshotFlowExecution) setupReplication( return nil, fmt.Errorf("failed to setup replication on source peer: %w", err) } - s.logger.Info("replication slot live for on source for peer flow - ", flowName) + s.logger.Info("replication slot live for on source for peer flow") return res, nil } @@ -68,7 +68,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( ctx workflow.Context, ) error { flowName := s.config.FlowJobName - s.logger.Info("closing slot keep alive for peer flow - ", flowName) + s.logger.Info("closing slot keep alive for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 15 * time.Minute, @@ -78,7 +78,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( return fmt.Errorf("failed to close slot keep alive for peer flow: %w", err) } - s.logger.Info("closed slot keep alive for peer flow - ", flowName) + s.logger.Info("closed slot keep alive for peer flow") return nil } @@ -226,7 +226,7 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( return fmt.Errorf("failed to setup replication: %w", err) } - logger.Info("cloning tables in parallel: ", numTablesInParallel) + logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel)) if err := s.cloneTables(ctx, slotInfo, numTablesInParallel); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 777daba38b..4cd6deece7 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -38,7 +38,7 @@ func XminFlowWorkflow( state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED for q.activeSignal == model.PauseSignal { - logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + logger.Info(fmt.Sprintf("mirror has been paused for %s", time.Since(startTime).Round(time.Second))) // only place we block on receive, so signal processing is immediate val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute) if ok { From 0496eb5ee31eec49b6d99643e07831c2ba24b4ae Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 7 Mar 2024 23:35:06 +0530 Subject: [PATCH 5/5] check for partition being synced before starting errgroup (#1401) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philip Dubé --- flow/activities/flowable.go | 27 +++++++++++++++++++-------- flow/connectors/bigquery/qrep.go | 15 ++++++--------- flow/connectors/clickhouse/qrep.go | 16 ++++------------ flow/connectors/core.go | 4 ++++ flow/connectors/postgres/qrep.go | 19 ++++++------------- flow/connectors/s3/qrep.go | 7 +++++++ flow/connectors/snowflake/qrep.go | 16 ++++++---------- protos/flow.proto | 7 ++++++- 8 files changed, 58 insertions(+), 53 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 5d9c4e690f..e2c1731174 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -611,12 +611,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := activity.GetLogger(ctx) - err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now()) - if err != nil { - a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return fmt.Errorf("failed to update start time for partition: %w", err) - } - srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) @@ -631,6 +625,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } defer connectors.CloseConnector(ctx, dstConn) + done, err := dstConn.IsQRepPartitionSynced(ctx, &protos.IsQRepPartitionSyncedInput{ + FlowJobName: config.FlowJobName, + PartitionId: partition.PartitionId, + }) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to get fetch status of partition: %w", err) + } + if done { + logger.Info("no records to push for partition " + partition.PartitionId) + return nil + } + + err = monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now()) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to update start time for partition: %w", err) + } + logger.Info("replicating partition " + partition.PartitionId) shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) @@ -705,8 +718,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, if err != nil { return err } - } else { - logger.Info("no records to push for partition " + partition.PartitionId) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 744a20cdec..e2c2ef58e5 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -30,15 +30,6 @@ func (c *BigQueryConnector) SyncQRepRecords( return 0, err } - done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId) - if err != nil { - return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) - } - - if done { - c.logger.Info(fmt.Sprintf("Partition %s has already been synced", partition.PartitionId)) - return 0, nil - } c.logger.Info(fmt.Sprintf("QRep sync function called and partition existence checked for"+ " partition %s of destination table %s", partition.PartitionId, destTable)) @@ -111,3 +102,9 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(ctx context.Context, config return nil } + +func (c *BigQueryConnector) IsQRepPartitionSynced(ctx context.Context, + req *protos.IsQRepPartitionSyncedInput, +) (bool, error) { + return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId) +} diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 5efcfaefb8..642c987962 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -33,16 +33,6 @@ func (c *ClickhouseConnector) SyncQRepRecords( slog.String("destinationTable", destTable), ) - done, err := c.isPartitionSynced(ctx, partition.PartitionId) - if err != nil { - return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) - } - - if done { - c.logger.Info("Partition has already been synced", flowLog) - return 0, nil - } - tblSchema, err := c.getTableSchema(destTable) if err != nil { return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err) @@ -96,9 +86,11 @@ func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnTyp return columnTypes, nil } -func (c *ClickhouseConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) { +func (c *ClickhouseConnector) IsQRepPartitionSynced(ctx context.Context, + req *protos.IsQRepPartitionSyncedInput, +) (bool, error) { //nolint:gosec - queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, partitionID) + queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, req.PartitionId) row := c.database.QueryRowContext(ctx, queryString) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index ed306f73bb..b374ff70e4 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -146,6 +146,9 @@ type QRepPullConnector interface { type QRepSyncConnector interface { Connector + // IsQRepPartitionSynced returns true if a partition has already been synced + IsQRepPartitionSynced(ctx context.Context, req *protos.IsQRepPartitionSyncedInput) (bool, error) + // SetupQRepMetadataTables sets up the metadata tables for QRep. SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error @@ -261,6 +264,7 @@ var ( _ QRepSyncConnector = &connbigquery.BigQueryConnector{} _ QRepSyncConnector = &connsnowflake.SnowflakeConnector{} _ QRepSyncConnector = &connclickhouse.ClickhouseConnector{} + _ QRepSyncConnector = &conns3.S3Connector{} _ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{} _ QRepConsolidateConnector = &connclickhouse.ClickhouseConnector{} diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index df82fb5bf4..ce9521ff4c 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -458,15 +458,6 @@ func (c *PostgresConnector) SyncQRepRecords( return 0, fmt.Errorf("table %s does not exist, used schema: %s", dstTable.Table, dstTable.Schema) } - done, err := c.isPartitionSynced(ctx, partition.PartitionId) - if err != nil { - return 0, fmt.Errorf("failed to check if partition is synced: %w", err) - } - - if done { - c.logger.Info(fmt.Sprintf("partition %s already synced", partition.PartitionId)) - return 0, nil - } c.logger.Info("SyncRecords called and initial checks complete.") stagingTableSync := &QRepStagingTableSync{connector: c} @@ -573,18 +564,20 @@ func BuildQuery(logger log.Logger, query string, flowJobName string) (string, er return res, nil } -// isPartitionSynced checks whether a specific partition is synced -func (c *PostgresConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) { +// IsQRepPartitionSynced checks whether a specific partition is synced +func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context, + req *protos.IsQRepPartitionSyncedInput, +) (bool, error) { // setup the query string metadataTableIdentifier := pgx.Identifier{c.metadataSchema, qRepMetadataTableName} queryString := fmt.Sprintf( - "SELECT COUNT(*)>0 FROM %s WHERE partitionID = $1;", + "SELECT COUNT(*)>0 FROM %s WHERE partitionID=$1;", metadataTableIdentifier.Sanitize(), ) // prepare and execute the query var result bool - err := c.conn.QueryRow(ctx, queryString, partitionID).Scan(&result) + err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result) if err != nil { return false, fmt.Errorf("failed to execute query: %w", err) } diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 7a9432c1d3..1d5684d060 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -81,3 +81,10 @@ func (c *S3Connector) SetupQRepMetadataTables(_ context.Context, config *protos. c.logger.Info("QRep metadata setup not needed for S3.") return nil } + +// S3 doesn't check if partition is already synced, but file with same name is overwritten +func (c *S3Connector) IsQRepPartitionSynced(_ context.Context, + config *protos.IsQRepPartitionSyncedInput, +) (bool, error) { + return false, nil +} diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index d5ef2e32db..04bfd590fe 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -35,16 +35,6 @@ func (c *SnowflakeConnector) SyncQRepRecords( } c.logger.Info("Called QRep sync function and obtained table schema", flowLog) - done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId) - if err != nil { - return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) - } - - if done { - c.logger.Info("Partition has already been synced", flowLog) - return 0, nil - } - avroSync := NewSnowflakeAvroSyncHandler(config, c) return avroSync.SyncQRepRecords(ctx, config, partition, tblSchema, stream) } @@ -282,3 +272,9 @@ func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string, func (c *SnowflakeConnector) getStageNameForJob(job string) string { return fmt.Sprintf("%s.peerdb_stage_%s", c.rawSchema, job) } + +func (c *SnowflakeConnector) IsQRepPartitionSynced(ctx context.Context, + req *protos.IsQRepPartitionSyncedInput, +) (bool, error) { + return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId) +} diff --git a/protos/flow.proto b/protos/flow.proto index 0ab9b94a35..08b87fb9d9 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -380,9 +380,14 @@ message SetupFlowOutput { map table_name_schema_mapping = 2; } -message AddTablesToPublicationInput{ +message AddTablesToPublicationInput { string flow_job_name = 1; string publication_name = 2; repeated TableMapping additional_tables = 3; } +message IsQRepPartitionSyncedInput { + string flow_job_name = 1; + string partition_id = 2; +} +