From 67ab274820d3d422ad58e02def24b69e0bd7f255 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 27 Dec 2023 00:09:35 +0530 Subject: [PATCH] soft delete logic fixing tests for BigQuery (#857) --- flow/connectors/bigquery/bigquery.go | 6 +- flow/e2e/bigquery/bigquery_helper.go | 12 + flow/e2e/bigquery/peer_flow_bq_test.go | 338 +++++++++++++++++++++++++ 3 files changed, 355 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 11a88e8382..22977ed4d9 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -456,9 +456,13 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync rawTableName := c.getRawTableName(flowJobName) // Prepare the query to retrieve distinct tables in that batch + // we want to only select the unchanged cols from UpdateRecords, as we have a workaround + // where a placeholder value for unchanged cols can be set in DeleteRecord if there is no backfill + // we don't want these particular DeleteRecords to be used in the update statement query := fmt.Sprintf(`SELECT _peerdb_destination_table_name, array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s.%s - WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d GROUP BY _peerdb_destination_table_name`, + WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2 + GROUP BY _peerdb_destination_table_name`, c.datasetID, rawTableName, normalizeBatchID, syncBatchID) // Run the query q := c.client.Query(query) diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 21bd3b5c75..40a42ea64f 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -455,3 +455,15 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord return nil } + +func (b *BigQueryTestHelper) RunInt64Query(query string) (int64, error) { + recordBatch, err := b.ExecuteAndProcessQuery(query) + if err != nil { + return 0, fmt.Errorf("could not execute query: %w", err) + } + if recordBatch.NumRecords != 1 { + return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords) + } + + return recordBatch.Records[0].Entries[0].Value.(int64), nil +} diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index c76688f79b..6cbac4c915 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -5,10 +5,12 @@ import ( "fmt" "log/slog" "strings" + "sync" "testing" "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -1271,3 +1273,339 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { env.AssertExpectations(s.t) } + +func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + cmpTableName := s.attachSchemaSuffix("test_softdel") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := "test_softdel" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.bqHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + wg := sync.WaitGroup{} + wg.Add(1) + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + // since we delete stuff, create another table to compare with + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + require.NoError(s.t, err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + + wg.Done() + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + wg.Wait() + + // verify our updates and delete happened + s.compareTableContentsBQ("test_softdel", "id,c1,c2,t") + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, + s.bqHelper.datasetName, dstTableName) + numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) + require.NoError(s.t, err) + s.Eq(1, numNewRows) +} + +func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + cmpTableName := s.attachSchemaSuffix("test_softdel_iud") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := "test_softdel_iud" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_iud"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.bqHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + insertTx, err := s.pool.Begin(context.Background()) + require.NoError(s.t, err) + + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + // since we delete stuff, create another table to compare with + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + + require.NoError(s.t, insertTx.Commit(context.Background())) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsBQ("test_softdel_iud", "id,c1,c2,t") + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, + s.bqHelper.datasetName, dstTableName) + numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) + require.NoError(s.t, err) + s.Eq(1, numNewRows) +} + +func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + cmpTableName := s.attachSchemaSuffix("test_softdel_ud") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := "test_softdel_ud" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_ud"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.bqHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 4, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + + insertTx, err := s.pool.Begin(context.Background()) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + // since we delete stuff, create another table to compare with + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + + require.NoError(s.t, insertTx.Commit(context.Background())) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsBQ("test_softdel_ud", "id,c1,c2,t") + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, + s.bqHelper.datasetName, dstTableName) + numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) + require.NoError(s.t, err) + s.Eq(1, numNewRows) +} + +func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + srcTableName := s.attachSchemaSuffix("test_softdel_iad") + dstTableName := "test_softdel_iad" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_iad"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.bqHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) + require.NoError(s.t, err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsBQ("test_softdel_iad", "id,c1,c2,t") + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, + s.bqHelper.datasetName, dstTableName) + numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) + require.NoError(s.t, err) + s.Eq(0, numNewRows) +}