From c65e79af0c4d7d2c7849a469f77d48e12760b516 Mon Sep 17 00:00:00 2001 From: Kevin K Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 19 Oct 2023 16:30:09 +0100 Subject: [PATCH] idle at end of QRepFlowWorkflow unless we detect new rows (#539) Closes #537 --- flow/activities/flowable.go | 36 ++ flow/connectors/postgres/qrep.go | 59 +++- flow/connectors/utils/partition/partition.go | 18 - flow/generated/protos/flow.pb.go | 348 +++++++------------ nexus/pt/src/peerdb_flow.rs | 12 +- nexus/pt/src/peerdb_flow.serde.rs | 130 ------- protos/flow.proto | 6 - ui/grpc_generated/flow.ts | 96 +---- 8 files changed, 217 insertions(+), 488 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index da5d739d9d..832274fd7f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -655,3 +655,39 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos. return nil } + +func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, + config *protos.QRepConfig, last *protos.QRepPartition) error { + if config.SourcePeer.Type != protos.DBType_POSTGRES { + return nil + } + waitBetweenBatches := 5 * time.Second + if config.WaitBetweenBatchesSeconds > 0 { + waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second + } + + srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) + if err != nil { + return fmt.Errorf("failed to get qrep source connector: %w", err) + } + defer connectors.CloseConnector(srcConn) + pgSrcConn := srcConn.(*connpostgres.PostgresConnector) + + attemptCount := 1 + for { + activity.RecordHeartbeat(ctx, fmt.Sprintf("no new rows yet, attempt #%d", attemptCount)) + time.Sleep(waitBetweenBatches) + + result, err := pgSrcConn.CheckForUpdatedMaxValue(config, last) + if err != nil { + return fmt.Errorf("failed to check for new rows: %w", err) + } + if result { + break + } + + attemptCount += 1 + } + + return nil +} diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 8853d11da4..4ee4cf9a29 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -133,8 +133,6 @@ func (c *PostgresConnector) getNumRowsPartitions( minVal = lastRange.IntRange.End case *protos.PartitionRange_TimestampRange: minVal = lastRange.TimestampRange.End.AsTime() - case *protos.PartitionRange_XminRange: - minVal = lastRange.XminRange.End } row = tx.QueryRow(c.ctx, countQuery, minVal) @@ -253,6 +251,12 @@ func (c *PostgresConnector) getMinMaxValues( } case *protos.PartitionRange_TimestampRange: minValue = lastRange.TimestampRange.End.AsTime() + case *protos.PartitionRange_TidRange: + minValue = lastRange.TidRange.End + maxValue = &protos.TID{ + BlockNumber: maxValue.(pgtype.TID).BlockNumber, + OffsetNumber: uint32(maxValue.(pgtype.TID).OffsetNumber), + } } } else { // Otherwise get the minimum value from the database @@ -272,6 +276,15 @@ func (c *PostgresConnector) getMinMaxValues( case int32: minValue = int64(v) maxValue = int64(maxValue.(int32)) + case pgtype.TID: + minValue = &protos.TID{ + BlockNumber: v.BlockNumber, + OffsetNumber: uint32(v.OffsetNumber), + } + maxValue = &protos.TID{ + BlockNumber: maxValue.(pgtype.TID).BlockNumber, + OffsetNumber: uint32(maxValue.(pgtype.TID).OffsetNumber), + } } } @@ -283,6 +296,42 @@ func (c *PostgresConnector) getMinMaxValues( return minValue, maxValue, nil } +func (c *PostgresConnector) CheckForUpdatedMaxValue(config *protos.QRepConfig, + last *protos.QRepPartition) (bool, error) { + tx, err := c.pool.Begin(c.ctx) + if err != nil { + return false, fmt.Errorf("unable to begin transaction for getting max value: %w", err) + } + defer func() { + deferErr := tx.Rollback(c.ctx) + if deferErr != pgx.ErrTxClosed && deferErr != nil { + log.WithFields(log.Fields{ + "flowName": config.FlowJobName, + }).Errorf("unexpected error rolling back transaction for getting max value: %v", err) + } + }() + + _, maxValue, err := c.getMinMaxValues(tx, config, last) + if err != nil { + return false, fmt.Errorf("error while getting min and max values: %w", err) + } + + switch x := last.Range.Range.(type) { + case *protos.PartitionRange_IntRange: + if maxValue.(int64) > x.IntRange.End { + return true, nil + } + case *protos.PartitionRange_TimestampRange: + if maxValue.(time.Time).After(x.TimestampRange.End.AsTime()) { + return true, nil + } + default: + return false, fmt.Errorf("unknown range type: %v", x) + } + + return false, nil +} + func (c *PostgresConnector) PullQRepRecords( config *protos.QRepConfig, partition *protos.QRepPartition) (*model.QRecordBatch, error) { @@ -321,9 +370,6 @@ func (c *PostgresConnector) PullQRepRecords( OffsetNumber: uint16(x.TidRange.End.OffsetNumber), Valid: true, } - case *protos.PartitionRange_XminRange: - rangeStart = x.XminRange.Start - rangeEnd = x.XminRange.End default: return nil, fmt.Errorf("unknown range type: %v", x) } @@ -406,9 +452,6 @@ func (c *PostgresConnector) PullQRepRecordStream( OffsetNumber: uint16(x.TidRange.End.OffsetNumber), Valid: true, } - case *protos.PartitionRange_XminRange: - rangeStart = x.XminRange.Start - rangeEnd = x.XminRange.End default: return 0, fmt.Errorf("unknown range type: %v", x) } diff --git a/flow/connectors/utils/partition/partition.go b/flow/connectors/utils/partition/partition.go index b25c39f6b1..ad8bb6067a 100644 --- a/flow/connectors/utils/partition/partition.go +++ b/flow/connectors/utils/partition/partition.go @@ -133,20 +133,6 @@ func createTIDPartition(start pgtype.TID, end pgtype.TID) *protos.QRepPartition } } -func createXMINPartition(start uint32, end uint32) *protos.QRepPartition { - return &protos.QRepPartition{ - PartitionId: uuid.New().String(), - Range: &protos.PartitionRange{ - Range: &protos.PartitionRange_XminRange{ - XminRange: &protos.XMINPartitionRange{ - Start: start, - End: end, - }, - }, - }, - } -} - type PartitionHelper struct { prevStart interface{} prevEnd interface{} @@ -196,10 +182,6 @@ func (p *PartitionHelper) AddPartition(start interface{}, end interface{}) error p.partitions = append(p.partitions, createTIDPartition(v, end.(pgtype.TID))) p.prevStart = v p.prevEnd = end - case pgtype.Uint32: - p.partitions = append(p.partitions, createXMINPartition(v.Uint32, end.(uint32))) - p.prevStart = v - p.prevEnd = end default: return fmt.Errorf("unsupported type: %T", v) } diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index 0b0582120d..69c3c73288 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -2131,61 +2131,6 @@ func (x *TIDPartitionRange) GetEnd() *TID { return nil } -type XMINPartitionRange struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Start uint32 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"` - End uint32 `protobuf:"varint,2,opt,name=end,proto3" json:"end,omitempty"` -} - -func (x *XMINPartitionRange) Reset() { - *x = XMINPartitionRange{} - if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[32] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *XMINPartitionRange) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*XMINPartitionRange) ProtoMessage() {} - -func (x *XMINPartitionRange) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[32] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use XMINPartitionRange.ProtoReflect.Descriptor instead. -func (*XMINPartitionRange) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{32} -} - -func (x *XMINPartitionRange) GetStart() uint32 { - if x != nil { - return x.Start - } - return 0 -} - -func (x *XMINPartitionRange) GetEnd() uint32 { - if x != nil { - return x.End - } - return 0 -} - type PartitionRange struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2198,14 +2143,13 @@ type PartitionRange struct { // *PartitionRange_IntRange // *PartitionRange_TimestampRange // *PartitionRange_TidRange - // *PartitionRange_XminRange Range isPartitionRange_Range `protobuf_oneof:"range"` } func (x *PartitionRange) Reset() { *x = PartitionRange{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[33] + mi := &file_flow_proto_msgTypes[32] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2218,7 +2162,7 @@ func (x *PartitionRange) String() string { func (*PartitionRange) ProtoMessage() {} func (x *PartitionRange) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[33] + mi := &file_flow_proto_msgTypes[32] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2231,7 +2175,7 @@ func (x *PartitionRange) ProtoReflect() protoreflect.Message { // Deprecated: Use PartitionRange.ProtoReflect.Descriptor instead. func (*PartitionRange) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{33} + return file_flow_proto_rawDescGZIP(), []int{32} } func (m *PartitionRange) GetRange() isPartitionRange_Range { @@ -2262,13 +2206,6 @@ func (x *PartitionRange) GetTidRange() *TIDPartitionRange { return nil } -func (x *PartitionRange) GetXminRange() *XMINPartitionRange { - if x, ok := x.GetRange().(*PartitionRange_XminRange); ok { - return x.XminRange - } - return nil -} - type isPartitionRange_Range interface { isPartitionRange_Range() } @@ -2285,18 +2222,12 @@ type PartitionRange_TidRange struct { TidRange *TIDPartitionRange `protobuf:"bytes,3,opt,name=tid_range,json=tidRange,proto3,oneof"` } -type PartitionRange_XminRange struct { - XminRange *XMINPartitionRange `protobuf:"bytes,4,opt,name=xmin_range,json=xminRange,proto3,oneof"` -} - func (*PartitionRange_IntRange) isPartitionRange_Range() {} func (*PartitionRange_TimestampRange) isPartitionRange_Range() {} func (*PartitionRange_TidRange) isPartitionRange_Range() {} -func (*PartitionRange_XminRange) isPartitionRange_Range() {} - type QRepWriteMode struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2309,7 +2240,7 @@ type QRepWriteMode struct { func (x *QRepWriteMode) Reset() { *x = QRepWriteMode{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[34] + mi := &file_flow_proto_msgTypes[33] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2322,7 +2253,7 @@ func (x *QRepWriteMode) String() string { func (*QRepWriteMode) ProtoMessage() {} func (x *QRepWriteMode) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[34] + mi := &file_flow_proto_msgTypes[33] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2335,7 +2266,7 @@ func (x *QRepWriteMode) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepWriteMode.ProtoReflect.Descriptor instead. func (*QRepWriteMode) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{34} + return file_flow_proto_rawDescGZIP(), []int{33} } func (x *QRepWriteMode) GetWriteType() QRepWriteType { @@ -2390,7 +2321,7 @@ type QRepConfig struct { func (x *QRepConfig) Reset() { *x = QRepConfig{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[35] + mi := &file_flow_proto_msgTypes[34] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2403,7 +2334,7 @@ func (x *QRepConfig) String() string { func (*QRepConfig) ProtoMessage() {} func (x *QRepConfig) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[35] + mi := &file_flow_proto_msgTypes[34] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2416,7 +2347,7 @@ func (x *QRepConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepConfig.ProtoReflect.Descriptor instead. func (*QRepConfig) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{35} + return file_flow_proto_rawDescGZIP(), []int{34} } func (x *QRepConfig) GetFlowJobName() string { @@ -2551,7 +2482,7 @@ type QRepPartition struct { func (x *QRepPartition) Reset() { *x = QRepPartition{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[36] + mi := &file_flow_proto_msgTypes[35] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2564,7 +2495,7 @@ func (x *QRepPartition) String() string { func (*QRepPartition) ProtoMessage() {} func (x *QRepPartition) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[36] + mi := &file_flow_proto_msgTypes[35] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2577,7 +2508,7 @@ func (x *QRepPartition) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepPartition.ProtoReflect.Descriptor instead. func (*QRepPartition) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{36} + return file_flow_proto_rawDescGZIP(), []int{35} } func (x *QRepPartition) GetPartitionId() string { @@ -2613,7 +2544,7 @@ type QRepPartitionBatch struct { func (x *QRepPartitionBatch) Reset() { *x = QRepPartitionBatch{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[37] + mi := &file_flow_proto_msgTypes[36] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2626,7 +2557,7 @@ func (x *QRepPartitionBatch) String() string { func (*QRepPartitionBatch) ProtoMessage() {} func (x *QRepPartitionBatch) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[37] + mi := &file_flow_proto_msgTypes[36] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2639,7 +2570,7 @@ func (x *QRepPartitionBatch) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepPartitionBatch.ProtoReflect.Descriptor instead. func (*QRepPartitionBatch) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{37} + return file_flow_proto_rawDescGZIP(), []int{36} } func (x *QRepPartitionBatch) GetBatchId() int32 { @@ -2667,7 +2598,7 @@ type QRepParitionResult struct { func (x *QRepParitionResult) Reset() { *x = QRepParitionResult{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[38] + mi := &file_flow_proto_msgTypes[37] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2680,7 +2611,7 @@ func (x *QRepParitionResult) String() string { func (*QRepParitionResult) ProtoMessage() {} func (x *QRepParitionResult) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[38] + mi := &file_flow_proto_msgTypes[37] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2693,7 +2624,7 @@ func (x *QRepParitionResult) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepParitionResult.ProtoReflect.Descriptor instead. func (*QRepParitionResult) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{38} + return file_flow_proto_rawDescGZIP(), []int{37} } func (x *QRepParitionResult) GetPartitions() []*QRepPartition { @@ -2714,7 +2645,7 @@ type DropFlowInput struct { func (x *DropFlowInput) Reset() { *x = DropFlowInput{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[39] + mi := &file_flow_proto_msgTypes[38] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2727,7 +2658,7 @@ func (x *DropFlowInput) String() string { func (*DropFlowInput) ProtoMessage() {} func (x *DropFlowInput) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[39] + mi := &file_flow_proto_msgTypes[38] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2740,7 +2671,7 @@ func (x *DropFlowInput) ProtoReflect() protoreflect.Message { // Deprecated: Use DropFlowInput.ProtoReflect.Descriptor instead. func (*DropFlowInput) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{39} + return file_flow_proto_rawDescGZIP(), []int{38} } func (x *DropFlowInput) GetFlowName() string { @@ -2762,7 +2693,7 @@ type DeltaAddedColumn struct { func (x *DeltaAddedColumn) Reset() { *x = DeltaAddedColumn{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[40] + mi := &file_flow_proto_msgTypes[39] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2775,7 +2706,7 @@ func (x *DeltaAddedColumn) String() string { func (*DeltaAddedColumn) ProtoMessage() {} func (x *DeltaAddedColumn) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[40] + mi := &file_flow_proto_msgTypes[39] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2788,7 +2719,7 @@ func (x *DeltaAddedColumn) ProtoReflect() protoreflect.Message { // Deprecated: Use DeltaAddedColumn.ProtoReflect.Descriptor instead. func (*DeltaAddedColumn) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{40} + return file_flow_proto_rawDescGZIP(), []int{39} } func (x *DeltaAddedColumn) GetColumnName() string { @@ -2818,7 +2749,7 @@ type TableSchemaDelta struct { func (x *TableSchemaDelta) Reset() { *x = TableSchemaDelta{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[41] + mi := &file_flow_proto_msgTypes[40] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2831,7 +2762,7 @@ func (x *TableSchemaDelta) String() string { func (*TableSchemaDelta) ProtoMessage() {} func (x *TableSchemaDelta) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[41] + mi := &file_flow_proto_msgTypes[40] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2844,7 +2775,7 @@ func (x *TableSchemaDelta) ProtoReflect() protoreflect.Message { // Deprecated: Use TableSchemaDelta.ProtoReflect.Descriptor instead. func (*TableSchemaDelta) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{41} + return file_flow_proto_rawDescGZIP(), []int{40} } func (x *TableSchemaDelta) GetSrcTableName() string { @@ -2880,7 +2811,7 @@ type ReplayTableSchemaDeltaInput struct { func (x *ReplayTableSchemaDeltaInput) Reset() { *x = ReplayTableSchemaDeltaInput{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[42] + mi := &file_flow_proto_msgTypes[41] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2893,7 +2824,7 @@ func (x *ReplayTableSchemaDeltaInput) String() string { func (*ReplayTableSchemaDeltaInput) ProtoMessage() {} func (x *ReplayTableSchemaDeltaInput) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[42] + mi := &file_flow_proto_msgTypes[41] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2906,7 +2837,7 @@ func (x *ReplayTableSchemaDeltaInput) ProtoReflect() protoreflect.Message { // Deprecated: Use ReplayTableSchemaDeltaInput.ProtoReflect.Descriptor instead. func (*ReplayTableSchemaDeltaInput) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{42} + return file_flow_proto_rawDescGZIP(), []int{41} } func (x *ReplayTableSchemaDeltaInput) GetFlowConnectionConfigs() *FlowConnectionConfigs { @@ -3378,29 +3309,21 @@ var file_flow_proto_rawDesc = []byte{ 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x49, 0x44, 0x52, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x22, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, - 0x6f, 0x77, 0x2e, 0x54, 0x49, 0x44, 0x52, 0x03, 0x65, 0x6e, 0x64, 0x22, 0x3c, 0x0a, 0x12, 0x58, - 0x4d, 0x49, 0x4e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, - 0x65, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, - 0x52, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x65, 0x6e, 0x64, 0x22, 0xaa, 0x02, 0x0a, 0x0e, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x3d, 0x0a, 0x09, - 0x69, 0x6e, 0x74, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x6e, - 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, - 0x00, 0x52, 0x08, 0x69, 0x6e, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4f, 0x0a, 0x0f, 0x74, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, - 0x6f, 0x77, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x50, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x0e, 0x74, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x3d, 0x0a, 0x09, - 0x74, 0x69, 0x64, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x49, - 0x44, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, - 0x00, 0x52, 0x08, 0x74, 0x69, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x40, 0x0a, 0x0a, 0x78, - 0x6d, 0x69, 0x6e, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x58, 0x4d, - 0x49, 0x4e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, - 0x48, 0x00, 0x52, 0x09, 0x78, 0x6d, 0x69, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x42, 0x07, 0x0a, + 0x6f, 0x77, 0x2e, 0x54, 0x49, 0x44, 0x52, 0x03, 0x65, 0x6e, 0x64, 0x22, 0xe8, 0x01, 0x0a, 0x0e, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x3d, + 0x0a, 0x09, 0x69, 0x6e, 0x74, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, + 0x49, 0x6e, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, + 0x65, 0x48, 0x00, 0x52, 0x08, 0x69, 0x6e, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4f, 0x0a, + 0x0f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x50, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x0e, + 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x3d, + 0x0a, 0x09, 0x74, 0x69, 0x64, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, + 0x54, 0x49, 0x44, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, + 0x65, 0x48, 0x00, 0x52, 0x08, 0x74, 0x69, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x42, 0x07, 0x0a, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x22, 0x78, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1a, 0x2e, 0x70, 0x65, @@ -3551,7 +3474,7 @@ func file_flow_proto_rawDescGZIP() []byte { } var file_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 54) +var file_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 53) var file_flow_proto_goTypes = []interface{}{ (QRepSyncMode)(0), // 0: peerdb_flow.QRepSyncMode (QRepWriteType)(0), // 1: peerdb_flow.QRepWriteType @@ -3587,99 +3510,97 @@ var file_flow_proto_goTypes = []interface{}{ (*TimestampPartitionRange)(nil), // 31: peerdb_flow.TimestampPartitionRange (*TID)(nil), // 32: peerdb_flow.TID (*TIDPartitionRange)(nil), // 33: peerdb_flow.TIDPartitionRange - (*XMINPartitionRange)(nil), // 34: peerdb_flow.XMINPartitionRange - (*PartitionRange)(nil), // 35: peerdb_flow.PartitionRange - (*QRepWriteMode)(nil), // 36: peerdb_flow.QRepWriteMode - (*QRepConfig)(nil), // 37: peerdb_flow.QRepConfig - (*QRepPartition)(nil), // 38: peerdb_flow.QRepPartition - (*QRepPartitionBatch)(nil), // 39: peerdb_flow.QRepPartitionBatch - (*QRepParitionResult)(nil), // 40: peerdb_flow.QRepParitionResult - (*DropFlowInput)(nil), // 41: peerdb_flow.DropFlowInput - (*DeltaAddedColumn)(nil), // 42: peerdb_flow.DeltaAddedColumn - (*TableSchemaDelta)(nil), // 43: peerdb_flow.TableSchemaDelta - (*ReplayTableSchemaDeltaInput)(nil), // 44: peerdb_flow.ReplayTableSchemaDeltaInput - nil, // 45: peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry - nil, // 46: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry - nil, // 47: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry - nil, // 48: peerdb_flow.StartFlowInput.RelationMessageMappingEntry - nil, // 49: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry - nil, // 50: peerdb_flow.SetupReplicationInput.TableNameMappingEntry - nil, // 51: peerdb_flow.CreateRawTableInput.TableNameMappingEntry - nil, // 52: peerdb_flow.TableSchema.ColumnsEntry - nil, // 53: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry - nil, // 54: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry - nil, // 55: peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry - (*Peer)(nil), // 56: peerdb_peers.Peer - (*timestamppb.Timestamp)(nil), // 57: google.protobuf.Timestamp + (*PartitionRange)(nil), // 34: peerdb_flow.PartitionRange + (*QRepWriteMode)(nil), // 35: peerdb_flow.QRepWriteMode + (*QRepConfig)(nil), // 36: peerdb_flow.QRepConfig + (*QRepPartition)(nil), // 37: peerdb_flow.QRepPartition + (*QRepPartitionBatch)(nil), // 38: peerdb_flow.QRepPartitionBatch + (*QRepParitionResult)(nil), // 39: peerdb_flow.QRepParitionResult + (*DropFlowInput)(nil), // 40: peerdb_flow.DropFlowInput + (*DeltaAddedColumn)(nil), // 41: peerdb_flow.DeltaAddedColumn + (*TableSchemaDelta)(nil), // 42: peerdb_flow.TableSchemaDelta + (*ReplayTableSchemaDeltaInput)(nil), // 43: peerdb_flow.ReplayTableSchemaDeltaInput + nil, // 44: peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry + nil, // 45: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry + nil, // 46: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry + nil, // 47: peerdb_flow.StartFlowInput.RelationMessageMappingEntry + nil, // 48: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry + nil, // 49: peerdb_flow.SetupReplicationInput.TableNameMappingEntry + nil, // 50: peerdb_flow.CreateRawTableInput.TableNameMappingEntry + nil, // 51: peerdb_flow.TableSchema.ColumnsEntry + nil, // 52: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry + nil, // 53: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry + nil, // 54: peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry + (*Peer)(nil), // 55: peerdb_peers.Peer + (*timestamppb.Timestamp)(nil), // 56: google.protobuf.Timestamp } var file_flow_proto_depIdxs = []int32{ 3, // 0: peerdb_flow.RelationMessage.columns:type_name -> peerdb_flow.RelationMessageColumn - 56, // 1: peerdb_flow.FlowConnectionConfigs.source:type_name -> peerdb_peers.Peer - 56, // 2: peerdb_flow.FlowConnectionConfigs.destination:type_name -> peerdb_peers.Peer + 55, // 1: peerdb_flow.FlowConnectionConfigs.source:type_name -> peerdb_peers.Peer + 55, // 2: peerdb_flow.FlowConnectionConfigs.destination:type_name -> peerdb_peers.Peer 23, // 3: peerdb_flow.FlowConnectionConfigs.table_schema:type_name -> peerdb_flow.TableSchema 5, // 4: peerdb_flow.FlowConnectionConfigs.table_mappings:type_name -> peerdb_flow.TableMapping - 45, // 5: peerdb_flow.FlowConnectionConfigs.src_table_id_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry - 46, // 6: peerdb_flow.FlowConnectionConfigs.table_name_schema_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry - 56, // 7: peerdb_flow.FlowConnectionConfigs.metadata_peer:type_name -> peerdb_peers.Peer + 44, // 5: peerdb_flow.FlowConnectionConfigs.src_table_id_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry + 45, // 6: peerdb_flow.FlowConnectionConfigs.table_name_schema_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry + 55, // 7: peerdb_flow.FlowConnectionConfigs.metadata_peer:type_name -> peerdb_peers.Peer 0, // 8: peerdb_flow.FlowConnectionConfigs.snapshot_sync_mode:type_name -> peerdb_flow.QRepSyncMode 0, // 9: peerdb_flow.FlowConnectionConfigs.cdc_sync_mode:type_name -> peerdb_flow.QRepSyncMode - 47, // 10: peerdb_flow.SyncFlowOptions.relation_message_mapping:type_name -> peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry - 57, // 11: peerdb_flow.LastSyncState.last_synced_at:type_name -> google.protobuf.Timestamp + 46, // 10: peerdb_flow.SyncFlowOptions.relation_message_mapping:type_name -> peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry + 56, // 11: peerdb_flow.LastSyncState.last_synced_at:type_name -> google.protobuf.Timestamp 9, // 12: peerdb_flow.StartFlowInput.last_sync_state:type_name -> peerdb_flow.LastSyncState 6, // 13: peerdb_flow.StartFlowInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs 7, // 14: peerdb_flow.StartFlowInput.sync_flow_options:type_name -> peerdb_flow.SyncFlowOptions - 48, // 15: peerdb_flow.StartFlowInput.relation_message_mapping:type_name -> peerdb_flow.StartFlowInput.RelationMessageMappingEntry + 47, // 15: peerdb_flow.StartFlowInput.relation_message_mapping:type_name -> peerdb_flow.StartFlowInput.RelationMessageMappingEntry 6, // 16: peerdb_flow.StartNormalizeInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 56, // 17: peerdb_flow.GetLastSyncedIDInput.peer_connection_config:type_name -> peerdb_peers.Peer - 56, // 18: peerdb_flow.EnsurePullabilityInput.peer_connection_config:type_name -> peerdb_peers.Peer - 56, // 19: peerdb_flow.EnsurePullabilityBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 55, // 17: peerdb_flow.GetLastSyncedIDInput.peer_connection_config:type_name -> peerdb_peers.Peer + 55, // 18: peerdb_flow.EnsurePullabilityInput.peer_connection_config:type_name -> peerdb_peers.Peer + 55, // 19: peerdb_flow.EnsurePullabilityBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer 15, // 20: peerdb_flow.TableIdentifier.postgres_table_identifier:type_name -> peerdb_flow.PostgresTableIdentifier 16, // 21: peerdb_flow.EnsurePullabilityOutput.table_identifier:type_name -> peerdb_flow.TableIdentifier - 49, // 22: peerdb_flow.EnsurePullabilityBatchOutput.table_identifier_mapping:type_name -> peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry - 56, // 23: peerdb_flow.SetupReplicationInput.peer_connection_config:type_name -> peerdb_peers.Peer - 50, // 24: peerdb_flow.SetupReplicationInput.table_name_mapping:type_name -> peerdb_flow.SetupReplicationInput.TableNameMappingEntry - 56, // 25: peerdb_flow.SetupReplicationInput.destination_peer:type_name -> peerdb_peers.Peer - 56, // 26: peerdb_flow.CreateRawTableInput.peer_connection_config:type_name -> peerdb_peers.Peer - 51, // 27: peerdb_flow.CreateRawTableInput.table_name_mapping:type_name -> peerdb_flow.CreateRawTableInput.TableNameMappingEntry + 48, // 22: peerdb_flow.EnsurePullabilityBatchOutput.table_identifier_mapping:type_name -> peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry + 55, // 23: peerdb_flow.SetupReplicationInput.peer_connection_config:type_name -> peerdb_peers.Peer + 49, // 24: peerdb_flow.SetupReplicationInput.table_name_mapping:type_name -> peerdb_flow.SetupReplicationInput.TableNameMappingEntry + 55, // 25: peerdb_flow.SetupReplicationInput.destination_peer:type_name -> peerdb_peers.Peer + 55, // 26: peerdb_flow.CreateRawTableInput.peer_connection_config:type_name -> peerdb_peers.Peer + 50, // 27: peerdb_flow.CreateRawTableInput.table_name_mapping:type_name -> peerdb_flow.CreateRawTableInput.TableNameMappingEntry 0, // 28: peerdb_flow.CreateRawTableInput.cdc_sync_mode:type_name -> peerdb_flow.QRepSyncMode - 52, // 29: peerdb_flow.TableSchema.columns:type_name -> peerdb_flow.TableSchema.ColumnsEntry - 56, // 30: peerdb_flow.GetTableSchemaBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer - 53, // 31: peerdb_flow.GetTableSchemaBatchOutput.table_name_schema_mapping:type_name -> peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry - 56, // 32: peerdb_flow.SetupNormalizedTableInput.peer_connection_config:type_name -> peerdb_peers.Peer + 51, // 29: peerdb_flow.TableSchema.columns:type_name -> peerdb_flow.TableSchema.ColumnsEntry + 55, // 30: peerdb_flow.GetTableSchemaBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 52, // 31: peerdb_flow.GetTableSchemaBatchOutput.table_name_schema_mapping:type_name -> peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry + 55, // 32: peerdb_flow.SetupNormalizedTableInput.peer_connection_config:type_name -> peerdb_peers.Peer 23, // 33: peerdb_flow.SetupNormalizedTableInput.source_table_schema:type_name -> peerdb_flow.TableSchema - 56, // 34: peerdb_flow.SetupNormalizedTableBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer - 54, // 35: peerdb_flow.SetupNormalizedTableBatchInput.table_name_schema_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry - 55, // 36: peerdb_flow.SetupNormalizedTableBatchOutput.table_exists_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry - 57, // 37: peerdb_flow.TimestampPartitionRange.start:type_name -> google.protobuf.Timestamp - 57, // 38: peerdb_flow.TimestampPartitionRange.end:type_name -> google.protobuf.Timestamp + 55, // 34: peerdb_flow.SetupNormalizedTableBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 53, // 35: peerdb_flow.SetupNormalizedTableBatchInput.table_name_schema_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry + 54, // 36: peerdb_flow.SetupNormalizedTableBatchOutput.table_exists_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry + 56, // 37: peerdb_flow.TimestampPartitionRange.start:type_name -> google.protobuf.Timestamp + 56, // 38: peerdb_flow.TimestampPartitionRange.end:type_name -> google.protobuf.Timestamp 32, // 39: peerdb_flow.TIDPartitionRange.start:type_name -> peerdb_flow.TID 32, // 40: peerdb_flow.TIDPartitionRange.end:type_name -> peerdb_flow.TID 30, // 41: peerdb_flow.PartitionRange.int_range:type_name -> peerdb_flow.IntPartitionRange 31, // 42: peerdb_flow.PartitionRange.timestamp_range:type_name -> peerdb_flow.TimestampPartitionRange 33, // 43: peerdb_flow.PartitionRange.tid_range:type_name -> peerdb_flow.TIDPartitionRange - 34, // 44: peerdb_flow.PartitionRange.xmin_range:type_name -> peerdb_flow.XMINPartitionRange - 1, // 45: peerdb_flow.QRepWriteMode.write_type:type_name -> peerdb_flow.QRepWriteType - 56, // 46: peerdb_flow.QRepConfig.source_peer:type_name -> peerdb_peers.Peer - 56, // 47: peerdb_flow.QRepConfig.destination_peer:type_name -> peerdb_peers.Peer - 0, // 48: peerdb_flow.QRepConfig.sync_mode:type_name -> peerdb_flow.QRepSyncMode - 36, // 49: peerdb_flow.QRepConfig.write_mode:type_name -> peerdb_flow.QRepWriteMode - 35, // 50: peerdb_flow.QRepPartition.range:type_name -> peerdb_flow.PartitionRange - 38, // 51: peerdb_flow.QRepPartitionBatch.partitions:type_name -> peerdb_flow.QRepPartition - 38, // 52: peerdb_flow.QRepParitionResult.partitions:type_name -> peerdb_flow.QRepPartition - 42, // 53: peerdb_flow.TableSchemaDelta.added_columns:type_name -> peerdb_flow.DeltaAddedColumn - 6, // 54: peerdb_flow.ReplayTableSchemaDeltaInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 43, // 55: peerdb_flow.ReplayTableSchemaDeltaInput.table_schema_deltas:type_name -> peerdb_flow.TableSchemaDelta - 23, // 56: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema - 4, // 57: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage - 4, // 58: peerdb_flow.StartFlowInput.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage - 16, // 59: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry.value:type_name -> peerdb_flow.TableIdentifier - 23, // 60: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema - 23, // 61: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema - 62, // [62:62] is the sub-list for method output_type - 62, // [62:62] is the sub-list for method input_type - 62, // [62:62] is the sub-list for extension type_name - 62, // [62:62] is the sub-list for extension extendee - 0, // [0:62] is the sub-list for field type_name + 1, // 44: peerdb_flow.QRepWriteMode.write_type:type_name -> peerdb_flow.QRepWriteType + 55, // 45: peerdb_flow.QRepConfig.source_peer:type_name -> peerdb_peers.Peer + 55, // 46: peerdb_flow.QRepConfig.destination_peer:type_name -> peerdb_peers.Peer + 0, // 47: peerdb_flow.QRepConfig.sync_mode:type_name -> peerdb_flow.QRepSyncMode + 35, // 48: peerdb_flow.QRepConfig.write_mode:type_name -> peerdb_flow.QRepWriteMode + 34, // 49: peerdb_flow.QRepPartition.range:type_name -> peerdb_flow.PartitionRange + 37, // 50: peerdb_flow.QRepPartitionBatch.partitions:type_name -> peerdb_flow.QRepPartition + 37, // 51: peerdb_flow.QRepParitionResult.partitions:type_name -> peerdb_flow.QRepPartition + 41, // 52: peerdb_flow.TableSchemaDelta.added_columns:type_name -> peerdb_flow.DeltaAddedColumn + 6, // 53: peerdb_flow.ReplayTableSchemaDeltaInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs + 42, // 54: peerdb_flow.ReplayTableSchemaDeltaInput.table_schema_deltas:type_name -> peerdb_flow.TableSchemaDelta + 23, // 55: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema + 4, // 56: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage + 4, // 57: peerdb_flow.StartFlowInput.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage + 16, // 58: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry.value:type_name -> peerdb_flow.TableIdentifier + 23, // 59: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema + 23, // 60: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema + 61, // [61:61] is the sub-list for method output_type + 61, // [61:61] is the sub-list for method input_type + 61, // [61:61] is the sub-list for extension type_name + 61, // [61:61] is the sub-list for extension extendee + 0, // [0:61] is the sub-list for field type_name } func init() { file_flow_proto_init() } @@ -4074,18 +3995,6 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[32].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*XMINPartitionRange); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_flow_proto_msgTypes[33].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PartitionRange); i { case 0: return &v.state @@ -4097,7 +4006,7 @@ func file_flow_proto_init() { return nil } } - file_flow_proto_msgTypes[34].Exporter = func(v interface{}, i int) interface{} { + file_flow_proto_msgTypes[33].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*QRepWriteMode); i { case 0: return &v.state @@ -4109,7 +4018,7 @@ func file_flow_proto_init() { return nil } } - file_flow_proto_msgTypes[35].Exporter = func(v interface{}, i int) interface{} { + file_flow_proto_msgTypes[34].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*QRepConfig); i { case 0: return &v.state @@ -4121,7 +4030,7 @@ func file_flow_proto_init() { return nil } } - file_flow_proto_msgTypes[36].Exporter = func(v interface{}, i int) interface{} { + file_flow_proto_msgTypes[35].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*QRepPartition); i { case 0: return &v.state @@ -4133,7 +4042,7 @@ func file_flow_proto_init() { return nil } } - file_flow_proto_msgTypes[37].Exporter = func(v interface{}, i int) interface{} { + file_flow_proto_msgTypes[36].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*QRepPartitionBatch); i { case 0: return &v.state @@ -4145,7 +4054,7 @@ func file_flow_proto_init() { return nil } } - file_flow_proto_msgTypes[38].Exporter = func(v interface{}, i int) interface{} { + file_flow_proto_msgTypes[37].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*QRepParitionResult); i { case 0: return &v.state @@ -4157,7 +4066,7 @@ func file_flow_proto_init() { return nil } } - file_flow_proto_msgTypes[39].Exporter = func(v interface{}, i int) interface{} { + file_flow_proto_msgTypes[38].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DropFlowInput); i { case 0: return &v.state @@ -4169,7 +4078,7 @@ func file_flow_proto_init() { return nil } } - file_flow_proto_msgTypes[40].Exporter = func(v interface{}, i int) interface{} { + file_flow_proto_msgTypes[39].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DeltaAddedColumn); i { case 0: return &v.state @@ -4181,7 +4090,7 @@ func file_flow_proto_init() { return nil } } - file_flow_proto_msgTypes[41].Exporter = func(v interface{}, i int) interface{} { + file_flow_proto_msgTypes[40].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*TableSchemaDelta); i { case 0: return &v.state @@ -4193,7 +4102,7 @@ func file_flow_proto_init() { return nil } } - file_flow_proto_msgTypes[42].Exporter = func(v interface{}, i int) interface{} { + file_flow_proto_msgTypes[41].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ReplayTableSchemaDeltaInput); i { case 0: return &v.state @@ -4209,11 +4118,10 @@ func file_flow_proto_init() { file_flow_proto_msgTypes[14].OneofWrappers = []interface{}{ (*TableIdentifier_PostgresTableIdentifier)(nil), } - file_flow_proto_msgTypes[33].OneofWrappers = []interface{}{ + file_flow_proto_msgTypes[32].OneofWrappers = []interface{}{ (*PartitionRange_IntRange)(nil), (*PartitionRange_TimestampRange)(nil), (*PartitionRange_TidRange)(nil), - (*PartitionRange_XminRange)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -4221,7 +4129,7 @@ func file_flow_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_flow_proto_rawDesc, NumEnums: 2, - NumMessages: 54, + NumMessages: 53, NumExtensions: 0, NumServices: 0, }, diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index ab35dc9efb..72fed1c4c0 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -331,17 +331,9 @@ pub struct TidPartitionRange { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct XminPartitionRange { - #[prost(uint32, tag="1")] - pub start: u32, - #[prost(uint32, tag="2")] - pub end: u32, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct PartitionRange { /// can be a timestamp range or an integer range - #[prost(oneof="partition_range::Range", tags="1, 2, 3, 4")] + #[prost(oneof="partition_range::Range", tags="1, 2, 3")] pub range: ::core::option::Option, } /// Nested message and enum types in `PartitionRange`. @@ -356,8 +348,6 @@ pub mod partition_range { TimestampRange(super::TimestampPartitionRange), #[prost(message, tag="3")] TidRange(super::TidPartitionRange), - #[prost(message, tag="4")] - XminRange(super::XminPartitionRange), } } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index d0f98e6ff3..53ade808cd 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -2088,9 +2088,6 @@ impl serde::Serialize for PartitionRange { partition_range::Range::TidRange(v) => { struct_ser.serialize_field("tidRange", v)?; } - partition_range::Range::XminRange(v) => { - struct_ser.serialize_field("xminRange", v)?; - } } } struct_ser.end() @@ -2109,8 +2106,6 @@ impl<'de> serde::Deserialize<'de> for PartitionRange { "timestampRange", "tid_range", "tidRange", - "xmin_range", - "xminRange", ]; #[allow(clippy::enum_variant_names)] @@ -2118,7 +2113,6 @@ impl<'de> serde::Deserialize<'de> for PartitionRange { IntRange, TimestampRange, TidRange, - XminRange, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -2144,7 +2138,6 @@ impl<'de> serde::Deserialize<'de> for PartitionRange { "intRange" | "int_range" => Ok(GeneratedField::IntRange), "timestampRange" | "timestamp_range" => Ok(GeneratedField::TimestampRange), "tidRange" | "tid_range" => Ok(GeneratedField::TidRange), - "xminRange" | "xmin_range" => Ok(GeneratedField::XminRange), _ => Ok(GeneratedField::__SkipField__), } } @@ -2186,13 +2179,6 @@ impl<'de> serde::Deserialize<'de> for PartitionRange { return Err(serde::de::Error::duplicate_field("tidRange")); } range__ = map.next_value::<::std::option::Option<_>>()?.map(partition_range::Range::TidRange) -; - } - GeneratedField::XminRange => { - if range__.is_some() { - return Err(serde::de::Error::duplicate_field("xminRange")); - } - range__ = map.next_value::<::std::option::Option<_>>()?.map(partition_range::Range::XminRange) ; } GeneratedField::__SkipField__ => { @@ -5808,119 +5794,3 @@ impl<'de> serde::Deserialize<'de> for TimestampPartitionRange { deserializer.deserialize_struct("peerdb_flow.TimestampPartitionRange", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for XminPartitionRange { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut len = 0; - if self.start != 0 { - len += 1; - } - if self.end != 0 { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("peerdb_flow.XMINPartitionRange", len)?; - if self.start != 0 { - struct_ser.serialize_field("start", &self.start)?; - } - if self.end != 0 { - struct_ser.serialize_field("end", &self.end)?; - } - struct_ser.end() - } -} -impl<'de> serde::Deserialize<'de> for XminPartitionRange { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "start", - "end", - ]; - - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - Start, - End, - __SkipField__, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "start" => Ok(GeneratedField::Start), - "end" => Ok(GeneratedField::End), - _ => Ok(GeneratedField::__SkipField__), - } - } - } - deserializer.deserialize_identifier(GeneratedVisitor) - } - } - struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = XminPartitionRange; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct peerdb_flow.XMINPartitionRange") - } - - fn visit_map(self, mut map: V) -> std::result::Result - where - V: serde::de::MapAccess<'de>, - { - let mut start__ = None; - let mut end__ = None; - while let Some(k) = map.next_key()? { - match k { - GeneratedField::Start => { - if start__.is_some() { - return Err(serde::de::Error::duplicate_field("start")); - } - start__ = - Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) - ; - } - GeneratedField::End => { - if end__.is_some() { - return Err(serde::de::Error::duplicate_field("end")); - } - end__ = - Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) - ; - } - GeneratedField::__SkipField__ => { - let _ = map.next_value::()?; - } - } - } - Ok(XminPartitionRange { - start: start__.unwrap_or_default(), - end: end__.unwrap_or_default(), - }) - } - } - deserializer.deserialize_struct("peerdb_flow.XMINPartitionRange", FIELDS, GeneratedVisitor) - } -} diff --git a/protos/flow.proto b/protos/flow.proto index 1cda641cac..0ce5bc44b4 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -208,18 +208,12 @@ message TIDPartitionRange { TID end = 2; } -message XMINPartitionRange { - uint32 start = 1; - uint32 end = 2; -} - message PartitionRange { // can be a timestamp range or an integer range oneof range { IntPartitionRange int_range = 1; TimestampPartitionRange timestamp_range = 2; TIDPartitionRange tid_range = 3; - XMINPartitionRange xmin_range = 4; } } diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 159842ccb2..83bc41a439 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -336,16 +336,10 @@ export interface TIDPartitionRange { end: TID | undefined; } -export interface XMINPartitionRange { - start: number; - end: number; -} - export interface PartitionRange { intRange?: IntPartitionRange | undefined; timestampRange?: TimestampPartitionRange | undefined; tidRange?: TIDPartitionRange | undefined; - xminRange?: XMINPartitionRange | undefined; } export interface QRepWriteMode { @@ -4436,79 +4430,8 @@ export const TIDPartitionRange = { }, }; -function createBaseXMINPartitionRange(): XMINPartitionRange { - return { start: 0, end: 0 }; -} - -export const XMINPartitionRange = { - encode(message: XMINPartitionRange, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - if (message.start !== 0) { - writer.uint32(8).uint32(message.start); - } - if (message.end !== 0) { - writer.uint32(16).uint32(message.end); - } - return writer; - }, - - decode(input: _m0.Reader | Uint8Array, length?: number): XMINPartitionRange { - const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseXMINPartitionRange(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 8) { - break; - } - - message.start = reader.uint32(); - continue; - case 2: - if (tag !== 16) { - break; - } - - message.end = reader.uint32(); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - fromJSON(object: any): XMINPartitionRange { - return { start: isSet(object.start) ? Number(object.start) : 0, end: isSet(object.end) ? Number(object.end) : 0 }; - }, - - toJSON(message: XMINPartitionRange): unknown { - const obj: any = {}; - if (message.start !== 0) { - obj.start = Math.round(message.start); - } - if (message.end !== 0) { - obj.end = Math.round(message.end); - } - return obj; - }, - - create, I>>(base?: I): XMINPartitionRange { - return XMINPartitionRange.fromPartial(base ?? ({} as any)); - }, - fromPartial, I>>(object: I): XMINPartitionRange { - const message = createBaseXMINPartitionRange(); - message.start = object.start ?? 0; - message.end = object.end ?? 0; - return message; - }, -}; - function createBasePartitionRange(): PartitionRange { - return { intRange: undefined, timestampRange: undefined, tidRange: undefined, xminRange: undefined }; + return { intRange: undefined, timestampRange: undefined, tidRange: undefined }; } export const PartitionRange = { @@ -4522,9 +4445,6 @@ export const PartitionRange = { if (message.tidRange !== undefined) { TIDPartitionRange.encode(message.tidRange, writer.uint32(26).fork()).ldelim(); } - if (message.xminRange !== undefined) { - XMINPartitionRange.encode(message.xminRange, writer.uint32(34).fork()).ldelim(); - } return writer; }, @@ -4556,13 +4476,6 @@ export const PartitionRange = { message.tidRange = TIDPartitionRange.decode(reader, reader.uint32()); continue; - case 4: - if (tag !== 34) { - break; - } - - message.xminRange = XMINPartitionRange.decode(reader, reader.uint32()); - continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -4579,7 +4492,6 @@ export const PartitionRange = { ? TimestampPartitionRange.fromJSON(object.timestampRange) : undefined, tidRange: isSet(object.tidRange) ? TIDPartitionRange.fromJSON(object.tidRange) : undefined, - xminRange: isSet(object.xminRange) ? XMINPartitionRange.fromJSON(object.xminRange) : undefined, }; }, @@ -4594,9 +4506,6 @@ export const PartitionRange = { if (message.tidRange !== undefined) { obj.tidRange = TIDPartitionRange.toJSON(message.tidRange); } - if (message.xminRange !== undefined) { - obj.xminRange = XMINPartitionRange.toJSON(message.xminRange); - } return obj; }, @@ -4614,9 +4523,6 @@ export const PartitionRange = { message.tidRange = (object.tidRange !== undefined && object.tidRange !== null) ? TIDPartitionRange.fromPartial(object.tidRange) : undefined; - message.xminRange = (object.xminRange !== undefined && object.xminRange !== null) - ? XMINPartitionRange.fromPartial(object.xminRange) - : undefined; return message; }, };