Skip to content

Commit

Permalink
Soft delete false for Snowflake initial load (#1647)
Browse files Browse the repository at this point in the history
This PR makes soft delete false instead of null for snowflake initial
load.
Functionally tested
Test added.
  • Loading branch information
Amogh-Bharadwaj authored Apr 26, 2024
1 parent f988899 commit 3656b0a
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 14 deletions.
11 changes: 6 additions & 5 deletions flow/connectors/snowflake/avro_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@ package connsnowflake
import "testing"

func TestAvroTransform(t *testing.T) {
colNames := []string{"col1", "col2", "col3", "camelCol4", "sync_col"}
colTypes := []string{"GEOGRAPHY", "VARIANT", "NUMBER", "STRING", "TIMESTAMP_LTZ"}
colNames := []string{"col1", "col2", "col3", "camelCol4", "sync_col", "del_col"}
colTypes := []string{"GEOGRAPHY", "VARIANT", "NUMBER", "STRING", "TIMESTAMP_LTZ", "BOOLEAN"}

expectedTransform := `TO_GEOGRAPHY($1:"col1"::string, true) AS "COL1",` +
`PARSE_JSON($1:"col2") AS "COL2",` +
`$1:"col3" AS "COL3",` +
`($1:"camelCol4")::STRING AS "camelCol4",` +
`CURRENT_TIMESTAMP AS "SYNC_COL"`
transform, cols := getTransformSQL(colNames, colTypes, "sync_col")
`CURRENT_TIMESTAMP AS "SYNC_COL",` +
`FALSE AS "DEL_COL"`
transform, cols := getTransformSQL(colNames, colTypes, "sync_col", "del_col")
if transform != expectedTransform {
t.Errorf("Transform SQL is not correct. Got: %v", transform)
}

expectedCols := `"COL1","COL2","COL3","camelCol4","SYNC_COL"`
expectedCols := `"COL1","COL2","COL3","camelCol4","SYNC_COL","DEL_COL"`
if cols != expectedCols {
t.Errorf("Columns are not correct. Got:%v", cols)
}
Expand Down
14 changes: 12 additions & 2 deletions flow/connectors/snowflake/qrep_avro_consolidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *SnowflakeAvroConsolidateHandler) CopyStageToDestination(ctx context.Con
return nil
}

func getTransformSQL(colNames []string, colTypes []string, syncedAtCol string) (string, string) {
func getTransformSQL(colNames []string, colTypes []string, syncedAtCol string, isDeletedCol string) (string, string) {
transformations := make([]string, 0, len(colNames))
columnOrder := make([]string, 0, len(colNames))
for idx, avroColName := range colNames {
Expand All @@ -88,6 +88,11 @@ func getTransformSQL(colNames []string, colTypes []string, syncedAtCol string) (
continue
}

if avroColName == isDeletedCol {
transformations = append(transformations, "FALSE AS "+normalizedColName)
continue
}

if utils.IsUpper(avroColName) {
avroColName = strings.ToLower(avroColName)
}
Expand Down Expand Up @@ -125,7 +130,12 @@ func getTransformSQL(colNames []string, colTypes []string, syncedAtCol string) (

// copy to either the actual destination table or a tempTable
func (s *SnowflakeAvroConsolidateHandler) getCopyTransformation(copyDstTable string) string {
transformationSQL, columnsSQL := getTransformSQL(s.allColNames, s.allColTypes, s.config.SyncedAtColName)
transformationSQL, columnsSQL := getTransformSQL(
s.allColNames,
s.allColTypes,
s.config.SyncedAtColName,
s.config.SoftDeleteColName,
)
return fmt.Sprintf("COPY INTO %s(%s) FROM (SELECT %s FROM @%s) FILE_FORMAT=(TYPE=AVRO), PURGE=TRUE",
copyDstTable, columnsSQL, transformationSQL, s.stage)
}
Expand Down
5 changes: 4 additions & 1 deletion flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
s.bqHelper.Peer,
"",
true,
"",
"")
require.NoError(s.t, err)
env := e2e.RunQRepFlowWorkflow(tc, qrepConfig)
Expand All @@ -94,6 +95,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_And_Date_QRep() {
s.bqHelper.Peer,
"",
true,
"",
"")
qrepConfig.WatermarkColumn = "watermark_ts"
require.NoError(s.t, err)
Expand Down Expand Up @@ -135,7 +137,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
s.bqHelper.Peer,
"",
true,
"_PEERDB_SYNCED_AT")
"_PEERDB_SYNCED_AT",
"")
require.NoError(s.t, err)
env := e2e.RunQRepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
Expand Down
4 changes: 3 additions & 1 deletion flow/e2e/elasticsearch/qrep_flow_es_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func (s elasticsearchSuite) Test_Simple_Qrep() {
s.peer,
"",
false,
"")
"",
"",
)
require.NoError(s.t, err)
env := e2e.RunQRepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
Expand Down
5 changes: 5 additions & 0 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
"",
true,
"",
"",
)
require.NoError(s.t, err)

Expand Down Expand Up @@ -244,6 +245,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() {
"",
true,
"_PEERDB_SYNCED_AT",
"",
)
require.NoError(s.t, err)

