diff --git a/flow/connectors/utils/partition/partition.go b/flow/connectors/utils/partition/partition.go index d58d122b52..68a0c30e3f 100644 --- a/flow/connectors/utils/partition/partition.go +++ b/flow/connectors/utils/partition/partition.go @@ -125,6 +125,20 @@ func createTIDPartition(start pgtype.TID, end pgtype.TID) *protos.QRepPartition } } +func createXMINPartition(start uint32, end uint32) *protos.QRepPartition { + return &protos.QRepPartition{ + PartitionId: uuid.New().String(), + Range: &protos.PartitionRange{ + Range: &protos.PartitionRange_XminRange{ + XminRange: &protos.XMINPartitionRange{ + Start: start, + End: end, + }, + }, + }, + } +} + type PartitionHelper struct { prevStart interface{} prevEnd interface{} @@ -174,6 +188,10 @@ func (p *PartitionHelper) AddPartition(start interface{}, end interface{}) error p.partitions = append(p.partitions, createTIDPartition(v, end.(pgtype.TID))) p.prevStart = v p.prevEnd = end + case pgtype.Uint32: + p.partitions = append(p.partitions, createXMINPartition(v.Uint32, end.(uint32))) + p.prevStart = v + p.prevEnd = end default: return fmt.Errorf("unsupported type: %T", v) } diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index e00c43999e..dd6de4ccaf 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -2068,6 +2068,61 @@ func (x *TIDPartitionRange) GetEnd() *TID { return nil } +type XMINPartitionRange struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Start uint32 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"` + End uint32 `protobuf:"varint,2,opt,name=end,proto3" json:"end,omitempty"` +} + +func (x *XMINPartitionRange) Reset() { + *x = XMINPartitionRange{} + if protoimpl.UnsafeEnabled { + mi := &file_flow_proto_msgTypes[31] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *XMINPartitionRange) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*XMINPartitionRange) ProtoMessage() {} + +func (x *XMINPartitionRange) ProtoReflect() protoreflect.Message { + mi := &file_flow_proto_msgTypes[31] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use XMINPartitionRange.ProtoReflect.Descriptor instead. +func (*XMINPartitionRange) Descriptor() ([]byte, []int) { + return file_flow_proto_rawDescGZIP(), []int{31} +} + +func (x *XMINPartitionRange) GetStart() uint32 { + if x != nil { + return x.Start + } + return 0 +} + +func (x *XMINPartitionRange) GetEnd() uint32 { + if x != nil { + return x.End + } + return 0 +} + type PartitionRange struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2080,13 +2135,14 @@ type PartitionRange struct { // *PartitionRange_IntRange // *PartitionRange_TimestampRange // *PartitionRange_TidRange + // *PartitionRange_XminRange Range isPartitionRange_Range `protobuf_oneof:"range"` } func (x *PartitionRange) Reset() { *x = PartitionRange{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[31] + mi := &file_flow_proto_msgTypes[32] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2099,7 +2155,7 @@ func (x *PartitionRange) String() string { func (*PartitionRange) ProtoMessage() {} func (x *PartitionRange) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[31] + mi := &file_flow_proto_msgTypes[32] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2112,7 +2168,7 @@ func (x *PartitionRange) ProtoReflect() protoreflect.Message { // Deprecated: Use PartitionRange.ProtoReflect.Descriptor instead. func (*PartitionRange) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{31} + return file_flow_proto_rawDescGZIP(), []int{32} } func (m *PartitionRange) GetRange() isPartitionRange_Range { @@ -2143,6 +2199,13 @@ func (x *PartitionRange) GetTidRange() *TIDPartitionRange { return nil } +func (x *PartitionRange) GetXminRange() *XMINPartitionRange { + if x, ok := x.GetRange().(*PartitionRange_XminRange); ok { + return x.XminRange + } + return nil +} + type isPartitionRange_Range interface { isPartitionRange_Range() } @@ -2159,12 +2222,18 @@ type PartitionRange_TidRange struct { TidRange *TIDPartitionRange `protobuf:"bytes,3,opt,name=tid_range,json=tidRange,proto3,oneof"` } +type PartitionRange_XminRange struct { + XminRange *XMINPartitionRange `protobuf:"bytes,4,opt,name=xmin_range,json=xminRange,proto3,oneof"` +} + func (*PartitionRange_IntRange) isPartitionRange_Range() {} func (*PartitionRange_TimestampRange) isPartitionRange_Range() {} func (*PartitionRange_TidRange) isPartitionRange_Range() {} +func (*PartitionRange_XminRange) isPartitionRange_Range() {} + type QRepWriteMode struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2177,7 +2246,7 @@ type QRepWriteMode struct { func (x *QRepWriteMode) Reset() { *x = QRepWriteMode{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[32] + mi := &file_flow_proto_msgTypes[33] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2190,7 +2259,7 @@ func (x *QRepWriteMode) String() string { func (*QRepWriteMode) ProtoMessage() {} func (x *QRepWriteMode) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[32] + mi := &file_flow_proto_msgTypes[33] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2203,7 +2272,7 @@ func (x *QRepWriteMode) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepWriteMode.ProtoReflect.Descriptor instead. func (*QRepWriteMode) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{32} + return file_flow_proto_rawDescGZIP(), []int{33} } func (x *QRepWriteMode) GetWriteType() QRepWriteType { @@ -2256,7 +2325,7 @@ type QRepConfig struct { func (x *QRepConfig) Reset() { *x = QRepConfig{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[33] + mi := &file_flow_proto_msgTypes[34] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2269,7 +2338,7 @@ func (x *QRepConfig) String() string { func (*QRepConfig) ProtoMessage() {} func (x *QRepConfig) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[33] + mi := &file_flow_proto_msgTypes[34] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2282,7 +2351,7 @@ func (x *QRepConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepConfig.ProtoReflect.Descriptor instead. func (*QRepConfig) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{33} + return file_flow_proto_rawDescGZIP(), []int{34} } func (x *QRepConfig) GetFlowJobName() string { @@ -2410,7 +2479,7 @@ type QRepPartition struct { func (x *QRepPartition) Reset() { *x = QRepPartition{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[34] + mi := &file_flow_proto_msgTypes[35] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2423,7 +2492,7 @@ func (x *QRepPartition) String() string { func (*QRepPartition) ProtoMessage() {} func (x *QRepPartition) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[34] + mi := &file_flow_proto_msgTypes[35] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2436,7 +2505,7 @@ func (x *QRepPartition) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepPartition.ProtoReflect.Descriptor instead. func (*QRepPartition) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{34} + return file_flow_proto_rawDescGZIP(), []int{35} } func (x *QRepPartition) GetPartitionId() string { @@ -2472,7 +2541,7 @@ type QRepPartitionBatch struct { func (x *QRepPartitionBatch) Reset() { *x = QRepPartitionBatch{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[35] + mi := &file_flow_proto_msgTypes[36] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2485,7 +2554,7 @@ func (x *QRepPartitionBatch) String() string { func (*QRepPartitionBatch) ProtoMessage() {} func (x *QRepPartitionBatch) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[35] + mi := &file_flow_proto_msgTypes[36] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2498,7 +2567,7 @@ func (x *QRepPartitionBatch) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepPartitionBatch.ProtoReflect.Descriptor instead. func (*QRepPartitionBatch) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{35} + return file_flow_proto_rawDescGZIP(), []int{36} } func (x *QRepPartitionBatch) GetBatchId() int32 { @@ -2526,7 +2595,7 @@ type QRepParitionResult struct { func (x *QRepParitionResult) Reset() { *x = QRepParitionResult{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[36] + mi := &file_flow_proto_msgTypes[37] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2539,7 +2608,7 @@ func (x *QRepParitionResult) String() string { func (*QRepParitionResult) ProtoMessage() {} func (x *QRepParitionResult) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[36] + mi := &file_flow_proto_msgTypes[37] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2552,7 +2621,7 @@ func (x *QRepParitionResult) ProtoReflect() protoreflect.Message { // Deprecated: Use QRepParitionResult.ProtoReflect.Descriptor instead. func (*QRepParitionResult) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{36} + return file_flow_proto_rawDescGZIP(), []int{37} } func (x *QRepParitionResult) GetPartitions() []*QRepPartition { @@ -2573,7 +2642,7 @@ type DropFlowInput struct { func (x *DropFlowInput) Reset() { *x = DropFlowInput{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[37] + mi := &file_flow_proto_msgTypes[38] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2586,7 +2655,7 @@ func (x *DropFlowInput) String() string { func (*DropFlowInput) ProtoMessage() {} func (x *DropFlowInput) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[37] + mi := &file_flow_proto_msgTypes[38] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2599,7 +2668,7 @@ func (x *DropFlowInput) ProtoReflect() protoreflect.Message { // Deprecated: Use DropFlowInput.ProtoReflect.Descriptor instead. func (*DropFlowInput) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{37} + return file_flow_proto_rawDescGZIP(), []int{38} } func (x *DropFlowInput) GetFlowName() string { @@ -2621,7 +2690,7 @@ type DeltaAddedColumn struct { func (x *DeltaAddedColumn) Reset() { *x = DeltaAddedColumn{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[38] + mi := &file_flow_proto_msgTypes[39] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2634,7 +2703,7 @@ func (x *DeltaAddedColumn) String() string { func (*DeltaAddedColumn) ProtoMessage() {} func (x *DeltaAddedColumn) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[38] + mi := &file_flow_proto_msgTypes[39] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2647,7 +2716,7 @@ func (x *DeltaAddedColumn) ProtoReflect() protoreflect.Message { // Deprecated: Use DeltaAddedColumn.ProtoReflect.Descriptor instead. func (*DeltaAddedColumn) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{38} + return file_flow_proto_rawDescGZIP(), []int{39} } func (x *DeltaAddedColumn) GetColumnName() string { @@ -2678,7 +2747,7 @@ type TableSchemaDelta struct { func (x *TableSchemaDelta) Reset() { *x = TableSchemaDelta{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[39] + mi := &file_flow_proto_msgTypes[40] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2691,7 +2760,7 @@ func (x *TableSchemaDelta) String() string { func (*TableSchemaDelta) ProtoMessage() {} func (x *TableSchemaDelta) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[39] + mi := &file_flow_proto_msgTypes[40] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2704,7 +2773,7 @@ func (x *TableSchemaDelta) ProtoReflect() protoreflect.Message { // Deprecated: Use TableSchemaDelta.ProtoReflect.Descriptor instead. func (*TableSchemaDelta) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{39} + return file_flow_proto_rawDescGZIP(), []int{40} } func (x *TableSchemaDelta) GetSrcTableName() string { @@ -2747,7 +2816,7 @@ type ReplayTableSchemaDeltaInput struct { func (x *ReplayTableSchemaDeltaInput) Reset() { *x = ReplayTableSchemaDeltaInput{} if protoimpl.UnsafeEnabled { - mi := &file_flow_proto_msgTypes[40] + mi := &file_flow_proto_msgTypes[41] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2760,7 +2829,7 @@ func (x *ReplayTableSchemaDeltaInput) String() string { func (*ReplayTableSchemaDeltaInput) ProtoMessage() {} func (x *ReplayTableSchemaDeltaInput) ProtoReflect() protoreflect.Message { - mi := &file_flow_proto_msgTypes[40] + mi := &file_flow_proto_msgTypes[41] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2773,7 +2842,7 @@ func (x *ReplayTableSchemaDeltaInput) ProtoReflect() protoreflect.Message { // Deprecated: Use ReplayTableSchemaDeltaInput.ProtoReflect.Descriptor instead. func (*ReplayTableSchemaDeltaInput) Descriptor() ([]byte, []int) { - return file_flow_proto_rawDescGZIP(), []int{40} + return file_flow_proto_rawDescGZIP(), []int{41} } func (x *ReplayTableSchemaDeltaInput) GetFlowConnectionConfigs() *FlowConnectionConfigs { @@ -3241,21 +3310,29 @@ var file_flow_proto_rawDesc = []byte{ 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x49, 0x44, 0x52, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x22, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x49, 0x44, 0x52, - 0x03, 0x65, 0x6e, 0x64, 0x22, 0xe8, 0x01, 0x0a, 0x0e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x3d, 0x0a, 0x09, 0x69, 0x6e, 0x74, 0x5f, 0x72, - 0x61, 0x6e, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x6e, 0x74, 0x50, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x08, 0x69, 0x6e, - 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4f, 0x0a, 0x0f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x0e, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x3d, 0x0a, 0x09, 0x74, 0x69, 0x64, 0x5f, 0x72, - 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x49, 0x44, 0x50, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x08, 0x74, 0x69, - 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x42, 0x07, 0x0a, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x22, + 0x03, 0x65, 0x6e, 0x64, 0x22, 0x3c, 0x0a, 0x12, 0x58, 0x4d, 0x49, 0x4e, 0x50, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, + 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x65, + 0x6e, 0x64, 0x22, 0xaa, 0x02, 0x0a, 0x0e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x3d, 0x0a, 0x09, 0x69, 0x6e, 0x74, 0x5f, 0x72, 0x61, 0x6e, + 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x6e, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x08, 0x69, 0x6e, 0x74, 0x52, + 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4f, 0x0a, 0x0f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, + 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x0e, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x3d, 0x0a, 0x09, 0x74, 0x69, 0x64, 0x5f, 0x72, 0x61, 0x6e, + 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x49, 0x44, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x08, 0x74, 0x69, 0x64, 0x52, + 0x61, 0x6e, 0x67, 0x65, 0x12, 0x40, 0x0a, 0x0a, 0x78, 0x6d, 0x69, 0x6e, 0x5f, 0x72, 0x61, 0x6e, + 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x58, 0x4d, 0x49, 0x4e, 0x50, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x09, 0x78, 0x6d, 0x69, + 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x42, 0x07, 0x0a, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x22, 0x78, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, @@ -3403,7 +3480,7 @@ func file_flow_proto_rawDescGZIP() []byte { } var file_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 53) +var file_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 54) var file_flow_proto_goTypes = []interface{}{ (QRepSyncMode)(0), // 0: peerdb_flow.QRepSyncMode (QRepWriteType)(0), // 1: peerdb_flow.QRepWriteType @@ -3438,98 +3515,100 @@ var file_flow_proto_goTypes = []interface{}{ (*TimestampPartitionRange)(nil), // 30: peerdb_flow.TimestampPartitionRange (*TID)(nil), // 31: peerdb_flow.TID (*TIDPartitionRange)(nil), // 32: peerdb_flow.TIDPartitionRange - (*PartitionRange)(nil), // 33: peerdb_flow.PartitionRange - (*QRepWriteMode)(nil), // 34: peerdb_flow.QRepWriteMode - (*QRepConfig)(nil), // 35: peerdb_flow.QRepConfig - (*QRepPartition)(nil), // 36: peerdb_flow.QRepPartition - (*QRepPartitionBatch)(nil), // 37: peerdb_flow.QRepPartitionBatch - (*QRepParitionResult)(nil), // 38: peerdb_flow.QRepParitionResult - (*DropFlowInput)(nil), // 39: peerdb_flow.DropFlowInput - (*DeltaAddedColumn)(nil), // 40: peerdb_flow.DeltaAddedColumn - (*TableSchemaDelta)(nil), // 41: peerdb_flow.TableSchemaDelta - (*ReplayTableSchemaDeltaInput)(nil), // 42: peerdb_flow.ReplayTableSchemaDeltaInput - nil, // 43: peerdb_flow.FlowConnectionConfigs.TableNameMappingEntry - nil, // 44: peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry - nil, // 45: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry - nil, // 46: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry - nil, // 47: peerdb_flow.StartFlowInput.RelationMessageMappingEntry - nil, // 48: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry - nil, // 49: peerdb_flow.SetupReplicationInput.TableNameMappingEntry - nil, // 50: peerdb_flow.CreateRawTableInput.TableNameMappingEntry - nil, // 51: peerdb_flow.TableSchema.ColumnsEntry - nil, // 52: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry - nil, // 53: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry - nil, // 54: peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry - (*Peer)(nil), // 55: peerdb_peers.Peer - (*timestamppb.Timestamp)(nil), // 56: google.protobuf.Timestamp + (*XMINPartitionRange)(nil), // 33: peerdb_flow.XMINPartitionRange + (*PartitionRange)(nil), // 34: peerdb_flow.PartitionRange + (*QRepWriteMode)(nil), // 35: peerdb_flow.QRepWriteMode + (*QRepConfig)(nil), // 36: peerdb_flow.QRepConfig + (*QRepPartition)(nil), // 37: peerdb_flow.QRepPartition + (*QRepPartitionBatch)(nil), // 38: peerdb_flow.QRepPartitionBatch + (*QRepParitionResult)(nil), // 39: peerdb_flow.QRepParitionResult + (*DropFlowInput)(nil), // 40: peerdb_flow.DropFlowInput + (*DeltaAddedColumn)(nil), // 41: peerdb_flow.DeltaAddedColumn + (*TableSchemaDelta)(nil), // 42: peerdb_flow.TableSchemaDelta + (*ReplayTableSchemaDeltaInput)(nil), // 43: peerdb_flow.ReplayTableSchemaDeltaInput + nil, // 44: peerdb_flow.FlowConnectionConfigs.TableNameMappingEntry + nil, // 45: peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry + nil, // 46: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry + nil, // 47: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry + nil, // 48: peerdb_flow.StartFlowInput.RelationMessageMappingEntry + nil, // 49: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry + nil, // 50: peerdb_flow.SetupReplicationInput.TableNameMappingEntry + nil, // 51: peerdb_flow.CreateRawTableInput.TableNameMappingEntry + nil, // 52: peerdb_flow.TableSchema.ColumnsEntry + nil, // 53: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry + nil, // 54: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry + nil, // 55: peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry + (*Peer)(nil), // 56: peerdb_peers.Peer + (*timestamppb.Timestamp)(nil), // 57: google.protobuf.Timestamp } var file_flow_proto_depIdxs = []int32{ 3, // 0: peerdb_flow.RelationMessage.columns:type_name -> peerdb_flow.RelationMessageColumn - 55, // 1: peerdb_flow.FlowConnectionConfigs.source:type_name -> peerdb_peers.Peer - 55, // 2: peerdb_flow.FlowConnectionConfigs.destination:type_name -> peerdb_peers.Peer + 56, // 1: peerdb_flow.FlowConnectionConfigs.source:type_name -> peerdb_peers.Peer + 56, // 2: peerdb_flow.FlowConnectionConfigs.destination:type_name -> peerdb_peers.Peer 22, // 3: peerdb_flow.FlowConnectionConfigs.table_schema:type_name -> peerdb_flow.TableSchema - 43, // 4: peerdb_flow.FlowConnectionConfigs.table_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameMappingEntry - 44, // 5: peerdb_flow.FlowConnectionConfigs.src_table_id_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry - 45, // 6: peerdb_flow.FlowConnectionConfigs.table_name_schema_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry - 55, // 7: peerdb_flow.FlowConnectionConfigs.metadata_peer:type_name -> peerdb_peers.Peer + 44, // 4: peerdb_flow.FlowConnectionConfigs.table_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameMappingEntry + 45, // 5: peerdb_flow.FlowConnectionConfigs.src_table_id_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry + 46, // 6: peerdb_flow.FlowConnectionConfigs.table_name_schema_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry + 56, // 7: peerdb_flow.FlowConnectionConfigs.metadata_peer:type_name -> peerdb_peers.Peer 0, // 8: peerdb_flow.FlowConnectionConfigs.snapshot_sync_mode:type_name -> peerdb_flow.QRepSyncMode 0, // 9: peerdb_flow.FlowConnectionConfigs.cdc_sync_mode:type_name -> peerdb_flow.QRepSyncMode - 46, // 10: peerdb_flow.SyncFlowOptions.relation_message_mapping:type_name -> peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry - 56, // 11: peerdb_flow.LastSyncState.last_synced_at:type_name -> google.protobuf.Timestamp + 47, // 10: peerdb_flow.SyncFlowOptions.relation_message_mapping:type_name -> peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry + 57, // 11: peerdb_flow.LastSyncState.last_synced_at:type_name -> google.protobuf.Timestamp 8, // 12: peerdb_flow.StartFlowInput.last_sync_state:type_name -> peerdb_flow.LastSyncState 5, // 13: peerdb_flow.StartFlowInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs 6, // 14: peerdb_flow.StartFlowInput.sync_flow_options:type_name -> peerdb_flow.SyncFlowOptions - 47, // 15: peerdb_flow.StartFlowInput.relation_message_mapping:type_name -> peerdb_flow.StartFlowInput.RelationMessageMappingEntry + 48, // 15: peerdb_flow.StartFlowInput.relation_message_mapping:type_name -> peerdb_flow.StartFlowInput.RelationMessageMappingEntry 5, // 16: peerdb_flow.StartNormalizeInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 55, // 17: peerdb_flow.GetLastSyncedIDInput.peer_connection_config:type_name -> peerdb_peers.Peer - 55, // 18: peerdb_flow.EnsurePullabilityInput.peer_connection_config:type_name -> peerdb_peers.Peer - 55, // 19: peerdb_flow.EnsurePullabilityBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 56, // 17: peerdb_flow.GetLastSyncedIDInput.peer_connection_config:type_name -> peerdb_peers.Peer + 56, // 18: peerdb_flow.EnsurePullabilityInput.peer_connection_config:type_name -> peerdb_peers.Peer + 56, // 19: peerdb_flow.EnsurePullabilityBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer 14, // 20: peerdb_flow.TableIdentifier.postgres_table_identifier:type_name -> peerdb_flow.PostgresTableIdentifier 15, // 21: peerdb_flow.EnsurePullabilityOutput.table_identifier:type_name -> peerdb_flow.TableIdentifier - 48, // 22: peerdb_flow.EnsurePullabilityBatchOutput.table_identifier_mapping:type_name -> peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry - 55, // 23: peerdb_flow.SetupReplicationInput.peer_connection_config:type_name -> peerdb_peers.Peer - 49, // 24: peerdb_flow.SetupReplicationInput.table_name_mapping:type_name -> peerdb_flow.SetupReplicationInput.TableNameMappingEntry - 55, // 25: peerdb_flow.SetupReplicationInput.destination_peer:type_name -> peerdb_peers.Peer - 55, // 26: peerdb_flow.CreateRawTableInput.peer_connection_config:type_name -> peerdb_peers.Peer - 50, // 27: peerdb_flow.CreateRawTableInput.table_name_mapping:type_name -> peerdb_flow.CreateRawTableInput.TableNameMappingEntry + 49, // 22: peerdb_flow.EnsurePullabilityBatchOutput.table_identifier_mapping:type_name -> peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry + 56, // 23: peerdb_flow.SetupReplicationInput.peer_connection_config:type_name -> peerdb_peers.Peer + 50, // 24: peerdb_flow.SetupReplicationInput.table_name_mapping:type_name -> peerdb_flow.SetupReplicationInput.TableNameMappingEntry + 56, // 25: peerdb_flow.SetupReplicationInput.destination_peer:type_name -> peerdb_peers.Peer + 56, // 26: peerdb_flow.CreateRawTableInput.peer_connection_config:type_name -> peerdb_peers.Peer + 51, // 27: peerdb_flow.CreateRawTableInput.table_name_mapping:type_name -> peerdb_flow.CreateRawTableInput.TableNameMappingEntry 0, // 28: peerdb_flow.CreateRawTableInput.cdc_sync_mode:type_name -> peerdb_flow.QRepSyncMode - 51, // 29: peerdb_flow.TableSchema.columns:type_name -> peerdb_flow.TableSchema.ColumnsEntry - 55, // 30: peerdb_flow.GetTableSchemaBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer - 52, // 31: peerdb_flow.GetTableSchemaBatchOutput.table_name_schema_mapping:type_name -> peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry - 55, // 32: peerdb_flow.SetupNormalizedTableInput.peer_connection_config:type_name -> peerdb_peers.Peer + 52, // 29: peerdb_flow.TableSchema.columns:type_name -> peerdb_flow.TableSchema.ColumnsEntry + 56, // 30: peerdb_flow.GetTableSchemaBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 53, // 31: peerdb_flow.GetTableSchemaBatchOutput.table_name_schema_mapping:type_name -> peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry + 56, // 32: peerdb_flow.SetupNormalizedTableInput.peer_connection_config:type_name -> peerdb_peers.Peer 22, // 33: peerdb_flow.SetupNormalizedTableInput.source_table_schema:type_name -> peerdb_flow.TableSchema - 55, // 34: peerdb_flow.SetupNormalizedTableBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer - 53, // 35: peerdb_flow.SetupNormalizedTableBatchInput.table_name_schema_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry - 54, // 36: peerdb_flow.SetupNormalizedTableBatchOutput.table_exists_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry - 56, // 37: peerdb_flow.TimestampPartitionRange.start:type_name -> google.protobuf.Timestamp - 56, // 38: peerdb_flow.TimestampPartitionRange.end:type_name -> google.protobuf.Timestamp + 56, // 34: peerdb_flow.SetupNormalizedTableBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 54, // 35: peerdb_flow.SetupNormalizedTableBatchInput.table_name_schema_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry + 55, // 36: peerdb_flow.SetupNormalizedTableBatchOutput.table_exists_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry + 57, // 37: peerdb_flow.TimestampPartitionRange.start:type_name -> google.protobuf.Timestamp + 57, // 38: peerdb_flow.TimestampPartitionRange.end:type_name -> google.protobuf.Timestamp 31, // 39: peerdb_flow.TIDPartitionRange.start:type_name -> peerdb_flow.TID 31, // 40: peerdb_flow.TIDPartitionRange.end:type_name -> peerdb_flow.TID 29, // 41: peerdb_flow.PartitionRange.int_range:type_name -> peerdb_flow.IntPartitionRange 30, // 42: peerdb_flow.PartitionRange.timestamp_range:type_name -> peerdb_flow.TimestampPartitionRange 32, // 43: peerdb_flow.PartitionRange.tid_range:type_name -> peerdb_flow.TIDPartitionRange - 1, // 44: peerdb_flow.QRepWriteMode.write_type:type_name -> peerdb_flow.QRepWriteType - 55, // 45: peerdb_flow.QRepConfig.source_peer:type_name -> peerdb_peers.Peer - 55, // 46: peerdb_flow.QRepConfig.destination_peer:type_name -> peerdb_peers.Peer - 0, // 47: peerdb_flow.QRepConfig.sync_mode:type_name -> peerdb_flow.QRepSyncMode - 34, // 48: peerdb_flow.QRepConfig.write_mode:type_name -> peerdb_flow.QRepWriteMode - 33, // 49: peerdb_flow.QRepPartition.range:type_name -> peerdb_flow.PartitionRange - 36, // 50: peerdb_flow.QRepPartitionBatch.partitions:type_name -> peerdb_flow.QRepPartition - 36, // 51: peerdb_flow.QRepParitionResult.partitions:type_name -> peerdb_flow.QRepPartition - 40, // 52: peerdb_flow.TableSchemaDelta.added_columns:type_name -> peerdb_flow.DeltaAddedColumn - 5, // 53: peerdb_flow.ReplayTableSchemaDeltaInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 41, // 54: peerdb_flow.ReplayTableSchemaDeltaInput.table_schema_delta:type_name -> peerdb_flow.TableSchemaDelta - 22, // 55: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema - 4, // 56: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage - 4, // 57: peerdb_flow.StartFlowInput.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage - 15, // 58: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry.value:type_name -> peerdb_flow.TableIdentifier - 22, // 59: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema - 22, // 60: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema - 61, // [61:61] is the sub-list for method output_type - 61, // [61:61] is the sub-list for method input_type - 61, // [61:61] is the sub-list for extension type_name - 61, // [61:61] is the sub-list for extension extendee - 0, // [0:61] is the sub-list for field type_name + 33, // 44: peerdb_flow.PartitionRange.xmin_range:type_name -> peerdb_flow.XMINPartitionRange + 1, // 45: peerdb_flow.QRepWriteMode.write_type:type_name -> peerdb_flow.QRepWriteType + 56, // 46: peerdb_flow.QRepConfig.source_peer:type_name -> peerdb_peers.Peer + 56, // 47: peerdb_flow.QRepConfig.destination_peer:type_name -> peerdb_peers.Peer + 0, // 48: peerdb_flow.QRepConfig.sync_mode:type_name -> peerdb_flow.QRepSyncMode + 35, // 49: peerdb_flow.QRepConfig.write_mode:type_name -> peerdb_flow.QRepWriteMode + 34, // 50: peerdb_flow.QRepPartition.range:type_name -> peerdb_flow.PartitionRange + 37, // 51: peerdb_flow.QRepPartitionBatch.partitions:type_name -> peerdb_flow.QRepPartition + 37, // 52: peerdb_flow.QRepParitionResult.partitions:type_name -> peerdb_flow.QRepPartition + 41, // 53: peerdb_flow.TableSchemaDelta.added_columns:type_name -> peerdb_flow.DeltaAddedColumn + 5, // 54: peerdb_flow.ReplayTableSchemaDeltaInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs + 42, // 55: peerdb_flow.ReplayTableSchemaDeltaInput.table_schema_delta:type_name -> peerdb_flow.TableSchemaDelta + 22, // 56: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema + 4, // 57: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage + 4, // 58: peerdb_flow.StartFlowInput.RelationMessageMappingEntry.value:type_name -> peerdb_flow.RelationMessage + 15, // 59: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry.value:type_name -> peerdb_flow.TableIdentifier + 22, // 60: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema + 22, // 61: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry.value:type_name -> peerdb_flow.TableSchema + 62, // [62:62] is the sub-list for method output_type + 62, // [62:62] is the sub-list for method input_type + 62, // [62:62] is the sub-list for extension type_name + 62, // [62:62] is the sub-list for extension extendee + 0, // [0:62] is the sub-list for field type_name } func init() { file_flow_proto_init() } @@ -3912,7 +3991,7 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[31].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PartitionRange); i { + switch v := v.(*XMINPartitionRange); i { case 0: return &v.state case 1: @@ -3924,7 +4003,7 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[32].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*QRepWriteMode); i { + switch v := v.(*PartitionRange); i { case 0: return &v.state case 1: @@ -3936,7 +4015,7 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[33].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*QRepConfig); i { + switch v := v.(*QRepWriteMode); i { case 0: return &v.state case 1: @@ -3948,7 +4027,7 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[34].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*QRepPartition); i { + switch v := v.(*QRepConfig); i { case 0: return &v.state case 1: @@ -3960,7 +4039,7 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[35].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*QRepPartitionBatch); i { + switch v := v.(*QRepPartition); i { case 0: return &v.state case 1: @@ -3972,7 +4051,7 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[36].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*QRepParitionResult); i { + switch v := v.(*QRepPartitionBatch); i { case 0: return &v.state case 1: @@ -3984,7 +4063,7 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[37].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DropFlowInput); i { + switch v := v.(*QRepParitionResult); i { case 0: return &v.state case 1: @@ -3996,7 +4075,7 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[38].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DeltaAddedColumn); i { + switch v := v.(*DropFlowInput); i { case 0: return &v.state case 1: @@ -4008,7 +4087,7 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[39].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*TableSchemaDelta); i { + switch v := v.(*DeltaAddedColumn); i { case 0: return &v.state case 1: @@ -4020,6 +4099,18 @@ func file_flow_proto_init() { } } file_flow_proto_msgTypes[40].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TableSchemaDelta); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_flow_proto_msgTypes[41].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ReplayTableSchemaDeltaInput); i { case 0: return &v.state @@ -4035,10 +4126,11 @@ func file_flow_proto_init() { file_flow_proto_msgTypes[13].OneofWrappers = []interface{}{ (*TableIdentifier_PostgresTableIdentifier)(nil), } - file_flow_proto_msgTypes[31].OneofWrappers = []interface{}{ + file_flow_proto_msgTypes[32].OneofWrappers = []interface{}{ (*PartitionRange_IntRange)(nil), (*PartitionRange_TimestampRange)(nil), (*PartitionRange_TidRange)(nil), + (*PartitionRange_XminRange)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -4046,7 +4138,7 @@ func file_flow_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_flow_proto_rawDesc, NumEnums: 2, - NumMessages: 53, + NumMessages: 54, NumExtensions: 0, NumServices: 0, }, diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index ad5fef2012..8e996e6446 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -321,9 +321,17 @@ pub struct TidPartitionRange { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct XminPartitionRange { + #[prost(uint32, tag="1")] + pub start: u32, + #[prost(uint32, tag="2")] + pub end: u32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PartitionRange { /// can be a timestamp range or an integer range - #[prost(oneof="partition_range::Range", tags="1, 2, 3")] + #[prost(oneof="partition_range::Range", tags="1, 2, 3, 4")] pub range: ::core::option::Option, } /// Nested message and enum types in `PartitionRange`. @@ -338,6 +346,8 @@ pub mod partition_range { TimestampRange(super::TimestampPartitionRange), #[prost(message, tag="3")] TidRange(super::TidPartitionRange), + #[prost(message, tag="4")] + XminRange(super::XminPartitionRange), } } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index d706bac0de..c8ad147bdc 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -2090,6 +2090,9 @@ impl serde::Serialize for PartitionRange { partition_range::Range::TidRange(v) => { struct_ser.serialize_field("tidRange", v)?; } + partition_range::Range::XminRange(v) => { + struct_ser.serialize_field("xminRange", v)?; + } } } struct_ser.end() @@ -2108,6 +2111,8 @@ impl<'de> serde::Deserialize<'de> for PartitionRange { "timestampRange", "tid_range", "tidRange", + "xmin_range", + "xminRange", ]; #[allow(clippy::enum_variant_names)] @@ -2115,6 +2120,7 @@ impl<'de> serde::Deserialize<'de> for PartitionRange { IntRange, TimestampRange, TidRange, + XminRange, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -2140,6 +2146,7 @@ impl<'de> serde::Deserialize<'de> for PartitionRange { "intRange" | "int_range" => Ok(GeneratedField::IntRange), "timestampRange" | "timestamp_range" => Ok(GeneratedField::TimestampRange), "tidRange" | "tid_range" => Ok(GeneratedField::TidRange), + "xminRange" | "xmin_range" => Ok(GeneratedField::XminRange), _ => Ok(GeneratedField::__SkipField__), } } @@ -2181,6 +2188,13 @@ impl<'de> serde::Deserialize<'de> for PartitionRange { return Err(serde::de::Error::duplicate_field("tidRange")); } range__ = map.next_value::<::std::option::Option<_>>()?.map(partition_range::Range::TidRange) +; + } + GeneratedField::XminRange => { + if range__.is_some() { + return Err(serde::de::Error::duplicate_field("xminRange")); + } + range__ = map.next_value::<::std::option::Option<_>>()?.map(partition_range::Range::XminRange) ; } GeneratedField::__SkipField__ => { @@ -5664,3 +5678,119 @@ impl<'de> serde::Deserialize<'de> for TimestampPartitionRange { deserializer.deserialize_struct("peerdb_flow.TimestampPartitionRange", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for XminPartitionRange { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.start != 0 { + len += 1; + } + if self.end != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_flow.XMINPartitionRange", len)?; + if self.start != 0 { + struct_ser.serialize_field("start", &self.start)?; + } + if self.end != 0 { + struct_ser.serialize_field("end", &self.end)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for XminPartitionRange { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "start", + "end", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Start, + End, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "start" => Ok(GeneratedField::Start), + "end" => Ok(GeneratedField::End), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = XminPartitionRange; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_flow.XMINPartitionRange") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut start__ = None; + let mut end__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Start => { + if start__.is_some() { + return Err(serde::de::Error::duplicate_field("start")); + } + start__ = + Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::End => { + if end__.is_some() { + return Err(serde::de::Error::duplicate_field("end")); + } + end__ = + Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(XminPartitionRange { + start: start__.unwrap_or_default(), + end: end__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("peerdb_flow.XMINPartitionRange", FIELDS, GeneratedVisitor) + } +} diff --git a/protos/flow.proto b/protos/flow.proto index 19526e0f1f..428b8ae253 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -202,12 +202,18 @@ message TIDPartitionRange { TID end = 2; } +message XMINPartitionRange { + uint32 start = 1; + uint32 end = 2; +} + message PartitionRange { // can be a timestamp range or an integer range oneof range { IntPartitionRange int_range = 1; TimestampPartitionRange timestamp_range = 2; TIDPartitionRange tid_range = 3; + XMINPartitionRange xmin_range = 4; } } diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index f0207a20e6..417a473da0 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -335,10 +335,16 @@ export interface TIDPartitionRange { end: TID | undefined; } +export interface XMINPartitionRange { + start: number; + end: number; +} + export interface PartitionRange { intRange?: IntPartitionRange | undefined; timestampRange?: TimestampPartitionRange | undefined; tidRange?: TIDPartitionRange | undefined; + xminRange?: XMINPartitionRange | undefined; } export interface QRepWriteMode { @@ -4430,8 +4436,79 @@ export const TIDPartitionRange = { }, }; +function createBaseXMINPartitionRange(): XMINPartitionRange { + return { start: 0, end: 0 }; +} + +export const XMINPartitionRange = { + encode(message: XMINPartitionRange, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.start !== 0) { + writer.uint32(8).uint32(message.start); + } + if (message.end !== 0) { + writer.uint32(16).uint32(message.end); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): XMINPartitionRange { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseXMINPartitionRange(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 8) { + break; + } + + message.start = reader.uint32(); + continue; + case 2: + if (tag !== 16) { + break; + } + + message.end = reader.uint32(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): XMINPartitionRange { + return { start: isSet(object.start) ? Number(object.start) : 0, end: isSet(object.end) ? Number(object.end) : 0 }; + }, + + toJSON(message: XMINPartitionRange): unknown { + const obj: any = {}; + if (message.start !== 0) { + obj.start = Math.round(message.start); + } + if (message.end !== 0) { + obj.end = Math.round(message.end); + } + return obj; + }, + + create, I>>(base?: I): XMINPartitionRange { + return XMINPartitionRange.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): XMINPartitionRange { + const message = createBaseXMINPartitionRange(); + message.start = object.start ?? 0; + message.end = object.end ?? 0; + return message; + }, +}; + function createBasePartitionRange(): PartitionRange { - return { intRange: undefined, timestampRange: undefined, tidRange: undefined }; + return { intRange: undefined, timestampRange: undefined, tidRange: undefined, xminRange: undefined }; } export const PartitionRange = { @@ -4445,6 +4522,9 @@ export const PartitionRange = { if (message.tidRange !== undefined) { TIDPartitionRange.encode(message.tidRange, writer.uint32(26).fork()).ldelim(); } + if (message.xminRange !== undefined) { + XMINPartitionRange.encode(message.xminRange, writer.uint32(34).fork()).ldelim(); + } return writer; }, @@ -4476,6 +4556,13 @@ export const PartitionRange = { message.tidRange = TIDPartitionRange.decode(reader, reader.uint32()); continue; + case 4: + if (tag !== 34) { + break; + } + + message.xminRange = XMINPartitionRange.decode(reader, reader.uint32()); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -4492,6 +4579,7 @@ export const PartitionRange = { ? TimestampPartitionRange.fromJSON(object.timestampRange) : undefined, tidRange: isSet(object.tidRange) ? TIDPartitionRange.fromJSON(object.tidRange) : undefined, + xminRange: isSet(object.xminRange) ? XMINPartitionRange.fromJSON(object.xminRange) : undefined, }; }, @@ -4506,6 +4594,9 @@ export const PartitionRange = { if (message.tidRange !== undefined) { obj.tidRange = TIDPartitionRange.toJSON(message.tidRange); } + if (message.xminRange !== undefined) { + obj.xminRange = XMINPartitionRange.toJSON(message.xminRange); + } return obj; }, @@ -4523,6 +4614,9 @@ export const PartitionRange = { message.tidRange = (object.tidRange !== undefined && object.tidRange !== null) ? TIDPartitionRange.fromPartial(object.tidRange) : undefined; + message.xminRange = (object.xminRange !== undefined && object.xminRange !== null) + ? XMINPartitionRange.fromPartial(object.xminRange) + : undefined; return message; }, };