From 06736cbae2c13838fd1dc92bc90f7547371eab52 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Wed, 20 Dec 2023 02:28:50 +0530 Subject: [PATCH 1/8] can't run em locally so checking CI --- flow/e2e/bigquery/bigquery_helper.go | 12 + flow/e2e/bigquery/peer_flow_bq_test.go | 334 +++++++++++++++++++++++++ 2 files changed, 346 insertions(+) diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 21bd3b5c75..810cbf26ee 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -455,3 +455,15 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord return nil } + +func (b *BigQueryTestHelper) RunIntQuery(query string) (int64, error) { + recordBatch, err := b.ExecuteAndProcessQuery(query) + if err != nil { + return 0, fmt.Errorf("could not execute query: %w", err) + } + if recordBatch.NumRecords != 1 { + return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords) + } + + return recordBatch.Records[0].Entries[0].Value.(int64), nil +} diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index c76688f79b..e3e33028dd 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -5,10 +5,12 @@ import ( "fmt" "log/slog" "strings" + "sync" "testing" "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -1271,3 +1273,335 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { env.AssertExpectations(s.t) } + +func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + cmpTableName := s.attachSchemaSuffix("test_softdel") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := "test_softdel" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.bqHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + wg := sync.WaitGroup{} + wg.Add(1) + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + // since we delete stuff, create another table to compare with + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + require.NoError(s.t, err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + + wg.Done() + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + wg.Wait() + + // verify our updates and delete happened + s.compareTableContentsBQ("test_softdel", "id,c1,c2,t") + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) + require.NoError(s.t, err) + require.Equal(s.t, 1, numNewRows) +} + +func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + cmpTableName := s.attachSchemaSuffix("test_softdel_iud") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := "test_softdel_iud" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_iud"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.bqHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + insertTx, err := s.pool.Begin(context.Background()) + require.NoError(s.t, err) + + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + // since we delete stuff, create another table to compare with + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + + require.NoError(s.t, insertTx.Commit(context.Background())) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsBQ("test_softdel_iud", "id,c1,c2,t") + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) + require.NoError(s.t, err) + s.Equal(1, numNewRows) +} + +func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + cmpTableName := s.attachSchemaSuffix("test_softdel_ud") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := "test_softdel_ud" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_ud"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.bqHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 4, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + + insertTx, err := s.pool.Begin(context.Background()) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + // since we delete stuff, create another table to compare with + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + + require.NoError(s.t, insertTx.Commit(context.Background())) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsBQ("test_softdel_ud", "id,c1,c2,t") + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) + require.NoError(s.t, err) + s.Equal(1, numNewRows) +} + +func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + srcTableName := s.attachSchemaSuffix("test_softdel_iad") + dstTableName := "test_softdel_iad" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_iad"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.bqHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) + require.NoError(s.t, err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsBQ("test_softdel_iad", "id,c1,c2,t") + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) + require.NoError(s.t, err) + s.Equal(0, numNewRows) +} From 9469e5695df4bb5552844ef6aa4435d9c71283ce Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Wed, 20 Dec 2023 02:45:45 +0530 Subject: [PATCH 2/8] trying to fix pt.1 --- flow/e2e/bigquery/peer_flow_bq_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index e3e33028dd..d9e8ca408c 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1355,7 +1355,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { s.compareTableContentsBQ("test_softdel", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + SELECT COUNT(*) FROM %s.%s WHERE _PEERDB_IS_DELETED = TRUE`, + s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) require.Equal(s.t, 1, numNewRows) @@ -1438,7 +1439,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { s.compareTableContentsBQ("test_softdel_iud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + SELECT COUNT(*) FROM %s.%s WHERE _PEERDB_IS_DELETED = TRUE`, + s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) s.Equal(1, numNewRows) @@ -1525,7 +1527,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { s.compareTableContentsBQ("test_softdel_ud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + SELECT COUNT(*) FROM %s.%s WHERE _PEERDB_IS_DELETED = TRUE`, + s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) s.Equal(1, numNewRows) @@ -1600,7 +1603,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { s.compareTableContentsBQ("test_softdel_iad", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + SELECT COUNT(*) FROM %s.%s WHERE _PEERDB_IS_DELETED = TRUE`, + s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) s.Equal(0, numNewRows) From 8249dab184117c34a1f99adaa4cacd902f692f44 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Wed, 20 Dec 2023 12:47:17 +0530 Subject: [PATCH 3/8] trying to fix pt.2 --- flow/e2e/bigquery/peer_flow_bq_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index d9e8ca408c..193b57cf15 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1355,7 +1355,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { s.compareTableContentsBQ("test_softdel", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s.%s WHERE _PEERDB_IS_DELETED = TRUE`, + SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) @@ -1439,7 +1439,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { s.compareTableContentsBQ("test_softdel_iud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s.%s WHERE _PEERDB_IS_DELETED = TRUE`, + SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) @@ -1527,7 +1527,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { s.compareTableContentsBQ("test_softdel_ud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s.%s WHERE _PEERDB_IS_DELETED = TRUE`, + SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) @@ -1603,7 +1603,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { s.compareTableContentsBQ("test_softdel_iad", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s.%s WHERE _PEERDB_IS_DELETED = TRUE`, + SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) From 5363acc4c06f08f6ef62ceb5fc8767d5b498a82d Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Wed, 20 Dec 2023 13:03:03 +0530 Subject: [PATCH 4/8] trying to fix pt.3 --- flow/e2e/bigquery/bigquery_helper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 810cbf26ee..a53bf41101 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -456,7 +456,7 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord return nil } -func (b *BigQueryTestHelper) RunIntQuery(query string) (int64, error) { +func (b *BigQueryTestHelper) RunIntQuery(query string) (int, error) { recordBatch, err := b.ExecuteAndProcessQuery(query) if err != nil { return 0, fmt.Errorf("could not execute query: %w", err) @@ -465,5 +465,5 @@ func (b *BigQueryTestHelper) RunIntQuery(query string) (int64, error) { return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords) } - return recordBatch.Records[0].Entries[0].Value.(int64), nil + return int(recordBatch.Records[0].Entries[0].Value.(int64)), nil } From 621872dce30926a0e8c8a6fe13dcf9d9f43a24ba Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 22 Dec 2023 02:25:37 +0530 Subject: [PATCH 5/8] fixing BQ softdel logic pt.1 --- flow/connectors/bigquery/bigquery.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3da34f99d7..5723546467 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -452,7 +452,8 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync // Prepare the query to retrieve distinct tables in that batch query := fmt.Sprintf(`SELECT _peerdb_destination_table_name, array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s.%s - WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d GROUP BY _peerdb_destination_table_name`, + WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d + AND _peerdb_record_type != 2 GROUP BY _peerdb_destination_table_name`, c.datasetID, rawTableName, normalizeBatchID, syncBatchID) // Run the query q := c.client.Query(query) From 026aa12d8253d735600ad3f3a272df5bd3a37c34 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 22 Dec 2023 02:39:38 +0530 Subject: [PATCH 6/8] Update flow/connectors/bigquery/bigquery.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philip Dubé --- flow/connectors/bigquery/bigquery.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5723546467..cf006298a7 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -452,8 +452,8 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync // Prepare the query to retrieve distinct tables in that batch query := fmt.Sprintf(`SELECT _peerdb_destination_table_name, array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s.%s - WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d - AND _peerdb_record_type != 2 GROUP BY _peerdb_destination_table_name`, + WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2 + GROUP BY _peerdb_destination_table_name`, c.datasetID, rawTableName, normalizeBatchID, syncBatchID) // Run the query q := c.client.Query(query) From bc71b895181481f5d5fd46de27cf0ab03c277b33 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 22 Dec 2023 11:54:04 +0530 Subject: [PATCH 7/8] changing RunIntQuery to avoid casts --- flow/e2e/bigquery/bigquery_helper.go | 4 ++-- flow/e2e/bigquery/peer_flow_bq_test.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index a53bf41101..40a42ea64f 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -456,7 +456,7 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord return nil } -func (b *BigQueryTestHelper) RunIntQuery(query string) (int, error) { +func (b *BigQueryTestHelper) RunInt64Query(query string) (int64, error) { recordBatch, err := b.ExecuteAndProcessQuery(query) if err != nil { return 0, fmt.Errorf("could not execute query: %w", err) @@ -465,5 +465,5 @@ func (b *BigQueryTestHelper) RunIntQuery(query string) (int, error) { return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords) } - return int(recordBatch.Records[0].Entries[0].Value.(int64)), nil + return recordBatch.Records[0].Entries[0].Value.(int64), nil } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 193b57cf15..6cbac4c915 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1357,9 +1357,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, s.bqHelper.datasetName, dstTableName) - numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) + numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, 1, numNewRows) + s.Eq(1, numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { @@ -1441,9 +1441,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, s.bqHelper.datasetName, dstTableName) - numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) + numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(1, numNewRows) + s.Eq(1, numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { @@ -1529,9 +1529,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, s.bqHelper.datasetName, dstTableName) - numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) + numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(1, numNewRows) + s.Eq(1, numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { @@ -1605,7 +1605,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, s.bqHelper.datasetName, dstTableName) - numNewRows, err := s.bqHelper.RunIntQuery(newerSyncedAtQuery) + numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(0, numNewRows) + s.Eq(0, numNewRows) } From c5ca1980ab13d29d862615681017ea98b18ec8ca Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Tue, 26 Dec 2023 20:06:02 +0530 Subject: [PATCH 8/8] added comment --- flow/connectors/bigquery/bigquery.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index cf006298a7..ff210c9ce0 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -450,6 +450,9 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync rawTableName := c.getRawTableName(flowJobName) // Prepare the query to retrieve distinct tables in that batch + // we want to only select the unchanged cols from UpdateRecords, as we have a workaround + // where a placeholder value for unchanged cols can be set in DeleteRecord if there is no backfill + // we don't want these particular DeleteRecords to be used in the update statement query := fmt.Sprintf(`SELECT _peerdb_destination_table_name, array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s.%s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2