From eac5b8548d079729dc9efbfcc74f2855d8583e07 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Dec 2023 20:22:50 +0530 Subject: [PATCH] adds fix for pg, adds comments --- .../bigquery/merge_statement_generator.go | 4 ++++ flow/connectors/postgres/client.go | 15 ++++++++++++++- flow/connectors/snowflake/snowflake.go | 4 ++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index cfe3108d5f..22161c434b 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -229,6 +229,10 @@ func (m *mergeStmtGenerator) generateUpdateStatements( (_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) + + // generates update statements for the case where updates and deletes happen in the same branch + // the backfill has happened from the pull side already, so treat the DeleteRecord as an update + // and then set soft-delete to true. if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") { tmpArray = append(tmpArray[:len(tmpArray)-1], fmt.Sprintf("`%s` = TRUE", peerdbCols.SoftDeleteColName)) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 1d7d4d5fb5..9f516a49c6 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -761,7 +761,7 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, peerdbCols.SyncedAtColName)) } // set soft-deleted to false, tackles insert after soft-delete - if peerdbCols.SoftDeleteColName != "" { + if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") { tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = FALSE`, peerdbCols.SoftDeleteColName)) } @@ -771,6 +771,19 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) + + // generates update statements for the case where updates and deletes happen in the same branch + // the backfill has happened from the pull side already, so treat the DeleteRecord as an update + // and then set soft-delete to true. + if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") { + tmpArray = append(tmpArray[:len(tmpArray)-1], + fmt.Sprintf(`"%s" = TRUE`, peerdbCols.SoftDeleteColName)) + ssep := strings.Join(tmpArray, ", ") + updateStmt := fmt.Sprintf(`WHEN MATCHED AND + src._peerdb_record_type = 2 AND _peerdb_unchanged_toast_columns='%s' + THEN UPDATE SET %s `, cols, ssep) + updateStmts = append(updateStmts, updateStmt) + } } return updateStmts } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index ad4d952547..8cd8240f11 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -1064,6 +1064,10 @@ func (c *SnowflakeConnector) generateUpdateStatements( (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) + + // generates update statements for the case where updates and deletes happen in the same branch + // the backfill has happened from the pull side already, so treat the DeleteRecord as an update + // and then set soft-delete to true. if softDelete && (softDeleteCol != "") { tmpArray = append(tmpArray[:len(tmpArray)-1], fmt.Sprintf(`"%s" = TRUE`, softDeleteCol)) ssep := strings.Join(tmpArray, ", ")