diff --git a/flow/cmd/qrep_api.go b/flow/cmd/qrep_api.go deleted file mode 100644 index e179d7b2b8..0000000000 --- a/flow/cmd/qrep_api.go +++ /dev/null @@ -1,89 +0,0 @@ -package main - -import ( - "errors" - "fmt" - - "github.com/PeerDB-io/peer-flow/generated/protos" -) - -const ( - SyncDataFormatAvro = "avro" - SyncDataFormatDefault = "default" - WriteModeAppend = "append" - WriteModeUpsert = "upsert" -) - -type UnsupportedOptionError struct { - OptionName string - OptionValue string -} - -func (e *UnsupportedOptionError) Error() string { - return fmt.Sprintf("unsupported %s: %s", e.OptionName, e.OptionValue) -} - -func createWriteMode(writeType protos.QRepWriteType, upsertKeyColumns []string) *protos.QRepWriteMode { - return &protos.QRepWriteMode{ - WriteType: writeType, - UpsertKeyColumns: upsertKeyColumns, - } -} - -func genConfigForQRepFlow( - config *protos.QRepConfig, - flowOptions map[string]interface{}, - queryString string, - destinationTableIdentifier string, -) error { - config.InitialCopyOnly = false - config.MaxParallelWorkers = uint32(flowOptions["parallelism"].(float64)) - config.DestinationTableIdentifier = destinationTableIdentifier - config.Query = queryString - config.WatermarkColumn = flowOptions["watermark_column"].(string) - config.WatermarkTable = flowOptions["watermark_table_name"].(string) - - // TODO (kaushik): investigate why this is a float64 in the first place - config.BatchSizeInt = uint32(flowOptions["batch_size_int"].(float64)) - config.BatchDurationSeconds = uint32(flowOptions["batch_duration_timestamp"].(float64)) - config.WaitBetweenBatchesSeconds = uint32(flowOptions["refresh_interval"].(float64)) - config.NumRowsPerPartition = uint32(flowOptions["num_rows_per_partition"].(float64)) - - syncDataFormat, ok := flowOptions["sync_data_format"].(string) - if !ok { - return errors.New("sync_data_format must be a string") - } - - switch syncDataFormat { - case SyncDataFormatAvro: - config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO - if _, ok := flowOptions["staging_path"]; ok { - config.StagingPath = flowOptions["staging_path"].(string) - } else { - config.StagingPath = "" - } - case SyncDataFormatDefault: - config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT - default: - return &UnsupportedOptionError{"sync_data_format", syncDataFormat} - } - - mode, ok := flowOptions["mode"].(string) - if !ok { - return errors.New("mode must be a string") - } - - switch mode { - case WriteModeAppend: - config.WriteMode = createWriteMode(protos.QRepWriteType_QREP_WRITE_MODE_APPEND, nil) - case WriteModeUpsert: - upsertKeyColumns := make([]string, 0) - for _, column := range flowOptions["unique_key_columns"].([]interface{}) { - upsertKeyColumns = append(upsertKeyColumns, column.(string)) - } - config.WriteMode = createWriteMode(protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, upsertKeyColumns) - default: - return &UnsupportedOptionError{"mode", mode} - } - return nil -} diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index e1ee8cf3c5..78b2dce932 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -127,6 +127,13 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e return fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, qRepMetadataTableName, err) } + if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { + _, err = c.client.Query(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)).Read(c.ctx) + if err != nil { + return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) + } + } + return nil } diff --git a/flow/connectors/postgres/postgres_cdc_test.go b/flow/connectors/postgres/postgres_cdc_test.go index 11659b43d5..cacc46ec17 100644 --- a/flow/connectors/postgres/postgres_cdc_test.go +++ b/flow/connectors/postgres/postgres_cdc_test.go @@ -40,19 +40,23 @@ func (suite *PostgresCDCTestSuite) insertSimpleRecords(srcTableName string) { func (suite *PostgresCDCTestSuite) validateInsertedSimpleRecords(records []model.Record, srcTableName string, dstTableName string) { suite.Equal(3, len(records)) + model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(2)}, + {Kind: qvalue.QValueKindString, Value: "quick"}}) matchData := []*model.RecordItems{ - model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(2)}, - "name": {Kind: qvalue.QValueKindString, Value: "quick"}, - }), - model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(4)}, - "name": {Kind: qvalue.QValueKindString, Value: "brown"}, - }), - model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(8)}, - "name": {Kind: qvalue.QValueKindString, Value: "fox"}, - }), + model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(2)}, + {Kind: qvalue.QValueKindString, Value: "quick"}}), + model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(4)}, + {Kind: qvalue.QValueKindString, Value: "brown"}}), + model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(8)}, + {Kind: qvalue.QValueKindString, Value: "fox"}}), } for idx, record := range records { suite.IsType(&model.InsertRecord{}, record) @@ -90,22 +94,22 @@ func (suite *PostgresCDCTestSuite) validateSimpleMutatedRecords(records []model. updateRecord := records[0].(*model.UpdateRecord) suite.Equal(srcTableName, updateRecord.SourceTableName) suite.Equal(dstTableName, updateRecord.DestinationTableName) - suite.Equal(model.RecordItems{}, updateRecord.OldItems) + suite.Equal(model.NewRecordItemWithData([]string{}, []*qvalue.QValue{}), updateRecord.OldItems) - items := model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(2)}, - "name": {Kind: qvalue.QValueKindString, Value: "slow"}, - }) + items := model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(2)}, + {Kind: qvalue.QValueKindString, Value: "slow"}}) suite.Equal(items, updateRecord.NewItems) suite.IsType(&model.DeleteRecord{}, records[1]) deleteRecord := records[1].(*model.DeleteRecord) suite.Equal(srcTableName, deleteRecord.SourceTableName) suite.Equal(dstTableName, deleteRecord.DestinationTableName) - items = model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(8)}, - "name": {Kind: qvalue.QValueKindInvalid, Value: nil}, - }) + items = model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(8)}, + {Kind: qvalue.QValueKindInvalid, Value: nil}}) suite.Equal(items, deleteRecord.Items) } @@ -780,6 +784,17 @@ func (suite *PostgresCDCTestSuite) TestToastHappyFlow() { RelationMessageMapping: relationMessageMapping, }) suite.failTestError(err) + recordsWithSchemaDelta, err = suite.connector.PullRecords(&model.PullRecordsRequest{ + FlowJobName: toastHappyFlowName, + LastSyncState: nil, + IdleTimeout: 10 * time.Second, + MaxBatchSize: 100, + SrcTableIDNameMapping: relIDTableNameMapping, + TableNameMapping: tableNameMapping, + TableNameSchemaMapping: tableNameSchemaMapping, + RelationMessageMapping: relationMessageMapping, + }) + suite.failTestError(err) suite.Nil(recordsWithSchemaDelta.TableSchemaDelta) suite.validateInsertedToastRecords(recordsWithSchemaDelta.RecordBatch.Records, toastHappyFlowSrcTableName, toastHappyFlowDstTableName) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 6d44e25bc9..7683111708 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -481,6 +481,14 @@ func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e "flowName": config.FlowJobName, }).Infof("Setup metadata table.") + if config.WriteMode != nil && + config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { + _, err = c.pool.Exec(c.ctx, fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) + if err != nil { + return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) + } + } + return nil } diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 3f795600ca..5102f0105a 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -150,6 +150,13 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) return err } + if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { + _, err = c.database.Exec(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) + if err != nil { + return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) + } + } + return nil } diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 14725d7899..5f5816ee51 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -182,6 +182,9 @@ func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig( ret.SyncMode = syncMode ret.StagingPath = c.StagingPath + ret.WriteMode = &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, + } return ret, nil } diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index acdc22f25f..b76dc27f5b 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -73,6 +73,8 @@ type QRepWriteType int32 const ( QRepWriteType_QREP_WRITE_MODE_APPEND QRepWriteType = 0 QRepWriteType_QREP_WRITE_MODE_UPSERT QRepWriteType = 1 + // only valid when initial_copy_true is set to true. TRUNCATES tables before reverting to APPEND. + QRepWriteType_QREP_WRITE_MODE_OVERWRITE QRepWriteType = 2 ) // Enum value maps for QRepWriteType. @@ -80,10 +82,12 @@ var ( QRepWriteType_name = map[int32]string{ 0: "QREP_WRITE_MODE_APPEND", 1: "QREP_WRITE_MODE_UPSERT", + 2: "QREP_WRITE_MODE_OVERWRITE", } QRepWriteType_value = map[string]int32{ - "QREP_WRITE_MODE_APPEND": 0, - "QREP_WRITE_MODE_UPSERT": 1, + "QREP_WRITE_MODE_APPEND": 0, + "QREP_WRITE_MODE_UPSERT": 1, + "QREP_WRITE_MODE_OVERWRITE": 2, } ) @@ -3358,19 +3362,21 @@ var file_flow_proto_rawDesc = []byte{ 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, - 0x47, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, + 0x66, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, - 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, - 0x77, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, - 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, - 0xaa, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, - 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0xea, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, + 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, + 0x57, 0x52, 0x49, 0x54, 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, + 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, + 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0xea, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/flow/model/model.go b/flow/model/model.go index 3984ad2848..64582127a9 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -57,11 +57,11 @@ func NewRecordItems() *RecordItems { } } -func NewRecordItemWithData(data map[string]*qvalue.QValue) *RecordItems { +func NewRecordItemWithData(cols []string, val []*qvalue.QValue) *RecordItems { recordItem := NewRecordItems() - for col, val := range data { + for i, col := range cols { recordItem.colToValIdx[col] = len(recordItem.values) - recordItem.values = append(recordItem.values, val) + recordItem.values = append(recordItem.values, val[i]) } return recordItem } diff --git a/generate_go_protos.sh b/generate_go_protos.sh deleted file mode 100755 index 4124962c24..0000000000 --- a/generate_go_protos.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -set -xeuo pipefail - -# check if buf is installed -if ! command -v buf &> /dev/null -then - echo "buf could not be found" - echo "Please install buf: https://buf.build/docs/installation" - exit -fi - -buf generate protos diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index b143f058c1..e112422032 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -51,7 +51,7 @@ lazy_static::lazy_static! { name: "mode", default_val: Some("append"), required: false, - accepted_values: Some(vec!["upsert", "append"]), + accepted_values: Some(vec!["upsert", "append", "overwrite"]), }, QRepOptionType::StringArray { name: "unique_key_columns", diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index c06d7aa71e..8396ba5b62 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -216,6 +216,17 @@ impl FlowGrpcClient { cfg.write_mode = Some(wm); } "append" => cfg.write_mode = Some(wm), + "overwrite" => { + if !cfg.initial_copy_only { + return anyhow::Result::Err( + anyhow::anyhow!( + "write mode overwrite can only be set with initial_copy_only = true" + ) + ); + } + wm.write_type = QRepWriteType::QrepWriteModeOverwrite as i32; + cfg.write_mode = Some(wm); + } _ => return anyhow::Result::Err(anyhow::anyhow!("invalid mode {}", s)), } } diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 5a55396853..d378e899c8 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -482,6 +482,8 @@ impl QRepSyncMode { pub enum QRepWriteType { QrepWriteModeAppend = 0, QrepWriteModeUpsert = 1, + /// only valid when initial_copy_true is set to true. TRUNCATES tables before reverting to APPEND. + QrepWriteModeOverwrite = 2, } impl QRepWriteType { /// String value of the enum field names used in the ProtoBuf definition. @@ -492,6 +494,7 @@ impl QRepWriteType { match self { QRepWriteType::QrepWriteModeAppend => "QREP_WRITE_MODE_APPEND", QRepWriteType::QrepWriteModeUpsert => "QREP_WRITE_MODE_UPSERT", + QRepWriteType::QrepWriteModeOverwrite => "QREP_WRITE_MODE_OVERWRITE", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -499,6 +502,7 @@ impl QRepWriteType { match value { "QREP_WRITE_MODE_APPEND" => Some(Self::QrepWriteModeAppend), "QREP_WRITE_MODE_UPSERT" => Some(Self::QrepWriteModeUpsert), + "QREP_WRITE_MODE_OVERWRITE" => Some(Self::QrepWriteModeOverwrite), _ => None, } } diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index 8d6aabe5b0..5a2d0e968f 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -3210,6 +3210,7 @@ impl serde::Serialize for QRepWriteType { let variant = match self { Self::QrepWriteModeAppend => "QREP_WRITE_MODE_APPEND", Self::QrepWriteModeUpsert => "QREP_WRITE_MODE_UPSERT", + Self::QrepWriteModeOverwrite => "QREP_WRITE_MODE_OVERWRITE", }; serializer.serialize_str(variant) } @@ -3223,6 +3224,7 @@ impl<'de> serde::Deserialize<'de> for QRepWriteType { const FIELDS: &[&str] = &[ "QREP_WRITE_MODE_APPEND", "QREP_WRITE_MODE_UPSERT", + "QREP_WRITE_MODE_OVERWRITE", ]; struct GeneratedVisitor; @@ -3267,6 +3269,7 @@ impl<'de> serde::Deserialize<'de> for QRepWriteType { match value { "QREP_WRITE_MODE_APPEND" => Ok(QRepWriteType::QrepWriteModeAppend), "QREP_WRITE_MODE_UPSERT" => Ok(QRepWriteType::QrepWriteModeUpsert), + "QREP_WRITE_MODE_OVERWRITE" => Ok(QRepWriteType::QrepWriteModeOverwrite), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/protos/flow.proto b/protos/flow.proto index 07d5af491a..4e19bfe413 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -219,6 +219,8 @@ enum QRepSyncMode { enum QRepWriteType { QREP_WRITE_MODE_APPEND = 0; QREP_WRITE_MODE_UPSERT = 1; + // only valid when initial_copy_true is set to true. TRUNCATES tables before reverting to APPEND. + QREP_WRITE_MODE_OVERWRITE = 2; } message QRepWriteMode { diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index c129730d5b..19f679ba0c 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -43,6 +43,8 @@ export function qRepSyncModeToJSON(object: QRepSyncMode): string { export enum QRepWriteType { QREP_WRITE_MODE_APPEND = 0, QREP_WRITE_MODE_UPSERT = 1, + /** QREP_WRITE_MODE_OVERWRITE - only valid when initial_copy_true is set to true. TRUNCATES tables before reverting to APPEND. */ + QREP_WRITE_MODE_OVERWRITE = 2, UNRECOGNIZED = -1, } @@ -54,6 +56,9 @@ export function qRepWriteTypeFromJSON(object: any): QRepWriteType { case 1: case "QREP_WRITE_MODE_UPSERT": return QRepWriteType.QREP_WRITE_MODE_UPSERT; + case 2: + case "QREP_WRITE_MODE_OVERWRITE": + return QRepWriteType.QREP_WRITE_MODE_OVERWRITE; case -1: case "UNRECOGNIZED": default: @@ -67,6 +72,8 @@ export function qRepWriteTypeToJSON(object: QRepWriteType): string { return "QREP_WRITE_MODE_APPEND"; case QRepWriteType.QREP_WRITE_MODE_UPSERT: return "QREP_WRITE_MODE_UPSERT"; + case QRepWriteType.QREP_WRITE_MODE_OVERWRITE: + return "QREP_WRITE_MODE_OVERWRITE"; case QRepWriteType.UNRECOGNIZED: default: return "UNRECOGNIZED"; @@ -78,6 +85,18 @@ export interface TableNameMapping { destinationTableName: string; } +export interface RelationMessageColumn { + flags: number; + name: string; + dataType: number; +} + +export interface RelationMessage { + relationId: number; + relationName: string; + columns: RelationMessageColumn[]; +} + export interface FlowConnectionConfigs { source: Peer | undefined; destination: Peer | undefined; @@ -105,6 +124,9 @@ export interface FlowConnectionConfigs { /** currently only works for snowflake */ softDelete: boolean; replicationSlotName: string; + /** the below two are for eventhub only */ + pushBatchSize: number; + pushParallelism: number; } export interface FlowConnectionConfigs_TableNameMappingEntry { @@ -124,6 +146,12 @@ export interface FlowConnectionConfigs_TableNameSchemaMappingEntry { export interface SyncFlowOptions { batchSize: number; + relationMessageMapping: { [key: number]: RelationMessage }; +} + +export interface SyncFlowOptions_RelationMessageMappingEntry { + key: number; + value: RelationMessage | undefined; } export interface NormalizeFlowOptions { @@ -139,6 +167,12 @@ export interface StartFlowInput { lastSyncState: LastSyncState | undefined; flowConnectionConfigs: FlowConnectionConfigs | undefined; syncFlowOptions: SyncFlowOptions | undefined; + relationMessageMapping: { [key: number]: RelationMessage }; +} + +export interface StartFlowInput_RelationMessageMappingEntry { + key: number; + value: RelationMessage | undefined; } export interface StartNormalizeInput { @@ -365,6 +399,23 @@ export interface DropFlowInput { flowName: string; } +export interface DeltaAddedColumn { + columnName: string; + columnType: string; +} + +export interface TableSchemaDelta { + srcTableName: string; + dstTableName: string; + addedColumns: DeltaAddedColumn[]; + droppedColumns: string[]; +} + +export interface ReplayTableSchemaDeltaInput { + flowConnectionConfigs: FlowConnectionConfigs | undefined; + tableSchemaDelta: TableSchemaDelta | undefined; +} + function createBaseTableNameMapping(): TableNameMapping { return { sourceTableName: "", destinationTableName: "" }; } @@ -439,6 +490,184 @@ export const TableNameMapping = { }, }; +function createBaseRelationMessageColumn(): RelationMessageColumn { + return { flags: 0, name: "", dataType: 0 }; +} + +export const RelationMessageColumn = { + encode(message: RelationMessageColumn, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.flags !== 0) { + writer.uint32(8).uint32(message.flags); + } + if (message.name !== "") { + writer.uint32(18).string(message.name); + } + if (message.dataType !== 0) { + writer.uint32(24).uint32(message.dataType); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): RelationMessageColumn { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseRelationMessageColumn(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 8) { + break; + } + + message.flags = reader.uint32(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.name = reader.string(); + continue; + case 3: + if (tag !== 24) { + break; + } + + message.dataType = reader.uint32(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): RelationMessageColumn { + return { + flags: isSet(object.flags) ? Number(object.flags) : 0, + name: isSet(object.name) ? String(object.name) : "", + dataType: isSet(object.dataType) ? Number(object.dataType) : 0, + }; + }, + + toJSON(message: RelationMessageColumn): unknown { + const obj: any = {}; + if (message.flags !== 0) { + obj.flags = Math.round(message.flags); + } + if (message.name !== "") { + obj.name = message.name; + } + if (message.dataType !== 0) { + obj.dataType = Math.round(message.dataType); + } + return obj; + }, + + create, I>>(base?: I): RelationMessageColumn { + return RelationMessageColumn.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): RelationMessageColumn { + const message = createBaseRelationMessageColumn(); + message.flags = object.flags ?? 0; + message.name = object.name ?? ""; + message.dataType = object.dataType ?? 0; + return message; + }, +}; + +function createBaseRelationMessage(): RelationMessage { + return { relationId: 0, relationName: "", columns: [] }; +} + +export const RelationMessage = { + encode(message: RelationMessage, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.relationId !== 0) { + writer.uint32(8).uint32(message.relationId); + } + if (message.relationName !== "") { + writer.uint32(18).string(message.relationName); + } + for (const v of message.columns) { + RelationMessageColumn.encode(v!, writer.uint32(26).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): RelationMessage { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseRelationMessage(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 8) { + break; + } + + message.relationId = reader.uint32(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.relationName = reader.string(); + continue; + case 3: + if (tag !== 26) { + break; + } + + message.columns.push(RelationMessageColumn.decode(reader, reader.uint32())); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): RelationMessage { + return { + relationId: isSet(object.relationId) ? Number(object.relationId) : 0, + relationName: isSet(object.relationName) ? String(object.relationName) : "", + columns: Array.isArray(object?.columns) ? object.columns.map((e: any) => RelationMessageColumn.fromJSON(e)) : [], + }; + }, + + toJSON(message: RelationMessage): unknown { + const obj: any = {}; + if (message.relationId !== 0) { + obj.relationId = Math.round(message.relationId); + } + if (message.relationName !== "") { + obj.relationName = message.relationName; + } + if (message.columns?.length) { + obj.columns = message.columns.map((e) => RelationMessageColumn.toJSON(e)); + } + return obj; + }, + + create, I>>(base?: I): RelationMessage { + return RelationMessage.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): RelationMessage { + const message = createBaseRelationMessage(); + message.relationId = object.relationId ?? 0; + message.relationName = object.relationName ?? ""; + message.columns = object.columns?.map((e) => RelationMessageColumn.fromPartial(e)) || []; + return message; + }, +}; + function createBaseFlowConnectionConfigs(): FlowConnectionConfigs { return { source: undefined, @@ -461,6 +690,8 @@ function createBaseFlowConnectionConfigs(): FlowConnectionConfigs { cdcStagingPath: "", softDelete: false, replicationSlotName: "", + pushBatchSize: 0, + pushParallelism: 0, }; } @@ -528,6 +759,12 @@ export const FlowConnectionConfigs = { if (message.replicationSlotName !== "") { writer.uint32(162).string(message.replicationSlotName); } + if (message.pushBatchSize !== 0) { + writer.uint32(168).int64(message.pushBatchSize); + } + if (message.pushParallelism !== 0) { + writer.uint32(176).int64(message.pushParallelism); + } return writer; }, @@ -687,6 +924,20 @@ export const FlowConnectionConfigs = { message.replicationSlotName = reader.string(); continue; + case 21: + if (tag !== 168) { + break; + } + + message.pushBatchSize = longToNumber(reader.int64() as Long); + continue; + case 22: + if (tag !== 176) { + break; + } + + message.pushParallelism = longToNumber(reader.int64() as Long); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -739,6 +990,8 @@ export const FlowConnectionConfigs = { cdcStagingPath: isSet(object.cdcStagingPath) ? String(object.cdcStagingPath) : "", softDelete: isSet(object.softDelete) ? Boolean(object.softDelete) : false, replicationSlotName: isSet(object.replicationSlotName) ? String(object.replicationSlotName) : "", + pushBatchSize: isSet(object.pushBatchSize) ? Number(object.pushBatchSize) : 0, + pushParallelism: isSet(object.pushParallelism) ? Number(object.pushParallelism) : 0, }; }, @@ -822,6 +1075,12 @@ export const FlowConnectionConfigs = { if (message.replicationSlotName !== "") { obj.replicationSlotName = message.replicationSlotName; } + if (message.pushBatchSize !== 0) { + obj.pushBatchSize = Math.round(message.pushBatchSize); + } + if (message.pushParallelism !== 0) { + obj.pushParallelism = Math.round(message.pushParallelism); + } return obj; }, @@ -880,6 +1139,8 @@ export const FlowConnectionConfigs = { message.cdcStagingPath = object.cdcStagingPath ?? ""; message.softDelete = object.softDelete ?? false; message.replicationSlotName = object.replicationSlotName ?? ""; + message.pushBatchSize = object.pushBatchSize ?? 0; + message.pushParallelism = object.pushParallelism ?? 0; return message; }, }; @@ -1121,7 +1382,7 @@ export const FlowConnectionConfigs_TableNameSchemaMappingEntry = { }; function createBaseSyncFlowOptions(): SyncFlowOptions { - return { batchSize: 0 }; + return { batchSize: 0, relationMessageMapping: {} }; } export const SyncFlowOptions = { @@ -1129,6 +1390,9 @@ export const SyncFlowOptions = { if (message.batchSize !== 0) { writer.uint32(8).int32(message.batchSize); } + Object.entries(message.relationMessageMapping).forEach(([key, value]) => { + SyncFlowOptions_RelationMessageMappingEntry.encode({ key: key as any, value }, writer.uint32(18).fork()).ldelim(); + }); return writer; }, @@ -1146,6 +1410,16 @@ export const SyncFlowOptions = { message.batchSize = reader.int32(); continue; + case 2: + if (tag !== 18) { + break; + } + + const entry2 = SyncFlowOptions_RelationMessageMappingEntry.decode(reader, reader.uint32()); + if (entry2.value !== undefined) { + message.relationMessageMapping[entry2.key] = entry2.value; + } + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -1156,7 +1430,18 @@ export const SyncFlowOptions = { }, fromJSON(object: any): SyncFlowOptions { - return { batchSize: isSet(object.batchSize) ? Number(object.batchSize) : 0 }; + return { + batchSize: isSet(object.batchSize) ? Number(object.batchSize) : 0, + relationMessageMapping: isObject(object.relationMessageMapping) + ? Object.entries(object.relationMessageMapping).reduce<{ [key: number]: RelationMessage }>( + (acc, [key, value]) => { + acc[Number(key)] = RelationMessage.fromJSON(value); + return acc; + }, + {}, + ) + : {}, + }; }, toJSON(message: SyncFlowOptions): unknown { @@ -1164,6 +1449,15 @@ export const SyncFlowOptions = { if (message.batchSize !== 0) { obj.batchSize = Math.round(message.batchSize); } + if (message.relationMessageMapping) { + const entries = Object.entries(message.relationMessageMapping); + if (entries.length > 0) { + obj.relationMessageMapping = {}; + entries.forEach(([k, v]) => { + obj.relationMessageMapping[k] = RelationMessage.toJSON(v); + }); + } + } return obj; }, @@ -1173,6 +1467,94 @@ export const SyncFlowOptions = { fromPartial, I>>(object: I): SyncFlowOptions { const message = createBaseSyncFlowOptions(); message.batchSize = object.batchSize ?? 0; + message.relationMessageMapping = Object.entries(object.relationMessageMapping ?? {}).reduce< + { [key: number]: RelationMessage } + >((acc, [key, value]) => { + if (value !== undefined) { + acc[Number(key)] = RelationMessage.fromPartial(value); + } + return acc; + }, {}); + return message; + }, +}; + +function createBaseSyncFlowOptions_RelationMessageMappingEntry(): SyncFlowOptions_RelationMessageMappingEntry { + return { key: 0, value: undefined }; +} + +export const SyncFlowOptions_RelationMessageMappingEntry = { + encode(message: SyncFlowOptions_RelationMessageMappingEntry, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.key !== 0) { + writer.uint32(8).uint32(message.key); + } + if (message.value !== undefined) { + RelationMessage.encode(message.value, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SyncFlowOptions_RelationMessageMappingEntry { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSyncFlowOptions_RelationMessageMappingEntry(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 8) { + break; + } + + message.key = reader.uint32(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value = RelationMessage.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): SyncFlowOptions_RelationMessageMappingEntry { + return { + key: isSet(object.key) ? Number(object.key) : 0, + value: isSet(object.value) ? RelationMessage.fromJSON(object.value) : undefined, + }; + }, + + toJSON(message: SyncFlowOptions_RelationMessageMappingEntry): unknown { + const obj: any = {}; + if (message.key !== 0) { + obj.key = Math.round(message.key); + } + if (message.value !== undefined) { + obj.value = RelationMessage.toJSON(message.value); + } + return obj; + }, + + create, I>>( + base?: I, + ): SyncFlowOptions_RelationMessageMappingEntry { + return SyncFlowOptions_RelationMessageMappingEntry.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): SyncFlowOptions_RelationMessageMappingEntry { + const message = createBaseSyncFlowOptions_RelationMessageMappingEntry(); + message.key = object.key ?? 0; + message.value = (object.value !== undefined && object.value !== null) + ? RelationMessage.fromPartial(object.value) + : undefined; return message; }, }; @@ -1309,7 +1691,12 @@ export const LastSyncState = { }; function createBaseStartFlowInput(): StartFlowInput { - return { lastSyncState: undefined, flowConnectionConfigs: undefined, syncFlowOptions: undefined }; + return { + lastSyncState: undefined, + flowConnectionConfigs: undefined, + syncFlowOptions: undefined, + relationMessageMapping: {}, + }; } export const StartFlowInput = { @@ -1323,6 +1710,9 @@ export const StartFlowInput = { if (message.syncFlowOptions !== undefined) { SyncFlowOptions.encode(message.syncFlowOptions, writer.uint32(26).fork()).ldelim(); } + Object.entries(message.relationMessageMapping).forEach(([key, value]) => { + StartFlowInput_RelationMessageMappingEntry.encode({ key: key as any, value }, writer.uint32(34).fork()).ldelim(); + }); return writer; }, @@ -1354,6 +1744,16 @@ export const StartFlowInput = { message.syncFlowOptions = SyncFlowOptions.decode(reader, reader.uint32()); continue; + case 4: + if (tag !== 34) { + break; + } + + const entry4 = StartFlowInput_RelationMessageMappingEntry.decode(reader, reader.uint32()); + if (entry4.value !== undefined) { + message.relationMessageMapping[entry4.key] = entry4.value; + } + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -1370,6 +1770,15 @@ export const StartFlowInput = { ? FlowConnectionConfigs.fromJSON(object.flowConnectionConfigs) : undefined, syncFlowOptions: isSet(object.syncFlowOptions) ? SyncFlowOptions.fromJSON(object.syncFlowOptions) : undefined, + relationMessageMapping: isObject(object.relationMessageMapping) + ? Object.entries(object.relationMessageMapping).reduce<{ [key: number]: RelationMessage }>( + (acc, [key, value]) => { + acc[Number(key)] = RelationMessage.fromJSON(value); + return acc; + }, + {}, + ) + : {}, }; }, @@ -1384,6 +1793,15 @@ export const StartFlowInput = { if (message.syncFlowOptions !== undefined) { obj.syncFlowOptions = SyncFlowOptions.toJSON(message.syncFlowOptions); } + if (message.relationMessageMapping) { + const entries = Object.entries(message.relationMessageMapping); + if (entries.length > 0) { + obj.relationMessageMapping = {}; + entries.forEach(([k, v]) => { + obj.relationMessageMapping[k] = RelationMessage.toJSON(v); + }); + } + } return obj; }, @@ -1402,6 +1820,94 @@ export const StartFlowInput = { message.syncFlowOptions = (object.syncFlowOptions !== undefined && object.syncFlowOptions !== null) ? SyncFlowOptions.fromPartial(object.syncFlowOptions) : undefined; + message.relationMessageMapping = Object.entries(object.relationMessageMapping ?? {}).reduce< + { [key: number]: RelationMessage } + >((acc, [key, value]) => { + if (value !== undefined) { + acc[Number(key)] = RelationMessage.fromPartial(value); + } + return acc; + }, {}); + return message; + }, +}; + +function createBaseStartFlowInput_RelationMessageMappingEntry(): StartFlowInput_RelationMessageMappingEntry { + return { key: 0, value: undefined }; +} + +export const StartFlowInput_RelationMessageMappingEntry = { + encode(message: StartFlowInput_RelationMessageMappingEntry, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.key !== 0) { + writer.uint32(8).uint32(message.key); + } + if (message.value !== undefined) { + RelationMessage.encode(message.value, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): StartFlowInput_RelationMessageMappingEntry { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseStartFlowInput_RelationMessageMappingEntry(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 8) { + break; + } + + message.key = reader.uint32(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value = RelationMessage.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): StartFlowInput_RelationMessageMappingEntry { + return { + key: isSet(object.key) ? Number(object.key) : 0, + value: isSet(object.value) ? RelationMessage.fromJSON(object.value) : undefined, + }; + }, + + toJSON(message: StartFlowInput_RelationMessageMappingEntry): unknown { + const obj: any = {}; + if (message.key !== 0) { + obj.key = Math.round(message.key); + } + if (message.value !== undefined) { + obj.value = RelationMessage.toJSON(message.value); + } + return obj; + }, + + create, I>>( + base?: I, + ): StartFlowInput_RelationMessageMappingEntry { + return StartFlowInput_RelationMessageMappingEntry.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): StartFlowInput_RelationMessageMappingEntry { + const message = createBaseStartFlowInput_RelationMessageMappingEntry(); + message.key = object.key ?? 0; + message.value = (object.value !== undefined && object.value !== null) + ? RelationMessage.fromPartial(object.value) + : undefined; return message; }, }; @@ -4671,6 +5177,267 @@ export const DropFlowInput = { }, }; +function createBaseDeltaAddedColumn(): DeltaAddedColumn { + return { columnName: "", columnType: "" }; +} + +export const DeltaAddedColumn = { + encode(message: DeltaAddedColumn, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.columnName !== "") { + writer.uint32(10).string(message.columnName); + } + if (message.columnType !== "") { + writer.uint32(18).string(message.columnType); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): DeltaAddedColumn { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseDeltaAddedColumn(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.columnName = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.columnType = reader.string(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): DeltaAddedColumn { + return { + columnName: isSet(object.columnName) ? String(object.columnName) : "", + columnType: isSet(object.columnType) ? String(object.columnType) : "", + }; + }, + + toJSON(message: DeltaAddedColumn): unknown { + const obj: any = {}; + if (message.columnName !== "") { + obj.columnName = message.columnName; + } + if (message.columnType !== "") { + obj.columnType = message.columnType; + } + return obj; + }, + + create, I>>(base?: I): DeltaAddedColumn { + return DeltaAddedColumn.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): DeltaAddedColumn { + const message = createBaseDeltaAddedColumn(); + message.columnName = object.columnName ?? ""; + message.columnType = object.columnType ?? ""; + return message; + }, +}; + +function createBaseTableSchemaDelta(): TableSchemaDelta { + return { srcTableName: "", dstTableName: "", addedColumns: [], droppedColumns: [] }; +} + +export const TableSchemaDelta = { + encode(message: TableSchemaDelta, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.srcTableName !== "") { + writer.uint32(10).string(message.srcTableName); + } + if (message.dstTableName !== "") { + writer.uint32(18).string(message.dstTableName); + } + for (const v of message.addedColumns) { + DeltaAddedColumn.encode(v!, writer.uint32(26).fork()).ldelim(); + } + for (const v of message.droppedColumns) { + writer.uint32(34).string(v!); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): TableSchemaDelta { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseTableSchemaDelta(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.srcTableName = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.dstTableName = reader.string(); + continue; + case 3: + if (tag !== 26) { + break; + } + + message.addedColumns.push(DeltaAddedColumn.decode(reader, reader.uint32())); + continue; + case 4: + if (tag !== 34) { + break; + } + + message.droppedColumns.push(reader.string()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): TableSchemaDelta { + return { + srcTableName: isSet(object.srcTableName) ? String(object.srcTableName) : "", + dstTableName: isSet(object.dstTableName) ? String(object.dstTableName) : "", + addedColumns: Array.isArray(object?.addedColumns) + ? object.addedColumns.map((e: any) => DeltaAddedColumn.fromJSON(e)) + : [], + droppedColumns: Array.isArray(object?.droppedColumns) ? object.droppedColumns.map((e: any) => String(e)) : [], + }; + }, + + toJSON(message: TableSchemaDelta): unknown { + const obj: any = {}; + if (message.srcTableName !== "") { + obj.srcTableName = message.srcTableName; + } + if (message.dstTableName !== "") { + obj.dstTableName = message.dstTableName; + } + if (message.addedColumns?.length) { + obj.addedColumns = message.addedColumns.map((e) => DeltaAddedColumn.toJSON(e)); + } + if (message.droppedColumns?.length) { + obj.droppedColumns = message.droppedColumns; + } + return obj; + }, + + create, I>>(base?: I): TableSchemaDelta { + return TableSchemaDelta.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): TableSchemaDelta { + const message = createBaseTableSchemaDelta(); + message.srcTableName = object.srcTableName ?? ""; + message.dstTableName = object.dstTableName ?? ""; + message.addedColumns = object.addedColumns?.map((e) => DeltaAddedColumn.fromPartial(e)) || []; + message.droppedColumns = object.droppedColumns?.map((e) => e) || []; + return message; + }, +}; + +function createBaseReplayTableSchemaDeltaInput(): ReplayTableSchemaDeltaInput { + return { flowConnectionConfigs: undefined, tableSchemaDelta: undefined }; +} + +export const ReplayTableSchemaDeltaInput = { + encode(message: ReplayTableSchemaDeltaInput, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.flowConnectionConfigs !== undefined) { + FlowConnectionConfigs.encode(message.flowConnectionConfigs, writer.uint32(10).fork()).ldelim(); + } + if (message.tableSchemaDelta !== undefined) { + TableSchemaDelta.encode(message.tableSchemaDelta, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): ReplayTableSchemaDeltaInput { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseReplayTableSchemaDeltaInput(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.flowConnectionConfigs = FlowConnectionConfigs.decode(reader, reader.uint32()); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.tableSchemaDelta = TableSchemaDelta.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): ReplayTableSchemaDeltaInput { + return { + flowConnectionConfigs: isSet(object.flowConnectionConfigs) + ? FlowConnectionConfigs.fromJSON(object.flowConnectionConfigs) + : undefined, + tableSchemaDelta: isSet(object.tableSchemaDelta) ? TableSchemaDelta.fromJSON(object.tableSchemaDelta) : undefined, + }; + }, + + toJSON(message: ReplayTableSchemaDeltaInput): unknown { + const obj: any = {}; + if (message.flowConnectionConfigs !== undefined) { + obj.flowConnectionConfigs = FlowConnectionConfigs.toJSON(message.flowConnectionConfigs); + } + if (message.tableSchemaDelta !== undefined) { + obj.tableSchemaDelta = TableSchemaDelta.toJSON(message.tableSchemaDelta); + } + return obj; + }, + + create, I>>(base?: I): ReplayTableSchemaDeltaInput { + return ReplayTableSchemaDeltaInput.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): ReplayTableSchemaDeltaInput { + const message = createBaseReplayTableSchemaDeltaInput(); + message.flowConnectionConfigs = + (object.flowConnectionConfigs !== undefined && object.flowConnectionConfigs !== null) + ? FlowConnectionConfigs.fromPartial(object.flowConnectionConfigs) + : undefined; + message.tableSchemaDelta = (object.tableSchemaDelta !== undefined && object.tableSchemaDelta !== null) + ? TableSchemaDelta.fromPartial(object.tableSchemaDelta) + : undefined; + return message; + }, +}; + declare const self: any | undefined; declare const window: any | undefined; declare const global: any | undefined;