diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 69628c28d5..11a6d37895 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -264,6 +264,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, RelationMessageMapping: input.RelationMessageMapping, RecordStream: recordBatch, + SetLastOffset: func(lastOffset int64) error { + return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset) + }, }) }) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5f966ecf7f..2749566ced 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -340,6 +340,23 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) { } } +func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error { + query := fmt.Sprintf( + "UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'", + c.datasetID, + MirrorJobsTable, + lastOffset, + jobName, + ) + q := c.client.Query(query) + _, err := q.Read(c.ctx) + if err != nil { + return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) + } + + return nil +} + func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 149825c2cf..22161c434b 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -229,6 +229,19 @@ func (m *mergeStmtGenerator) generateUpdateStatements( (_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) + + // generates update statements for the case where updates and deletes happen in the same branch + // the backfill has happened from the pull side already, so treat the DeleteRecord as an update + // and then set soft-delete to true. + if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") { + tmpArray = append(tmpArray[:len(tmpArray)-1], + fmt.Sprintf("`%s` = TRUE", peerdbCols.SoftDeleteColName)) + ssep := strings.Join(tmpArray, ", ") + updateStmt := fmt.Sprintf(`WHEN MATCHED AND + (_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns='%s' + THEN UPDATE SET %s `, cols, ssep) + updateStmts = append(updateStmts, updateStmt) + } } return updateStmts } diff --git a/flow/connectors/bigquery/merge_stmt_generator_test.go b/flow/connectors/bigquery/merge_stmt_generator_test.go index 47705167d6..37dd3e07ed 100644 --- a/flow/connectors/bigquery/merge_stmt_generator_test.go +++ b/flow/connectors/bigquery/merge_stmt_generator_test.go @@ -14,25 +14,31 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { unchangedToastCols := []string{"", "col2, col3", "col2", "col3"} expected := []string{ - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" + - " AND _peerdb_unchanged_toast_columns='' " + - "THEN UPDATE SET `col1` = _peerdb_deduped.col1," + - " `col2` = _peerdb_deduped.col2," + - " `col3` = _peerdb_deduped.col3," + - "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" + - " AND _peerdb_unchanged_toast_columns='col2, col3' " + - "THEN UPDATE SET `col1` = _peerdb_deduped.col1," + - "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" + - " AND _peerdb_unchanged_toast_columns='col2'" + - "THEN UPDATE SET `col1` = _peerdb_deduped.col1," + - " `col3` = _peerdb_deduped.col3," + - "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" + - " AND _peerdb_unchanged_toast_columns='col3'" + - "THEN UPDATE SET `col1` = _peerdb_deduped.col1," + - " `col2` = _peerdb_deduped.col2," + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", + "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns=''" + + " THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2,`col3`=_peerdb_deduped.col3," + + "`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE", + "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) " + + "AND _peerdb_unchanged_toast_columns='' " + + "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2," + + "`col3`=_peerdb_deduped.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", + "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col2,col3' " + + "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", + "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col2,col3' " + + "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", + "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) " + + "AND _peerdb_unchanged_toast_columns='col2' " + + "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," + + "`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE", + "WHEN MATCHED AND(_peerdb_deduped._peerdb_record_type=2) " + + "AND _peerdb_unchanged_toast_columns='col2' " + + "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," + + "`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ", + "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col3' " + + "THEN UPDATE SET `col1`=_peerdb_deduped.col1," + + "`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", + "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col3' " + + "THEN UPDATE SET `col1`=_peerdb_deduped.col1," + + "`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", } result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{ @@ -47,7 +53,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { } if !reflect.DeepEqual(result, expected) { - t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result) } } @@ -65,6 +71,10 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) { " `col3` = _peerdb_deduped.col3," + " `synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", + "WHEN MATCHED AND" + + "(_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns=''" + + "THEN UPDATE SET `col1` = _peerdb_deduped.col1, `col2` = _peerdb_deduped.col2, " + + "`col3` = _peerdb_deduped.col3, `synced_at` = CURRENT_TIMESTAMP, `deleted` = TRUE", } result := m.generateUpdateStatements(allCols, unchangedToastCols, diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index bf1c603d43..df771e50a2 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -47,7 +47,7 @@ func (c *BigQueryConnector) SyncQRepRecords( avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath} return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, - tblMetadata, stream, config.SyncedAtColName) + tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName) } func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition, diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 882f11a8c2..6a83d23ae8 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -48,7 +48,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( flowJobName, dstTableName, syncBatchID), ) // You will need to define your Avro schema as a string - avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "") + avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "", "") if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } @@ -108,6 +108,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( dstTableMetadata *bigquery.TableMetadata, stream *model.QRecordStream, syncedAtCol string, + softDeleteCol string, ) (int, error) { startTime := time.Now() flowLog := slog.Group("sync_metadata", @@ -116,7 +117,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( slog.String("destinationTable", dstTableName), ) // You will need to define your Avro schema as a string - avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol) + avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol, softDeleteCol) if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } @@ -139,8 +140,11 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( stmts := []string{"BEGIN TRANSACTION;"} selector := "*" + if softDeleteCol != "" { // PeerDB column + selector += ", FALSE" + } if syncedAtCol != "" { // PeerDB column - selector = "*, CURRENT_TIMESTAMP" + selector += ", CURRENT_TIMESTAMP" } // Insert the records from the staging table into the destination table insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;", @@ -187,12 +191,13 @@ type AvroSchema struct { func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetadata, syncedAtCol string, + softDeleteCol string, ) (*model.QRecordAvroSchemaDefinition, error) { avroFields := []AvroField{} nullableFields := make(map[string]struct{}) for _, bqField := range dstTableMetadata.Schema { - if bqField.Name == syncedAtCol { + if bqField.Name == syncedAtCol || bqField.Name == softDeleteCol { continue } avroType, err := GetAvroType(bqField) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 707a7f0b11..c3fb138398 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -62,6 +62,9 @@ type CDCSyncConnector interface { // GetLastOffset gets the last offset from the metadata table on the destination GetLastOffset(jobName string) (int64, error) + // SetLastOffset updates the last offset on the metadata table on the destination + SetLastOffset(jobName string, lastOffset int64) error + // GetLastSyncBatchID gets the last batch synced to the destination from the metadata table GetLastSyncBatchID(jobName string) (int64, error) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 4be57309f2..027d3027fa 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -109,7 +109,7 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) { return c.pgMetadata.FetchLastOffset(jobName) } -func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { +func (c *EventHubConnector) SetLastOffset(jobName string, offset int64) error { err := c.pgMetadata.UpdateLastOffset(jobName, offset) if err != nil { c.logger.Error(fmt.Sprintf("failed to update last offset: %v", err)) @@ -187,7 +187,7 @@ func (c *EventHubConnector) processBatch( } if lastSeenLSN > lastUpdatedOffset { - err = c.updateLastOffset(flowJobName, lastSeenLSN) + err = c.SetLastOffset(flowJobName, lastSeenLSN) lastUpdatedOffset = lastSeenLSN c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN)) if err != nil { @@ -233,7 +233,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, err } - err = c.updateLastOffset(req.FlowJobName, lastCheckpoint) + err = c.SetLastOffset(req.FlowJobName, lastCheckpoint) if err != nil { c.logger.Error("failed to update last offset", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index ef2cf5e45b..eee1d4ef66 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -146,7 +146,6 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { var offset pgtype.Int8 err := rows.Scan(&offset) if err != nil { - // if the job doesn't exist, return 0 if err.Error() == "no rows in result set" { return 0, nil } @@ -198,7 +197,8 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) - DO UPDATE SET last_offset = $2, updated_at = NOW() + DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), + updated_at = NOW() `, jobName, offset, 0) if err != nil { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 979723f930..f2eda2e5f4 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -30,10 +30,10 @@ type PostgresCDCSource struct { SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude slot string + SetLastOffset func(int64) error publication string relationMessageMapping model.RelationMessageMapping typeMap *pgtype.Map - startLSN pglogrepl.LSN commitLock bool customTypeMapping map[uint32]string @@ -56,6 +56,7 @@ type PostgresCDCConfig struct { RelationMessageMapping model.RelationMessageMapping CatalogPool *pgxpool.Pool FlowJobName string + SetLastOffset func(int64) error } // Create a new PostgresCDCSource @@ -72,6 +73,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, TableNameMapping: cdcConfig.TableNameMapping, slot: cdcConfig.Slot, + SetLastOffset: cdcConfig.SetLastOffset, publication: cdcConfig.Publication, relationMessageMapping: cdcConfig.RelationMessageMapping, typeMap: pgtype.NewMap(), @@ -152,19 +154,20 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error { sysident.SystemID, sysident.Timeline, sysident.XLogPos, sysident.DBName)) // start replication - p.startLSN = 0 + var clientXLogPos, startLSN pglogrepl.LSN if req.LastOffset > 0 { p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset)) - p.startLSN = pglogrepl.LSN(req.LastOffset + 1) + clientXLogPos = pglogrepl.LSN(req.LastOffset) + startLSN = clientXLogPos + 1 } - err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts) + err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, startLSN, replicationOpts) if err != nil { - return fmt.Errorf("error starting replication at startLsn - %d: %w", p.startLSN, err) + return fmt.Errorf("error starting replication at startLsn - %d: %w", startLSN, err) } - p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, p.startLSN)) + p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, startLSN)) - return p.consumeStream(pgConn, req, p.startLSN, req.RecordStream) + return p.consumeStream(pgConn, req, clientXLogPos, req.RecordStream) } // start consuming the cdc stream @@ -181,12 +184,12 @@ func (p *PostgresCDCSource) consumeStream( } }() - // clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed - // until clientXLogPos - 1 each time we send a standby status update. + // clientXLogPos is the last checkpoint id, we need to ack that we have processed + // until clientXLogPos each time we send a standby status update. // consumedXLogPos is the lsn that has been committed on the destination. consumedXLogPos := pglogrepl.LSN(0) if clientXLogPos > 0 { - consumedXLogPos = clientXLogPos - 1 + consumedXLogPos = clientXLogPos err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) @@ -194,6 +197,7 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) } } + proposedConsumedXLogPos := consumedXLogPos var standByLastLogged time.Time cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName) @@ -252,19 +256,27 @@ func (p *PostgresCDCSource) consumeStream( if pkmRequiresResponse { // Update XLogPos to the last processed position, we can only confirm // that this is the last row committed on the destination. + if proposedConsumedXLogPos > consumedXLogPos { + p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos)) + consumedXLogPos = proposedConsumedXLogPos + err := p.SetLastOffset(int64(consumedXLogPos)) + if err != nil { + return fmt.Errorf("storing updated LSN failed: %w", err) + } + } + err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } + pkmRequiresResponse = false if time.Since(standByLastLogged) > 10*time.Second { numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len()) p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage)) standByLastLogged = time.Now() } - - pkmRequiresResponse = false } if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock { @@ -469,7 +481,7 @@ func (p *PostgresCDCSource) consumeStream( if cdcRecordsStorage.IsEmpty() { // given that we have no records it is safe to update the flush wal position // to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages. - consumedXLogPos = clientXLogPos + proposedConsumedXLogPos = clientXLogPos records.UpdateLatestCheckpoint(int64(clientXLogPos)) } } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 9aa05131c7..9f516a49c6 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -15,6 +15,7 @@ import ( "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" + "github.com/lib/pq/oid" "golang.org/x/exp/maps" ) @@ -33,13 +34,14 @@ const ( createRawTableDstTableIndexSQL = "CREATE INDEX IF NOT EXISTS %s_dst_table_idx ON %s.%s(_peerdb_destination_table_name)" getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1" + setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2" getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1" getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1" createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)" insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)" checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1" - updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=$1, sync_batch_id=$2 WHERE mirror_job_name=$3" + updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1), sync_batch_id=$2 WHERE mirror_job_name=$3" updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2" getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name, @@ -77,6 +79,15 @@ const ( deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=$1" ) +type ReplicaIdentityType rune + +const ( + ReplicaIdentityDefault ReplicaIdentityType = 'd' + ReplicaIdentityFull = 'f' + ReplicaIdentityIndex = 'i' + ReplicaIdentityNothing = 'n' +) + // getRelIDForTable returns the relation ID for a table. func (c *PostgresConnector) getRelIDForTable(schemaTable *utils.SchemaTable) (uint32, error) { var relID pgtype.Uint32 @@ -92,10 +103,10 @@ func (c *PostgresConnector) getRelIDForTable(schemaTable *utils.SchemaTable) (ui } // getReplicaIdentity returns the replica identity for a table. -func (c *PostgresConnector) isTableFullReplica(schemaTable *utils.SchemaTable) (bool, error) { +func (c *PostgresConnector) getReplicaIdentityType(schemaTable *utils.SchemaTable) (ReplicaIdentityType, error) { relID, relIDErr := c.getRelIDForTable(schemaTable) if relIDErr != nil { - return false, fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, relIDErr) + return ReplicaIdentityDefault, fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, relIDErr) } var replicaIdentity rune @@ -103,43 +114,76 @@ func (c *PostgresConnector) isTableFullReplica(schemaTable *utils.SchemaTable) ( `SELECT relreplident FROM pg_class WHERE oid = $1;`, relID).Scan(&replicaIdentity) if err != nil { - return false, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, err) + return ReplicaIdentityDefault, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, err) } - return string(replicaIdentity) == "f", nil + + return ReplicaIdentityType(replicaIdentity), nil } -// getPrimaryKeyColumns for table returns the primary key column for a given table -// errors if there is no primary key column or if there is more than one primary key column. -func (c *PostgresConnector) getPrimaryKeyColumns(schemaTable *utils.SchemaTable) ([]string, error) { +// getPrimaryKeyColumns returns the primary key columns for a given table. +// Errors if there is no primary key column or if there is more than one primary key column. +func (c *PostgresConnector) getPrimaryKeyColumns( + replicaIdentity ReplicaIdentityType, + schemaTable *utils.SchemaTable, +) ([]string, error) { relID, err := c.getRelIDForTable(schemaTable) if err != nil { return nil, fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, err) } - // Get the primary key column name - var pkCol pgtype.Text - pkCols := make([]string, 0) + if replicaIdentity == ReplicaIdentityIndex { + return c.getReplicaIdentityIndexColumns(relID, schemaTable) + } + + // Find the primary key index OID + var pkIndexOID oid.Oid + err = c.pool.QueryRow(c.ctx, + `SELECT indexrelid FROM pg_index WHERE indrelid = $1 AND indisprimary`, + relID).Scan(&pkIndexOID) + if err != nil { + return nil, fmt.Errorf("error finding primary key index for table %s: %w", schemaTable, err) + } + + return c.getColumnNamesForIndex(pkIndexOID) +} + +// getReplicaIdentityIndexColumns returns the columns used in the replica identity index. +func (c *PostgresConnector) getReplicaIdentityIndexColumns(relID uint32, schemaTable *utils.SchemaTable) ([]string, error) { + var indexRelID oid.Oid + // Fetch the OID of the index used as the replica identity + err := c.pool.QueryRow(c.ctx, + `SELECT indexrelid FROM pg_index + WHERE indrelid = $1 AND indisreplident = true`, + relID).Scan(&indexRelID) + if err != nil { + return nil, fmt.Errorf("error finding replica identity index for table %s: %w", schemaTable, err) + } + + return c.getColumnNamesForIndex(indexRelID) +} + +// getColumnNamesForIndex returns the column names for a given index. +func (c *PostgresConnector) getColumnNamesForIndex(indexOID oid.Oid) ([]string, error) { + var col pgtype.Text + cols := make([]string, 0) rows, err := c.pool.Query(c.ctx, `SELECT a.attname FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) - WHERE i.indrelid = $1 AND i.indisprimary ORDER BY a.attname ASC`, - relID) + WHERE i.indexrelid = $1 ORDER BY a.attname ASC`, + indexOID) if err != nil { - return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err) + return nil, fmt.Errorf("error getting columns for index %v: %w", indexOID, err) } defer rows.Close() - for { - if !rows.Next() { - break - } - err = rows.Scan(&pkCol) + + for rows.Next() { + err = rows.Scan(&col) if err != nil { - return nil, fmt.Errorf("error scanning primary key column for table %s: %w", schemaTable, err) + return nil, fmt.Errorf("error scanning column for index %v: %w", indexOID, err) } - pkCols = append(pkCols, pkCol.String) + cols = append(cols, col.String) } - - return pkCols, nil + return cols, nil } func (c *PostgresConnector) tableExists(schemaTable *utils.SchemaTable) (bool, error) { @@ -209,7 +253,8 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er } rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,wal_status,"+ "confirmed_flush_lsn::text,active,"+ - "round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+ + "round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END"+ + " - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+ " FROM pg_control_checkpoint(), pg_replication_slots"+specificSlotClause+";") if err != nil { return nil, err @@ -443,6 +488,7 @@ func (c *PostgresConnector) jobMetadataExistsTx(tx pgx.Tx, jobName string) (bool if err != nil { return false, fmt.Errorf("error reading result row: %w", err) } + return result.Bool, nil } @@ -715,7 +761,7 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, peerdbCols.SyncedAtColName)) } // set soft-deleted to false, tackles insert after soft-delete - if peerdbCols.SoftDeleteColName != "" { + if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") { tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = FALSE`, peerdbCols.SoftDeleteColName)) } @@ -725,12 +771,26 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) + + // generates update statements for the case where updates and deletes happen in the same branch + // the backfill has happened from the pull side already, so treat the DeleteRecord as an update + // and then set soft-delete to true. + if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") { + tmpArray = append(tmpArray[:len(tmpArray)-1], + fmt.Sprintf(`"%s" = TRUE`, peerdbCols.SoftDeleteColName)) + ssep := strings.Join(tmpArray, ", ") + updateStmt := fmt.Sprintf(`WHEN MATCHED AND + src._peerdb_record_type = 2 AND _peerdb_unchanged_toast_columns='%s' + THEN UPDATE SET %s `, cols, ssep) + updateStmts = append(updateStmts, updateStmt) + } } return updateStmts } func (c *PostgresConnector) getCurrentLSN() (pglogrepl.LSN, error) { - row := c.pool.QueryRow(c.ctx, "SELECT pg_current_wal_lsn();") + row := c.pool.QueryRow(c.ctx, + "SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END") var result pgtype.Text err := row.Scan(&result) if err != nil { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 82426b3e3f..b848c5a5b5 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -185,13 +185,24 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) { if err != nil { return 0, fmt.Errorf("error while reading result row: %w", err) } + if result.Int64 == 0 { c.logger.Warn("Assuming zero offset means no sync has happened") } - return result.Int64, nil } +// SetLastOffset updates the last synced offset for a job. +func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) error { + _, err := c.pool. + Exec(c.ctx, fmt.Sprintf(setLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName) + if err != nil { + return fmt.Errorf("error setting last offset for job %s: %w", jobName, err) + } + + return nil +} + // PullRecords pulls records from the source. func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error { defer func() { @@ -238,6 +249,7 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu RelationMessageMapping: req.RelationMessageMapping, CatalogPool: catalogPool, FlowJobName: req.FlowJobName, + SetLastOffset: req.SetLastOffset, }, c.customTypesMapping) if err != nil { return fmt.Errorf("failed to create cdc source: %w", err) @@ -558,12 +570,12 @@ func (c *PostgresConnector) getTableSchemaForTable( return nil, err } - isFullReplica, replErr := c.isTableFullReplica(schemaTable) + replicaIdentityType, replErr := c.getReplicaIdentityType(schemaTable) if replErr != nil { return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr) } - pKeyCols, err := c.getPrimaryKeyColumns(schemaTable) + pKeyCols, err := c.getPrimaryKeyColumns(replicaIdentityType, schemaTable) if err != nil { return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err) } @@ -581,7 +593,7 @@ func (c *PostgresConnector) getTableSchemaForTable( TableIdentifier: tableName, Columns: make(map[string]string), PrimaryKeyColumns: pKeyCols, - IsReplicaIdentityFull: isFullReplica, + IsReplicaIdentityFull: replicaIdentityType == ReplicaIdentityFull, } for _, fieldDescription := range rows.FieldDescriptions() { @@ -731,18 +743,18 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatch return nil, err } - isFullReplica, replErr := c.isTableFullReplica(schemaTable) + replicaIdentity, replErr := c.getReplicaIdentityType(schemaTable) if replErr != nil { return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr) } - pKeyCols, err := c.getPrimaryKeyColumns(schemaTable) + pKeyCols, err := c.getPrimaryKeyColumns(replicaIdentity, schemaTable) if err != nil { return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err) } // we only allow no primary key if the table has REPLICA IDENTITY FULL - if len(pKeyCols) == 0 && !isFullReplica { + if len(pKeyCols) == 0 && !(replicaIdentity == ReplicaIdentityFull) { return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable) } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 96d16930cc..c40ef05bd4 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -176,7 +176,7 @@ func (c *S3Connector) GetLastOffset(jobName string) (int64, error) { } // update offset for a job -func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { +func (c *S3Connector) SetLastOffset(jobName string, offset int64) error { err := c.pgMetadata.UpdateLastOffset(jobName, offset) if err != nil { c.logger.Error("failed to update last offset: ", slog.Any("error", err)) @@ -218,7 +218,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } - err = c.updateLastOffset(req.FlowJobName, lastCheckpoint) + err = c.SetLastOffset(req.FlowJobName, lastCheckpoint) if err != nil { c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index f92ed3e33e..8cd8240f11 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -73,6 +73,7 @@ const ( WHERE TABLE_SCHEMA=? and TABLE_NAME=?` checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" + setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" @@ -301,7 +302,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { }() if !rows.Next() { - c.logger.Warn("No row found ,returning nil") + c.logger.Warn("No row found, returning 0") return 0, nil } var result pgtype.Int8 @@ -311,10 +312,20 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { } if result.Int64 == 0 { c.logger.Warn("Assuming zero offset means no sync has happened") + return 0, nil } return result.Int64, nil } +func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) error { + _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(setLastOffsetSQL, + c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName) + if err != nil { + return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) + } + return nil +} + func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) @@ -1053,6 +1064,10 @@ func (c *SnowflakeConnector) generateUpdateStatements( (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) + + // generates update statements for the case where updates and deletes happen in the same branch + // the backfill has happened from the pull side already, so treat the DeleteRecord as an update + // and then set soft-delete to true. if softDelete && (softDeleteCol != "") { tmpArray = append(tmpArray[:len(tmpArray)-1], fmt.Sprintf(`"%s" = TRUE`, softDeleteCol)) ssep := strings.Join(tmpArray, ", ") diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 7e194f2d34..90c016b404 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -216,8 +216,9 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils Body: r, }) if err != nil { - slog.Error("failed to upload file: ", slog.Any("error", err)) - return nil, fmt.Errorf("failed to upload file: %w", err) + s3Path := "s3://" + bucketName + "/" + key + slog.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path)) + return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err) } slog.Info("file uploaded to" + result.Location) diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index bfa19f866b..a938f673b3 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -22,6 +22,11 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env, s.T()) + setupErr := s.setupS3("s3") + if setupErr != nil { + s.Fail("failed to setup S3", setupErr) + } + srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") flowJobName := s.attachSuffix("test_simple_flow_s3") diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index d4ff50751f..8d521dbb72 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -198,6 +198,72 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.AssertExpectations(s.t) } +func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + srcTableName := s.attachSchemaSuffix("test_replica_identity_no_pkey") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_replica_identity_no_pkey") + + // Create a table without a primary key and create a named unique index + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + CREATE UNIQUE INDEX unique_idx_on_id_key ON %s (id, key); + ALTER TABLE %s REPLICA IDENTITY USING INDEX unique_idx_on_id_key; + `, srcTableName, srcTableName, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_simple_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.sfHelper.Peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + require.NoError(s.t, err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 20, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert 20 rows into the source table + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 20 rows into the source table + for i := 0; i < 20; i++ { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (id, key, value) VALUES ($1, $2, $3) + `, srcTableName), i, testKey, testValue) + require.NoError(s.t, err) + } + fmt.Println("Inserted 20 rows into the source table") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + require.Contains(s.t, err.Error(), "continue as new") + + count, err := s.sfHelper.CountRows("test_replica_identity_no_pkey") + require.NoError(s.t, err) + s.Equal(20, count) + + env.AssertExpectations(s.t) +} + func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env, s.t) diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index 4be91a690f..3cb146df01 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -2671,6 +2671,7 @@ type QRepConfig struct { // to be used after the old mirror is dropped DstTableFullResync bool `protobuf:"varint,18,opt,name=dst_table_full_resync,json=dstTableFullResync,proto3" json:"dst_table_full_resync,omitempty"` SyncedAtColName string `protobuf:"bytes,19,opt,name=synced_at_col_name,json=syncedAtColName,proto3" json:"synced_at_col_name,omitempty"` + SoftDeleteColName string `protobuf:"bytes,20,opt,name=soft_delete_col_name,json=softDeleteColName,proto3" json:"soft_delete_col_name,omitempty"` } func (x *QRepConfig) Reset() { @@ -2838,6 +2839,13 @@ func (x *QRepConfig) GetSyncedAtColName() string { return "" } +func (x *QRepConfig) GetSoftDeleteColName() string { + if x != nil { + return x.SoftDeleteColName + } + return "" +} + type QRepPartition struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -3908,7 +3916,7 @@ var file_flow_proto_rawDesc = []byte{ 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x2c, 0x0a, 0x12, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x4b, 0x65, 0x79, 0x43, 0x6f, - 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x22, 0xc6, 0x07, 0x0a, 0x0a, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, + 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x22, 0xf7, 0x07, 0x0a, 0x0a, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x22, 0x0a, 0x0d, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x33, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, @@ -3968,104 +3976,107 @@ var file_flow_proto_rawDesc = []byte{ 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x46, 0x75, 0x6c, 0x6c, 0x52, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x2b, 0x0a, 0x12, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x5f, 0x63, 0x6f, 0x6c, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x13, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x73, - 0x79, 0x6e, 0x63, 0x65, 0x64, 0x41, 0x74, 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x97, - 0x01, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x49, 0x64, 0x12, 0x31, 0x0a, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, - 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, - 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x14, 0x66, 0x75, 0x6c, 0x6c, 0x5f, 0x74, - 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x66, 0x75, 0x6c, 0x6c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x50, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x6b, 0x0a, 0x12, 0x51, 0x52, 0x65, 0x70, - 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x12, 0x19, - 0x0a, 0x08, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x07, 0x62, 0x61, 0x74, 0x63, 0x68, 0x49, 0x64, 0x12, 0x3a, 0x0a, 0x0a, 0x70, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, - 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x50, 0x0a, 0x12, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x3a, 0x0a, 0x0a, 0x70, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, - 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x2c, 0x0a, 0x0d, 0x44, 0x72, 0x6f, 0x70, 0x46, - 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x6c, 0x6f, 0x77, - 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x6c, 0x6f, - 0x77, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x54, 0x0a, 0x10, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x41, 0x64, - 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, 0x6c, - 0x75, 0x6d, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, - 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, - 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0a, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x22, 0xa2, 0x01, 0x0a, 0x10, - 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, - 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x72, 0x63, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x72, 0x63, 0x54, 0x61, 0x62, - 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x64, 0x73, 0x74, 0x5f, 0x74, 0x61, - 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, - 0x64, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x42, 0x0a, 0x0d, - 0x61, 0x64, 0x64, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x03, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, - 0x77, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x41, 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, - 0x6d, 0x6e, 0x52, 0x0c, 0x61, 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, - 0x22, 0xc8, 0x01, 0x0a, 0x1b, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x54, 0x61, 0x62, 0x6c, 0x65, - 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x49, 0x6e, 0x70, 0x75, 0x74, - 0x12, 0x5a, 0x0a, 0x17, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, - 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x15, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, 0x65, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x4d, 0x0a, 0x13, - 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x64, 0x65, 0x6c, - 0x74, 0x61, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, - 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x52, 0x11, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, - 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x22, 0xe9, 0x01, 0x0a, 0x0d, - 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x41, 0x0a, - 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, - 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x12, 0x38, 0x0a, 0x18, 0x6e, 0x75, 0x6d, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x5f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x16, 0x6e, 0x75, 0x6d, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x6e, 0x65, - 0x65, 0x64, 0x73, 0x5f, 0x72, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x0b, 0x6e, 0x65, 0x65, 0x64, 0x73, 0x52, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x38, 0x0a, - 0x19, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x77, 0x61, 0x69, 0x74, 0x5f, 0x66, 0x6f, - 0x72, 0x5f, 0x6e, 0x65, 0x77, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x15, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x57, 0x61, 0x69, 0x74, 0x46, 0x6f, 0x72, - 0x4e, 0x65, 0x77, 0x52, 0x6f, 0x77, 0x73, 0x22, 0x8e, 0x01, 0x0a, 0x0d, 0x50, 0x65, 0x65, 0x72, - 0x44, 0x42, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x12, 0x2f, 0x0a, 0x14, 0x73, 0x6f, 0x66, - 0x74, 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x5f, 0x6e, 0x61, 0x6d, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x73, 0x6f, 0x66, 0x74, 0x44, 0x65, 0x6c, - 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2b, 0x0a, 0x12, 0x73, 0x79, - 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x5f, 0x63, 0x6f, 0x6c, 0x5f, 0x6e, 0x61, 0x6d, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x41, 0x74, - 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x66, 0x74, 0x5f, - 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x73, 0x6f, - 0x66, 0x74, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, 0x70, - 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, - 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, - 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, - 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x52, - 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, 0x51, 0x52, - 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, 0x51, - 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x41, - 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, - 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, - 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, - 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, 0x54, 0x45, - 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, 0x74, 0x6f, - 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, - 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0a, - 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x79, 0x6e, 0x63, 0x65, 0x64, 0x41, 0x74, 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2f, + 0x0a, 0x14, 0x73, 0x6f, 0x66, 0x74, 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x5f, 0x63, 0x6f, + 0x6c, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x14, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x73, 0x6f, + 0x66, 0x74, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x22, + 0x97, 0x01, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x31, 0x0a, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, + 0x77, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, + 0x52, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x14, 0x66, 0x75, 0x6c, 0x6c, 0x5f, + 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x66, 0x75, 0x6c, 0x6c, 0x54, 0x61, 0x62, 0x6c, 0x65, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x6b, 0x0a, 0x12, 0x51, 0x52, 0x65, + 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x12, + 0x19, 0x0a, 0x08, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x07, 0x62, 0x61, 0x74, 0x63, 0x68, 0x49, 0x64, 0x12, 0x3a, 0x0a, 0x0a, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, + 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, + 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x50, 0x0a, 0x12, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, + 0x72, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x3a, 0x0a, 0x0a, + 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, + 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x2c, 0x0a, 0x0d, 0x44, 0x72, 0x6f, 0x70, + 0x46, 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x6c, 0x6f, + 0x77, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x6c, + 0x6f, 0x77, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x54, 0x0a, 0x10, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x41, + 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, + 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0a, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x63, + 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0a, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x22, 0xa2, 0x01, 0x0a, + 0x10, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, + 0x61, 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x72, 0x63, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x72, 0x63, 0x54, 0x61, + 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x64, 0x73, 0x74, 0x5f, 0x74, + 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0c, 0x64, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x42, 0x0a, + 0x0d, 0x61, 0x64, 0x64, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x03, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, + 0x6f, 0x77, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x41, 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, + 0x75, 0x6d, 0x6e, 0x52, 0x0c, 0x61, 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, + 0x73, 0x22, 0xc8, 0x01, 0x0a, 0x1b, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x54, 0x61, 0x62, 0x6c, + 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x49, 0x6e, 0x70, 0x75, + 0x74, 0x12, 0x5a, 0x0a, 0x17, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, + 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x15, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, + 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x4d, 0x0a, + 0x13, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x64, 0x65, + 0x6c, 0x74, 0x61, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, + 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x52, 0x11, 0x74, 0x61, 0x62, 0x6c, 0x65, + 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x22, 0xe9, 0x01, 0x0a, + 0x0d, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x41, + 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x38, 0x0a, 0x18, 0x6e, 0x75, 0x6d, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x5f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x16, 0x6e, 0x75, 0x6d, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x6e, + 0x65, 0x65, 0x64, 0x73, 0x5f, 0x72, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x0b, 0x6e, 0x65, 0x65, 0x64, 0x73, 0x52, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x38, + 0x0a, 0x19, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x77, 0x61, 0x69, 0x74, 0x5f, 0x66, + 0x6f, 0x72, 0x5f, 0x6e, 0x65, 0x77, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x15, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x57, 0x61, 0x69, 0x74, 0x46, 0x6f, + 0x72, 0x4e, 0x65, 0x77, 0x52, 0x6f, 0x77, 0x73, 0x22, 0x8e, 0x01, 0x0a, 0x0d, 0x50, 0x65, 0x65, + 0x72, 0x44, 0x42, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x12, 0x2f, 0x0a, 0x14, 0x73, 0x6f, + 0x66, 0x74, 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x5f, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x73, 0x6f, 0x66, 0x74, 0x44, 0x65, + 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2b, 0x0a, 0x12, 0x73, + 0x79, 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x5f, 0x63, 0x6f, 0x6c, 0x5f, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x41, + 0x74, 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x66, 0x74, + 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x73, + 0x6f, 0x66, 0x74, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, + 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, + 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, + 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, + 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, + 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, 0x51, + 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, + 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, + 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, + 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, + 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, + 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, 0x54, + 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, + 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, + 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index 6818299073..02a52b26d6 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -71,25 +71,23 @@ func GetAvroSchemaDefinition( dstTableName string, qRecordSchema *QRecordSchema, ) (*QRecordAvroSchemaDefinition, error) { - avroFields := []QRecordAvroField{} + avroFields := make([]QRecordAvroField, 0, len(qRecordSchema.Fields)) nullableFields := make(map[string]struct{}) for _, qField := range qRecordSchema.Fields { - avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, qField.Nullable) + avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type) if err != nil { return nil, err } - consolidatedType := avroType.AvroLogicalSchema - if qField.Nullable { - consolidatedType = []interface{}{"null", consolidatedType} + avroType = []interface{}{"null", avroType} nullableFields[qField.Name] = struct{}{} } avroFields = append(avroFields, QRecordAvroField{ Name: qField.Name, - Type: consolidatedType, + Type: avroType, }) } diff --git a/flow/model/model.go b/flow/model/model.go index 02949f3c2e..487616c531 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -50,6 +50,8 @@ type PullRecordsRequest struct { RelationMessageMapping RelationMessageMapping // record batch for pushing changes into RecordStream *CDCRecordStream + // last offset may be forwarded while processing records + SetLastOffset func(int64) error } type Record interface { diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 2cd6fe2e52..4f9cbe2e47 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -2,18 +2,25 @@ package qvalue import ( "fmt" + "log/slog" "math/big" "time" "github.com/google/uuid" "github.com/linkedin/goavro/v2" - "golang.org/x/exp/slog" ) -// QValueKindAvroSchema defines a structure for representing Avro schemas. -// AvroLogicalSchema holds the Avro logical schema for a corresponding QValueKind. -type QValueKindAvroSchema struct { - AvroLogicalSchema interface{} +// https://avro.apache.org/docs/1.11.0/spec.html +type AvroSchemaArray struct { + Type string `json:"type"` + Items string `json:"items"` +} + +type AvroSchemaNumeric struct { + Type string `json:"type"` + LogicalType string `json:"logicalType"` + Precision int `json:"precision"` + Scale int `json:"scale"` } // GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind. @@ -23,104 +30,61 @@ type QValueKindAvroSchema struct { // // For example, QValueKindInt64 would return an AvroLogicalSchema of "long". Unsupported QValueKinds // will return an error. -// -// The function currently does not support the following QValueKinds: -// - QValueKindBit -// -// Please note that for QValueKindNumeric and QValueKindETime, RespectNull is always -// set to false, regardless of the nullable value passed in. -func GetAvroSchemaFromQValueKind(kind QValueKind, nullable bool) (*QValueKindAvroSchema, error) { +func GetAvroSchemaFromQValueKind(kind QValueKind) (interface{}, error) { switch kind { case QValueKindString, QValueKindUUID: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "string", - }, nil + return "string", nil case QValueKindGeometry, QValueKindGeography, QValueKindPoint: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "string", - }, nil + return "string", nil case QValueKindInt16, QValueKindInt32, QValueKindInt64: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "long", - }, nil + return "long", nil case QValueKindFloat32: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "float", - }, nil + return "float", nil case QValueKindFloat64: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "double", - }, nil + return "double", nil case QValueKindBoolean: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "boolean", - }, nil + return "boolean", nil case QValueKindBytes, QValueKindBit: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "bytes", - }, nil + return "bytes", nil case QValueKindNumeric: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "bytes", - "logicalType": "decimal", - "precision": 38, - "scale": 9, - }, + return AvroSchemaNumeric{ + Type: "bytes", + LogicalType: "decimal", + Precision: 38, + Scale: 9, }, nil case QValueKindTime, QValueKindTimeTZ, QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]string{ - "type": "string", - }, - }, nil + return "string", nil case QValueKindHStore, QValueKindJSON, QValueKindStruct: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "string", - "values": "string", - }, - }, nil + return "string", nil case QValueKindArrayFloat32: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "float", - }, + return AvroSchemaArray{ + Type: "array", + Items: "float", }, nil case QValueKindArrayFloat64: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "double", - }, + return AvroSchemaArray{ + Type: "array", + Items: "double", }, nil case QValueKindArrayInt32: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "int", - }, + return AvroSchemaArray{ + Type: "array", + Items: "int", }, nil case QValueKindArrayInt64: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "long", - }, + return AvroSchemaArray{ + Type: "array", + Items: "long", }, nil case QValueKindArrayString: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "string", - }, + return AvroSchemaArray{ + Type: "array", + Items: "string", }, nil case QValueKindInvalid: // lets attempt to do invalid as a string - return &QValueKindAvroSchema{ - AvroLogicalSchema: "string", - }, nil + return "string", nil default: return nil, fmt.Errorf("unsupported QValueKind type: %s", kind) } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index bc8448b4b7..998317c520 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -177,6 +177,7 @@ func (s *SnapshotFlowExecution) cloneTable( MaxParallelWorkers: numWorkers, StagingPath: s.config.SnapshotStagingPath, SyncedAtColName: s.config.SyncedAtColName, + SoftDeleteColName: s.config.SoftDeleteColName, WriteMode: &protos.QRepWriteMode{ WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, }, diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 50b1541e0d..4163aa0363 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -474,6 +474,8 @@ pub struct QRepConfig { pub dst_table_full_resync: bool, #[prost(string, tag="19")] pub synced_at_col_name: ::prost::alloc::string::String, + #[prost(string, tag="20")] + pub soft_delete_col_name: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index 1ebf981cd4..3374e09b99 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -2771,6 +2771,9 @@ impl serde::Serialize for QRepConfig { if !self.synced_at_col_name.is_empty() { len += 1; } + if !self.soft_delete_col_name.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_flow.QRepConfig", len)?; if !self.flow_job_name.is_empty() { struct_ser.serialize_field("flowJobName", &self.flow_job_name)?; @@ -2831,6 +2834,9 @@ impl serde::Serialize for QRepConfig { if !self.synced_at_col_name.is_empty() { struct_ser.serialize_field("syncedAtColName", &self.synced_at_col_name)?; } + if !self.soft_delete_col_name.is_empty() { + struct_ser.serialize_field("softDeleteColName", &self.soft_delete_col_name)?; + } struct_ser.end() } } @@ -2878,6 +2884,8 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { "dstTableFullResync", "synced_at_col_name", "syncedAtColName", + "soft_delete_col_name", + "softDeleteColName", ]; #[allow(clippy::enum_variant_names)] @@ -2901,6 +2909,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { SetupWatermarkTableOnDestination, DstTableFullResync, SyncedAtColName, + SoftDeleteColName, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -2942,6 +2951,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { "setupWatermarkTableOnDestination" | "setup_watermark_table_on_destination" => Ok(GeneratedField::SetupWatermarkTableOnDestination), "dstTableFullResync" | "dst_table_full_resync" => Ok(GeneratedField::DstTableFullResync), "syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName), + "softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName), _ => Ok(GeneratedField::__SkipField__), } } @@ -2980,6 +2990,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { let mut setup_watermark_table_on_destination__ = None; let mut dst_table_full_resync__ = None; let mut synced_at_col_name__ = None; + let mut soft_delete_col_name__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::FlowJobName => { @@ -3106,6 +3117,12 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { } synced_at_col_name__ = Some(map.next_value()?); } + GeneratedField::SoftDeleteColName => { + if soft_delete_col_name__.is_some() { + return Err(serde::de::Error::duplicate_field("softDeleteColName")); + } + soft_delete_col_name__ = Some(map.next_value()?); + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -3131,6 +3148,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { setup_watermark_table_on_destination: setup_watermark_table_on_destination__.unwrap_or_default(), dst_table_full_resync: dst_table_full_resync__.unwrap_or_default(), synced_at_col_name: synced_at_col_name__.unwrap_or_default(), + soft_delete_col_name: soft_delete_col_name__.unwrap_or_default(), }) } } diff --git a/protos/flow.proto b/protos/flow.proto index 57ceef506f..965d8d0fce 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -323,6 +323,7 @@ message QRepConfig { bool dst_table_full_resync = 18; string synced_at_col_name = 19; + string soft_delete_col_name = 20; } message QRepPartition { diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index 65f2158ee6..63155acdf2 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -66,12 +66,6 @@ export default function CDCConfigForm({ if (mirrorConfig.source != undefined && mirrorConfig.destination != undefined) return ( <> - {normalSettings.map((setting, id) => { return ( paramDisplayCondition(setting) && ( @@ -112,6 +106,13 @@ export default function CDCConfigForm({ /> ); })} + + ); } diff --git a/ui/app/mirrors/create/cdc/columnbox.tsx b/ui/app/mirrors/create/cdc/columnbox.tsx new file mode 100644 index 0000000000..b68560419f --- /dev/null +++ b/ui/app/mirrors/create/cdc/columnbox.tsx @@ -0,0 +1,80 @@ +'use client'; +import { TableMapRow } from '@/app/dto/MirrorsDTO'; +import { Checkbox } from '@/lib/Checkbox'; +import { Label } from '@/lib/Label'; +import { RowWithCheckbox } from '@/lib/Layout'; +import { Dispatch, SetStateAction } from 'react'; + +interface ColumnProps { + columns: string[]; + tableRow: TableMapRow; + rows: TableMapRow[]; + setRows: Dispatch>; +} +export default function ColumnBox({ + columns, + tableRow, + rows, + setRows, +}: ColumnProps) { + const handleColumnExclusion = ( + source: string, + column: string, + include: boolean + ) => { + const currRows = [...rows]; + const rowOfSource = currRows.find((row) => row.source === source); + if (rowOfSource) { + if (include) { + const updatedExclude = rowOfSource.exclude.filter( + (col) => col !== column + ); + rowOfSource.exclude = updatedExclude; + } else { + rowOfSource.exclude.push(column); + } + } + setRows(currRows); + }; + + const columnExclusion = new Set(tableRow.exclude); + return columns.map((column) => { + const [columnName, columnType, isPkeyStr] = column.split(':'); + const isPkey = isPkeyStr === 'true'; + return ( + + {columnName} +

+ {columnType} +

+ + } + action={ + + handleColumnExclusion(tableRow.source, columnName, state) + } + /> + } + /> + ); + }); +} diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx index 4195fae83d..d9b15a703c 100644 --- a/ui/app/mirrors/create/cdc/schemabox.tsx +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -7,9 +7,16 @@ import { Label } from '@/lib/Label'; import { RowWithCheckbox } from '@/lib/Layout'; import { SearchField } from '@/lib/SearchField'; import { TextField } from '@/lib/TextField'; -import { Dispatch, SetStateAction, useCallback, useState } from 'react'; +import { + Dispatch, + SetStateAction, + useCallback, + useMemo, + useState, +} from 'react'; import { BarLoader } from 'react-spinners/'; import { fetchColumns, fetchTables } from '../handlers'; +import ColumnBox from './columnbox'; import { expandableStyle, schemaBoxStyle, tableBoxStyle } from './styles'; interface SchemaBoxProps { @@ -36,6 +43,20 @@ const SchemaBox = ({ const [columnsLoading, setColumnsLoading] = useState(false); const [expandedSchemas, setExpandedSchemas] = useState([]); const [tableQuery, setTableQuery] = useState(''); + const [schemaLoadedSet, setSchemaLoadedSet] = useState>( + new Set() + ); + + const [handlingAll, setHandlingAll] = useState(false); + + const searchedTables = useMemo(() => { + const tableQueryLower = tableQuery.toLowerCase(); + return rows.filter( + (row) => + row.schema === schema && + row.source.toLowerCase().includes(tableQueryLower) + ); + }, [schema, rows, tableQuery]); const schemaIsExpanded = useCallback( (schema: string) => { @@ -74,11 +95,13 @@ const SchemaBox = ({ const addTableColumns = (table: string) => { const schemaName = table.split('.')[0]; const tableName = table.split('.')[1]; + fetchColumns(sourcePeer, schemaName, tableName, setColumnsLoading).then( - (res) => + (res) => { setTableColumns((prev) => { return [...prev, { tableName: table, columns: res }]; - }) + }); + } ); }; @@ -93,47 +116,34 @@ const SchemaBox = ({ ?.columns; }; - const handleColumnExclusion = ( - source: string, - column: string, - include: boolean - ) => { - const currRows = [...rows]; - const rowOfSource = currRows.find((row) => row.source === source); - if (rowOfSource) { - if (include) { - const updatedExclude = rowOfSource.exclude.filter( - (col) => col !== column - ); - rowOfSource.exclude = updatedExclude; - } else { - rowOfSource.exclude.push(column); - } - } - setRows(currRows); - }; - const handleSelectAll = ( - e: React.MouseEvent + e: React.MouseEvent, + schemaName: string ) => { + setHandlingAll(true); const newRows = [...rows]; for (const row of newRows) { - row.selected = e.currentTarget.checked; - if (e.currentTarget.checked) addTableColumns(row.source); - else removeTableColumns(row.source); + if (row.schema === schemaName) { + row.selected = e.currentTarget.checked; + if (e.currentTarget.checked) addTableColumns(row.source); + else removeTableColumns(row.source); + } } setRows(newRows); + setHandlingAll(false); }; const handleSchemaClick = (schemaName: string) => { if (!schemaIsExpanded(schemaName)) { - setTablesLoading(true); setExpandedSchemas((curr) => [...curr, schemaName]); - fetchTables(sourcePeer, schemaName, peerType).then((tableRows) => { - const newRows = [...rows, ...tableRows]; - setRows(newRows); - setTablesLoading(false); - }); + if (!schemaLoadedSet.has(schemaName)) { + setTablesLoading(true); + setSchemaLoadedSet((loaded) => new Set(loaded).add(schemaName)); + fetchTables(sourcePeer, schemaName, peerType).then((tableRows) => { + setRows((value) => [...value, ...tableRows]); + setTablesLoading(false); + }); + } } else { setExpandedSchemas((curr) => curr.filter((expandedSchema) => expandedSchema != schemaName) @@ -158,7 +168,10 @@ const SchemaBox = ({
- handleSelectAll(e)} /> + handleSelectAll(e, schema)} + /> @@ -173,139 +186,96 @@ const SchemaBox = ({ />
- {schemaIsExpanded(schema) && ( + {/* TABLE BOX */} + {handlingAll && } + {!handlingAll && schemaIsExpanded(schema) && (
- {rows.filter((row) => row.schema === schema).length ? ( - rows - .filter( - (row) => - row.schema === schema && - row.source.toLowerCase().includes(tableQuery.toLowerCase()) - ) - .map((row) => { - const columns = getTableColumns(row.source); - return ( -
+ {searchedTables.length ? ( + searchedTables.map((row) => { + const columns = getTableColumns(row.source); + return ( +
+
+ + {row.source} + + } + action={ + + handleTableSelect(state, row.source) + } + /> + } + /> +
- - {row.source} - - } - action={ - - handleTableSelect(state, row.source) - } - /> +

Target Table:

+ ) => + updateDestination(row.source, e.target.value) } /> +
+
-
+ + {columns ? ( + -
-
- {row.selected && ( -
+ ) : columnsLoading ? ( + + ) : ( - {columns ? ( - columns.map((column) => { - const columnName = column.split(':')[0]; - const columnType = column.split(':')[1]; - const isPkey = column.split(':')[2] === 'true'; - return ( - - {columnName} -

- {columnType} -

- - } - action={ - col == columnName - ) - } - onCheckedChange={(state: boolean) => - handleColumnExclusion( - row.source, - columnName, - state - ) - } - /> - } - /> - ); - }) - ) : columnsLoading ? ( - - ) : ( - - )} -
- )} -
- ); - }) + )} +
+ )} + + ); + }) ) : tablesLoading ? ( ) : ( diff --git a/ui/app/mirrors/create/cdc/tablemapping.tsx b/ui/app/mirrors/create/cdc/tablemapping.tsx index 85c889cb4b..2cccea321f 100644 --- a/ui/app/mirrors/create/cdc/tablemapping.tsx +++ b/ui/app/mirrors/create/cdc/tablemapping.tsx @@ -55,7 +55,7 @@ const TableMapping = ({ /> -
+
{allSchemas ? ( allSchemas ?.filter((schema) => { diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 094d97765a..1ba651b45f 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -431,6 +431,7 @@ export interface QRepConfig { */ dstTableFullResync: boolean; syncedAtColName: string; + softDeleteColName: string; } export interface QRepPartition { @@ -5309,6 +5310,7 @@ function createBaseQRepConfig(): QRepConfig { setupWatermarkTableOnDestination: false, dstTableFullResync: false, syncedAtColName: "", + softDeleteColName: "", }; } @@ -5371,6 +5373,9 @@ export const QRepConfig = { if (message.syncedAtColName !== "") { writer.uint32(154).string(message.syncedAtColName); } + if (message.softDeleteColName !== "") { + writer.uint32(162).string(message.softDeleteColName); + } return writer; }, @@ -5514,6 +5519,13 @@ export const QRepConfig = { message.syncedAtColName = reader.string(); continue; + case 20: + if (tag !== 162) { + break; + } + + message.softDeleteColName = reader.string(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -5548,6 +5560,7 @@ export const QRepConfig = { : false, dstTableFullResync: isSet(object.dstTableFullResync) ? Boolean(object.dstTableFullResync) : false, syncedAtColName: isSet(object.syncedAtColName) ? String(object.syncedAtColName) : "", + softDeleteColName: isSet(object.softDeleteColName) ? String(object.softDeleteColName) : "", }; }, @@ -5610,6 +5623,9 @@ export const QRepConfig = { if (message.syncedAtColName !== "") { obj.syncedAtColName = message.syncedAtColName; } + if (message.softDeleteColName !== "") { + obj.softDeleteColName = message.softDeleteColName; + } return obj; }, @@ -5643,6 +5659,7 @@ export const QRepConfig = { message.setupWatermarkTableOnDestination = object.setupWatermarkTableOnDestination ?? false; message.dstTableFullResync = object.dstTableFullResync ?? false; message.syncedAtColName = object.syncedAtColName ?? ""; + message.softDeleteColName = object.softDeleteColName ?? ""; return message; }, };