Skip to content

Commit

Permalink
print debug
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 17, 2024
1 parent 608b810 commit 4db4c5d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
37 changes: 18 additions & 19 deletions flow/connectors/postgres/normalize_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (n *normalizeStmtGenerator) generateFallbackStatements() []string {
columnCount := utils.TableSchemaColumns(n.normalizedTableSchema)
columnNames := make([]string, 0, columnCount)
flattenedCastsSQLArray := make([]string, 0, columnCount)
primaryKeyColumnCasts := make(map[string]string)
primaryKeyColumnCasts := make(map[string]string, len(n.normalizedTableSchema.PrimaryKeyColumns))
utils.IterColumns(n.normalizedTableSchema, func(columnName, genericColumnType string) {
quotedCol := QuoteIdentifier(columnName)
stringCol := QuoteLiteral(columnName)
Expand All @@ -64,22 +64,22 @@ func (n *normalizeStmtGenerator) generateFallbackStatements() []string {
primaryKeyColumnCasts[columnName] = fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType)
}
})
flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",")
flattenedCastsSQL := strings.Join(flattenedCastsSQLArray, ",")
parsedDstTable, _ := utils.ParseSchemaTable(n.dstTableName)

insertColumnsSQL := strings.TrimSuffix(strings.Join(columnNames, ","), ",")
insertColumnsSQL := strings.Join(columnNames, ",")
updateColumnsSQLArray := make([]string, 0, utils.TableSchemaColumns(n.normalizedTableSchema))
utils.IterColumns(n.normalizedTableSchema, func(columnName, _ string) {
quotedCol := QuoteIdentifier(columnName)
updateColumnsSQLArray = append(updateColumnsSQLArray, fmt.Sprintf(`%s=EXCLUDED.%s`, quotedCol, quotedCol))
})
updateColumnsSQL := strings.TrimSuffix(strings.Join(updateColumnsSQLArray, ","), ",")
updateColumnsSQL := strings.Join(updateColumnsSQLArray, ",")
deleteWhereClauseArray := make([]string, 0, len(n.normalizedTableSchema.PrimaryKeyColumns))
for columnName, columnCast := range primaryKeyColumnCasts {
deleteWhereClauseArray = append(deleteWhereClauseArray, fmt.Sprintf(`%s.%s=%s AND `,
deleteWhereClauseArray = append(deleteWhereClauseArray, fmt.Sprintf(`%s.%s=%s`,
parsedDstTable.String(), QuoteIdentifier(columnName), columnCast))
}
deleteWhereClauseSQL := strings.TrimSuffix(strings.Join(deleteWhereClauseArray, ""), "AND ")
deleteWhereClauseSQL := strings.Join(deleteWhereClauseArray, " AND ")

deleteUpdate := ""
if n.peerdbCols.SoftDelete {
Expand All @@ -90,7 +90,7 @@ func (n *normalizeStmtGenerator) generateFallbackStatements() []string {
}
}
fallbackUpsertStatement := fmt.Sprintf(fallbackUpsertStatementSQL,
strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), n.metadataSchema,
strings.Join(maps.Values(primaryKeyColumnCasts), ","), n.metadataSchema,
n.rawTableName, parsedDstTable.String(), insertColumnsSQL, flattenedCastsSQL,
strings.Join(n.normalizedTableSchema.PrimaryKeyColumns, ","), updateColumnsSQL)
fallbackDeleteStatement := fmt.Sprintf(fallbackDeleteStatementSQL,
Expand Down Expand Up @@ -128,10 +128,10 @@ func (n *normalizeStmtGenerator) generateMergeStatement() string {
quotedCol, quotedCol))
}
})
flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",")
flattenedCastsSQL := strings.Join(flattenedCastsSQLArray, ",")
insertValuesSQLArray := make([]string, 0, len(quotedColumnNames)+2)
for _, columnName := range quotedColumnNames {
insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("src.%s", columnName))
for _, quotedCol := range quotedColumnNames {
insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("src.%s", quotedCol))
}

updateStatementsforToastCols := n.generateUpdateStatements(quotedColumnNames)
Expand All @@ -154,13 +154,12 @@ func (n *normalizeStmtGenerator) generateMergeStatement() string {
}
updateStringToastCols := strings.Join(updateStatementsforToastCols, "\n")

deletePart := "DELETE"
conflictPart := "DELETE"
if n.peerdbCols.SoftDelete {
colName := n.peerdbCols.SoftDeleteColName
deletePart = fmt.Sprintf(`UPDATE SET %s=TRUE`, QuoteIdentifier(colName))
conflictPart = fmt.Sprintf(`UPDATE SET %s=TRUE`, QuoteIdentifier(colName))
if n.peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf(`%s,%s=CURRENT_TIMESTAMP`,
deletePart, QuoteIdentifier(n.peerdbCols.SyncedAtColName))
conflictPart += fmt.Sprintf(`,%s=CURRENT_TIMESTAMP`, QuoteIdentifier(n.peerdbCols.SyncedAtColName))
}
}

Expand All @@ -175,7 +174,7 @@ func (n *normalizeStmtGenerator) generateMergeStatement() string {
insertColumnsSQL,
insertValuesSQL,
updateStringToastCols,
deletePart,
conflictPart,
)

return mergeStmt
Expand Down Expand Up @@ -212,22 +211,22 @@ func (n *normalizeStmtGenerator) generateUpdateStatements(quotedCols []string) [
QuoteIdentifier(n.peerdbCols.SoftDeleteColName)))
}

quotedCols := QuoteLiteral(cols)
ssep := strings.Join(tmpArray, ",")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
src._peerdb_record_type!=2 AND _peerdb_unchanged_toast_columns=%s
THEN UPDATE SET %s`, QuoteLiteral(cols), ssep)
THEN UPDATE SET %s`, quotedCols, ssep)
updateStmts = append(updateStmts, updateStmt)

// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if handleSoftDelete {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf(`%s=TRUE`, QuoteIdentifier(n.peerdbCols.SoftDeleteColName)))
tmpArray[len(tmpArray)-1] = fmt.Sprintf(`%s=TRUE`, QuoteIdentifier(n.peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
src._peerdb_record_type=2 AND _peerdb_unchanged_toast_columns=%s
THEN UPDATE SET %s`, QuoteLiteral(cols), ssep)
THEN UPDATE SET %s`, quotedCols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
}
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
mergeStatementsBatch := &pgx.Batch{}
totalRowsAffected := 0
errlog := ""
for _, destinationTableName := range destinationTableNames {
normalizeStmtGen := &normalizeStmtGenerator{
rawTableName: rawTableIdentifier,
Expand All @@ -485,6 +486,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
normalizeStatements := normalizeStmtGen.generateNormalizeStatements()
for _, normalizeStatement := range normalizeStatements {
errlog += normalizeStatement
mergeStatementsBatch.Queue(normalizeStatement, batchIDs.NormalizeBatchID, batchIDs.SyncBatchID, destinationTableName).Exec(
func(ct pgconn.CommandTag) error {
totalRowsAffected += int(ct.RowsAffected())
Expand All @@ -496,7 +498,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
mergeResults := normalizeRecordsTx.SendBatch(c.ctx, mergeStatementsBatch)
err = mergeResults.Close()
if err != nil {
return nil, fmt.Errorf("error executing merge statements: %w", err)
return nil, fmt.Errorf("error executing merge statements %s: %w", errlog, err)
}
}
c.logger.Info(fmt.Sprintf("normalized %d records", totalRowsAffected))
Expand Down

0 comments on commit 4db4c5d

Please sign in to comment.