Expand Down Expand Up @@ -281,6 +283,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Overwrite_PG() {
"",
true,
"_PEERDB_SYNCED_AT",
"",
)
require.NoError(s.t, err)
qrepConfig.WriteMode = &protos.QRepWriteMode{
Expand Down Expand Up @@ -331,6 +334,7 @@ func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() {
"",
true,
"_PEERDB_SYNCED_AT",
"",
)
require.NoError(s.t, err)

Expand Down Expand Up @@ -363,6 +367,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() {
"",
true,
"_PEERDB_SYNCED_AT",
"",
)
require.NoError(s.t, err)
config.InitialCopyOnly = false
Expand Down
2 changes: 2 additions & 0 deletions flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() {
"stage",
false,
"",
"",
)
require.NoError(s.t, err)
qrepConfig.StagingPath = s.s3Helper.s3Config.Url
Expand Down Expand Up @@ -154,6 +155,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() {
"stage",
false,
"",
"",
)
require.NoError(s.t, err)
qrepConfig.StagingPath = s.s3Helper.s3Config.Url
Expand Down
48 changes: 46 additions & 2 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() {
"",
false,
"",
"",
)
qrepConfig.SetupWatermarkTableOnDestination = true
require.NoError(s.t, err)
Expand Down Expand Up @@ -105,6 +106,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple()
"",
false,
"",
"",
)
qrepConfig.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT,
Expand Down Expand Up @@ -143,6 +145,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {
"",
false,
"",
"",
)
require.NoError(s.t, err)
qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New())
Expand Down Expand Up @@ -178,6 +181,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
"",
false,
"",
"",
)
qrepConfig.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT,
Expand Down Expand Up @@ -220,6 +224,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration()
"",
false,
"",
"",
)
require.NoError(s.t, err)
qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New())
Expand Down Expand Up @@ -255,6 +260,46 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() {
"",
true,
"_PEERDB_SYNCED_AT",
"",
)
qrepConfig.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT,
UpsertKeyColumns: []string{"id"},
}
qrepConfig.SetupWatermarkTableOnDestination = true
require.NoError(s.t, err)

env := e2e.RunQRepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

err = s.sfHelper.checkSyncedAt(`SELECT "_PEERDB_SYNCED_AT" FROM ` + dstSchemaQualified)
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Default_False_SF() {
tc := e2e.NewTemporalClient(s.t)

numRows := 10

tblName := "test_qrep_deleted_false_sf"
s.setupSourceTable(tblName, numRows)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
s.pgSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
"test_deleted_false_qrep_sf",
fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName),
dstSchemaQualified,
query,
s.sfHelper.Peer,
"",
true,
"_PEERDB_SYNCED_AT",
"_PEERDB_IS_DELETED",
)
qrepConfig.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT,
Expand All @@ -267,7 +312,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() {
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

err = s.sfHelper.checkSyncedAt(fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s.%s`,
s.sfHelper.testSchemaName, tblName))
err = s.sfHelper.checkIsDeleted(`SELECT "_PEERDB_IS_DELETED" FROM ` + dstSchemaQualified)
require.NoError(s.t, err)
}
19 changes: 18 additions & 1 deletion flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ func (s *SnowflakeTestHelper) RunIntQuery(query string) (int, error) {
}
}

// runs a query that returns an int result
func (s *SnowflakeTestHelper) checkSyncedAt(query string) error {
recordBatch, err := s.testClient.ExecuteAndProcessQuery(context.Background(), query)
if err != nil {
Expand All @@ -204,3 +203,21 @@ func (s *SnowflakeTestHelper) checkSyncedAt(query string) error {

return nil
}

func (s *SnowflakeTestHelper) checkIsDeleted(query string) error {
recordBatch, err := s.testClient.ExecuteAndProcessQuery(context.Background(), query)
if err != nil {
return err
}

for _, record := range recordBatch.Records {
for _, entry := range record {
_, ok := entry.(qvalue.QValueBoolean)
if !ok {
return errors.New("is_deleted column failed: _PEERDB_IS_DELETED is not a boolean")
}
}
}

return nil
}
2 changes: 2 additions & 0 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func CreateQRepWorkflowConfig(
stagingPath string,
setupDst bool,
syncedAtCol string,
isDeletedCol string,
) (*protos.QRepConfig, error) {
connectionGen := QRepFlowConnectionGenerationConfig{
FlowJobName: flowJobName,
Expand All @@ -421,6 +422,7 @@ func CreateQRepWorkflowConfig(
qrepConfig.InitialCopyOnly = true
qrepConfig.SyncedAtColName = syncedAtCol
qrepConfig.SetupWatermarkTableOnDestination = setupDst
qrepConfig.SoftDeleteColName = isDeletedCol

return qrepConfig, nil
}
Expand Down
5 changes: 3 additions & 2 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
TableNameSchemaMapping: map[string]*protos.TableSchema{
q.config.DestinationTableIdentifier: watermarkTableSchema,
},
SyncedAtColName: q.config.SyncedAtColName,
FlowName: q.config.FlowJobName,
SyncedAtColName: q.config.SyncedAtColName,
SoftDeleteColName: q.config.SoftDeleteColName,
FlowName: q.config.FlowJobName,
}

future := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig)
Expand Down

0 comments on commit 3656b0a

Please sign in to comment.