From 476549b5b05b24337f3d9ef053fc60707e70f74a Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 19 Dec 2023 13:40:43 +0530 Subject: [PATCH] PG,BQ,SF CDC: PeerDB Columns (#845) Irons out `_PEERDB_IS_DELETED` (Soft Delete feature) and `_PEERDB_SYNCED_AT` columns for CDC in BQ, SF, and PG. --- flow/connectors/bigquery/bigquery.go | 23 +- .../bigquery/merge_statement_generator.go | 54 ++- .../bigquery/merge_stmt_generator_test.go | 30 +- flow/connectors/postgres/client.go | 116 +++++- flow/connectors/postgres/postgres.go | 9 +- flow/connectors/snowflake/snowflake.go | 21 +- flow/e2e/bigquery/peer_flow_bq_test.go | 101 +++++ flow/e2e/congen.go | 6 +- flow/e2e/postgres/peer_flow_pg_test.go | 86 ++++ flow/e2e/snowflake/peer_flow_sf_test.go | 11 +- flow/generated/protos/flow.pb.go | 370 +++++++++++------- nexus/pt/src/peerdb_flow.rs | 12 + nexus/pt/src/peerdb_flow.serde.rs | 150 +++++++ protos/flow.proto | 8 + ui/grpc_generated/flow.ts | 112 ++++++ 15 files changed, 924 insertions(+), 185 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 6806445f3d..5f966ecf7f 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -793,6 +793,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) SyncBatchID: syncBatchID, NormalizeBatchID: normalizeBatchID, UnchangedToastColumns: tableNametoUnchangedToastCols[tableName], + peerdbCols: &protos.PeerDBColumns{ + SoftDeleteColName: req.SoftDeleteColName, + SyncedAtColName: req.SyncedAtColName, + SoftDelete: req.SoftDelete, + }, } // normalize anything between last normalized batch id to last sync batchid mergeStmts := mergeGen.generateMergeStmts() @@ -961,7 +966,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( } // convert the column names and types to bigquery types - columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns)) + columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns), len(tableSchema.Columns)+2) idx := 0 for colName, genericColType := range tableSchema.Columns { columns[idx] = &bigquery.FieldSchema{ @@ -972,6 +977,22 @@ func (c *BigQueryConnector) SetupNormalizedTables( idx++ } + if req.SoftDeleteColName != "" { + columns = append(columns, &bigquery.FieldSchema{ + Name: req.SoftDeleteColName, + Type: bigquery.BooleanFieldType, + Repeated: false, + }) + } + + if req.SyncedAtColName != "" { + columns = append(columns, &bigquery.FieldSchema{ + Name: req.SyncedAtColName, + Type: bigquery.TimestampFieldType, + Repeated: false, + }) + } + // create the table using the columns schema := bigquery.Schema(columns) err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema}) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 2a37ef5ecb..149825c2cf 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -26,6 +26,8 @@ type mergeStmtGenerator struct { NormalizedTableSchema *protos.TableSchema // array of toast column combinations that are unchanged UnchangedToastColumns []string + // _PEERDB_IS_DELETED and _SYNCED_AT columns + peerdbCols *protos.PeerDBColumns } // GenerateMergeStmt generates a merge statements. @@ -39,7 +41,7 @@ func (m *mergeStmtGenerator) generateMergeStmts() []string { "CREATE TEMP TABLE %s AS (%s, %s);", tempTable, flattenedCTE, deDupedCTE) - mergeStmt := m.generateMergeStmt(tempTable) + mergeStmt := m.generateMergeStmt(tempTable, m.peerdbCols) dropTempTableStmt := fmt.Sprintf("DROP TABLE %s;", tempTable) @@ -127,7 +129,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { } // generateMergeStmt generates a merge statement. -func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string { +func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *protos.PeerDBColumns) string { // comma separated list of column names backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns)) pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns)) @@ -136,8 +138,19 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string { pureColNames = append(pureColNames, colName) } csep := strings.Join(backtickColNames, ", ") - - updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.UnchangedToastColumns) + insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName) + insertValuesSQL := csep + ",CURRENT_TIMESTAMP" + + updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, + m.UnchangedToastColumns, peerdbCols) + if m.peerdbCols.SoftDelete { + softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName) + softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE" + + updateStatementsforToastCols = append(updateStatementsforToastCols, + fmt.Sprintf("WHEN NOT MATCHED AND (_peerdb_deduped._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)", + softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL)) + } updateStringToastCols := strings.Join(updateStatementsforToastCols, " ") pkeySelectSQLArray := make([]string, 0, len(m.NormalizedTableSchema.PrimaryKeyColumns)) @@ -148,6 +161,16 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string { // _peerdb_target. = _peerdb_deduped. AND _peerdb_target. = _peerdb_deduped. ... pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ") + deletePart := "DELETE" + if peerdbCols.SoftDelete { + colName := peerdbCols.SoftDeleteColName + deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName) + if peerdbCols.SyncedAtColName != "" { + deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP", + deletePart, peerdbCols.SyncedAtColName) + } + } + return fmt.Sprintf(` MERGE %s.%s _peerdb_target USING %s _peerdb_deduped ON %s @@ -155,8 +178,9 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string { INSERT (%s) VALUES (%s) %s WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN - DELETE; - `, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, csep, csep, updateStringToastCols) + %s; + `, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, insertColumnsSQL, insertValuesSQL, + updateStringToastCols, deletePart) } /* @@ -174,7 +198,11 @@ and updating the other columns (not the unchanged toast columns) 6. Repeat steps 1-5 for each unique unchanged toast column group. 7. Return the list of generated update statements. */ -func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string { +func (m *mergeStmtGenerator) generateUpdateStatements( + allCols []string, + unchangedToastCols []string, + peerdbCols *protos.PeerDBColumns, +) []string { updateStmts := make([]string, 0, len(unchangedToastCols)) for _, cols := range unchangedToastCols { @@ -184,6 +212,18 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchange for _, colName := range otherCols { tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName)) } + + // set the synced at column to the current timestamp + if peerdbCols.SyncedAtColName != "" { + tmpArray = append(tmpArray, fmt.Sprintf("`%s` = CURRENT_TIMESTAMP", + peerdbCols.SyncedAtColName)) + } + // set soft-deleted to false, tackles insert after soft-delete + if peerdbCols.SoftDeleteColName != "" { + tmpArray = append(tmpArray, fmt.Sprintf("`%s` = FALSE", + peerdbCols.SoftDeleteColName)) + } + ssep := strings.Join(tmpArray, ", ") updateStmt := fmt.Sprintf(`WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s' diff --git a/flow/connectors/bigquery/merge_stmt_generator_test.go b/flow/connectors/bigquery/merge_stmt_generator_test.go index 41e54114e6..47705167d6 100644 --- a/flow/connectors/bigquery/merge_stmt_generator_test.go +++ b/flow/connectors/bigquery/merge_stmt_generator_test.go @@ -4,6 +4,8 @@ import ( "reflect" "strings" "testing" + + "github.com/PeerDB-io/peer-flow/generated/protos" ) func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { @@ -16,21 +18,28 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { " AND _peerdb_unchanged_toast_columns='' " + "THEN UPDATE SET `col1` = _peerdb_deduped.col1," + " `col2` = _peerdb_deduped.col2," + - " `col3` = _peerdb_deduped.col3", + " `col3` = _peerdb_deduped.col3," + + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" + " AND _peerdb_unchanged_toast_columns='col2, col3' " + - "THEN UPDATE SET `col1` = _peerdb_deduped.col1", + "THEN UPDATE SET `col1` = _peerdb_deduped.col1," + + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" + " AND _peerdb_unchanged_toast_columns='col2'" + "THEN UPDATE SET `col1` = _peerdb_deduped.col1," + - " `col3` = _peerdb_deduped.col3", + " `col3` = _peerdb_deduped.col3," + + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" + " AND _peerdb_unchanged_toast_columns='col3'" + "THEN UPDATE SET `col1` = _peerdb_deduped.col1," + - " `col2` = _peerdb_deduped.col2", + " `col2` = _peerdb_deduped.col2," + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", } - result := m.generateUpdateStatements(allCols, unchangedToastCols) + result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{ + SoftDelete: true, + SoftDeleteColName: "deleted", + SyncedAtColName: "synced_at", + }) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) @@ -53,10 +62,17 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) { "THEN UPDATE SET " + "`col1` = _peerdb_deduped.col1," + " `col2` = _peerdb_deduped.col2," + - " `col3` = _peerdb_deduped.col3", + " `col3` = _peerdb_deduped.col3," + + " `synced_at`=CURRENT_TIMESTAMP," + + "`deleted`=FALSE", } - result := m.generateUpdateStatements(allCols, unchangedToastCols) + result := m.generateUpdateStatements(allCols, unchangedToastCols, + &protos.PeerDBColumns{ + SoftDelete: true, + SoftDeleteColName: "deleted", + SyncedAtColName: "synced_at", + }) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 77a5413de7..9aa05131c7 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -58,7 +58,7 @@ const ( INSERT (%s) VALUES (%s) %s WHEN MATCHED AND src._peerdb_record_type=2 THEN - DELETE` + %s` fallbackUpsertStatementSQL = `WITH src_rank AS ( SELECT _peerdb_data,_peerdb_record_type,_peerdb_unchanged_toast_columns, RANK() OVER (PARTITION BY %s ORDER BY _peerdb_timestamp DESC) AS _peerdb_rank @@ -71,7 +71,7 @@ const ( RANK() OVER (PARTITION BY %s ORDER BY _peerdb_timestamp DESC) AS _peerdb_rank FROM %s.%s WHERE _peerdb_batch_id>$1 AND _peerdb_batch_id<=$2 AND _peerdb_destination_table_name=$3 ) - DELETE FROM %s USING src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2` + %s src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2` dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=$1" @@ -346,15 +346,28 @@ func getRawTableIdentifier(jobName string) string { return fmt.Sprintf("%s_%s", rawTablePrefix, strings.ToLower(jobName)) } -func generateCreateTableSQLForNormalizedTable(sourceTableIdentifier string, +func generateCreateTableSQLForNormalizedTable( + sourceTableIdentifier string, sourceTableSchema *protos.TableSchema, + softDeleteColName string, + syncedAtColName string, ) string { - createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)) + createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)+2) for columnName, genericColumnType := range sourceTableSchema.Columns { createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("\"%s\" %s,", columnName, qValueKindToPostgresType(genericColumnType))) } + if softDeleteColName != "" { + createTableSQLArray = append(createTableSQLArray, + fmt.Sprintf(`"%s" BOOL DEFAULT FALSE,`, softDeleteColName)) + } + + if syncedAtColName != "" { + createTableSQLArray = append(createTableSQLArray, + fmt.Sprintf(`"%s" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,`, syncedAtColName)) + } + // add composite primary key to the table if len(sourceTableSchema.PrimaryKeyColumns) > 0 { primaryKeyColsQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns)) @@ -523,17 +536,19 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, sync func (c *PostgresConnector) generateNormalizeStatements(destinationTableIdentifier string, unchangedToastColumns []string, rawTableIdentifier string, supportsMerge bool, + peerdbCols *protos.PeerDBColumns, ) []string { if supportsMerge { - return []string{c.generateMergeStatement(destinationTableIdentifier, unchangedToastColumns, rawTableIdentifier)} + return []string{c.generateMergeStatement(destinationTableIdentifier, unchangedToastColumns, + rawTableIdentifier, peerdbCols)} } c.logger.Warn("Postgres version is not high enough to support MERGE, falling back to UPSERT + DELETE") c.logger.Warn("TOAST columns will not be updated properly, use REPLICA IDENTITY FULL or upgrade Postgres") - return c.generateFallbackStatements(destinationTableIdentifier, rawTableIdentifier) + return c.generateFallbackStatements(destinationTableIdentifier, rawTableIdentifier, peerdbCols) } func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifier string, - rawTableIdentifier string, + rawTableIdentifier string, peerdbCols *protos.PeerDBColumns, ) []string { normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier] columnNames := make([]string, 0, len(normalizedTableSchema.Columns)) @@ -569,20 +584,35 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie parsedDstTable.String(), columnName, columnCast)) } deleteWhereClauseSQL := strings.TrimSuffix(strings.Join(deleteWhereClauseArray, ""), "AND ") - + deletePart := fmt.Sprintf( + "DELETE FROM %s USING", + parsedDstTable.String()) + + if peerdbCols.SoftDelete { + deletePart = fmt.Sprintf(`UPDATE %s SET "%s" = TRUE`, + parsedDstTable.String(), peerdbCols.SoftDeleteColName) + if peerdbCols.SyncedAtColName != "" { + deletePart = fmt.Sprintf(`%s, "%s" = CURRENT_TIMESTAMP`, + deletePart, peerdbCols.SyncedAtColName) + } + deletePart += " FROM" + } fallbackUpsertStatement := fmt.Sprintf(fallbackUpsertStatementSQL, strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), c.metadataSchema, rawTableIdentifier, parsedDstTable.String(), insertColumnsSQL, flattenedCastsSQL, strings.Join(normalizedTableSchema.PrimaryKeyColumns, ","), updateColumnsSQL) fallbackDeleteStatement := fmt.Sprintf(fallbackDeleteStatementSQL, strings.Join(maps.Values(primaryKeyColumnCasts), ","), c.metadataSchema, - rawTableIdentifier, parsedDstTable.String(), deleteWhereClauseSQL) + rawTableIdentifier, deletePart, deleteWhereClauseSQL) return []string{fallbackUpsertStatement, fallbackDeleteStatement} } -func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier string, unchangedToastColumns []string, +func (c *PostgresConnector) generateMergeStatement( + destinationTableIdentifier string, + unchangedToastColumns []string, rawTableIdentifier string, + peerdbCols *protos.PeerDBColumns, ) string { normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier] columnNames := maps.Keys(normalizedTableSchema.Columns) @@ -612,21 +642,60 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st } } flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",") - - insertColumnsSQL := strings.TrimSuffix(strings.Join(columnNames, ","), ",") insertValuesSQLArray := make([]string, 0, len(columnNames)) for _, columnName := range columnNames { insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("src.%s", columnName)) } + + updateStatementsforToastCols := c.generateUpdateStatement(columnNames, unchangedToastColumns, peerdbCols) + // append synced_at column + columnNames = append(columnNames, fmt.Sprintf(`"%s"`, peerdbCols.SyncedAtColName)) + insertColumnsSQL := strings.Join(columnNames, ",") + // fill in synced_at column + insertValuesSQLArray = append(insertValuesSQLArray, "CURRENT_TIMESTAMP") insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ","), ",") - updateStatements := c.generateUpdateStatement(columnNames, unchangedToastColumns) - return fmt.Sprintf(mergeStatementSQL, strings.Join(maps.Values(primaryKeyColumnCasts), ","), - c.metadataSchema, rawTableIdentifier, parsedDstTable.String(), flattenedCastsSQL, - strings.Join(primaryKeySelectSQLArray, " AND "), insertColumnsSQL, insertValuesSQL, updateStatements) + if peerdbCols.SoftDelete { + softDeleteInsertColumnsSQL := strings.TrimSuffix(strings.Join(append(columnNames, + fmt.Sprintf(`"%s"`, peerdbCols.SoftDeleteColName)), ","), ",") + softDeleteInsertValuesSQL := strings.Join(append(insertValuesSQLArray, "TRUE"), ",") + + updateStatementsforToastCols = append(updateStatementsforToastCols, + fmt.Sprintf("WHEN NOT MATCHED AND (src._peerdb_record_type = 2) THEN INSERT (%s) VALUES(%s)", + softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL)) + } + updateStringToastCols := strings.Join(updateStatementsforToastCols, "\n") + + deletePart := "DELETE" + if peerdbCols.SoftDelete { + colName := peerdbCols.SoftDeleteColName + deletePart = fmt.Sprintf(`UPDATE SET "%s" = TRUE`, colName) + if peerdbCols.SyncedAtColName != "" { + deletePart = fmt.Sprintf(`%s, "%s" = CURRENT_TIMESTAMP`, + deletePart, peerdbCols.SyncedAtColName) + } + } + + mergeStmt := fmt.Sprintf( + mergeStatementSQL, + strings.Join(maps.Values(primaryKeyColumnCasts), ","), + c.metadataSchema, + rawTableIdentifier, + parsedDstTable.String(), + flattenedCastsSQL, + strings.Join(primaryKeySelectSQLArray, " AND "), + insertColumnsSQL, + insertValuesSQL, + updateStringToastCols, + deletePart, + ) + + return mergeStmt } -func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedToastColsLists []string) string { +func (c *PostgresConnector) generateUpdateStatement(allCols []string, + unchangedToastColsLists []string, peerdbCols *protos.PeerDBColumns, +) []string { updateStmts := make([]string, 0, len(unchangedToastColsLists)) for _, cols := range unchangedToastColsLists { @@ -640,13 +709,24 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedT for _, colName := range otherCols { tmpArray = append(tmpArray, fmt.Sprintf("%s=src.%s", colName, colName)) } + // set the synced at column to the current timestamp + if peerdbCols.SyncedAtColName != "" { + tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = CURRENT_TIMESTAMP`, + peerdbCols.SyncedAtColName)) + } + // set soft-deleted to false, tackles insert after soft-delete + if peerdbCols.SoftDeleteColName != "" { + tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = FALSE`, + peerdbCols.SoftDeleteColName)) + } + ssep := strings.Join(tmpArray, ",") updateStmt := fmt.Sprintf(`WHEN MATCHED AND src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) } - return strings.Join(updateStmts, "\n") + return updateStmts } func (c *PostgresConnector) getCurrentLSN() (pglogrepl.LSN, error) { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 20dd2a5a71..82426b3e3f 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -442,8 +442,13 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) mergeStatementsBatch := &pgx.Batch{} totalRowsAffected := 0 for destinationTableName, unchangedToastCols := range unchangedToastColsMap { + peerdbCols := protos.PeerDBColumns{ + SoftDeleteColName: req.SoftDeleteColName, + SyncedAtColName: req.SyncedAtColName, + SoftDelete: req.SoftDelete, + } normalizeStatements := c.generateNormalizeStatements(destinationTableName, unchangedToastCols, - rawTableIdentifier, supportsMerge) + rawTableIdentifier, supportsMerge, &peerdbCols) for _, normalizeStatement := range normalizeStatements { mergeStatementsBatch.Queue(normalizeStatement, normalizeBatchID, syncBatchID, destinationTableName).Exec( func(ct pgconn.CommandTag) error { @@ -634,7 +639,7 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab // convert the column names and types to Postgres types normalizedTableCreateSQL := generateCreateTableSQLForNormalizedTable( - parsedNormalizedTable.String(), tableSchema) + parsedNormalizedTable.String(), tableSchema, req.SoftDeleteColName, req.SyncedAtColName) _, err = createNormalizedTablesTx.Exec(c.ctx, normalizedTableCreateSQL) if err != nil { return nil, fmt.Errorf("error while creating normalized table: %w", err) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 90b29e4765..f92ed3e33e 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -751,7 +751,7 @@ func generateCreateTableSQLForNormalizedTable( softDeleteColName string, syncedAtColName string, ) string { - createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)) + createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)+2) for columnName, genericColumnType := range sourceTableSchema.Columns { columnNameUpper := strings.ToUpper(columnName) sfColType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType)) @@ -847,17 +847,21 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( for _, columnName := range columnNames { quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(columnName))) } + // append synced_at column + quotedUpperColNames = append(quotedUpperColNames, + fmt.Sprintf(`"%s"`, strings.ToUpper(normalizeReq.SyncedAtColName)), + ) insertColumnsSQL := strings.TrimSuffix(strings.Join(quotedUpperColNames, ","), ",") insertValuesSQLArray := make([]string, 0, len(columnNames)) for _, columnName := range columnNames { quotedUpperColumnName := fmt.Sprintf(`"%s"`, strings.ToUpper(columnName)) - insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("SOURCE.%s,", quotedUpperColumnName)) + insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("SOURCE.%s", quotedUpperColumnName)) } - - insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",") - + // fill in synced_at column + insertValuesSQLArray = append(insertValuesSQLArray, "CURRENT_TIMESTAMP") + insertValuesSQL := strings.Join(insertValuesSQLArray, ",") updateStatementsforToastCols := c.generateUpdateStatements(normalizeReq.SyncedAtColName, normalizeReq.SoftDeleteColName, normalizeReq.SoftDelete, columnNames, unchangedToastColumns) @@ -866,10 +870,9 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( // with soft-delete, we want the row to be in the destination with SOFT_DELETE true // the current merge statement doesn't do that, so we add another case to insert the DeleteRecord if normalizeReq.SoftDelete { - softDeleteInsertColumnsSQL := strings.TrimSuffix(strings.Join(append(quotedUpperColNames, - normalizeReq.SoftDeleteColName), ","), ",") - softDeleteInsertValuesSQL := strings.Join(append(insertValuesSQLArray, "TRUE"), "") - + softDeleteInsertColumnsSQL := strings.Join(append(quotedUpperColNames, + normalizeReq.SoftDeleteColName), ",") + softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE" updateStatementsforToastCols = append(updateStatementsforToastCols, fmt.Sprintf("WHEN NOT MATCHED AND (SOURCE._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)", softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL)) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index de3ddae7e5..30e203aeba 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" @@ -51,6 +52,43 @@ func (s PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.bqSuffix) } +func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, rowID int8) error { + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, dstQualified) + query := fmt.Sprintf("SELECT `_PEERDB_IS_DELETED`,`_PEERDB_SYNCED_AT` FROM %s WHERE id = %d", + qualifiedTableName, rowID) + + recordBatch, err := s.bqHelper.ExecuteAndProcessQuery(query) + if err != nil { + return err + } + + recordCount := 0 + for _, record := range recordBatch.Records { + for _, entry := range record.Entries { + if entry.Kind == qvalue.QValueKindBoolean { + isDeleteVal, ok := entry.Value.(bool) + if !(ok && isDeleteVal) { + return fmt.Errorf("peerdb column failed: _PEERDB_IS_DELETED is not true") + } + recordCount += 1 + } + + if entry.Kind == qvalue.QValueKindTimestamp { + _, ok := entry.Value.(time.Time) + if !ok { + return fmt.Errorf("peerdb column failed: _PEERDB_SYNCED_AT is not valid") + } + recordCount += 1 + } + } + } + if recordCount != 2 { + return fmt.Errorf("peerdb column failed: _PEERDB_IS_DELETED or _PEERDB_SYNCED_AT not present") + } + + return nil +} + // setupBigQuery sets up the bigquery connection. func setupBigQuery(t *testing.T) *BigQueryTestHelper { bqHelper, err := NewBigQueryTestHelper() @@ -1095,3 +1133,66 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.AssertExpectations(s.t) } + +func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + srcTableName := s.attachSchemaSuffix("test_peerdb_cols") + dstTableName := "test_peerdb_cols_dst" + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.bqHelper.Peer, + SoftDelete: true, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + require.NoError(s.t, err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 2, + MaxBatchSize: 100, + } + + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 1 row into the source table + testKey := fmt.Sprintf("test_key_%d", 1) + testValue := fmt.Sprintf("test_value_%d", 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + require.NoError(s.t, err) + + // delete that row + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1 + `, srcTableName)) + require.NoError(s.t, err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + require.Contains(s.t, err.Error(), "continue as new") + + err = s.checkPeerdbColumns(dstTableName, 1) + require.NoError(s.t, err) + + env.AssertExpectations(s.t) +} diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index ac28879f45..e881dd5ead 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -171,6 +171,7 @@ type FlowConnectionGenerationConfig struct { PostgresPort int Destination *protos.Peer CdcStagingPath string + SoftDelete bool } // GenerateSnowflakePeer generates a snowflake peer config for testing. @@ -201,7 +202,10 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto ret.Source = GeneratePostgresPeer(c.PostgresPort) ret.Destination = c.Destination ret.CdcStagingPath = c.CdcStagingPath - ret.SoftDeleteColName = "_PEERDB_IS_DELETED" + ret.SoftDelete = c.SoftDelete + if ret.SoftDelete { + ret.SoftDeleteColName = "_PEERDB_IS_DELETED" + } ret.SyncedAtColName = "_PEERDB_SYNCED_AT" return ret, nil } diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 2720891fb6..da050ccf64 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" + "github.com/jackc/pgx/v5/pgtype" ) func (s *PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { @@ -18,6 +19,27 @@ func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, postgresSuffix) } +func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { + query := fmt.Sprintf(`SELECT "_PEERDB_IS_DELETED","_PEERDB_SYNCED_AT" FROM %s WHERE id = %d`, + dstSchemaQualified, rowID) + var isDeleted pgtype.Bool + var syncedAt pgtype.Timestamp + err := s.pool.QueryRow(context.Background(), query).Scan(&isDeleted, &syncedAt) + if err != nil { + return fmt.Errorf("failed to query row: %w", err) + } + + if !isDeleted.Bool { + return fmt.Errorf("isDeleted is not true") + } + + if !syncedAt.Valid { + return fmt.Errorf("syncedAt is not valid") + } + + return nil +} + func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env, s.T()) @@ -474,3 +496,67 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { env.AssertExpectations(s.T()) } + +func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.T()) + + srcTableName := s.attachSchemaSuffix("test_peerdb_cols") + dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.peer, + SoftDelete: true, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 2, + MaxBatchSize: 100, + } + + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 1 row into the source table + testKey := fmt.Sprintf("test_key_%d", 1) + testValue := fmt.Sprintf("test_value_%d", 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + s.NoError(err) + + // delete that row + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1 + `, srcTableName)) + s.NoError(err) + fmt.Println("Inserted and deleted a row for peerdb column check") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + + err = env.GetWorkflowError() + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + checkErr := s.checkPeerdbColumns(dstTableName, 1) + s.NoError(checkErr) + env.AssertExpectations(s.T()) +} diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 3e6f0c2bc0..d4ff50751f 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1176,8 +1176,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { Exclude: []string{"c2"}, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), - CdcStagingPath: connectionGen.CdcStagingPath, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SyncedAtColName: "_PEERDB_SYNCED_AT", } limits := peerflow.CDCFlowLimits{ @@ -1221,7 +1222,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { for _, field := range sfRows.Schema.Fields { require.NotEqual(s.t, field.Name, "c2") } - s.Equal(4, len(sfRows.Schema.Fields)) + s.Equal(5, len(sfRows.Schema.Fields)) s.Equal(10, len(sfRows.Records)) } @@ -1260,6 +1261,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", } limits := peerflow.CDCFlowLimits{ @@ -1346,6 +1348,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", } limits := peerflow.CDCFlowLimits{ @@ -1428,6 +1431,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", } limits := peerflow.CDCFlowLimits{ @@ -1513,6 +1517,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", } limits := peerflow.CDCFlowLimits{ diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index a4a8ff581c..4be91a690f 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -2669,7 +2669,8 @@ type QRepConfig struct { SetupWatermarkTableOnDestination bool `protobuf:"varint,17,opt,name=setup_watermark_table_on_destination,json=setupWatermarkTableOnDestination,proto3" json:"setup_watermark_table_on_destination,omitempty"` // create new tables with "_peerdb_resync" suffix, perform initial load and then swap the new table with the old ones // to be used after the old mirror is dropped - DstTableFullResync bool `protobuf:"varint,18,opt,name=dst_table_full_resync,json=dstTableFullResync,proto3" json:"dst_table_full_resync,omitempty"` + DstTableFullResync bool `protobuf:"varint,18,opt,name=dst_table_full_resync,json=dstTableFullResync,proto3" json:"dst_table_full_resync,omitempty"` + SyncedAtColName string `protobuf:"bytes,19,opt,name=synced_at_col_name,json=syncedAtColName,proto3" json:"synced_at_col_name,omitempty"` } func (x *QRepConfig) Reset() { @@ -2830,6 +2831,13 @@ func (x *QRepConfig) GetDstTableFullResync() bool { return false } +func (x *QRepConfig) GetSyncedAtColName() string { + if x != nil { + return x.SyncedAtColName + } + return "" +} + type QRepPartition struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -3286,6 +3294,69 @@ func (x *QRepFlowState) GetDisableWaitForNewRows() bool { return false } +type PeerDBColumns struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SoftDeleteColName string `protobuf:"bytes,1,opt,name=soft_delete_col_name,json=softDeleteColName,proto3" json:"soft_delete_col_name,omitempty"` + SyncedAtColName string `protobuf:"bytes,2,opt,name=synced_at_col_name,json=syncedAtColName,proto3" json:"synced_at_col_name,omitempty"` + SoftDelete bool `protobuf:"varint,3,opt,name=soft_delete,json=softDelete,proto3" json:"soft_delete,omitempty"` +} + +func (x *PeerDBColumns) Reset() { + *x = PeerDBColumns{} + if protoimpl.UnsafeEnabled { + mi := &file_flow_proto_msgTypes[48] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PeerDBColumns) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PeerDBColumns) ProtoMessage() {} + +func (x *PeerDBColumns) ProtoReflect() protoreflect.Message { + mi := &file_flow_proto_msgTypes[48] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PeerDBColumns.ProtoReflect.Descriptor instead. +func (*PeerDBColumns) Descriptor() ([]byte, []int) { + return file_flow_proto_rawDescGZIP(), []int{48} +} + +func (x *PeerDBColumns) GetSoftDeleteColName() string { + if x != nil { + return x.SoftDeleteColName + } + return "" +} + +func (x *PeerDBColumns) GetSyncedAtColName() string { + if x != nil { + return x.SyncedAtColName + } + return "" +} + +func (x *PeerDBColumns) GetSoftDelete() bool { + if x != nil { + return x.SoftDelete + } + return false +} + var File_flow_proto protoreflect.FileDescriptor var file_flow_proto_rawDesc = []byte{ @@ -3837,7 +3908,7 @@ var file_flow_proto_rawDesc = []byte{ 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x2c, 0x0a, 0x12, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x4b, 0x65, 0x79, 0x43, 0x6f, - 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x22, 0x99, 0x07, 0x0a, 0x0a, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, + 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x22, 0xc6, 0x07, 0x0a, 0x0a, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x22, 0x0a, 0x0d, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x33, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, @@ -3895,94 +3966,106 @@ var file_flow_proto_rawDesc = []byte{ 0x0a, 0x15, 0x64, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x66, 0x75, 0x6c, 0x6c, 0x5f, 0x72, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x18, 0x12, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x64, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x46, 0x75, 0x6c, 0x6c, 0x52, 0x65, 0x73, 0x79, 0x6e, - 0x63, 0x22, 0x97, 0x01, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x31, 0x0a, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, - 0x6c, 0x6f, 0x77, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, - 0x67, 0x65, 0x52, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x14, 0x66, 0x75, 0x6c, - 0x6c, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x66, 0x75, 0x6c, 0x6c, 0x54, 0x61, 0x62, - 0x6c, 0x65, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x6b, 0x0a, 0x12, 0x51, - 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x61, 0x74, 0x63, - 0x68, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x05, 0x52, 0x07, 0x62, 0x61, 0x74, 0x63, 0x68, 0x49, 0x64, 0x12, 0x3a, 0x0a, 0x0a, - 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, - 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x50, 0x0a, 0x12, 0x51, 0x52, 0x65, 0x70, - 0x50, 0x61, 0x72, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x3a, - 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, - 0x2e, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, - 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x2c, 0x0a, 0x0d, 0x44, 0x72, - 0x6f, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x66, - 0x6c, 0x6f, 0x77, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x54, 0x0a, 0x10, 0x44, 0x65, 0x6c, 0x74, - 0x61, 0x41, 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, - 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0a, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, - 0x0b, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x22, 0xa2, - 0x01, 0x0a, 0x10, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, - 0x6c, 0x74, 0x61, 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x72, 0x63, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, - 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x72, 0x63, - 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x64, 0x73, 0x74, - 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0c, 0x64, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, - 0x42, 0x0a, 0x0d, 0x61, 0x64, 0x64, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, - 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, - 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x41, 0x64, 0x64, 0x65, 0x64, 0x43, - 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x52, 0x0c, 0x61, 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, - 0x6d, 0x6e, 0x73, 0x22, 0xc8, 0x01, 0x0a, 0x1b, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x54, 0x61, - 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x49, 0x6e, - 0x70, 0x75, 0x74, 0x12, 0x5a, 0x0a, 0x17, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, - 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, - 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x15, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x6f, - 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, - 0x4d, 0x0a, 0x13, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, - 0x64, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, - 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x52, 0x11, 0x74, 0x61, 0x62, - 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x22, 0xe9, - 0x01, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, - 0x12, 0x41, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x12, 0x38, 0x0a, 0x18, 0x6e, 0x75, 0x6d, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x16, 0x6e, 0x75, 0x6d, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x12, 0x21, 0x0a, - 0x0c, 0x6e, 0x65, 0x65, 0x64, 0x73, 0x5f, 0x72, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x0b, 0x6e, 0x65, 0x65, 0x64, 0x73, 0x52, 0x65, 0x73, 0x79, 0x6e, 0x63, - 0x12, 0x38, 0x0a, 0x19, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x77, 0x61, 0x69, 0x74, - 0x5f, 0x66, 0x6f, 0x72, 0x5f, 0x6e, 0x65, 0x77, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x15, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x57, 0x61, 0x69, 0x74, - 0x46, 0x6f, 0x72, 0x4e, 0x65, 0x77, 0x52, 0x6f, 0x77, 0x73, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, - 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, - 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, - 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, - 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, - 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, - 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, - 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, - 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, - 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, - 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, - 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, - 0x54, 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, - 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, - 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x12, 0x2b, 0x0a, 0x12, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x5f, 0x63, + 0x6f, 0x6c, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x13, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x73, + 0x79, 0x6e, 0x63, 0x65, 0x64, 0x41, 0x74, 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x97, + 0x01, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x49, 0x64, 0x12, 0x31, 0x0a, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, + 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, + 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x14, 0x66, 0x75, 0x6c, 0x6c, 0x5f, 0x74, + 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x66, 0x75, 0x6c, 0x6c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x6b, 0x0a, 0x12, 0x51, 0x52, 0x65, 0x70, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x12, 0x19, + 0x0a, 0x08, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, + 0x52, 0x07, 0x62, 0x61, 0x74, 0x63, 0x68, 0x49, 0x64, 0x12, 0x3a, 0x0a, 0x0a, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x50, 0x0a, 0x12, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x3a, 0x0a, 0x0a, 0x70, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, + 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x2c, 0x0a, 0x0d, 0x44, 0x72, 0x6f, 0x70, 0x46, + 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x6c, 0x6f, 0x77, + 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x6c, 0x6f, + 0x77, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x54, 0x0a, 0x10, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x41, 0x64, + 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, 0x6c, + 0x75, 0x6d, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, + 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, + 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0a, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x22, 0xa2, 0x01, 0x0a, 0x10, + 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, + 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x72, 0x63, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x72, 0x63, 0x54, 0x61, 0x62, + 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x64, 0x73, 0x74, 0x5f, 0x74, 0x61, + 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, + 0x64, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x42, 0x0a, 0x0d, + 0x61, 0x64, 0x64, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x03, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, + 0x77, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x41, 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, + 0x6d, 0x6e, 0x52, 0x0c, 0x61, 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, + 0x22, 0xc8, 0x01, 0x0a, 0x1b, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x54, 0x61, 0x62, 0x6c, 0x65, + 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x49, 0x6e, 0x70, 0x75, 0x74, + 0x12, 0x5a, 0x0a, 0x17, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, + 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x15, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x4d, 0x0a, 0x13, + 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x64, 0x65, 0x6c, + 0x74, 0x61, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x52, 0x11, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x22, 0xe9, 0x01, 0x0a, 0x0d, + 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x41, 0x0a, + 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, + 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x52, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x38, 0x0a, 0x18, 0x6e, 0x75, 0x6d, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x5f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x16, 0x6e, 0x75, 0x6d, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x6e, 0x65, + 0x65, 0x64, 0x73, 0x5f, 0x72, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x0b, 0x6e, 0x65, 0x65, 0x64, 0x73, 0x52, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x38, 0x0a, + 0x19, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x77, 0x61, 0x69, 0x74, 0x5f, 0x66, 0x6f, + 0x72, 0x5f, 0x6e, 0x65, 0x77, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x15, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x57, 0x61, 0x69, 0x74, 0x46, 0x6f, 0x72, + 0x4e, 0x65, 0x77, 0x52, 0x6f, 0x77, 0x73, 0x22, 0x8e, 0x01, 0x0a, 0x0d, 0x50, 0x65, 0x65, 0x72, + 0x44, 0x42, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x12, 0x2f, 0x0a, 0x14, 0x73, 0x6f, 0x66, + 0x74, 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x5f, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x73, 0x6f, 0x66, 0x74, 0x44, 0x65, 0x6c, + 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2b, 0x0a, 0x12, 0x73, 0x79, + 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x5f, 0x63, 0x6f, 0x6c, 0x5f, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x41, 0x74, + 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x66, 0x74, 0x5f, + 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x73, 0x6f, + 0x66, 0x74, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, 0x70, + 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, + 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, + 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, + 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x52, + 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, 0x51, 0x52, + 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, 0x51, + 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x41, + 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, + 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, + 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, + 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, 0x54, 0x45, + 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, + 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0a, + 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -3998,7 +4081,7 @@ func file_flow_proto_rawDescGZIP() []byte { } var file_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 60) +var file_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 61) var file_flow_proto_goTypes = []interface{}{ (QRepSyncMode)(0), // 0: peerdb_flow.QRepSyncMode (QRepWriteType)(0), // 1: peerdb_flow.QRepWriteType @@ -4050,74 +4133,75 @@ var file_flow_proto_goTypes = []interface{}{ (*TableSchemaDelta)(nil), // 47: peerdb_flow.TableSchemaDelta (*ReplayTableSchemaDeltaInput)(nil), // 48: peerdb_flow.ReplayTableSchemaDeltaInput (*QRepFlowState)(nil), // 49: peerdb_flow.QRepFlowState - nil, // 50: peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry - nil, // 51: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry - nil, // 52: peerdb_flow.CreateTablesFromExistingInput.NewToExistingTableMappingEntry - nil, // 53: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry - nil, // 54: peerdb_flow.StartFlowInput.RelationMessageMappingEntry - nil, // 55: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry - nil, // 56: peerdb_flow.SetupReplicationInput.TableNameMappingEntry - nil, // 57: peerdb_flow.CreateRawTableInput.TableNameMappingEntry - nil, // 58: peerdb_flow.TableSchema.ColumnsEntry - nil, // 59: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry - nil, // 60: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry - nil, // 61: peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry - (*Peer)(nil), // 62: peerdb_peers.Peer - (*timestamppb.Timestamp)(nil), // 63: google.protobuf.Timestamp + (*PeerDBColumns)(nil), // 50: peerdb_flow.PeerDBColumns + nil, // 51: peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry + nil, // 52: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry + nil, // 53: peerdb_flow.CreateTablesFromExistingInput.NewToExistingTableMappingEntry + nil, // 54: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry + nil, // 55: peerdb_flow.StartFlowInput.RelationMessageMappingEntry + nil, // 56: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry + nil, // 57: peerdb_flow.SetupReplicationInput.TableNameMappingEntry + nil, // 58: peerdb_flow.CreateRawTableInput.TableNameMappingEntry + nil, // 59: peerdb_flow.TableSchema.ColumnsEntry + nil, // 60: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry + nil, // 61: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry + nil, // 62: peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry + (*Peer)(nil), // 63: peerdb_peers.Peer + (*timestamppb.Timestamp)(nil), // 64: google.protobuf.Timestamp } var file_flow_proto_depIdxs = []int32{ 3, // 0: peerdb_flow.RelationMessage.columns:type_name -> peerdb_flow.RelationMessageColumn - 62, // 1: peerdb_flow.FlowConnectionConfigs.source:type_name -> peerdb_peers.Peer - 62, // 2: peerdb_flow.FlowConnectionConfigs.destination:type_name -> peerdb_peers.Peer + 63, // 1: peerdb_flow.FlowConnectionConfigs.source:type_name -> peerdb_peers.Peer + 63, // 2: peerdb_flow.FlowConnectionConfigs.destination:type_name -> peerdb_peers.Peer 28, // 3: peerdb_flow.FlowConnectionConfigs.table_schema:type_name -> peerdb_flow.TableSchema 5, // 4: peerdb_flow.FlowConnectionConfigs.table_mappings:type_name -> peerdb_flow.TableMapping - 50, // 5: peerdb_flow.FlowConnectionConfigs.src_table_id_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry - 51, // 6: peerdb_flow.FlowConnectionConfigs.table_name_schema_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry - 62, // 7: peerdb_flow.FlowConnectionConfigs.metadata_peer:type_name -> peerdb_peers.Peer + 51, // 5: peerdb_flow.FlowConnectionConfigs.src_table_id_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry + 52, // 6: peerdb_flow.FlowConnectionConfigs.table_name_schema_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry + 63, // 7: peerdb_flow.FlowConnectionConfigs.metadata_peer:type_name -> peerdb_peers.Peer 0, // 8: peerdb_flow.FlowConnectionConfigs.snapshot_sync_mode:type_name -> peerdb_flow.QRepSyncMode 0, // 9: peerdb_flow.FlowConnectionConfigs.cdc_sync_mode:type_name -> peerdb_flow.QRepSyncMode 28, // 10: peerdb_flow.RenameTableOption.table_schema:type_name -> peerdb_flow.TableSchema - 62, // 11: peerdb_flow.RenameTablesInput.peer:type_name -> peerdb_peers.Peer + 63, // 11: peerdb_flow.RenameTablesInput.peer:type_name -> peerdb_peers.Peer 7, // 12: peerdb_flow.RenameTablesInput.rename_table_options:type_name -> peerdb_flow.RenameTableOption - 62, // 13: peerdb_flow.CreateTablesFromExistingInput.peer:type_name -> peerdb_peers.Peer - 52, // 14: peerdb_flow.CreateTablesFromExistingInput.new_to_existing_table_mapping:type_name -> peerdb_flow.CreateTablesFromExistingInput.NewToExistingTableMappingEntry - 53, // 15: peerdb_flow.SyncFlowOptions.relation_message_mapping:type_name -> peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry - 63, // 16: peerdb_flow.LastSyncState.last_synced_at:type_name -> google.protobuf.Timestamp + 63, // 13: peerdb_flow.CreateTablesFromExistingInput.peer:type_name -> peerdb_peers.Peer + 53, // 14: peerdb_flow.CreateTablesFromExistingInput.new_to_existing_table_mapping:type_name -> peerdb_flow.CreateTablesFromExistingInput.NewToExistingTableMappingEntry + 54, // 15: peerdb_flow.SyncFlowOptions.relation_message_mapping:type_name -> peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry + 64, // 16: peerdb_flow.LastSyncState.last_synced_at:type_name -> google.protobuf.Timestamp 14, // 17: peerdb_flow.StartFlowInput.last_sync_state:type_name -> peerdb_flow.LastSyncState 6, // 18: peerdb_flow.StartFlowInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs 12, // 19: peerdb_flow.StartFlowInput.sync_flow_options:type_name -> peerdb_flow.SyncFlowOptions - 54, // 20: peerdb_flow.StartFlowInput.relation_message_mapping:type_name -> peerdb_flow.StartFlowInput.RelationMessageMappingEntry + 55, // 20: peerdb_flow.StartFlowInput.relation_message_mapping:type_name -> peerdb_flow.StartFlowInput.RelationMessageMappingEntry 6, // 21: peerdb_flow.StartNormalizeInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 62, // 22: peerdb_flow.GetLastSyncedIDInput.peer_connection_config:type_name -> peerdb_peers.Peer - 62, // 23: peerdb_flow.EnsurePullabilityInput.peer_connection_config:type_name -> peerdb_peers.Peer - 62, // 24: peerdb_flow.EnsurePullabilityBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 63, // 22: peerdb_flow.GetLastSyncedIDInput.peer_connection_config:type_name -> peerdb_peers.Peer + 63, // 23: peerdb_flow.EnsurePullabilityInput.peer_connection_config:type_name -> peerdb_peers.Peer + 63, // 24: peerdb_flow.EnsurePullabilityBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer 20, // 25: peerdb_flow.TableIdentifier.postgres_table_identifier:type_name -> peerdb_flow.PostgresTableIdentifier 21, // 26: peerdb_flow.EnsurePullabilityOutput.table_identifier:type_name -> peerdb_flow.TableIdentifier - 55, // 27: peerdb_flow.EnsurePullabilityBatchOutput.table_identifier_mapping:type_name -> peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry - 62, // 28: peerdb_flow.SetupReplicationInput.peer_connection_config:type_name -> peerdb_peers.Peer - 56, // 29: peerdb_flow.SetupReplicationInput.table_name_mapping:type_name -> peerdb_flow.SetupReplicationInput.TableNameMappingEntry - 62, // 30: peerdb_flow.SetupReplicationInput.destination_peer:type_name -> peerdb_peers.Peer - 62, // 31: peerdb_flow.CreateRawTableInput.peer_connection_config:type_name -> peerdb_peers.Peer - 57, // 32: peerdb_flow.CreateRawTableInput.table_name_mapping:type_name -> peerdb_flow.CreateRawTableInput.TableNameMappingEntry + 56, // 27: peerdb_flow.EnsurePullabilityBatchOutput.table_identifier_mapping:type_name -> peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry + 63, // 28: peerdb_flow.SetupReplicationInput.peer_connection_config:type_name -> peerdb_peers.Peer + 57, // 29: peerdb_flow.SetupReplicationInput.table_name_mapping:type_name -> peerdb_flow.SetupReplicationInput.TableNameMappingEntry + 63, // 30: peerdb_flow.SetupReplicationInput.destination_peer:type_name -> peerdb_peers.Peer + 63, // 31: peerdb_flow.CreateRawTableInput.peer_connection_config:type_name -> peerdb_peers.Peer + 58, // 32: peerdb_flow.CreateRawTableInput.table_name_mapping:type_name -> peerdb_flow.CreateRawTableInput.TableNameMappingEntry 0, // 33: peerdb_flow.CreateRawTableInput.cdc_sync_mode:type_name -> peerdb_flow.QRepSyncMode - 58, // 34: peerdb_flow.TableSchema.columns:type_name -> peerdb_flow.TableSchema.ColumnsEntry - 62, // 35: peerdb_flow.GetTableSchemaBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer - 59, // 36: peerdb_flow.GetTableSchemaBatchOutput.table_name_schema_mapping:type_name -> peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry - 62, // 37: peerdb_flow.SetupNormalizedTableInput.peer_connection_config:type_name -> peerdb_peers.Peer + 59, // 34: peerdb_flow.TableSchema.columns:type_name -> peerdb_flow.TableSchema.ColumnsEntry + 63, // 35: peerdb_flow.GetTableSchemaBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 60, // 36: peerdb_flow.GetTableSchemaBatchOutput.table_name_schema_mapping:type_name -> peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry + 63, // 37: peerdb_flow.SetupNormalizedTableInput.peer_connection_config:type_name -> peerdb_peers.Peer 28, // 38: peerdb_flow.SetupNormalizedTableInput.source_table_schema:type_name -> peerdb_flow.TableSchema - 62, // 39: peerdb_flow.SetupNormalizedTableBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer - 60, // 40: peerdb_flow.SetupNormalizedTableBatchInput.table_name_schema_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry - 61, // 41: peerdb_flow.SetupNormalizedTableBatchOutput.table_exists_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry - 63, // 42: peerdb_flow.TimestampPartitionRange.start:type_name -> google.protobuf.Timestamp - 63, // 43: peerdb_flow.TimestampPartitionRange.end:type_name -> google.protobuf.Timestamp + 63, // 39: peerdb_flow.SetupNormalizedTableBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 61, // 40: peerdb_flow.SetupNormalizedTableBatchInput.table_name_schema_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry + 62, // 41: peerdb_flow.SetupNormalizedTableBatchOutput.table_exists_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry + 64, // 42: peerdb_flow.TimestampPartitionRange.start:type_name -> google.protobuf.Timestamp + 64, // 43: peerdb_flow.TimestampPartitionRange.end:type_name -> google.protobuf.Timestamp 37, // 44: peerdb_flow.TIDPartitionRange.start:type_name -> peerdb_flow.TID 37, // 45: peerdb_flow.TIDPartitionRange.end:type_name -> peerdb_flow.TID 35, // 46: peerdb_flow.PartitionRange.int_range:type_name -> peerdb_flow.IntPartitionRange 36, // 47: peerdb_flow.PartitionRange.timestamp_range:type_name -> peerdb_flow.TimestampPartitionRange 38, // 48: peerdb_flow.PartitionRange.tid_range:type_name -> peerdb_flow.TIDPartitionRange 1, // 49: peerdb_flow.QRepWriteMode.write_type:type_name -> peerdb_flow.QRepWriteType - 62, // 50: peerdb_flow.QRepConfig.source_peer:type_name -> peerdb_peers.Peer - 62, // 51: peerdb_flow.QRepConfig.destination_peer:type_name -> peerdb_peers.Peer + 63, // 50: peerdb_flow.QRepConfig.source_peer:type_name -> peerdb_peers.Peer + 63, // 51: peerdb_flow.QRepConfig.destination_peer:type_name -> peerdb_peers.Peer 0, // 52: peerdb_flow.QRepConfig.sync_mode:type_name -> peerdb_flow.QRepSyncMode 40, // 53: peerdb_flow.QRepConfig.write_mode:type_name -> peerdb_flow.QRepWriteMode 39, // 54: peerdb_flow.QRepPartition.range:type_name -> peerdb_flow.PartitionRange @@ -4723,6 +4807,18 @@ func file_flow_proto_init() { return nil } } + file_flow_proto_msgTypes[48].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PeerDBColumns); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_flow_proto_msgTypes[6].OneofWrappers = []interface{}{} file_flow_proto_msgTypes[19].OneofWrappers = []interface{}{ @@ -4739,7 +4835,7 @@ func file_flow_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_flow_proto_rawDesc, NumEnums: 2, - NumMessages: 60, + NumMessages: 61, NumExtensions: 0, NumServices: 0, }, diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 798e09c99b..50b1541e0d 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -472,6 +472,8 @@ pub struct QRepConfig { /// to be used after the old mirror is dropped #[prost(bool, tag="18")] pub dst_table_full_resync: bool, + #[prost(string, tag="19")] + pub synced_at_col_name: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -541,6 +543,16 @@ pub struct QRepFlowState { #[prost(bool, tag="4")] pub disable_wait_for_new_rows: bool, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PeerDbColumns { + #[prost(string, tag="1")] + pub soft_delete_col_name: ::prost::alloc::string::String, + #[prost(string, tag="2")] + pub synced_at_col_name: ::prost::alloc::string::String, + #[prost(bool, tag="3")] + pub soft_delete: bool, +} /// protos for qrep #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index 0436bf3345..1ebf981cd4 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -2476,6 +2476,138 @@ impl<'de> serde::Deserialize<'de> for PartitionRange { deserializer.deserialize_struct("peerdb_flow.PartitionRange", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PeerDbColumns { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.soft_delete_col_name.is_empty() { + len += 1; + } + if !self.synced_at_col_name.is_empty() { + len += 1; + } + if self.soft_delete { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_flow.PeerDBColumns", len)?; + if !self.soft_delete_col_name.is_empty() { + struct_ser.serialize_field("softDeleteColName", &self.soft_delete_col_name)?; + } + if !self.synced_at_col_name.is_empty() { + struct_ser.serialize_field("syncedAtColName", &self.synced_at_col_name)?; + } + if self.soft_delete { + struct_ser.serialize_field("softDelete", &self.soft_delete)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PeerDbColumns { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "soft_delete_col_name", + "softDeleteColName", + "synced_at_col_name", + "syncedAtColName", + "soft_delete", + "softDelete", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + SoftDeleteColName, + SyncedAtColName, + SoftDelete, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName), + "syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName), + "softDelete" | "soft_delete" => Ok(GeneratedField::SoftDelete), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PeerDbColumns; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_flow.PeerDBColumns") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut soft_delete_col_name__ = None; + let mut synced_at_col_name__ = None; + let mut soft_delete__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::SoftDeleteColName => { + if soft_delete_col_name__.is_some() { + return Err(serde::de::Error::duplicate_field("softDeleteColName")); + } + soft_delete_col_name__ = Some(map.next_value()?); + } + GeneratedField::SyncedAtColName => { + if synced_at_col_name__.is_some() { + return Err(serde::de::Error::duplicate_field("syncedAtColName")); + } + synced_at_col_name__ = Some(map.next_value()?); + } + GeneratedField::SoftDelete => { + if soft_delete__.is_some() { + return Err(serde::de::Error::duplicate_field("softDelete")); + } + soft_delete__ = Some(map.next_value()?); + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(PeerDbColumns { + soft_delete_col_name: soft_delete_col_name__.unwrap_or_default(), + synced_at_col_name: synced_at_col_name__.unwrap_or_default(), + soft_delete: soft_delete__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("peerdb_flow.PeerDBColumns", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PostgresTableIdentifier { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -2636,6 +2768,9 @@ impl serde::Serialize for QRepConfig { if self.dst_table_full_resync { len += 1; } + if !self.synced_at_col_name.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_flow.QRepConfig", len)?; if !self.flow_job_name.is_empty() { struct_ser.serialize_field("flowJobName", &self.flow_job_name)?; @@ -2693,6 +2828,9 @@ impl serde::Serialize for QRepConfig { if self.dst_table_full_resync { struct_ser.serialize_field("dstTableFullResync", &self.dst_table_full_resync)?; } + if !self.synced_at_col_name.is_empty() { + struct_ser.serialize_field("syncedAtColName", &self.synced_at_col_name)?; + } struct_ser.end() } } @@ -2738,6 +2876,8 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { "setupWatermarkTableOnDestination", "dst_table_full_resync", "dstTableFullResync", + "synced_at_col_name", + "syncedAtColName", ]; #[allow(clippy::enum_variant_names)] @@ -2760,6 +2900,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { NumRowsPerPartition, SetupWatermarkTableOnDestination, DstTableFullResync, + SyncedAtColName, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -2800,6 +2941,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { "numRowsPerPartition" | "num_rows_per_partition" => Ok(GeneratedField::NumRowsPerPartition), "setupWatermarkTableOnDestination" | "setup_watermark_table_on_destination" => Ok(GeneratedField::SetupWatermarkTableOnDestination), "dstTableFullResync" | "dst_table_full_resync" => Ok(GeneratedField::DstTableFullResync), + "syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName), _ => Ok(GeneratedField::__SkipField__), } } @@ -2837,6 +2979,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { let mut num_rows_per_partition__ = None; let mut setup_watermark_table_on_destination__ = None; let mut dst_table_full_resync__ = None; + let mut synced_at_col_name__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::FlowJobName => { @@ -2957,6 +3100,12 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { } dst_table_full_resync__ = Some(map.next_value()?); } + GeneratedField::SyncedAtColName => { + if synced_at_col_name__.is_some() { + return Err(serde::de::Error::duplicate_field("syncedAtColName")); + } + synced_at_col_name__ = Some(map.next_value()?); + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -2981,6 +3130,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { num_rows_per_partition: num_rows_per_partition__.unwrap_or_default(), setup_watermark_table_on_destination: setup_watermark_table_on_destination__.unwrap_or_default(), dst_table_full_resync: dst_table_full_resync__.unwrap_or_default(), + synced_at_col_name: synced_at_col_name__.unwrap_or_default(), }) } } diff --git a/protos/flow.proto b/protos/flow.proto index 88d87b9835..57ceef506f 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -321,6 +321,8 @@ message QRepConfig { // create new tables with "_peerdb_resync" suffix, perform initial load and then swap the new table with the old ones // to be used after the old mirror is dropped bool dst_table_full_resync = 18; + + string synced_at_col_name = 19; } message QRepPartition { @@ -364,3 +366,9 @@ message QRepFlowState { bool needs_resync = 3; bool disable_wait_for_new_rows = 4; } + +message PeerDBColumns { + string soft_delete_col_name = 1; + string synced_at_col_name = 2; + bool soft_delete = 3; +} \ No newline at end of file diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 845b4d627f..094d97765a 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -430,6 +430,7 @@ export interface QRepConfig { * to be used after the old mirror is dropped */ dstTableFullResync: boolean; + syncedAtColName: string; } export interface QRepPartition { @@ -474,6 +475,12 @@ export interface QRepFlowState { disableWaitForNewRows: boolean; } +export interface PeerDBColumns { + softDeleteColName: string; + syncedAtColName: string; + softDelete: boolean; +} + function createBaseTableNameMapping(): TableNameMapping { return { sourceTableName: "", destinationTableName: "" }; } @@ -5301,6 +5308,7 @@ function createBaseQRepConfig(): QRepConfig { numRowsPerPartition: 0, setupWatermarkTableOnDestination: false, dstTableFullResync: false, + syncedAtColName: "", }; } @@ -5360,6 +5368,9 @@ export const QRepConfig = { if (message.dstTableFullResync === true) { writer.uint32(144).bool(message.dstTableFullResync); } + if (message.syncedAtColName !== "") { + writer.uint32(154).string(message.syncedAtColName); + } return writer; }, @@ -5496,6 +5507,13 @@ export const QRepConfig = { message.dstTableFullResync = reader.bool(); continue; + case 19: + if (tag !== 154) { + break; + } + + message.syncedAtColName = reader.string(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -5529,6 +5547,7 @@ export const QRepConfig = { ? Boolean(object.setupWatermarkTableOnDestination) : false, dstTableFullResync: isSet(object.dstTableFullResync) ? Boolean(object.dstTableFullResync) : false, + syncedAtColName: isSet(object.syncedAtColName) ? String(object.syncedAtColName) : "", }; }, @@ -5588,6 +5607,9 @@ export const QRepConfig = { if (message.dstTableFullResync === true) { obj.dstTableFullResync = message.dstTableFullResync; } + if (message.syncedAtColName !== "") { + obj.syncedAtColName = message.syncedAtColName; + } return obj; }, @@ -5620,6 +5642,7 @@ export const QRepConfig = { message.numRowsPerPartition = object.numRowsPerPartition ?? 0; message.setupWatermarkTableOnDestination = object.setupWatermarkTableOnDestination ?? false; message.dstTableFullResync = object.dstTableFullResync ?? false; + message.syncedAtColName = object.syncedAtColName ?? ""; return message; }, }; @@ -6257,6 +6280,95 @@ export const QRepFlowState = { }, }; +function createBasePeerDBColumns(): PeerDBColumns { + return { softDeleteColName: "", syncedAtColName: "", softDelete: false }; +} + +export const PeerDBColumns = { + encode(message: PeerDBColumns, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.softDeleteColName !== "") { + writer.uint32(10).string(message.softDeleteColName); + } + if (message.syncedAtColName !== "") { + writer.uint32(18).string(message.syncedAtColName); + } + if (message.softDelete === true) { + writer.uint32(24).bool(message.softDelete); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PeerDBColumns { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePeerDBColumns(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.softDeleteColName = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.syncedAtColName = reader.string(); + continue; + case 3: + if (tag !== 24) { + break; + } + + message.softDelete = reader.bool(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): PeerDBColumns { + return { + softDeleteColName: isSet(object.softDeleteColName) ? String(object.softDeleteColName) : "", + syncedAtColName: isSet(object.syncedAtColName) ? String(object.syncedAtColName) : "", + softDelete: isSet(object.softDelete) ? Boolean(object.softDelete) : false, + }; + }, + + toJSON(message: PeerDBColumns): unknown { + const obj: any = {}; + if (message.softDeleteColName !== "") { + obj.softDeleteColName = message.softDeleteColName; + } + if (message.syncedAtColName !== "") { + obj.syncedAtColName = message.syncedAtColName; + } + if (message.softDelete === true) { + obj.softDelete = message.softDelete; + } + return obj; + }, + + create, I>>(base?: I): PeerDBColumns { + return PeerDBColumns.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): PeerDBColumns { + const message = createBasePeerDBColumns(); + message.softDeleteColName = object.softDeleteColName ?? ""; + message.syncedAtColName = object.syncedAtColName ?? ""; + message.softDelete = object.softDelete ?? false; + return message; + }, +}; + declare const self: any | undefined; declare const window: any | undefined; declare const global: any | undefined;