Skip to content

Commit

Permalink
soft delete logic fixing tests for BigQuery (#857)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 26, 2023
1 parent 42d71e0 commit 67ab274
Show file tree
Hide file tree
Showing 3 changed files with 355 additions and 1 deletion.
6 changes: 5 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,13 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
// we want to only select the unchanged cols from UpdateRecords, as we have a workaround
// where a placeholder value for unchanged cols can be set in DeleteRecord if there is no backfill
// we don't want these particular DeleteRecords to be used in the update statement
query := fmt.Sprintf(`SELECT _peerdb_destination_table_name,
array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s.%s
WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d GROUP BY _peerdb_destination_table_name`,
WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2
GROUP BY _peerdb_destination_table_name`,
c.datasetID, rawTableName, normalizeBatchID, syncBatchID)
// Run the query
q := c.client.Query(query)
Expand Down
12 changes: 12 additions & 0 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,3 +455,15 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord

return nil
}

func (b *BigQueryTestHelper) RunInt64Query(query string) (int64, error) {
recordBatch, err := b.ExecuteAndProcessQuery(query)
if err != nil {
return 0, fmt.Errorf("could not execute query: %w", err)
}
if recordBatch.NumRecords != 1 {
return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords)
}

return recordBatch.Records[0].Entries[0].Value.(int64), nil
}
338 changes: 338 additions & 0 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"testing"
"time"

"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -1271,3 +1273,339 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {

env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
dstTableName := "test_softdel"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.bqHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 3,
MaxBatchSize: 100,
}

wg := sync.WaitGroup{}
wg.Add(1)

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
// since we delete stuff, create another table to compare with
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)

wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

wg.Wait()

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(1, numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel_iud")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
dstTableName := "test_softdel_iud"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_iud"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.bqHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 3,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

insertTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)

_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)

require.NoError(s.t, insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_iud", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(1, numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel_ud")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
dstTableName := "test_softdel_ud"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_ud"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.bqHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)

insertTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName))
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)

require.NoError(s.t, insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_ud", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(1, numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_softdel_iad")
dstTableName := "test_softdel_iad"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_iad"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.bqHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 3,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
require.NoError(s.t, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_iad", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(0, numNewRows)
}

0 comments on commit 67ab274

Please sign in to comment.