Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

soft delete logic fixing tests for BigQuery #857

Merged
merged 9 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,13 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
// we want to only select the unchanged cols from UpdateRecords, as we have a workaround
// where a placeholder value for unchanged cols can be set in DeleteRecord if there is no backfill
// we don't want these particular DeleteRecords to be used in the update statement
query := fmt.Sprintf(`SELECT _peerdb_destination_table_name,
array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s.%s
WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d GROUP BY _peerdb_destination_table_name`,
WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include in PR description what _peerdb_record_type != 2 achieves

GROUP BY _peerdb_destination_table_name`,
c.datasetID, rawTableName, normalizeBatchID, syncBatchID)
// Run the query
q := c.client.Query(query)
Expand Down
12 changes: 12 additions & 0 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,3 +455,15 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord

return nil
}

func (b *BigQueryTestHelper) RunInt64Query(query string) (int64, error) {
recordBatch, err := b.ExecuteAndProcessQuery(query)
if err != nil {
return 0, fmt.Errorf("could not execute query: %w", err)
}
if recordBatch.NumRecords != 1 {
return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords)
}

return recordBatch.Records[0].Entries[0].Value.(int64), nil
}
338 changes: 338 additions & 0 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"testing"
"time"

"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -1271,3 +1273,339 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {

env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
dstTableName := "test_softdel"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.bqHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 3,
MaxBatchSize: 100,
}

wg := sync.WaitGroup{}
wg.Add(1)

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
// since we delete stuff, create another table to compare with
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)

wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

wg.Wait()

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(1, numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel_iud")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
dstTableName := "test_softdel_iud"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_iud"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.bqHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 3,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

insertTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)

_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)

require.NoError(s.t, insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_iud", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(1, numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel_ud")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
dstTableName := "test_softdel_ud"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_ud"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.bqHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)

insertTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName))
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)

require.NoError(s.t, insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_ud", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(1, numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_softdel_iad")
dstTableName := "test_softdel_iad"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_iad"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.bqHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 3,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
require.NoError(s.t, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_iad", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
s.bqHelper.datasetName, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Eq(0, numNewRows)
}
Loading