Skip to content

Commit

Permalink
Merge branch 'main' into alerting-v1.5-alerter
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Dec 21, 2023
2 parents 10c6060 + 65c585a commit 42ca906
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 21 deletions.
13 changes: 13 additions & 0 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,19 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
(_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)

// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf("`%s` = TRUE", peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
}
return updateStmts
}
50 changes: 30 additions & 20 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,31 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
unchangedToastCols := []string{"", "col2, col3", "col2", "col3"}

expected := []string{
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," +
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2, col3' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col3'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns=''" +
" THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2,`col3`=_peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) " +
"AND _peerdb_unchanged_toast_columns='' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2," +
"`col3`=_peerdb_deduped.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col2,col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col2,col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) " +
"AND _peerdb_unchanged_toast_columns='col2' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND(_peerdb_deduped._peerdb_record_type=2) " +
"AND _peerdb_unchanged_toast_columns='col2' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1," +
"`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1," +
"`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
Expand All @@ -47,7 +53,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
}
}

Expand All @@ -65,6 +71,10 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
" `col3` = _peerdb_deduped.col3," +
" `synced_at`=CURRENT_TIMESTAMP," +
"`deleted`=FALSE",
"WHEN MATCHED AND" +
"(_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns=''" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1, `col2` = _peerdb_deduped.col2, " +
"`col3` = _peerdb_deduped.col3, `synced_at` = CURRENT_TIMESTAMP, `deleted` = TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols,
Expand Down
15 changes: 14 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string,
peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = FALSE`,
peerdbCols.SoftDeleteColName))
}
Expand All @@ -772,6 +772,19 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string,
src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)

// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf(`"%s" = TRUE`, peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
src._peerdb_record_type = 2 AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
}
return updateStmts
}
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,10 @@ func (c *SnowflakeConnector) generateUpdateStatements(
(SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)

// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if softDelete && (softDeleteCol != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1], fmt.Sprintf(`"%s" = TRUE`, softDeleteCol))
ssep := strings.Join(tmpArray, ", ")
Expand Down

0 comments on commit 42ca906

Please sign in to comment.