From 4d42a0e2699a52452d4ad9641dd4e323dec5476a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 12 Oct 2023 15:08:16 -0400 Subject: [PATCH] Return only at commit message in Postgres CDC (#503) Co-authored-by: Kevin Biju --- flow/activities/flowable.go | 8 +- flow/connectors/bigquery/bigquery.go | 63 +++--- flow/connectors/core.go | 2 +- flow/connectors/postgres/cdc.go | 68 ++++--- flow/connectors/postgres/postgres.go | 58 +++--- flow/connectors/postgres/postgres_cdc_test.go | 8 +- .../postgres/postgres_schema_delta_test.go | 190 +----------------- flow/connectors/snowflake/snowflake.go | 68 +++---- flow/e2e/bigquery/peer_flow_bq_test.go | 5 +- flow/e2e/postgres/peer_flow_pg_test.go | 63 +++++- flow/e2e/postgres/qrep_flow_pg_test.go | 16 +- flow/e2e/snowflake/peer_flow_sf_test.go | 76 ++++++- .../snowflake/snowflake_schema_delta_test.go | 182 ++--------------- flow/generated/protos/flow.pb.go | 93 ++++----- flow/model/model.go | 6 +- flow/workflows/cdc_flow.go | 54 +++-- flow/workflows/sync_flow.go | 29 +-- nexus/pt/src/peerdb_flow.rs | 6 +- nexus/pt/src/peerdb_flow.serde.rs | 44 ++-- protos/flow.proto | 3 +- ui/grpc_generated/flow.ts | 40 ++-- 21 files changed, 415 insertions(+), 667 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 5876aab726..0eb93920cd 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -249,7 +249,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0) return &model.SyncResponse{ RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping, - TableSchemaDelta: recordsWithTableSchemaDelta.TableSchemaDelta, + TableSchemaDeltas: recordsWithTableSchemaDelta.TableSchemaDeltas, }, nil } @@ -297,7 +297,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, if err != nil { return nil, err } - res.TableSchemaDelta = recordsWithTableSchemaDelta.TableSchemaDelta + res.TableSchemaDeltas = recordsWithTableSchemaDelta.TableSchemaDeltas res.RelationMessageMapping = recordsWithTableSchemaDelta.RelationMessageMapping pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) @@ -372,7 +372,7 @@ func (a *FlowableActivity) StartNormalize( return res, nil } -func (a *FlowableActivity) ReplayTableSchemaDelta( +func (a *FlowableActivity) ReplayTableSchemaDeltas( ctx context.Context, input *protos.ReplayTableSchemaDeltaInput, ) error { @@ -384,7 +384,7 @@ func (a *FlowableActivity) ReplayTableSchemaDelta( } defer connectors.CloseConnector(dest) - return dest.ReplayTableSchemaDelta(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDelta) + return dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas) } // SetupQRepMetadataTables sets up the metadata tables for QReplication. diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 006052777d..19c8bc596c 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -205,39 +205,29 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc return nil } -// ReplayTableSchemaDelta changes a destination table to match the schema at source +// ReplayTableSchemaDeltas changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. -func (c *BigQueryConnector) ReplayTableSchemaDelta(flowJobName string, - schemaDelta *protos.TableSchemaDelta) error { - if (schemaDelta == nil) || (len(schemaDelta.AddedColumns) == 0 && len(schemaDelta.DroppedColumns) == 0) { - return nil - } - - for _, droppedColumn := range schemaDelta.DroppedColumns { - _, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s DROP COLUMN `%s`", c.datasetID, - schemaDelta.DstTableName, droppedColumn)).Read(c.ctx) - if err != nil { - return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn, - schemaDelta.SrcTableName, err) +func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, + schemaDeltas []*protos.TableSchemaDelta) error { + for _, schemaDelta := range schemaDeltas { + if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { + return nil } - log.WithFields(log.Fields{ - "flowName": flowJobName, - "tableName": schemaDelta.SrcTableName, - }).Infof("[schema delta replay] dropped column %s", droppedColumn) - } - for _, addedColumn := range schemaDelta.AddedColumns { - _, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN `%s` %s", c.datasetID, - schemaDelta.DstTableName, addedColumn.ColumnName, - qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx) - if err != nil { - return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, - schemaDelta.SrcTableName, err) + + for _, addedColumn := range schemaDelta.AddedColumns { + _, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN `%s` %s", c.datasetID, + schemaDelta.DstTableName, addedColumn.ColumnName, + qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx) + if err != nil { + return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, + schemaDelta.SrcTableName, err) + } + log.WithFields(log.Fields{ + "flowName": flowJobName, + "tableName": schemaDelta.SrcTableName, + }).Infof("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName, + addedColumn.ColumnType) } - log.WithFields(log.Fields{ - "flowName": flowJobName, - "tableName": schemaDelta.SrcTableName, - }).Infof("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName, - addedColumn.ColumnType) } return nil @@ -299,7 +289,8 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState } func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) + query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", + c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) it, err := q.Read(c.ctx) if err != nil { @@ -323,7 +314,8 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { } func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) + query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", + c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) it, err := q.Read(c.ctx) if err != nil { @@ -981,7 +973,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr {Name: "_peerdb_unchanged_toast_columns", Type: bigquery.StringFieldType}, } - staging_schema := bigquery.Schema{ + stagingSchema := bigquery.Schema{ {Name: "_peerdb_uid", Type: bigquery.StringFieldType}, {Name: "_peerdb_timestamp", Type: bigquery.TimestampFieldType}, {Name: "_peerdb_timestamp_nanos", Type: bigquery.IntegerFieldType}, @@ -1022,7 +1014,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr stagingTableName := c.getStagingTableName(req.FlowJobName) stagingTable := c.client.Dataset(c.datasetID).Table(stagingTableName) err = stagingTable.Create(c.ctx, &bigquery.TableMetadata{ - Schema: staging_schema, + Schema: stagingSchema, }) if err != nil { return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, stagingTableName, err) @@ -1034,7 +1026,8 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr } // getUpdateMetadataStmt updates the metadata tables for a given job. -func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64, batchID int64) (string, error) { +func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64, + batchID int64) (string, error) { hasJob, err := c.metadataHasJob(jobName) if err != nil { return "", fmt.Errorf("failed to check if job exists: %w", err) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index c31a0f5304..a67f9e3461 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -90,7 +90,7 @@ type CDCNormalizeConnector interface { // ReplayTableSchemaDelta changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. - ReplayTableSchemaDelta(flowJobName string, schemaDelta *protos.TableSchemaDelta) error + ReplayTableSchemaDeltas(flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error } type QRepPullConnector interface { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 1d19b21508..81e9ef790d 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -29,6 +29,7 @@ type PostgresCDCSource struct { relationMessageMapping model.RelationMessageMapping typeMap *pgtype.Map startLSN pglogrepl.LSN + commitLock bool } type PostgresCDCConfig struct { @@ -52,6 +53,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, err publication: cdcConfig.Publication, relationMessageMapping: cdcConfig.RelationMessageMapping, typeMap: pgtype.NewMap(), + commitLock: false, }, nil } @@ -125,13 +127,12 @@ func (p *PostgresCDCSource) consumeStream( } result := &model.RecordsWithTableSchemaDelta{ RecordBatch: records, - TableSchemaDelta: nil, + TableSchemaDeltas: nil, RelationMessageMapping: p.relationMessageMapping, } standbyMessageTimeout := req.IdleTimeout nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) - earlyReturn := false defer func() { err := conn.Close(p.ctx) @@ -142,20 +143,21 @@ func (p *PostgresCDCSource) consumeStream( } }() + // clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed + // until clientXLogPos - 1 each time we send a standby status update. + // consumedXLogPos is the lsn that has been committed on the destination. + consumedXLogPos := pglogrepl.LSN(0) + if clientXLogPos > 0 { + consumedXLogPos = clientXLogPos - 1 + } + for { if time.Now().After(nextStandbyMessageDeadline) || - earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) { - // update the WALWritePosition to be clientXLogPos - 1 - // as the clientXLogPos is the last checkpoint id + 1 - // and we want to send the last checkpoint id as the last - // checkpoint id that we have processed. - lastProcessedXLogPos := clientXLogPos - if clientXLogPos > 0 { - lastProcessedXLogPos = clientXLogPos - 1 - } + // Update XLogPos to the last processed position, we can only confirm + // that this is the last row committed on the destination. err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, - pglogrepl.StandbyStatusUpdate{WALWritePosition: lastProcessedXLogPos}) + pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } @@ -165,7 +167,7 @@ func (p *PostgresCDCSource) consumeStream( log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - if earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) { + if !p.commitLock && (len(records.Records) == int(req.MaxBatchSize)) { return result, nil } } @@ -173,12 +175,13 @@ func (p *PostgresCDCSource) consumeStream( ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) rawMsg, err := conn.ReceiveMessage(ctx) cancel() - if err != nil { + if err != nil && !p.commitLock { if pgconn.Timeout(err) { log.Infof("Idle timeout reached, returning currently accumulated records") return result, nil + } else { + return nil, fmt.Errorf("ReceiveMessage failed: %w", err) } - return nil, fmt.Errorf("ReceiveMessage failed: %w", err) } if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok { @@ -285,12 +288,11 @@ func (p *PostgresCDCSource) consumeStream( case *model.DeleteRecord: records.Records = append(records.Records, rec) case *model.RelationRecord: - tableSchemaDelta := rec.(*model.RelationRecord).TableSchemaDelta - if len(tableSchemaDelta.AddedColumns) > 0 || len(tableSchemaDelta.DroppedColumns) > 0 { - result.TableSchemaDelta = tableSchemaDelta - log.Infof("Detected schema change for table %s, returning currently accumulated records", - result.TableSchemaDelta.SrcTableName) - earlyReturn = true + tableSchemaDelta := r.TableSchemaDelta + if len(tableSchemaDelta.AddedColumns) > 0 { + log.Infof("Detected schema change for table %s, addedColumns: %v", + tableSchemaDelta.SrcTableName, tableSchemaDelta.AddedColumns) + result.TableSchemaDeltas = append(result.TableSchemaDeltas, tableSchemaDelta) } } } @@ -311,7 +313,9 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre switch msg := logicalMsg.(type) { case *pglogrepl.BeginMessage: - log.Debugf("Ignoring BeginMessage") + log.Debugf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid) + log.Debugf("Locking PullRecords at BeginMessage, awaiting CommitMessage") + p.commitLock = true case *pglogrepl.InsertMessage: return p.processInsertMessage(xld.WALStart, msg) case *pglogrepl.UpdateMessage: @@ -320,7 +324,10 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre return p.processDeleteMessage(xld.WALStart, msg) case *pglogrepl.CommitMessage: // for a commit message, update the last checkpoint id for the record batch. + log.Debugf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v", + msg.CommitLSN, msg.TransactionEndLSN) batch.LastCheckPointID = int64(xld.WALStart) + p.commitLock = false case *pglogrepl.RelationMessage: // TODO (kaushik): consider persistent state for a mirror job // to be stored somewhere in temporal state. We might need to persist @@ -563,10 +570,9 @@ func (p *PostgresCDCSource) processRelationMessage( schemaDelta := &protos.TableSchemaDelta{ // set it to the source table for now, so we can update the schema on the source side // then at the Workflow level we set it t - SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId], - DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]], - AddedColumns: make([]*protos.DeltaAddedColumn, 0), - DroppedColumns: make([]string, 0), + SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId], + DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]], + AddedColumns: make([]*protos.DeltaAddedColumn, 0), } for _, column := range currRel.Columns { // not present in previous relation message, but in current one, so added. @@ -578,17 +584,15 @@ func (p *PostgresCDCSource) processRelationMessage( // present in previous and current relation messages, but data types have changed. // so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first. } else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId { - schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name) - schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{ - ColumnName: column.Name, - ColumnType: string(postgresOIDToQValueKind(column.DataType)), - }) + log.Warnf("Detected dropped column %s in table %s, but not propagating", column, + schemaDelta.SrcTableName) } } for _, column := range prevRel.Columns { // present in previous relation message, but not in current one, so dropped. if currRelMap[column.Name] == nil { - schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name) + log.Warnf("Detected dropped column %s in table %s, but not propagating", column, + schemaDelta.SrcTableName) } } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index fdca7f07e4..9725a6a2af 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -667,16 +667,13 @@ func (c *PostgresConnector) InitializeTableSchema(req map[string]*protos.TableSc // ReplayTableSchemaDelta changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. -func (c *PostgresConnector) ReplayTableSchemaDelta(flowJobName string, schemaDelta *protos.TableSchemaDelta) error { - if (schemaDelta == nil) || (len(schemaDelta.AddedColumns) == 0 && len(schemaDelta.DroppedColumns) == 0) { - return nil - } - +func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string, + schemaDeltas []*protos.TableSchemaDelta) error { // Postgres is cool and supports transactional DDL. So we use a transaction. tableSchemaModifyTx, err := c.pool.Begin(c.ctx) if err != nil { - return fmt.Errorf("error starting transaction for schema modification for table %s: %w", - schemaDelta.DstTableName, err) + return fmt.Errorf("error starting transaction for schema modification: %w", + err) } defer func() { deferErr := tableSchemaModifyTx.Rollback(c.ctx) @@ -687,39 +684,32 @@ func (c *PostgresConnector) ReplayTableSchemaDelta(flowJobName string, schemaDel } }() - for _, droppedColumn := range schemaDelta.DroppedColumns { - _, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN \"%s\"", - schemaDelta.DstTableName, droppedColumn)) - if err != nil { - return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn, - schemaDelta.DstTableName, err) + for _, schemaDelta := range schemaDeltas { + if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { + return nil } - log.WithFields(log.Fields{ - "flowName": flowJobName, - "srcTableName": schemaDelta.SrcTableName, - "dstTableName": schemaDelta.DstTableName, - }).Infof("[schema delta replay] dropped column %s", droppedColumn) - } - for _, addedColumn := range schemaDelta.AddedColumns { - _, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s", - schemaDelta.DstTableName, addedColumn.ColumnName, - qValueKindToPostgresType(addedColumn.ColumnType))) - if err != nil { - return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, - schemaDelta.DstTableName, err) + + for _, addedColumn := range schemaDelta.AddedColumns { + _, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s", + schemaDelta.DstTableName, addedColumn.ColumnName, + qValueKindToPostgresType(addedColumn.ColumnType))) + if err != nil { + return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, + schemaDelta.DstTableName, err) + } + log.WithFields(log.Fields{ + "flowName": flowJobName, + "srcTableName": schemaDelta.SrcTableName, + "dstTableName": schemaDelta.DstTableName, + }).Infof("[schema delta replay] added column %s with data type %s", + addedColumn.ColumnName, addedColumn.ColumnType) } - log.WithFields(log.Fields{ - "flowName": flowJobName, - "srcTableName": schemaDelta.SrcTableName, - "dstTableName": schemaDelta.DstTableName, - }).Infof("[schema delta replay] added column %s with data type %s", - addedColumn.ColumnName, addedColumn.ColumnType) } err = tableSchemaModifyTx.Commit(c.ctx) if err != nil { - return fmt.Errorf("failed to commit transaction for table schema modification for table %s: %w", - schemaDelta.DstTableName, err) + return fmt.Errorf("failed to commit transaction for table schema modification: %w", + err) } return nil diff --git a/flow/connectors/postgres/postgres_cdc_test.go b/flow/connectors/postgres/postgres_cdc_test.go index fd1d46e80b..c18b188801 100644 --- a/flow/connectors/postgres/postgres_cdc_test.go +++ b/flow/connectors/postgres/postgres_cdc_test.go @@ -524,7 +524,7 @@ func (suite *PostgresCDCTestSuite) TestSimpleHappyFlow() { }) suite.failTestError(err) suite.Equal(0, len(recordsWithSchemaDelta.RecordBatch.Records)) - suite.Nil(recordsWithSchemaDelta.TableSchemaDelta) + suite.Nil(recordsWithSchemaDelta.TableSchemaDeltas) suite.Equal(int64(0), recordsWithSchemaDelta.RecordBatch.FirstCheckPointID) suite.Equal(int64(0), recordsWithSchemaDelta.RecordBatch.LastCheckPointID) relationMessageMapping = recordsWithSchemaDelta.RelationMessageMapping @@ -542,7 +542,7 @@ func (suite *PostgresCDCTestSuite) TestSimpleHappyFlow() { RelationMessageMapping: relationMessageMapping, }) suite.failTestError(err) - suite.Nil(recordsWithSchemaDelta.TableSchemaDelta) + suite.Nil(recordsWithSchemaDelta.TableSchemaDeltas) suite.validateInsertedSimpleRecords(recordsWithSchemaDelta.RecordBatch.Records, simpleHappyFlowSrcTableName, simpleHappyFlowDstTableName) suite.Greater(recordsWithSchemaDelta.RecordBatch.FirstCheckPointID, int64(0)) @@ -567,7 +567,7 @@ func (suite *PostgresCDCTestSuite) TestSimpleHappyFlow() { RelationMessageMapping: relationMessageMapping, }) suite.failTestError(err) - suite.Nil(recordsWithSchemaDelta.TableSchemaDelta) + suite.Nil(recordsWithSchemaDelta.TableSchemaDeltas) suite.validateSimpleMutatedRecords(recordsWithSchemaDelta.RecordBatch.Records, simpleHappyFlowSrcTableName, simpleHappyFlowDstTableName) suite.GreaterOrEqual(recordsWithSchemaDelta.RecordBatch.FirstCheckPointID, currentCheckPointID) @@ -794,7 +794,7 @@ func (suite *PostgresCDCTestSuite) TestToastHappyFlow() { RelationMessageMapping: relationMessageMapping, }) suite.failTestError(err) - suite.Nil(recordsWithSchemaDelta.TableSchemaDelta) + suite.Nil(recordsWithSchemaDelta.TableSchemaDeltas) suite.validateInsertedToastRecords(recordsWithSchemaDelta.RecordBatch.Records, toastHappyFlowSrcTableName, toastHappyFlowDstTableName) suite.Greater(recordsWithSchemaDelta.RecordBatch.FirstCheckPointID, int64(0)) diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 243f597864..38400ac741 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -79,14 +79,14 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) suite.failTestError(err) - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ + err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: []*protos.DeltaAddedColumn{{ ColumnName: "hi", ColumnType: string(qvalue.QValueKindInt64), }}, - }) + }}) suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -103,95 +103,7 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { }, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestSimpleDropColumn() { - tableName := fmt.Sprintf("%s.simple_drop_column", schemaDeltaTestSchemaName) - _, err := suite.connector.pool.Exec(context.Background(), - fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY, bye TEXT)", tableName)) - suite.failTestError(err) - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - DroppedColumns: []string{"bye"}, - }) - suite.failTestError(err) - - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt32), - }, - PrimaryKeyColumn: "id", - }, output.TableNameSchemaMapping[tableName]) -} - -func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddDropColumn() { - tableName := fmt.Sprintf("%s.simple_add_drop_column", schemaDeltaTestSchemaName) - _, err := suite.connector.pool.Exec(context.Background(), - fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY, bye TEXT)", tableName)) - suite.failTestError(err) - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - AddedColumns: []*protos.DeltaAddedColumn{{ - ColumnName: "hi", - ColumnType: string(qvalue.QValueKindInt64), - }}, - DroppedColumns: []string{"bye"}, - }) - suite.failTestError(err) - - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt32), - "hi": string(qvalue.QValueKindInt64), - }, - PrimaryKeyColumn: "id", - }, output.TableNameSchemaMapping[tableName]) -} - -func (suite *PostgresSchemaDeltaTestSuite) TestAddDropSameColumn() { - tableName := fmt.Sprintf("%s.add_drop_same_column", schemaDeltaTestSchemaName) - _, err := suite.connector.pool.Exec(context.Background(), - fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY, bye UUID)", tableName)) - suite.failTestError(err) - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - AddedColumns: []*protos.DeltaAddedColumn{{ - ColumnName: "bye", - ColumnType: string(qvalue.QValueKindJSON), - }}, - DroppedColumns: []string{"bye"}, - }) - suite.failTestError(err) - - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt32), - "bye": string(qvalue.QValueKindJSON), - }, - PrimaryKeyColumn: "id", - }, output.TableNameSchemaMapping[tableName]) -} - -func (suite *PostgresSchemaDeltaTestSuite) TestAddDropAllColumnTypes() { +func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.add_drop_all_column_types", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) @@ -231,11 +143,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropAllColumnTypes() { } } - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ + err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }) + }}) suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -243,38 +155,9 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropAllColumnTypes() { }) suite.failTestError(err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) - - droppedColumns := make([]string, 0) - for columnName := range expectedTableSchema.Columns { - if columnName != "id" { - droppedColumns = append(droppedColumns, columnName) - } - } - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - DroppedColumns: droppedColumns, - }) - suite.failTestError(err) - - _, err = suite.connector.pool.Exec(context.Background(), - fmt.Sprintf("INSERT INTO %s VALUES($1)", tableName), 1) - suite.failTestError(err) - output, err = suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt32), - }, - PrimaryKeyColumn: "id", - }, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddDropTrickyColumnNames() { +func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) @@ -306,11 +189,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropTrickyColumnNames() { } } - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ + err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }) + }}) suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -318,32 +201,6 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropTrickyColumnNames() { }) suite.failTestError(err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) - - droppedColumns := make([]string, 0) - for columnName := range expectedTableSchema.Columns { - if columnName != "id" { - droppedColumns = append(droppedColumns, columnName) - } - } - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - DroppedColumns: droppedColumns, - }) - suite.failTestError(err) - - output, err = suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt32), - }, - PrimaryKeyColumn: "id", - }, output.TableNameSchemaMapping[tableName]) } func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { @@ -372,12 +229,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { } } - fmt.Println(addedColumns) - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ + err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }) + }}) suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -385,32 +241,6 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { }) suite.failTestError(err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) - - droppedColumns := make([]string, 0) - for columnName := range expectedTableSchema.Columns { - if columnName != " " { - droppedColumns = append(droppedColumns, columnName) - } - } - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - DroppedColumns: droppedColumns, - }) - suite.failTestError(err) - - output, err = suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - " ": string(qvalue.QValueKindInt32), - }, - PrimaryKeyColumn: " ", - }, output.TableNameSchemaMapping[tableName]) } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 7ffbca7aa2..4335ee85e8 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -219,7 +219,7 @@ func (c *SnowflakeConnector) GetTableSchema( req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error) { res := make(map[string]*protos.TableSchema) for _, tableName := range req.TableIdentifiers { - tableSchema, err := c.getTableSchemaForTable(tableName) + tableSchema, err := c.getTableSchemaForTable(strings.ToUpper(tableName)) if err != nil { return nil, err } @@ -427,17 +427,14 @@ func (c *SnowflakeConnector) InitializeTableSchema(req map[string]*protos.TableS return nil } -// ReplayTableSchemaDelta changes a destination table to match the schema at source +// ReplayTableSchemaDeltas changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. -func (c *SnowflakeConnector) ReplayTableSchemaDelta(flowJobName string, schemaDelta *protos.TableSchemaDelta) error { - if (schemaDelta == nil) || (len(schemaDelta.AddedColumns) == 0 && len(schemaDelta.DroppedColumns) == 0) { - return nil - } - +func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string, + schemaDeltas []*protos.TableSchemaDelta) error { tableSchemaModifyTx, err := c.database.Begin() if err != nil { - return fmt.Errorf("error starting transaction for schema modification for table %s: %w", - schemaDelta.DstTableName, err) + return fmt.Errorf("error starting transaction for schema modification: %w", + err) } defer func() { deferErr := tableSchemaModifyTx.Rollback() @@ -448,39 +445,32 @@ func (c *SnowflakeConnector) ReplayTableSchemaDelta(flowJobName string, schemaDe } }() - for _, droppedColumn := range schemaDelta.DroppedColumns { - _, err = tableSchemaModifyTx.Exec(fmt.Sprintf("ALTER TABLE %s DROP COLUMN \"%s\"", schemaDelta.DstTableName, - strings.ToUpper(droppedColumn))) - if err != nil { - return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn, - schemaDelta.DstTableName, err) + for _, schemaDelta := range schemaDeltas { + if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { + return nil } - log.WithFields(log.Fields{ - "flowName": flowJobName, - "srcTableName": schemaDelta.SrcTableName, - "dstTableName": schemaDelta.DstTableName, - }).Infof("[schema delta replay] dropped column %s", droppedColumn) - } - for _, addedColumn := range schemaDelta.AddedColumns { - _, err = tableSchemaModifyTx.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s", - schemaDelta.DstTableName, strings.ToUpper(addedColumn.ColumnName), - qValueKindToSnowflakeType(qvalue.QValueKind(addedColumn.ColumnType)))) - if err != nil { - return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, - schemaDelta.DstTableName, err) + + for _, addedColumn := range schemaDelta.AddedColumns { + _, err = tableSchemaModifyTx.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s", + schemaDelta.DstTableName, strings.ToUpper(addedColumn.ColumnName), + qValueKindToSnowflakeType(qvalue.QValueKind(addedColumn.ColumnType)))) + if err != nil { + return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, + schemaDelta.DstTableName, err) + } + log.WithFields(log.Fields{ + "flowName": flowJobName, + "srcTableName": schemaDelta.SrcTableName, + "dstTableName": schemaDelta.DstTableName, + }).Infof("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName, + addedColumn.ColumnType) } - log.WithFields(log.Fields{ - "flowName": flowJobName, - "srcTableName": schemaDelta.SrcTableName, - "dstTableName": schemaDelta.DstTableName, - }).Infof("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName, - addedColumn.ColumnType) } err = tableSchemaModifyTx.Commit() if err != nil { - return fmt.Errorf("failed to commit transaction for table schema modification for table %s: %w", - schemaDelta.DstTableName, err) + return fmt.Errorf("failed to commit transaction for table schema modification: %w", + err) } return nil @@ -968,7 +958,8 @@ func (c *SnowflakeConnector) CreateRawTable(req *protos.CreateRawTableInput) (*p if err != nil { return nil, err } - // there is no easy way to check if a table has the same schema in Snowflake, so just executing the CREATE TABLE IF NOT EXISTS blindly. + // there is no easy way to check if a table has the same schema in Snowflake, + // so just executing the CREATE TABLE IF NOT EXISTS blindly. _, err = createRawTableTx.ExecContext(c.ctx, fmt.Sprintf(createRawTableSQL, peerDBInternalSchema, rawTableIdentifier)) if err != nil { @@ -1237,7 +1228,8 @@ func (c *SnowflakeConnector) updateSyncMetadata(flowJobName string, lastCP int64 return nil } -func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, normalizeBatchID int64, normalizeRecordsTx *sql.Tx) error { +func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, + normalizeBatchID int64, normalizeRecordsTx *sql.Tx) error { jobMetadataExists, err := c.jobMetadataExists(flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 649f59ede9..75e1d35153 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -924,6 +924,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { env.AssertExpectations(s.T()) } +// TODO: not checking schema exactly, add later func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -966,7 +967,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - s.compareTableSchemasBQ("test_simple_schema_changes") s.compareTableContentsBQ("test_simple_schema_changes", "id,c1") // alter source table, add column c2 and insert another row. @@ -981,7 +981,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) - s.compareTableSchemasBQ("test_simple_schema_changes") s.compareTableContentsBQ("test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. @@ -996,7 +995,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) - s.compareTableSchemasBQ("test_simple_schema_changes") s.compareTableContentsBQ("test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. @@ -1011,7 +1009,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) - s.compareTableSchemasBQ("test_simple_schema_changes") s.compareTableContentsBQ("test_simple_schema_changes", "id,c1") }() diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 6198f605e5..f8f0c243f2 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -5,6 +5,8 @@ import ( "fmt" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -121,6 +123,19 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + expectedTableSchema := &protos.TableSchema{ + TableIdentifier: dstTableName, + Columns: map[string]string{ + "id": string(qvalue.QValueKindInt64), + "c1": string(qvalue.QValueKindInt64), + }, + PrimaryKeyColumn: "id", + } + output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") s.NoError(err) @@ -136,7 +151,21 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) - err = s.comparePGTables(srcTableName, dstTableName, "id,c1") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: dstTableName, + Columns: map[string]string{ + "id": string(qvalue.QValueKindInt64), + "c1": string(qvalue.QValueKindInt64), + "c2": string(qvalue.QValueKindInt64), + }, + PrimaryKeyColumn: "id", + } + output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") s.NoError(err) // alter source table, add column c3, drop column c2 and insert another row. @@ -151,7 +180,22 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) - err = s.comparePGTables(srcTableName, dstTableName, "id,c1") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: dstTableName, + Columns: map[string]string{ + "id": string(qvalue.QValueKindInt64), + "c1": string(qvalue.QValueKindInt64), + "c2": string(qvalue.QValueKindInt64), + "c3": string(qvalue.QValueKindInt64), + }, + PrimaryKeyColumn: "id", + } + output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") s.NoError(err) // alter source table, drop column c3 and insert another row. @@ -166,6 +210,21 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: dstTableName, + Columns: map[string]string{ + "id": string(qvalue.QValueKindInt64), + "c1": string(qvalue.QValueKindInt64), + "c2": string(qvalue.QValueKindInt64), + "c3": string(qvalue.QValueKindInt64), + }, + PrimaryKeyColumn: "id", + } + output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") s.NoError(err) }() diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 0aa56ea924..b4cad3dca7 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5/pgxpool" @@ -21,8 +22,9 @@ type PeerFlowE2ETestSuitePG struct { suite.Suite testsuite.WorkflowTestSuite - pool *pgxpool.Pool - peer *protos.Peer + pool *pgxpool.Pool + peer *protos.Peer + connector *connpostgres.PostgresConnector } func TestPeerFlowE2ETestSuitePG(t *testing.T) { @@ -46,6 +48,16 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { } s.pool = pool s.peer = generatePGPeer(e2e.GetTestPostgresConf()) + + s.connector, err = connpostgres.NewPostgresConnector(context.Background(), + &protos.PostgresConfig{ + Host: "localhost", + Port: 7132, + User: "postgres", + Password: "postgres", + Database: "postgres", + }) + s.NoError(err) } // Implement TearDownAllSuite interface to tear down the test suite diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 796402d033..de8e8ba9c8 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -3,10 +3,13 @@ package e2e_snowflake import ( "context" "fmt" + "strings" "testing" + connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" @@ -21,8 +24,9 @@ type PeerFlowE2ETestSuiteSF struct { suite.Suite testsuite.WorkflowTestSuite - pool *pgxpool.Pool - sfHelper *SnowflakeTestHelper + pool *pgxpool.Pool + sfHelper *SnowflakeTestHelper + connector *connsnowflake.SnowflakeConnector } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { @@ -69,6 +73,10 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { if err != nil { s.Fail("failed to setup snowflake", err) } + + s.connector, err = connsnowflake.NewSnowflakeConnector(context.Background(), + s.sfHelper.Config) + s.NoError(err) } // Implement TearDownAllSuite interface to tear down the test suite @@ -84,6 +92,9 @@ func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { s.Fail("failed to clean up Snowflake", err) } } + + err = s.connector.Close() + s.NoError(err) } func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { @@ -841,7 +852,19 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - s.compareTableSchemasSF("test_simple_schema_changes") + expectedTableSchema := &protos.TableSchema{ + TableIdentifier: strings.ToUpper(dstTableName), + Columns: map[string]string{ + "ID": string(qvalue.QValueKindNumeric), + "C1": string(qvalue.QValueKindNumeric), + "_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean), + }, + } + output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. @@ -856,7 +879,20 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) - s.compareTableSchemasSF("test_simple_schema_changes") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: strings.ToUpper(dstTableName), + Columns: map[string]string{ + "ID": string(qvalue.QValueKindNumeric), + "C1": string(qvalue.QValueKindNumeric), + "C2": string(qvalue.QValueKindNumeric), + "_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean), + }, + } + output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. @@ -871,7 +907,21 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) - s.compareTableSchemasSF("test_simple_schema_changes") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: strings.ToUpper(dstTableName), + Columns: map[string]string{ + "ID": string(qvalue.QValueKindNumeric), + "C1": string(qvalue.QValueKindNumeric), + "C2": string(qvalue.QValueKindNumeric), + "C3": string(qvalue.QValueKindNumeric), + "_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean), + }, + } + output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. @@ -886,7 +936,21 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) - s.compareTableSchemasSF("test_simple_schema_changes") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: strings.ToUpper(dstTableName), + Columns: map[string]string{ + "ID": string(qvalue.QValueKindNumeric), + "C1": string(qvalue.QValueKindNumeric), + "C2": string(qvalue.QValueKindNumeric), + "C3": string(qvalue.QValueKindNumeric), + "_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean), + }, + } + output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) }() diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index 4f4ea86b9b..d17b60f798 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -39,6 +39,8 @@ func (suite *SnowflakeSchemaDeltaTestSuite) SetupSuite() { func (suite *SnowflakeSchemaDeltaTestSuite) TearDownSuite() { err := suite.sfTestHelper.Cleanup() suite.failTestError(err) + err = suite.connector.Close() + suite.failTestError(err) } func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { @@ -46,14 +48,14 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) suite.failTestError(err) - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ + err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: []*protos.DeltaAddedColumn{{ ColumnName: "HI", ColumnType: string(qvalue.QValueKindJSON), }}, - }) + }}) suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -69,89 +71,7 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { }, output.TableNameSchemaMapping[tableName]) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleDropColumn() { - tableName := fmt.Sprintf("%s.SIMPLE_DROP_COLUMN", schemaDeltaTestSchemaName) - err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY, BYE TEXT)", tableName)) - suite.failTestError(err) - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - DroppedColumns: []string{"BYE"}, - }) - suite.failTestError(err) - - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "ID": string(qvalue.QValueKindString), - }, - }, output.TableNameSchemaMapping[tableName]) -} - -func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddDropColumn() { - tableName := fmt.Sprintf("%s.SIMPLE_ADD_DROP_COLUMN", schemaDeltaTestSchemaName) - err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY, BYE TEXT)", tableName)) - suite.failTestError(err) - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - AddedColumns: []*protos.DeltaAddedColumn{{ - ColumnName: "HI", - ColumnType: string(qvalue.QValueKindFloat64), - }}, - DroppedColumns: []string{"BYE"}, - }) - suite.failTestError(err) - - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "ID": string(qvalue.QValueKindString), - "HI": string(qvalue.QValueKindFloat64), - }, - }, output.TableNameSchemaMapping[tableName]) -} - -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropSameColumn() { - tableName := fmt.Sprintf("%s.ADD_DROP_SAME_COLUMN", schemaDeltaTestSchemaName) - err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY, BYE INTEGER)", tableName)) - suite.failTestError(err) - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - AddedColumns: []*protos.DeltaAddedColumn{{ - ColumnName: "BYE", - ColumnType: string(qvalue.QValueKindJSON), - }}, - DroppedColumns: []string{"BYE"}, - }) - suite.failTestError(err) - - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "ID": string(qvalue.QValueKindString), - "BYE": string(qvalue.QValueKindJSON), - }, - }, output.TableNameSchemaMapping[tableName]) -} - -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropAllColumnTypes() { +func (suite *SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.ADD_DROP_ALL_COLUMN_TYPES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) suite.failTestError(err) @@ -183,11 +103,11 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropAllColumnTypes() { } } - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ + err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }) + }}) suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -195,34 +115,9 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropAllColumnTypes() { }) suite.failTestError(err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) - - droppedColumns := make([]string, 0) - for columnName := range expectedTableSchema.Columns { - if columnName != "ID" { - droppedColumns = append(droppedColumns, columnName) - } - } - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - DroppedColumns: droppedColumns, - }) - suite.failTestError(err) - - output, err = suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "ID": string(qvalue.QValueKindString), - }, - }, output.TableNameSchemaMapping[tableName]) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropTrickyColumnNames() { +func (suite *SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_TRICKY_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(id TEXT PRIMARY KEY)", tableName)) suite.failTestError(err) @@ -252,11 +147,11 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropTrickyColumnNames() { } } - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ + err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }) + }}) suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -264,34 +159,9 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropTrickyColumnNames() { }) suite.failTestError(err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) - - droppedColumns := make([]string, 0) - for columnName := range expectedTableSchema.Columns { - if columnName != "ID" { - droppedColumns = append(droppedColumns, columnName) - } - } - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - DroppedColumns: droppedColumns, - }) - suite.failTestError(err) - - output, err = suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - "ID": string(qvalue.QValueKindString), - }, - }, output.TableNameSchemaMapping[tableName]) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { +func (suite *SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_WHITESPACE_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(\" \" TEXT PRIMARY KEY)", tableName)) suite.failTestError(err) @@ -315,12 +185,11 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { } } - fmt.Println(addedColumns) - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ + err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }) + }}) suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -328,31 +197,6 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { }) suite.failTestError(err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) - - droppedColumns := make([]string, 0) - for columnName := range expectedTableSchema.Columns { - if columnName != " " { - droppedColumns = append(droppedColumns, columnName) - } - } - - err = suite.connector.ReplayTableSchemaDelta("schema_delta_flow", &protos.TableSchemaDelta{ - SrcTableName: tableName, - DstTableName: tableName, - DroppedColumns: droppedColumns, - }) - suite.failTestError(err) - - output, err = suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{tableName}, - }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ - TableIdentifier: tableName, - Columns: map[string]string{ - " ": string(qvalue.QValueKindString), - }, - }, output.TableNameSchemaMapping[tableName]) } func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index 88804b1251..1a1203014b 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -2801,10 +2801,9 @@ type TableSchemaDelta struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - SrcTableName string `protobuf:"bytes,1,opt,name=src_table_name,json=srcTableName,proto3" json:"src_table_name,omitempty"` - DstTableName string `protobuf:"bytes,2,opt,name=dst_table_name,json=dstTableName,proto3" json:"dst_table_name,omitempty"` - AddedColumns []*DeltaAddedColumn `protobuf:"bytes,3,rep,name=added_columns,json=addedColumns,proto3" json:"added_columns,omitempty"` - DroppedColumns []string `protobuf:"bytes,4,rep,name=dropped_columns,json=droppedColumns,proto3" json:"dropped_columns,omitempty"` + SrcTableName string `protobuf:"bytes,1,opt,name=src_table_name,json=srcTableName,proto3" json:"src_table_name,omitempty"` + DstTableName string `protobuf:"bytes,2,opt,name=dst_table_name,json=dstTableName,proto3" json:"dst_table_name,omitempty"` + AddedColumns []*DeltaAddedColumn `protobuf:"bytes,3,rep,name=added_columns,json=addedColumns,proto3" json:"added_columns,omitempty"` } func (x *TableSchemaDelta) Reset() { @@ -2860,20 +2859,13 @@ func (x *TableSchemaDelta) GetAddedColumns() []*DeltaAddedColumn { return nil } -func (x *TableSchemaDelta) GetDroppedColumns() []string { - if x != nil { - return x.DroppedColumns - } - return nil -} - type ReplayTableSchemaDeltaInput struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields FlowConnectionConfigs *FlowConnectionConfigs `protobuf:"bytes,1,opt,name=flow_connection_configs,json=flowConnectionConfigs,proto3" json:"flow_connection_configs,omitempty"` - TableSchemaDelta *TableSchemaDelta `protobuf:"bytes,2,opt,name=table_schema_delta,json=tableSchemaDelta,proto3" json:"table_schema_delta,omitempty"` + TableSchemaDeltas []*TableSchemaDelta `protobuf:"bytes,2,rep,name=table_schema_deltas,json=tableSchemaDeltas,proto3" json:"table_schema_deltas,omitempty"` } func (x *ReplayTableSchemaDeltaInput) Reset() { @@ -2915,9 +2907,9 @@ func (x *ReplayTableSchemaDeltaInput) GetFlowConnectionConfigs() *FlowConnection return nil } -func (x *ReplayTableSchemaDeltaInput) GetTableSchemaDelta() *TableSchemaDelta { +func (x *ReplayTableSchemaDeltaInput) GetTableSchemaDeltas() []*TableSchemaDelta { if x != nil { - return x.TableSchemaDelta + return x.TableSchemaDeltas } return nil } @@ -3487,7 +3479,7 @@ var file_flow_proto_rawDesc = []byte{ 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, - 0x54, 0x79, 0x70, 0x65, 0x22, 0xcb, 0x01, 0x0a, 0x10, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, + 0x54, 0x79, 0x70, 0x65, 0x22, 0xa2, 0x01, 0x0a, 0x10, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x72, 0x63, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x72, 0x63, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, @@ -3497,42 +3489,39 @@ var file_flow_proto_rawDesc = []byte{ 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x41, 0x64, 0x64, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x52, 0x0c, 0x61, 0x64, 0x64, - 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x64, 0x72, 0x6f, - 0x70, 0x70, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x03, - 0x28, 0x09, 0x52, 0x0e, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, - 0x6e, 0x73, 0x22, 0xc6, 0x01, 0x0a, 0x1b, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x54, 0x61, 0x62, - 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x49, 0x6e, 0x70, - 0x75, 0x74, 0x12, 0x5a, 0x0a, 0x17, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, - 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x15, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, - 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x4b, - 0x0a, 0x12, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x64, - 0x65, 0x6c, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, - 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x52, 0x10, 0x74, 0x61, 0x62, 0x6c, 0x65, - 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x2a, 0x50, 0x0a, 0x0c, 0x51, - 0x52, 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, - 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, - 0x4c, 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, - 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, - 0x54, 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, - 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, - 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, - 0x45, 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, - 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, - 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, - 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, - 0x49, 0x54, 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, - 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, - 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x46, 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0xea, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x64, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x22, 0xc8, 0x01, 0x0a, 0x1b, 0x52, 0x65, + 0x70, 0x6c, 0x61, 0x79, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, + 0x65, 0x6c, 0x74, 0x61, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x5a, 0x0a, 0x17, 0x66, 0x6c, 0x6f, + 0x77, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x15, + 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x4d, 0x0a, 0x13, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x64, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, + 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, + 0x61, 0x52, 0x11, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, + 0x6c, 0x74, 0x61, 0x73, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, + 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, + 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, + 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, + 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, + 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, + 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, + 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, + 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, + 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, + 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, 0x54, 0x45, 0x10, 0x02, 0x42, 0x76, + 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, + 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, + 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, + 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, + 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, + 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, + 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -3665,7 +3654,7 @@ var file_flow_proto_depIdxs = []int32{ 38, // 52: peerdb_flow.QRepParitionResult.partitions:type_name -> peerdb_flow.QRepPartition 42, // 53: peerdb_flow.TableSchemaDelta.added_columns:type_name -> peerdb_flow.DeltaAddedColumn 6, // 54: peerdb_flow.ReplayTableSchemaDeltaInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 43, // 55: peerdb_flow.ReplayTableSchemaDeltaInput.table_schema_delta:type_name -> peerdb_flow.TableSchemaDelta + 43, // 55: peerdb_flow.ReplayTableSchemaDeltaInput.table_schema_deltas:type_name -> peerdb_flow.TableSchemaDelta 23, // 56: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema 4, // 57: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage 4, // 58: peerdb_flow.StartFlowInput.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage diff --git a/flow/model/model.go b/flow/model/model.go index cf5784b113..610305d295 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -326,8 +326,8 @@ type SyncResponse struct { CurrentSyncBatchID int64 // TableNameRowsMapping tells how many records need to be synced to each destination table. TableNameRowsMapping map[string]uint32 - // to be carried to NormalizeFlow - TableSchemaDelta *protos.TableSchemaDelta + // to be carried to parent WorkFlow + TableSchemaDeltas []*protos.TableSchemaDelta // to be stored in state for future PullFlows RelationMessageMapping RelationMessageMapping } @@ -342,7 +342,7 @@ type NormalizeResponse struct { // sync all the records normally, then apply the schema delta after NormalizeFlow. type RecordsWithTableSchemaDelta struct { RecordBatch *RecordBatch - TableSchemaDelta *protos.TableSchemaDelta + TableSchemaDeltas []*protos.TableSchemaDelta RelationMessageMapping RelationMessageMapping } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index f9209820d4..fb47ff8ffd 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -303,38 +303,28 @@ func CDCFlowWorkflowWithConfig( } ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) - var tableSchemaDelta *protos.TableSchemaDelta = nil + var tableSchemaDeltas []*protos.TableSchemaDelta = nil if childSyncFlowRes != nil { - tableSchemaDelta = childSyncFlowRes.TableSchemaDelta + tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas } - childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( - ctx, - NormalizeFlowWorkflow, - cfg, - tableSchemaDelta, - ) + // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. + if tableSchemaDeltas != nil { + modifiedSrcTables := make([]string, 0) + modifiedDstTables := make([]string, 0) - selector.AddFuture(childNormalizeFlowFuture, func(f workflow.Future) { - var childNormalizeFlowRes *model.NormalizeResponse - if err := f.Get(ctx, &childNormalizeFlowRes); err != nil { - w.logger.Error("failed to execute normalize flow: ", err) - state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err) - } else { - state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) + for _, tableSchemaDelta := range tableSchemaDeltas { + modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) + modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName) } - }) - selector.Select(ctx) - // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. - if tableSchemaDelta != nil { getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, &protos.GetTableSchemaBatchInput{ PeerConnectionConfig: cfg.Source, - TableIdentifiers: []string{tableSchemaDelta.SrcTableName}, + TableIdentifiers: modifiedSrcTables, }) var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput @@ -342,10 +332,30 @@ func CDCFlowWorkflowWithConfig( w.logger.Error("failed to execute schema update at source: ", err) state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err) } else { - cfg.TableNameSchemaMapping[tableSchemaDelta.DstTableName] = - getModifiedSchemaRes.TableNameSchemaMapping[tableSchemaDelta.SrcTableName] + for i := range modifiedSrcTables { + cfg.TableNameSchemaMapping[modifiedDstTables[i]] = + getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] + } + } } + + childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( + ctx, + NormalizeFlowWorkflow, + cfg, + ) + + selector.AddFuture(childNormalizeFlowFuture, func(f workflow.Future) { + var childNormalizeFlowRes *model.NormalizeResponse + if err := f.Get(ctx, &childNormalizeFlowRes); err != nil { + w.logger.Error("failed to execute normalize flow: ", err) + state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err) + } else { + state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) + } + }) + selector.Select(ctx) } // send WAL heartbeat diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index fe7a285838..d63c57d1ed 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -102,6 +102,20 @@ func (s *SyncFlowExecution) executeSyncFlow( return nil, fmt.Errorf("failed to flow: %w", err) } + replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + }) + replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{ + FlowConnectionConfigs: config, + TableSchemaDeltas: syncRes.TableSchemaDeltas, + } + + fReplayTableSchemaDelta := workflow.ExecuteActivity(replayTableSchemaDeltaCtx, + flowable.ReplayTableSchemaDeltas, replayTableSchemaInput) + if err := fReplayTableSchemaDelta.Get(replayTableSchemaDeltaCtx, nil); err != nil { + return nil, fmt.Errorf("failed to replay schema delta: %w", err) + } + return syncRes, nil } @@ -122,20 +136,18 @@ func SyncFlowWorkflow(ctx workflow.Context, func NormalizeFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, - tableSchemaDelta *protos.TableSchemaDelta, ) (*model.NormalizeResponse, error) { s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ CDCFlowName: config.FlowJobName, Progress: []string{}, }) - return s.executeNormalizeFlow(ctx, config, tableSchemaDelta) + return s.executeNormalizeFlow(ctx, config) } func (s *NormalizeFlowExecution) executeNormalizeFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, - tableSchemaDelta *protos.TableSchemaDelta, ) (*model.NormalizeResponse, error) { s.logger.Info("executing normalize flow - ", s.CDCFlowName) @@ -155,16 +167,5 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( return nil, fmt.Errorf("failed to flow: %w", err) } - replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{ - FlowConnectionConfigs: config, - TableSchemaDelta: tableSchemaDelta, - } - - fReplayTableSchemaDelta := workflow.ExecuteActivity(normalizeFlowCtx, flowable.ReplayTableSchemaDelta, - replayTableSchemaInput) - if err := fReplayTableSchemaDelta.Get(normalizeFlowCtx, nil); err != nil { - return nil, fmt.Errorf("failed to replay schema delta: %w", err) - } - return normalizeResponse, nil } diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 975007c8b9..3a6303c287 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -461,16 +461,14 @@ pub struct TableSchemaDelta { pub dst_table_name: ::prost::alloc::string::String, #[prost(message, repeated, tag="3")] pub added_columns: ::prost::alloc::vec::Vec, - #[prost(string, repeated, tag="4")] - pub dropped_columns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ReplayTableSchemaDeltaInput { #[prost(message, optional, tag="1")] pub flow_connection_configs: ::core::option::Option, - #[prost(message, optional, tag="2")] - pub table_schema_delta: ::core::option::Option, + #[prost(message, repeated, tag="2")] + pub table_schema_deltas: ::prost::alloc::vec::Vec, } /// protos for qrep #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index 399e43c66f..ca2445fc4f 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -3567,15 +3567,15 @@ impl serde::Serialize for ReplayTableSchemaDeltaInput { if self.flow_connection_configs.is_some() { len += 1; } - if self.table_schema_delta.is_some() { + if !self.table_schema_deltas.is_empty() { len += 1; } let mut struct_ser = serializer.serialize_struct("peerdb_flow.ReplayTableSchemaDeltaInput", len)?; if let Some(v) = self.flow_connection_configs.as_ref() { struct_ser.serialize_field("flowConnectionConfigs", v)?; } - if let Some(v) = self.table_schema_delta.as_ref() { - struct_ser.serialize_field("tableSchemaDelta", v)?; + if !self.table_schema_deltas.is_empty() { + struct_ser.serialize_field("tableSchemaDeltas", &self.table_schema_deltas)?; } struct_ser.end() } @@ -3589,14 +3589,14 @@ impl<'de> serde::Deserialize<'de> for ReplayTableSchemaDeltaInput { const FIELDS: &[&str] = &[ "flow_connection_configs", "flowConnectionConfigs", - "table_schema_delta", - "tableSchemaDelta", + "table_schema_deltas", + "tableSchemaDeltas", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { FlowConnectionConfigs, - TableSchemaDelta, + TableSchemaDeltas, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -3620,7 +3620,7 @@ impl<'de> serde::Deserialize<'de> for ReplayTableSchemaDeltaInput { { match value { "flowConnectionConfigs" | "flow_connection_configs" => Ok(GeneratedField::FlowConnectionConfigs), - "tableSchemaDelta" | "table_schema_delta" => Ok(GeneratedField::TableSchemaDelta), + "tableSchemaDeltas" | "table_schema_deltas" => Ok(GeneratedField::TableSchemaDeltas), _ => Ok(GeneratedField::__SkipField__), } } @@ -3641,7 +3641,7 @@ impl<'de> serde::Deserialize<'de> for ReplayTableSchemaDeltaInput { V: serde::de::MapAccess<'de>, { let mut flow_connection_configs__ = None; - let mut table_schema_delta__ = None; + let mut table_schema_deltas__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::FlowConnectionConfigs => { @@ -3650,11 +3650,11 @@ impl<'de> serde::Deserialize<'de> for ReplayTableSchemaDeltaInput { } flow_connection_configs__ = map.next_value()?; } - GeneratedField::TableSchemaDelta => { - if table_schema_delta__.is_some() { - return Err(serde::de::Error::duplicate_field("tableSchemaDelta")); + GeneratedField::TableSchemaDeltas => { + if table_schema_deltas__.is_some() { + return Err(serde::de::Error::duplicate_field("tableSchemaDeltas")); } - table_schema_delta__ = map.next_value()?; + table_schema_deltas__ = Some(map.next_value()?); } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; @@ -3663,7 +3663,7 @@ impl<'de> serde::Deserialize<'de> for ReplayTableSchemaDeltaInput { } Ok(ReplayTableSchemaDeltaInput { flow_connection_configs: flow_connection_configs__, - table_schema_delta: table_schema_delta__, + table_schema_deltas: table_schema_deltas__.unwrap_or_default(), }) } } @@ -5563,9 +5563,6 @@ impl serde::Serialize for TableSchemaDelta { if !self.added_columns.is_empty() { len += 1; } - if !self.dropped_columns.is_empty() { - len += 1; - } let mut struct_ser = serializer.serialize_struct("peerdb_flow.TableSchemaDelta", len)?; if !self.src_table_name.is_empty() { struct_ser.serialize_field("srcTableName", &self.src_table_name)?; @@ -5576,9 +5573,6 @@ impl serde::Serialize for TableSchemaDelta { if !self.added_columns.is_empty() { struct_ser.serialize_field("addedColumns", &self.added_columns)?; } - if !self.dropped_columns.is_empty() { - struct_ser.serialize_field("droppedColumns", &self.dropped_columns)?; - } struct_ser.end() } } @@ -5595,8 +5589,6 @@ impl<'de> serde::Deserialize<'de> for TableSchemaDelta { "dstTableName", "added_columns", "addedColumns", - "dropped_columns", - "droppedColumns", ]; #[allow(clippy::enum_variant_names)] @@ -5604,7 +5596,6 @@ impl<'de> serde::Deserialize<'de> for TableSchemaDelta { SrcTableName, DstTableName, AddedColumns, - DroppedColumns, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -5630,7 +5621,6 @@ impl<'de> serde::Deserialize<'de> for TableSchemaDelta { "srcTableName" | "src_table_name" => Ok(GeneratedField::SrcTableName), "dstTableName" | "dst_table_name" => Ok(GeneratedField::DstTableName), "addedColumns" | "added_columns" => Ok(GeneratedField::AddedColumns), - "droppedColumns" | "dropped_columns" => Ok(GeneratedField::DroppedColumns), _ => Ok(GeneratedField::__SkipField__), } } @@ -5653,7 +5643,6 @@ impl<'de> serde::Deserialize<'de> for TableSchemaDelta { let mut src_table_name__ = None; let mut dst_table_name__ = None; let mut added_columns__ = None; - let mut dropped_columns__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::SrcTableName => { @@ -5674,12 +5663,6 @@ impl<'de> serde::Deserialize<'de> for TableSchemaDelta { } added_columns__ = Some(map.next_value()?); } - GeneratedField::DroppedColumns => { - if dropped_columns__.is_some() { - return Err(serde::de::Error::duplicate_field("droppedColumns")); - } - dropped_columns__ = Some(map.next_value()?); - } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -5689,7 +5672,6 @@ impl<'de> serde::Deserialize<'de> for TableSchemaDelta { src_table_name: src_table_name__.unwrap_or_default(), dst_table_name: dst_table_name__.unwrap_or_default(), added_columns: added_columns__.unwrap_or_default(), - dropped_columns: dropped_columns__.unwrap_or_default(), }) } } diff --git a/protos/flow.proto b/protos/flow.proto index 81178fa5e2..0e03b86fe6 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -309,10 +309,9 @@ message TableSchemaDelta { string src_table_name = 1; string dst_table_name = 2; repeated DeltaAddedColumn added_columns = 3; - repeated string dropped_columns = 4; } message ReplayTableSchemaDeltaInput { FlowConnectionConfigs flow_connection_configs = 1; - TableSchemaDelta table_schema_delta = 2; + repeated TableSchemaDelta table_schema_deltas = 2; } diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index b71ddf8bef..aae30e5b51 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -416,12 +416,11 @@ export interface TableSchemaDelta { srcTableName: string; dstTableName: string; addedColumns: DeltaAddedColumn[]; - droppedColumns: string[]; } export interface ReplayTableSchemaDeltaInput { flowConnectionConfigs: FlowConnectionConfigs | undefined; - tableSchemaDelta: TableSchemaDelta | undefined; + tableSchemaDeltas: TableSchemaDelta[]; } function createBaseTableNameMapping(): TableNameMapping { @@ -5359,7 +5358,7 @@ export const DeltaAddedColumn = { }; function createBaseTableSchemaDelta(): TableSchemaDelta { - return { srcTableName: "", dstTableName: "", addedColumns: [], droppedColumns: [] }; + return { srcTableName: "", dstTableName: "", addedColumns: [] }; } export const TableSchemaDelta = { @@ -5373,9 +5372,6 @@ export const TableSchemaDelta = { for (const v of message.addedColumns) { DeltaAddedColumn.encode(v!, writer.uint32(26).fork()).ldelim(); } - for (const v of message.droppedColumns) { - writer.uint32(34).string(v!); - } return writer; }, @@ -5407,13 +5403,6 @@ export const TableSchemaDelta = { message.addedColumns.push(DeltaAddedColumn.decode(reader, reader.uint32())); continue; - case 4: - if (tag !== 34) { - break; - } - - message.droppedColumns.push(reader.string()); - continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -5430,7 +5419,6 @@ export const TableSchemaDelta = { addedColumns: Array.isArray(object?.addedColumns) ? object.addedColumns.map((e: any) => DeltaAddedColumn.fromJSON(e)) : [], - droppedColumns: Array.isArray(object?.droppedColumns) ? object.droppedColumns.map((e: any) => String(e)) : [], }; }, @@ -5445,9 +5433,6 @@ export const TableSchemaDelta = { if (message.addedColumns?.length) { obj.addedColumns = message.addedColumns.map((e) => DeltaAddedColumn.toJSON(e)); } - if (message.droppedColumns?.length) { - obj.droppedColumns = message.droppedColumns; - } return obj; }, @@ -5459,13 +5444,12 @@ export const TableSchemaDelta = { message.srcTableName = object.srcTableName ?? ""; message.dstTableName = object.dstTableName ?? ""; message.addedColumns = object.addedColumns?.map((e) => DeltaAddedColumn.fromPartial(e)) || []; - message.droppedColumns = object.droppedColumns?.map((e) => e) || []; return message; }, }; function createBaseReplayTableSchemaDeltaInput(): ReplayTableSchemaDeltaInput { - return { flowConnectionConfigs: undefined, tableSchemaDelta: undefined }; + return { flowConnectionConfigs: undefined, tableSchemaDeltas: [] }; } export const ReplayTableSchemaDeltaInput = { @@ -5473,8 +5457,8 @@ export const ReplayTableSchemaDeltaInput = { if (message.flowConnectionConfigs !== undefined) { FlowConnectionConfigs.encode(message.flowConnectionConfigs, writer.uint32(10).fork()).ldelim(); } - if (message.tableSchemaDelta !== undefined) { - TableSchemaDelta.encode(message.tableSchemaDelta, writer.uint32(18).fork()).ldelim(); + for (const v of message.tableSchemaDeltas) { + TableSchemaDelta.encode(v!, writer.uint32(18).fork()).ldelim(); } return writer; }, @@ -5498,7 +5482,7 @@ export const ReplayTableSchemaDeltaInput = { break; } - message.tableSchemaDelta = TableSchemaDelta.decode(reader, reader.uint32()); + message.tableSchemaDeltas.push(TableSchemaDelta.decode(reader, reader.uint32())); continue; } if ((tag & 7) === 4 || tag === 0) { @@ -5514,7 +5498,9 @@ export const ReplayTableSchemaDeltaInput = { flowConnectionConfigs: isSet(object.flowConnectionConfigs) ? FlowConnectionConfigs.fromJSON(object.flowConnectionConfigs) : undefined, - tableSchemaDelta: isSet(object.tableSchemaDelta) ? TableSchemaDelta.fromJSON(object.tableSchemaDelta) : undefined, + tableSchemaDeltas: Array.isArray(object?.tableSchemaDeltas) + ? object.tableSchemaDeltas.map((e: any) => TableSchemaDelta.fromJSON(e)) + : [], }; }, @@ -5523,8 +5509,8 @@ export const ReplayTableSchemaDeltaInput = { if (message.flowConnectionConfigs !== undefined) { obj.flowConnectionConfigs = FlowConnectionConfigs.toJSON(message.flowConnectionConfigs); } - if (message.tableSchemaDelta !== undefined) { - obj.tableSchemaDelta = TableSchemaDelta.toJSON(message.tableSchemaDelta); + if (message.tableSchemaDeltas?.length) { + obj.tableSchemaDeltas = message.tableSchemaDeltas.map((e) => TableSchemaDelta.toJSON(e)); } return obj; }, @@ -5538,9 +5524,7 @@ export const ReplayTableSchemaDeltaInput = { (object.flowConnectionConfigs !== undefined && object.flowConnectionConfigs !== null) ? FlowConnectionConfigs.fromPartial(object.flowConnectionConfigs) : undefined; - message.tableSchemaDelta = (object.tableSchemaDelta !== undefined && object.tableSchemaDelta !== null) - ? TableSchemaDelta.fromPartial(object.tableSchemaDelta) - : undefined; + message.tableSchemaDeltas = object.tableSchemaDeltas?.map((e) => TableSchemaDelta.fromPartial(e)) || []; return message; }, };