From 5f444544a94362a11c519dff0570deddf25fd0c3 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 5 Dec 2023 10:22:11 -0500 Subject: [PATCH 1/2] add an endpoint to expose the PeerDB version from Flow API (#749) --- dev-peerdb.sh | 1 + docker-bake.hcl | 3 + docker-compose-dev.yml | 4 +- flow/generated/protos/route.pb.go | 434 ++++++++++++++++--------- flow/generated/protos/route.pb.gw.go | 69 ++++ flow/generated/protos/route_grpc.pb.go | 37 +++ nexus/pt/src/peerdb_route.rs | 10 + nexus/pt/src/peerdb_route.serde.rs | 167 ++++++++++ nexus/pt/src/peerdb_route.tonic.rs | 78 +++++ protos/route.proto | 11 + stacks/flow.Dockerfile | 6 +- ui/grpc_generated/route.ts | 132 ++++++++ 12 files changed, 795 insertions(+), 157 deletions(-) diff --git a/dev-peerdb.sh b/dev-peerdb.sh index d8b0e15137..2305884213 100755 --- a/dev-peerdb.sh +++ b/dev-peerdb.sh @@ -7,5 +7,6 @@ then exit 1 fi +export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD) docker compose -f docker-compose-dev.yml up --build \ --no-attach temporal --no-attach pyroscope --no-attach temporal-ui diff --git a/docker-bake.hcl b/docker-bake.hcl index ec3b7cd469..6e6098ca14 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -28,6 +28,9 @@ target "flow-api" { "linux/amd64", "linux/arm64", ] + args = { + PEERDB_VERSION_SHA_SHORT = "${SHA_SHORT}" + } tags = [ "${REGISTRY}/flow-api:${TAG}", "${REGISTRY}/flow-api:${SHA_SHORT}", diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 34d695f240..504c6d354e 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -8,7 +8,7 @@ x-catalog-config: &catalog-config PEERDB_CATALOG_DATABASE: postgres x-flow-worker-env: &flow-worker-env - # For Temporal Cloud, this will look like: + # For Temporal Cloud, this will look like: # ..tmprl.cloud:7233 TEMPORAL_HOST_PORT: temporal:7233 PEERDB_TEMPORAL_NAMESPACE: default @@ -115,6 +115,8 @@ services: context: . dockerfile: stacks/flow.Dockerfile target: flow-api + args: + PEERDB_VERSION_SHA_SHORT: ${PEERDB_VERSION_SHA_SHORT:-} ports: - 8112:8112 - 8113:8113 diff --git a/flow/generated/protos/route.pb.go b/flow/generated/protos/route.pb.go index d5c7b761e4..48bed94377 100644 --- a/flow/generated/protos/route.pb.go +++ b/flow/generated/protos/route.pb.go @@ -2004,6 +2004,91 @@ func (x *FlowStateChangeResponse) GetErrorMessage() string { return "" } +type PeerDBVersionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *PeerDBVersionRequest) Reset() { + *x = PeerDBVersionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_route_proto_msgTypes[32] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PeerDBVersionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PeerDBVersionRequest) ProtoMessage() {} + +func (x *PeerDBVersionRequest) ProtoReflect() protoreflect.Message { + mi := &file_route_proto_msgTypes[32] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PeerDBVersionRequest.ProtoReflect.Descriptor instead. +func (*PeerDBVersionRequest) Descriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{32} +} + +type PeerDBVersionResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` +} + +func (x *PeerDBVersionResponse) Reset() { + *x = PeerDBVersionResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_route_proto_msgTypes[33] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PeerDBVersionResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PeerDBVersionResponse) ProtoMessage() {} + +func (x *PeerDBVersionResponse) ProtoReflect() protoreflect.Message { + mi := &file_route_proto_msgTypes[33] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PeerDBVersionResponse.ProtoReflect.Descriptor instead. +func (*PeerDBVersionResponse) Descriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{33} +} + +func (x *PeerDBVersionResponse) GetVersion() string { + if x != nil { + return x.Version + } + return "" +} + var File_route_proto protoreflect.FileDescriptor var file_route_proto_rawDesc = []byte{ @@ -2232,133 +2317,144 @@ var file_route_proto_rawDesc = []byte{ 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, - 0x42, 0x0a, 0x12, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x10, 0x43, 0x52, 0x45, 0x41, 0x54, 0x49, 0x4f, - 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x56, - 0x41, 0x4c, 0x49, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, - 0x44, 0x10, 0x02, 0x2a, 0x43, 0x0a, 0x10, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, - 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x56, 0x41, 0x4c, 0x49, 0x44, - 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, - 0x0b, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, - 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x43, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, - 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x55, - 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, - 0x45, 0x5f, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x53, - 0x54, 0x41, 0x54, 0x45, 0x5f, 0x50, 0x41, 0x55, 0x53, 0x45, 0x44, 0x10, 0x02, 0x32, 0x82, 0x0d, - 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x74, 0x0a, - 0x0c, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x21, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, - 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, - 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, 0x3a, 0x01, 0x2a, 0x22, - 0x12, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, - 0x61, 0x74, 0x65, 0x12, 0x6c, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, - 0x72, 0x12, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, - 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, - 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x3a, 0x01, 0x2a, 0x22, - 0x10, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x63, 0x72, 0x65, 0x61, 0x74, - 0x65, 0x12, 0x64, 0x0a, 0x08, 0x44, 0x72, 0x6f, 0x70, 0x50, 0x65, 0x65, 0x72, 0x12, 0x1d, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x44, 0x72, 0x6f, - 0x70, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x44, 0x72, 0x6f, 0x70, - 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, - 0xe4, 0x93, 0x02, 0x13, 0x3a, 0x01, 0x2a, 0x22, 0x0e, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, - 0x72, 0x73, 0x2f, 0x64, 0x72, 0x6f, 0x70, 0x12, 0x79, 0x0a, 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, - 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, - 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x70, + 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, + 0x16, 0x0a, 0x14, 0x50, 0x65, 0x65, 0x72, 0x44, 0x42, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x31, 0x0a, 0x15, 0x50, 0x65, 0x65, 0x72, 0x44, + 0x42, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2a, 0x42, 0x0a, 0x12, 0x56, 0x61, + 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x12, 0x14, 0x0a, 0x10, 0x43, 0x52, 0x45, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, + 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, + 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x02, 0x2a, 0x43, + 0x0a, 0x10, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, + 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x52, + 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, + 0x44, 0x10, 0x02, 0x2a, 0x43, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, + 0x4e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x52, 0x55, 0x4e, + 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, + 0x50, 0x41, 0x55, 0x53, 0x45, 0x44, 0x10, 0x02, 0x32, 0xee, 0x0d, 0x0a, 0x0b, 0x46, 0x6c, 0x6f, + 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x74, 0x0a, 0x0c, 0x56, 0x61, 0x6c, 0x69, + 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, + 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, + 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, 0x3a, 0x01, 0x2a, 0x22, 0x12, 0x2f, 0x76, 0x31, 0x2f, + 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x12, 0x6c, + 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, - 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x1f, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x19, 0x3a, 0x01, 0x2a, 0x22, 0x14, 0x2f, 0x76, - 0x31, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x2f, 0x63, 0x64, 0x63, 0x2f, 0x63, 0x72, 0x65, 0x61, - 0x74, 0x65, 0x12, 0x7d, 0x0a, 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, - 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, + 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, + 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x3a, 0x01, 0x2a, 0x22, 0x10, 0x2f, 0x76, 0x31, 0x2f, + 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x12, 0x64, 0x0a, 0x08, + 0x44, 0x72, 0x6f, 0x70, 0x50, 0x65, 0x65, 0x72, 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x44, 0x72, 0x6f, 0x70, 0x50, 0x65, 0x65, 0x72, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x44, 0x72, 0x6f, 0x70, 0x50, 0x65, 0x65, 0x72, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x3a, + 0x01, 0x2a, 0x22, 0x0e, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x64, 0x72, + 0x6f, 0x70, 0x12, 0x79, 0x0a, 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, + 0x6c, 0x6f, 0x77, 0x12, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, + 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1f, 0x82, 0xd3, + 0xe4, 0x93, 0x02, 0x19, 0x3a, 0x01, 0x2a, 0x22, 0x14, 0x2f, 0x76, 0x31, 0x2f, 0x66, 0x6c, 0x6f, + 0x77, 0x73, 0x2f, 0x63, 0x64, 0x63, 0x2f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x12, 0x7d, 0x0a, + 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x12, + 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, - 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, - 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x20, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1a, 0x3a, 0x01, 0x2a, 0x22, 0x15, 0x2f, 0x76, 0x31, 0x2f, - 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x2f, 0x71, 0x72, 0x65, 0x70, 0x2f, 0x63, 0x72, 0x65, 0x61, 0x74, - 0x65, 0x12, 0x79, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x12, + 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x20, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x1a, 0x3a, 0x01, 0x2a, 0x22, 0x15, 0x2f, 0x76, 0x31, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x73, + 0x2f, 0x71, 0x72, 0x65, 0x70, 0x2f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x12, 0x79, 0x0a, 0x0a, + 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, + 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, + 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, + 0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, + 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x12, 0x74, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x54, 0x61, + 0x62, 0x6c, 0x65, 0x73, 0x49, 0x6e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x21, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x63, 0x68, 0x65, + 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x18, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x12, 0x12, 0x10, 0x2f, 0x76, 0x31, + 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x7c, 0x0a, + 0x0c, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x2d, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x6f, 0x73, + 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, + 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x41, 0x6c, 0x6c, 0x54, + 0x61, 0x62, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1c, 0x82, + 0xd3, 0xe4, 0x93, 0x02, 0x16, 0x12, 0x14, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, + 0x2f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x2f, 0x61, 0x6c, 0x6c, 0x12, 0x6e, 0x0a, 0x0a, 0x47, + 0x65, 0x74, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x43, 0x6f, + 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x54, 0x61, 0x62, 0x6c, + 0x65, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, + 0x65, 0x72, 0x73, 0x2f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x12, 0x81, 0x01, 0x0a, 0x0b, + 0x47, 0x65, 0x74, 0x53, 0x6c, 0x6f, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x2e, 0x70, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, + 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, + 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x53, 0x6c, + 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x6c, + 0x6f, 0x74, 0x73, 0x2f, 0x7b, 0x70, 0x65, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x12, + 0x81, 0x01, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, 0x74, 0x69, 0x76, - 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, + 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x65, - 0x65, 0x72, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, 0x76, 0x31, 0x2f, 0x70, - 0x65, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x12, 0x74, 0x0a, 0x11, - 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x49, 0x6e, 0x53, 0x63, 0x68, 0x65, 0x6d, - 0x61, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, - 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x71, + 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, + 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, + 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x2f, 0x7b, 0x70, 0x65, 0x65, 0x72, 0x5f, 0x6e, 0x61, + 0x6d, 0x65, 0x7d, 0x12, 0x6a, 0x0a, 0x0c, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x46, + 0x6c, 0x6f, 0x77, 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, + 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x3a, 0x01, 0x2a, 0x22, 0x10, 0x2f, + 0x76, 0x31, 0x2f, 0x6d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x2f, 0x64, 0x72, 0x6f, 0x70, 0x12, + 0x60, 0x0a, 0x0f, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, + 0x67, 0x65, 0x12, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, + 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, + 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x12, 0x7a, 0x0a, 0x0c, 0x4d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, + 0x2e, 0x4d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, - 0x75, 0x74, 0x65, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x18, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x12, - 0x12, 0x10, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x74, 0x61, 0x62, 0x6c, - 0x65, 0x73, 0x12, 0x7c, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x54, 0x61, 0x62, 0x6c, - 0x65, 0x73, 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, - 0x65, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, - 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, - 0x2e, 0x41, 0x6c, 0x6c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x1c, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x16, 0x12, 0x14, 0x2f, 0x76, 0x31, 0x2f, - 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x2f, 0x61, 0x6c, 0x6c, - 0x12, 0x6e, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x12, 0x21, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x54, 0x61, - 0x62, 0x6c, 0x65, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, - 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, - 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, - 0x12, 0x81, 0x01, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x53, 0x6c, 0x6f, 0x74, 0x49, 0x6e, 0x66, 0x6f, - 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, - 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, 0x74, 0x69, - 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, - 0x65, 0x65, 0x72, 0x53, 0x6c, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, - 0x72, 0x73, 0x2f, 0x73, 0x6c, 0x6f, 0x74, 0x73, 0x2f, 0x7b, 0x70, 0x65, 0x65, 0x72, 0x5f, 0x6e, - 0x61, 0x6d, 0x65, 0x7d, 0x12, 0x81, 0x01, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, - 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, - 0x75, 0x74, 0x65, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, - 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, - 0x74, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, - 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x2f, 0x7b, 0x70, 0x65, - 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x12, 0x6a, 0x0a, 0x0c, 0x53, 0x68, 0x75, 0x74, - 0x64, 0x6f, 0x77, 0x6e, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x3a, - 0x01, 0x2a, 0x22, 0x10, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x2f, - 0x64, 0x72, 0x6f, 0x70, 0x12, 0x60, 0x0a, 0x0f, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, - 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, - 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x46, 0x6c, 0x6f, - 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x7a, 0x0a, 0x0c, 0x4d, 0x69, 0x72, 0x72, 0x6f, 0x72, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, - 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, - 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x69, 0x72, 0x72, 0x6f, - 0x72, 0x73, 0x2f, 0x7b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, - 0x65, 0x7d, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, 0x0a, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xe2, 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, + 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x2f, 0x7b, 0x66, + 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x12, 0x6a, 0x0a, + 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x2e, 0x70, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x44, + 0x42, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, + 0x65, 0x65, 0x72, 0x44, 0x42, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x13, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0d, 0x12, 0x0b, 0x2f, 0x76, + 0x31, 0x2f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, + 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, 0x0a, 0x52, + 0x6f, 0x75, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, + 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, + 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, + 0x65, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xe2, + 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, 0x47, 0x50, + 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -2374,7 +2470,7 @@ func file_route_proto_rawDescGZIP() []byte { } var file_route_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_route_proto_msgTypes = make([]protoimpl.MessageInfo, 32) +var file_route_proto_msgTypes = make([]protoimpl.MessageInfo, 34) var file_route_proto_goTypes = []interface{}{ (ValidatePeerStatus)(0), // 0: peerdb_route.ValidatePeerStatus (CreatePeerStatus)(0), // 1: peerdb_route.CreatePeerStatus @@ -2411,30 +2507,32 @@ var file_route_proto_goTypes = []interface{}{ (*MirrorStatusResponse)(nil), // 32: peerdb_route.MirrorStatusResponse (*FlowStateChangeRequest)(nil), // 33: peerdb_route.FlowStateChangeRequest (*FlowStateChangeResponse)(nil), // 34: peerdb_route.FlowStateChangeResponse - (*FlowConnectionConfigs)(nil), // 35: peerdb_flow.FlowConnectionConfigs - (*QRepConfig)(nil), // 36: peerdb_flow.QRepConfig - (*Peer)(nil), // 37: peerdb_peers.Peer - (*timestamppb.Timestamp)(nil), // 38: google.protobuf.Timestamp + (*PeerDBVersionRequest)(nil), // 35: peerdb_route.PeerDBVersionRequest + (*PeerDBVersionResponse)(nil), // 36: peerdb_route.PeerDBVersionResponse + (*FlowConnectionConfigs)(nil), // 37: peerdb_flow.FlowConnectionConfigs + (*QRepConfig)(nil), // 38: peerdb_flow.QRepConfig + (*Peer)(nil), // 39: peerdb_peers.Peer + (*timestamppb.Timestamp)(nil), // 40: google.protobuf.Timestamp } var file_route_proto_depIdxs = []int32{ - 35, // 0: peerdb_route.CreateCDCFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 36, // 1: peerdb_route.CreateQRepFlowRequest.qrep_config:type_name -> peerdb_flow.QRepConfig - 37, // 2: peerdb_route.ShutdownRequest.source_peer:type_name -> peerdb_peers.Peer - 37, // 3: peerdb_route.ShutdownRequest.destination_peer:type_name -> peerdb_peers.Peer - 37, // 4: peerdb_route.ValidatePeerRequest.peer:type_name -> peerdb_peers.Peer - 37, // 5: peerdb_route.CreatePeerRequest.peer:type_name -> peerdb_peers.Peer + 37, // 0: peerdb_route.CreateCDCFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs + 38, // 1: peerdb_route.CreateQRepFlowRequest.qrep_config:type_name -> peerdb_flow.QRepConfig + 39, // 2: peerdb_route.ShutdownRequest.source_peer:type_name -> peerdb_peers.Peer + 39, // 3: peerdb_route.ShutdownRequest.destination_peer:type_name -> peerdb_peers.Peer + 39, // 4: peerdb_route.ValidatePeerRequest.peer:type_name -> peerdb_peers.Peer + 39, // 5: peerdb_route.CreatePeerRequest.peer:type_name -> peerdb_peers.Peer 0, // 6: peerdb_route.ValidatePeerResponse.status:type_name -> peerdb_route.ValidatePeerStatus 1, // 7: peerdb_route.CreatePeerResponse.status:type_name -> peerdb_route.CreatePeerStatus - 38, // 8: peerdb_route.PartitionStatus.start_time:type_name -> google.protobuf.Timestamp - 38, // 9: peerdb_route.PartitionStatus.end_time:type_name -> google.protobuf.Timestamp - 36, // 10: peerdb_route.QRepMirrorStatus.config:type_name -> peerdb_flow.QRepConfig + 40, // 8: peerdb_route.PartitionStatus.start_time:type_name -> google.protobuf.Timestamp + 40, // 9: peerdb_route.PartitionStatus.end_time:type_name -> google.protobuf.Timestamp + 38, // 10: peerdb_route.QRepMirrorStatus.config:type_name -> peerdb_flow.QRepConfig 16, // 11: peerdb_route.QRepMirrorStatus.partitions:type_name -> peerdb_route.PartitionStatus - 38, // 12: peerdb_route.CDCSyncStatus.start_time:type_name -> google.protobuf.Timestamp - 38, // 13: peerdb_route.CDCSyncStatus.end_time:type_name -> google.protobuf.Timestamp + 40, // 12: peerdb_route.CDCSyncStatus.start_time:type_name -> google.protobuf.Timestamp + 40, // 13: peerdb_route.CDCSyncStatus.end_time:type_name -> google.protobuf.Timestamp 26, // 14: peerdb_route.PeerSlotResponse.slot_data:type_name -> peerdb_route.SlotInfo 27, // 15: peerdb_route.PeerStatResponse.stat_data:type_name -> peerdb_route.StatInfo 17, // 16: peerdb_route.SnapshotStatus.clones:type_name -> peerdb_route.QRepMirrorStatus - 35, // 17: peerdb_route.CDCMirrorStatus.config:type_name -> peerdb_flow.FlowConnectionConfigs + 37, // 17: peerdb_route.CDCMirrorStatus.config:type_name -> peerdb_flow.FlowConnectionConfigs 30, // 18: peerdb_route.CDCMirrorStatus.snapshot_status:type_name -> peerdb_route.SnapshotStatus 18, // 19: peerdb_route.CDCMirrorStatus.cdc_syncs:type_name -> peerdb_route.CDCSyncStatus 17, // 20: peerdb_route.MirrorStatusResponse.qrep_status:type_name -> peerdb_route.QRepMirrorStatus @@ -2454,22 +2552,24 @@ var file_route_proto_depIdxs = []int32{ 7, // 34: peerdb_route.FlowService.ShutdownFlow:input_type -> peerdb_route.ShutdownRequest 33, // 35: peerdb_route.FlowService.FlowStateChange:input_type -> peerdb_route.FlowStateChangeRequest 15, // 36: peerdb_route.FlowService.MirrorStatus:input_type -> peerdb_route.MirrorStatusRequest - 13, // 37: peerdb_route.FlowService.ValidatePeer:output_type -> peerdb_route.ValidatePeerResponse - 14, // 38: peerdb_route.FlowService.CreatePeer:output_type -> peerdb_route.CreatePeerResponse - 12, // 39: peerdb_route.FlowService.DropPeer:output_type -> peerdb_route.DropPeerResponse - 4, // 40: peerdb_route.FlowService.CreateCDCFlow:output_type -> peerdb_route.CreateCDCFlowResponse - 6, // 41: peerdb_route.FlowService.CreateQRepFlow:output_type -> peerdb_route.CreateQRepFlowResponse - 19, // 42: peerdb_route.FlowService.GetSchemas:output_type -> peerdb_route.PeerSchemasResponse - 21, // 43: peerdb_route.FlowService.GetTablesInSchema:output_type -> peerdb_route.SchemaTablesResponse - 22, // 44: peerdb_route.FlowService.GetAllTables:output_type -> peerdb_route.AllTablesResponse - 24, // 45: peerdb_route.FlowService.GetColumns:output_type -> peerdb_route.TableColumnsResponse - 28, // 46: peerdb_route.FlowService.GetSlotInfo:output_type -> peerdb_route.PeerSlotResponse - 29, // 47: peerdb_route.FlowService.GetStatInfo:output_type -> peerdb_route.PeerStatResponse - 8, // 48: peerdb_route.FlowService.ShutdownFlow:output_type -> peerdb_route.ShutdownResponse - 34, // 49: peerdb_route.FlowService.FlowStateChange:output_type -> peerdb_route.FlowStateChangeResponse - 32, // 50: peerdb_route.FlowService.MirrorStatus:output_type -> peerdb_route.MirrorStatusResponse - 37, // [37:51] is the sub-list for method output_type - 23, // [23:37] is the sub-list for method input_type + 35, // 37: peerdb_route.FlowService.GetVersion:input_type -> peerdb_route.PeerDBVersionRequest + 13, // 38: peerdb_route.FlowService.ValidatePeer:output_type -> peerdb_route.ValidatePeerResponse + 14, // 39: peerdb_route.FlowService.CreatePeer:output_type -> peerdb_route.CreatePeerResponse + 12, // 40: peerdb_route.FlowService.DropPeer:output_type -> peerdb_route.DropPeerResponse + 4, // 41: peerdb_route.FlowService.CreateCDCFlow:output_type -> peerdb_route.CreateCDCFlowResponse + 6, // 42: peerdb_route.FlowService.CreateQRepFlow:output_type -> peerdb_route.CreateQRepFlowResponse + 19, // 43: peerdb_route.FlowService.GetSchemas:output_type -> peerdb_route.PeerSchemasResponse + 21, // 44: peerdb_route.FlowService.GetTablesInSchema:output_type -> peerdb_route.SchemaTablesResponse + 22, // 45: peerdb_route.FlowService.GetAllTables:output_type -> peerdb_route.AllTablesResponse + 24, // 46: peerdb_route.FlowService.GetColumns:output_type -> peerdb_route.TableColumnsResponse + 28, // 47: peerdb_route.FlowService.GetSlotInfo:output_type -> peerdb_route.PeerSlotResponse + 29, // 48: peerdb_route.FlowService.GetStatInfo:output_type -> peerdb_route.PeerStatResponse + 8, // 49: peerdb_route.FlowService.ShutdownFlow:output_type -> peerdb_route.ShutdownResponse + 34, // 50: peerdb_route.FlowService.FlowStateChange:output_type -> peerdb_route.FlowStateChangeResponse + 32, // 51: peerdb_route.FlowService.MirrorStatus:output_type -> peerdb_route.MirrorStatusResponse + 36, // 52: peerdb_route.FlowService.GetVersion:output_type -> peerdb_route.PeerDBVersionResponse + 38, // [38:53] is the sub-list for method output_type + 23, // [23:38] is the sub-list for method input_type 23, // [23:23] is the sub-list for extension type_name 23, // [23:23] is the sub-list for extension extendee 0, // [0:23] is the sub-list for field type_name @@ -2867,6 +2967,30 @@ func file_route_proto_init() { return nil } } + file_route_proto_msgTypes[32].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PeerDBVersionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_route_proto_msgTypes[33].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PeerDBVersionResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_route_proto_msgTypes[29].OneofWrappers = []interface{}{ (*MirrorStatusResponse_QrepStatus)(nil), @@ -2878,7 +3002,7 @@ func file_route_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_route_proto_rawDesc, NumEnums: 3, - NumMessages: 32, + NumMessages: 34, NumExtensions: 0, NumServices: 1, }, diff --git a/flow/generated/protos/route.pb.gw.go b/flow/generated/protos/route.pb.gw.go index a54e9601e9..708d0adb4c 100644 --- a/flow/generated/protos/route.pb.gw.go +++ b/flow/generated/protos/route.pb.gw.go @@ -535,6 +535,24 @@ func local_request_FlowService_MirrorStatus_0(ctx context.Context, marshaler run } +func request_FlowService_GetVersion_0(ctx context.Context, marshaler runtime.Marshaler, client FlowServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq PeerDBVersionRequest + var metadata runtime.ServerMetadata + + msg, err := client.GetVersion(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_FlowService_GetVersion_0(ctx context.Context, marshaler runtime.Marshaler, server FlowServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq PeerDBVersionRequest + var metadata runtime.ServerMetadata + + msg, err := server.GetVersion(ctx, &protoReq) + return msg, metadata, err + +} + // RegisterFlowServiceHandlerServer registers the http handlers for service FlowService to "mux". // UnaryRPC :call FlowServiceServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. @@ -866,6 +884,31 @@ func RegisterFlowServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux }) + mux.Handle("GET", pattern_FlowService_GetVersion_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/peerdb_route.FlowService/GetVersion", runtime.WithHTTPPathPattern("/v1/version")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_FlowService_GetVersion_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_FlowService_GetVersion_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -1193,6 +1236,28 @@ func RegisterFlowServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux }) + mux.Handle("GET", pattern_FlowService_GetVersion_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/peerdb_route.FlowService/GetVersion", runtime.WithHTTPPathPattern("/v1/version")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_FlowService_GetVersion_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_FlowService_GetVersion_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -1222,6 +1287,8 @@ var ( pattern_FlowService_ShutdownFlow_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "mirrors", "drop"}, "")) pattern_FlowService_MirrorStatus_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2}, []string{"v1", "mirrors", "flow_job_name"}, "")) + + pattern_FlowService_GetVersion_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "version"}, "")) ) var ( @@ -1250,4 +1317,6 @@ var ( forward_FlowService_ShutdownFlow_0 = runtime.ForwardResponseMessage forward_FlowService_MirrorStatus_0 = runtime.ForwardResponseMessage + + forward_FlowService_GetVersion_0 = runtime.ForwardResponseMessage ) diff --git a/flow/generated/protos/route_grpc.pb.go b/flow/generated/protos/route_grpc.pb.go index f92077ebf1..6d36c84fee 100644 --- a/flow/generated/protos/route_grpc.pb.go +++ b/flow/generated/protos/route_grpc.pb.go @@ -33,6 +33,7 @@ const ( FlowService_ShutdownFlow_FullMethodName = "/peerdb_route.FlowService/ShutdownFlow" FlowService_FlowStateChange_FullMethodName = "/peerdb_route.FlowService/FlowStateChange" FlowService_MirrorStatus_FullMethodName = "/peerdb_route.FlowService/MirrorStatus" + FlowService_GetVersion_FullMethodName = "/peerdb_route.FlowService/GetVersion" ) // FlowServiceClient is the client API for FlowService service. @@ -53,6 +54,7 @@ type FlowServiceClient interface { ShutdownFlow(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) FlowStateChange(ctx context.Context, in *FlowStateChangeRequest, opts ...grpc.CallOption) (*FlowStateChangeResponse, error) MirrorStatus(ctx context.Context, in *MirrorStatusRequest, opts ...grpc.CallOption) (*MirrorStatusResponse, error) + GetVersion(ctx context.Context, in *PeerDBVersionRequest, opts ...grpc.CallOption) (*PeerDBVersionResponse, error) } type flowServiceClient struct { @@ -189,6 +191,15 @@ func (c *flowServiceClient) MirrorStatus(ctx context.Context, in *MirrorStatusRe return out, nil } +func (c *flowServiceClient) GetVersion(ctx context.Context, in *PeerDBVersionRequest, opts ...grpc.CallOption) (*PeerDBVersionResponse, error) { + out := new(PeerDBVersionResponse) + err := c.cc.Invoke(ctx, FlowService_GetVersion_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // FlowServiceServer is the server API for FlowService service. // All implementations must embed UnimplementedFlowServiceServer // for forward compatibility @@ -207,6 +218,7 @@ type FlowServiceServer interface { ShutdownFlow(context.Context, *ShutdownRequest) (*ShutdownResponse, error) FlowStateChange(context.Context, *FlowStateChangeRequest) (*FlowStateChangeResponse, error) MirrorStatus(context.Context, *MirrorStatusRequest) (*MirrorStatusResponse, error) + GetVersion(context.Context, *PeerDBVersionRequest) (*PeerDBVersionResponse, error) mustEmbedUnimplementedFlowServiceServer() } @@ -256,6 +268,9 @@ func (UnimplementedFlowServiceServer) FlowStateChange(context.Context, *FlowStat func (UnimplementedFlowServiceServer) MirrorStatus(context.Context, *MirrorStatusRequest) (*MirrorStatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method MirrorStatus not implemented") } +func (UnimplementedFlowServiceServer) GetVersion(context.Context, *PeerDBVersionRequest) (*PeerDBVersionResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetVersion not implemented") +} func (UnimplementedFlowServiceServer) mustEmbedUnimplementedFlowServiceServer() {} // UnsafeFlowServiceServer may be embedded to opt out of forward compatibility for this service. @@ -521,6 +536,24 @@ func _FlowService_MirrorStatus_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _FlowService_GetVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PeerDBVersionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FlowServiceServer).GetVersion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: FlowService_GetVersion_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FlowServiceServer).GetVersion(ctx, req.(*PeerDBVersionRequest)) + } + return interceptor(ctx, in, info, handler) +} + // FlowService_ServiceDesc is the grpc.ServiceDesc for FlowService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -584,6 +617,10 @@ var FlowService_ServiceDesc = grpc.ServiceDesc{ MethodName: "MirrorStatus", Handler: _FlowService_MirrorStatus_Handler, }, + { + MethodName: "GetVersion", + Handler: _FlowService_GetVersion_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "route.proto", diff --git a/nexus/pt/src/peerdb_route.rs b/nexus/pt/src/peerdb_route.rs index aeb9582dbb..f162fad0ef 100644 --- a/nexus/pt/src/peerdb_route.rs +++ b/nexus/pt/src/peerdb_route.rs @@ -278,6 +278,16 @@ pub struct FlowStateChangeResponse { #[prost(string, tag="2")] pub error_message: ::prost::alloc::string::String, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PeerDbVersionRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PeerDbVersionResponse { + #[prost(string, tag="1")] + pub version: ::prost::alloc::string::String, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum ValidatePeerStatus { diff --git a/nexus/pt/src/peerdb_route.serde.rs b/nexus/pt/src/peerdb_route.serde.rs index aea2c0a3dc..87b6bd8557 100644 --- a/nexus/pt/src/peerdb_route.serde.rs +++ b/nexus/pt/src/peerdb_route.serde.rs @@ -2035,6 +2035,173 @@ impl<'de> serde::Deserialize<'de> for PartitionStatus { deserializer.deserialize_struct("peerdb_route.PartitionStatus", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PeerDbVersionRequest { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let len = 0; + let struct_ser = serializer.serialize_struct("peerdb_route.PeerDBVersionRequest", len)?; + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PeerDbVersionRequest { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + Ok(GeneratedField::__SkipField__) + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PeerDbVersionRequest; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_route.PeerDBVersionRequest") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + while map.next_key::()?.is_some() { + let _ = map.next_value::()?; + } + Ok(PeerDbVersionRequest { + }) + } + } + deserializer.deserialize_struct("peerdb_route.PeerDBVersionRequest", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for PeerDbVersionResponse { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.version.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_route.PeerDBVersionResponse", len)?; + if !self.version.is_empty() { + struct_ser.serialize_field("version", &self.version)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PeerDbVersionResponse { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "version", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Version, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "version" => Ok(GeneratedField::Version), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PeerDbVersionResponse; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_route.PeerDBVersionResponse") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut version__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Version => { + if version__.is_some() { + return Err(serde::de::Error::duplicate_field("version")); + } + version__ = Some(map.next_value()?); + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(PeerDbVersionResponse { + version: version__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("peerdb_route.PeerDBVersionResponse", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PeerSchemasResponse { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/nexus/pt/src/peerdb_route.tonic.rs b/nexus/pt/src/peerdb_route.tonic.rs index b606dcdc47..b6a2c9506d 100644 --- a/nexus/pt/src/peerdb_route.tonic.rs +++ b/nexus/pt/src/peerdb_route.tonic.rs @@ -451,6 +451,32 @@ pub mod flow_service_client { .insert(GrpcMethod::new("peerdb_route.FlowService", "MirrorStatus")); self.inner.unary(req, path, codec).await } + /// + pub async fn get_version( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/peerdb_route.FlowService/GetVersion", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("peerdb_route.FlowService", "GetVersion")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -572,6 +598,14 @@ pub mod flow_service_server { tonic::Response, tonic::Status, >; + /// + async fn get_version( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// #[derive(Debug)] @@ -1297,6 +1331,50 @@ pub mod flow_service_server { }; Box::pin(fut) } + "/peerdb_route.FlowService/GetVersion" => { + #[allow(non_camel_case_types)] + struct GetVersionSvc(pub Arc); + impl< + T: FlowService, + > tonic::server::UnaryService + for GetVersionSvc { + type Response = super::PeerDbVersionResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).get_version(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetVersionSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/protos/route.proto b/protos/route.proto index d32b70c1d7..db96121ca4 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -197,6 +197,13 @@ message FlowStateChangeResponse { string error_message = 2; } +message PeerDBVersionRequest { +} + +message PeerDBVersionResponse { + string version = 1; +} + service FlowService { rpc ValidatePeer(ValidatePeerRequest) returns (ValidatePeerResponse) { option (google.api.http) = { @@ -258,4 +265,8 @@ service FlowService { rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) { option (google.api.http) = { get: "/v1/mirrors/{flow_job_name}" }; } + + rpc GetVersion(PeerDBVersionRequest) returns (PeerDBVersionResponse) { + option (google.api.http) = { get: "/v1/version" }; + } } diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index ca607170d1..6cb36b167a 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -5,7 +5,7 @@ RUN apt-get update && apt-get install -y gcc libgeos-dev WORKDIR /root/flow # first copy only go.mod and go.sum to cache dependencies -COPY flow/go.mod flow/go.sum . +COPY flow/go.mod flow/go.sum ./ # download all the dependencies RUN go mod download @@ -24,6 +24,10 @@ WORKDIR /root COPY --from=builder /root/peer-flow . FROM flow-base AS flow-api + +ARG PEERDB_VERSION_SHA_SHORT +ENV PEERDB_VERSION_SHA_SHORT=${PEERDB_VERSION_SHA_SHORT} + EXPOSE 8112 8113 ENTRYPOINT [\ "./peer-flow",\ diff --git a/ui/grpc_generated/route.ts b/ui/grpc_generated/route.ts index 72d6d844e5..36b0e8a2d0 100644 --- a/ui/grpc_generated/route.ts +++ b/ui/grpc_generated/route.ts @@ -309,6 +309,13 @@ export interface FlowStateChangeResponse { errorMessage: string; } +export interface PeerDBVersionRequest { +} + +export interface PeerDBVersionResponse { + version: string; +} + function createBaseCreateCDCFlowRequest(): CreateCDCFlowRequest { return { connectionConfigs: undefined, createCatalogEntry: false }; } @@ -2763,6 +2770,106 @@ export const FlowStateChangeResponse = { }, }; +function createBasePeerDBVersionRequest(): PeerDBVersionRequest { + return {}; +} + +export const PeerDBVersionRequest = { + encode(_: PeerDBVersionRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PeerDBVersionRequest { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePeerDBVersionRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(_: any): PeerDBVersionRequest { + return {}; + }, + + toJSON(_: PeerDBVersionRequest): unknown { + const obj: any = {}; + return obj; + }, + + create, I>>(base?: I): PeerDBVersionRequest { + return PeerDBVersionRequest.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(_: I): PeerDBVersionRequest { + const message = createBasePeerDBVersionRequest(); + return message; + }, +}; + +function createBasePeerDBVersionResponse(): PeerDBVersionResponse { + return { version: "" }; +} + +export const PeerDBVersionResponse = { + encode(message: PeerDBVersionResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.version !== "") { + writer.uint32(10).string(message.version); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PeerDBVersionResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePeerDBVersionResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.version = reader.string(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): PeerDBVersionResponse { + return { version: isSet(object.version) ? String(object.version) : "" }; + }, + + toJSON(message: PeerDBVersionResponse): unknown { + const obj: any = {}; + if (message.version !== "") { + obj.version = message.version; + } + return obj; + }, + + create, I>>(base?: I): PeerDBVersionResponse { + return PeerDBVersionResponse.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): PeerDBVersionResponse { + const message = createBasePeerDBVersionResponse(); + message.version = object.version ?? ""; + return message; + }, +}; + export type FlowServiceService = typeof FlowServiceService; export const FlowServiceService = { validatePeer: { @@ -2895,6 +3002,15 @@ export const FlowServiceService = { responseSerialize: (value: MirrorStatusResponse) => Buffer.from(MirrorStatusResponse.encode(value).finish()), responseDeserialize: (value: Buffer) => MirrorStatusResponse.decode(value), }, + getVersion: { + path: "/peerdb_route.FlowService/GetVersion", + requestStream: false, + responseStream: false, + requestSerialize: (value: PeerDBVersionRequest) => Buffer.from(PeerDBVersionRequest.encode(value).finish()), + requestDeserialize: (value: Buffer) => PeerDBVersionRequest.decode(value), + responseSerialize: (value: PeerDBVersionResponse) => Buffer.from(PeerDBVersionResponse.encode(value).finish()), + responseDeserialize: (value: Buffer) => PeerDBVersionResponse.decode(value), + }, } as const; export interface FlowServiceServer extends UntypedServiceImplementation { @@ -2912,6 +3028,7 @@ export interface FlowServiceServer extends UntypedServiceImplementation { shutdownFlow: handleUnaryCall; flowStateChange: handleUnaryCall; mirrorStatus: handleUnaryCall; + getVersion: handleUnaryCall; } export interface FlowServiceClient extends Client { @@ -3125,6 +3242,21 @@ export interface FlowServiceClient extends Client { options: Partial, callback: (error: ServiceError | null, response: MirrorStatusResponse) => void, ): ClientUnaryCall; + getVersion( + request: PeerDBVersionRequest, + callback: (error: ServiceError | null, response: PeerDBVersionResponse) => void, + ): ClientUnaryCall; + getVersion( + request: PeerDBVersionRequest, + metadata: Metadata, + callback: (error: ServiceError | null, response: PeerDBVersionResponse) => void, + ): ClientUnaryCall; + getVersion( + request: PeerDBVersionRequest, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: PeerDBVersionResponse) => void, + ): ClientUnaryCall; } export const FlowServiceClient = makeGenericClientConstructor( From 20d901c0e9247796ffa009203c26b0c8a193c5b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Dec 2023 17:55:11 +0000 Subject: [PATCH 2/2] fix xmin ci (#753) Also fix poorly named method after marker changed from `txid_current()` to `txid_snapshot_min(txid_current_snapsnot())` Remove special casing for xmin in qrep code --- flow/activities/flowable.go | 6 ----- flow/connectors/postgres/qrep.go | 9 ++------ .../postgres/qrep_query_executor.go | 2 +- flow/e2e/snowflake/qrep_flow_sf_test.go | 2 +- flow/e2e/test_utils.go | 10 ++++++++- flow/workflows/xmin_flow.go | 22 ++++++------------- 6 files changed, 20 insertions(+), 31 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 33c944f993..bd460e9661 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -764,12 +764,6 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second } - if config.WatermarkColumn == "xmin" { - // for xmin we ignore the wait between batches, as seq scan time is - // extremely slow. - waitBetweenBatches = 10 * time.Second - } - srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { return fmt.Errorf("failed to get qrep source connector: %w", err) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 5bde72370c..518d47686e 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -274,11 +274,6 @@ func (c *PostgresConnector) getMinMaxValues( func (c *PostgresConnector) CheckForUpdatedMaxValue(config *protos.QRepConfig, last *protos.QRepPartition) (bool, error) { - // for xmin lets always assume there are updates - if config.WatermarkColumn == "xmin" { - return true, nil - } - tx, err := c.pool.Begin(c.ctx) if err != nil { return false, fmt.Errorf("unable to begin transaction for getting max value: %w", err) @@ -571,9 +566,9 @@ func (c *PostgresConnector) PullXminRecordStream( var numRecords int if partition.Range != nil { - numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query, oldxid) + numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(stream, query, oldxid) } else { - numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query) + numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(stream, query) } if err != nil { return 0, currentSnapshotXmin, err diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 81c7095c66..85c0fb2cce 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -325,7 +325,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream( return totalRecordsFetched, err } -func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentTxid( +func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin( stream *model.QRecordStream, query string, args ...interface{}, diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index f39686d0bb..5f5b01404d 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -208,7 +208,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { qrepConfig.WatermarkColumn = "xmin" s.NoError(err) - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunXminFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index ac668ae2f5..2d561c74c2 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -20,7 +20,7 @@ import ( "go.temporal.io/sdk/testsuite" ) -// readFileToBytes reads a file to a byte array. +// ReadFileToBytes reads a file to a byte array. func ReadFileToBytes(path string) ([]byte, error) { var ret []byte @@ -49,6 +49,7 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment) { env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) env.RegisterWorkflow(peerflow.NormalizeFlowWorkflow) env.RegisterWorkflow(peerflow.QRepFlowWorkflow) + env.RegisterWorkflow(peerflow.XminFlowWorkflow) env.RegisterWorkflow(peerflow.QRepPartitionWorkflow) env.RegisterActivity(&activities.FlowableActivity{}) env.RegisterActivity(&activities.SnapshotActivity{}) @@ -303,6 +304,13 @@ func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos. env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state) } +func RunXminFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) { + state := peerflow.NewQRepFlowState() + state.LastPartition.PartitionId = uuid.New().String() + time.Sleep(5 * time.Second) + env.ExecuteWorkflow(peerflow.XminFlowWorkflow, config, state) +} + func GetOwnersSchema() *model.QRecordSchema { return &model.QRecordSchema{ Fields: []*model.QField{ diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index db09b7abc9..1a1d67a4c8 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -176,7 +176,7 @@ func (q *XminFlowExecution) consolidatePartitions(ctx workflow.Context) error { return fmt.Errorf("failed to cleanup qrep flow: %w", err) } - q.logger.Info("qrep flow cleaned up") + q.logger.Info("xmin flow cleaned up") return nil } @@ -186,14 +186,6 @@ func XminFlowWorkflow( config *protos.QRepConfig, state *protos.QRepFlowState, ) error { - // register a query to get the number of partitions processed - err := workflow.SetQueryHandler(ctx, "num-partitions-processed", func() (uint64, error) { - return state.NumPartitionsProcessed, nil - }) - if err != nil { - return fmt.Errorf("failed to register query handler: %w", err) - } - // get xmin run uuid via side-effect runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { return uuid.New().String() @@ -206,7 +198,7 @@ func XminFlowWorkflow( q := NewXminFlowExecution(ctx, config, runUUID) - err = q.SetupWatermarkTableOnDestination(ctx) + err := q.SetupWatermarkTableOnDestination(ctx) if err != nil { return fmt.Errorf("failed to setup watermark table: %w", err) } @@ -238,9 +230,8 @@ func XminFlowWorkflow( return fmt.Errorf("xmin replication failed: %w", err) } - state.LastPartition = &protos.QRepPartition{ - PartitionId: q.runUUID, - Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, + if err = q.consolidatePartitions(ctx); err != nil { + return err } if config.InitialCopyOnly { @@ -253,8 +244,9 @@ func XminFlowWorkflow( return err } - if err = q.consolidatePartitions(ctx); err != nil { - return err + state.LastPartition = &protos.QRepPartition{ + PartitionId: q.runUUID, + Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, } workflow.GetLogger(ctx).Info("Continuing as new workflow",