From b002eb73a49a83e64ca1094faa3feb75a4f2856d Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Wed, 8 Nov 2023 00:26:49 +0530 Subject: [PATCH] committing before context switch --- flow/cmd/handler.go | 33 +- flow/generated/protos/route.pb.go | 516 ++++++++++++++----------- flow/generated/protos/route_grpc.pb.go | 30 +- flow/shared/constants.go | 1 + flow/workflows/cdc_flow.go | 48 +-- flow/workflows/qrep_flow.go | 48 +-- nexus/analyzer/src/lib.rs | 4 + nexus/flow-rs/src/grpc.rs | 15 +- nexus/pt/src/peerdb_route.rs | 36 +- nexus/pt/src/peerdb_route.serde.rs | 496 ++++++++++++++---------- nexus/pt/src/peerdb_route.tonic.rs | 42 +- nexus/server/src/main.rs | 63 ++- protos/route.proto | 14 +- ui/grpc_generated/route.ts | 138 +++++-- 14 files changed, 902 insertions(+), 582 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 0f824186b2..c4a0e86891 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -350,22 +350,33 @@ func (h *FlowRequestHandler) ShutdownFlow( }, nil } -func (h *FlowRequestHandler) PauseFlow( +func (h *FlowRequestHandler) FlowStateChange( ctx context.Context, - req *protos.PauseRequest, -) (*protos.PauseResponse, error) { - err := h.temporalClient.SignalWorkflow( - ctx, - req.WorkflowId, - "", - shared.CDCFlowSignalName, - shared.PauseSignal, - ) + req *protos.FlowStateChangeRequest, +) (*protos.FlowStateChangeResponse, error) { + var err error + if req.RequestedFlowState == protos.FlowState_STATE_PAUSED { + err = h.temporalClient.SignalWorkflow( + ctx, + req.WorkflowId, + "", + shared.CDCFlowSignalName, + shared.PauseSignal, + ) + } else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING { + err = h.temporalClient.SignalWorkflow( + ctx, + req.WorkflowId, + "", + shared.CDCFlowSignalName, + shared.NoopSignal, + ) + } if err != nil { return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err) } - return &protos.PauseResponse{ + return &protos.FlowStateChangeResponse{ Ok: true, }, nil } diff --git a/flow/generated/protos/route.pb.go b/flow/generated/protos/route.pb.go index d6f8053aa6..60755e99f6 100644 --- a/flow/generated/protos/route.pb.go +++ b/flow/generated/protos/route.pb.go @@ -120,6 +120,56 @@ func (CreatePeerStatus) EnumDescriptor() ([]byte, []int) { return file_route_proto_rawDescGZIP(), []int{1} } +// in the future, consider moving DropFlow to this and reduce route surface +type FlowState int32 + +const ( + FlowState_STATE_UNKNOWN FlowState = 0 + FlowState_STATE_RUNNING FlowState = 1 + FlowState_STATE_PAUSED FlowState = 2 +) + +// Enum value maps for FlowState. +var ( + FlowState_name = map[int32]string{ + 0: "STATE_UNKNOWN", + 1: "STATE_RUNNING", + 2: "STATE_PAUSED", + } + FlowState_value = map[string]int32{ + "STATE_UNKNOWN": 0, + "STATE_RUNNING": 1, + "STATE_PAUSED": 2, + } +) + +func (x FlowState) Enum() *FlowState { + p := new(FlowState) + *p = x + return p +} + +func (x FlowState) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (FlowState) Descriptor() protoreflect.EnumDescriptor { + return file_route_proto_enumTypes[2].Descriptor() +} + +func (FlowState) Type() protoreflect.EnumType { + return &file_route_proto_enumTypes[2] +} + +func (x FlowState) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use FlowState.Descriptor instead. +func (FlowState) EnumDescriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{2} +} + type CreateCDCFlowRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1789,17 +1839,18 @@ func (*MirrorStatusResponse_QrepStatus) isMirrorStatusResponse_Status() {} func (*MirrorStatusResponse_CdcStatus) isMirrorStatusResponse_Status() {} -type PauseRequest struct { +type FlowStateChangeRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - WorkflowId string `protobuf:"bytes,1,opt,name=workflow_id,json=workflowId,proto3" json:"workflow_id,omitempty"` - FlowJobName string `protobuf:"bytes,2,opt,name=flow_job_name,json=flowJobName,proto3" json:"flow_job_name,omitempty"` + WorkflowId string `protobuf:"bytes,1,opt,name=workflow_id,json=workflowId,proto3" json:"workflow_id,omitempty"` + FlowJobName string `protobuf:"bytes,2,opt,name=flow_job_name,json=flowJobName,proto3" json:"flow_job_name,omitempty"` + RequestedFlowState FlowState `protobuf:"varint,3,opt,name=requested_flow_state,json=requestedFlowState,proto3,enum=peerdb_route.FlowState" json:"requested_flow_state,omitempty"` } -func (x *PauseRequest) Reset() { - *x = PauseRequest{} +func (x *FlowStateChangeRequest) Reset() { + *x = FlowStateChangeRequest{} if protoimpl.UnsafeEnabled { mi := &file_route_proto_msgTypes[29] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -1807,13 +1858,13 @@ func (x *PauseRequest) Reset() { } } -func (x *PauseRequest) String() string { +func (x *FlowStateChangeRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PauseRequest) ProtoMessage() {} +func (*FlowStateChangeRequest) ProtoMessage() {} -func (x *PauseRequest) ProtoReflect() protoreflect.Message { +func (x *FlowStateChangeRequest) ProtoReflect() protoreflect.Message { mi := &file_route_proto_msgTypes[29] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -1825,26 +1876,33 @@ func (x *PauseRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PauseRequest.ProtoReflect.Descriptor instead. -func (*PauseRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use FlowStateChangeRequest.ProtoReflect.Descriptor instead. +func (*FlowStateChangeRequest) Descriptor() ([]byte, []int) { return file_route_proto_rawDescGZIP(), []int{29} } -func (x *PauseRequest) GetWorkflowId() string { +func (x *FlowStateChangeRequest) GetWorkflowId() string { if x != nil { return x.WorkflowId } return "" } -func (x *PauseRequest) GetFlowJobName() string { +func (x *FlowStateChangeRequest) GetFlowJobName() string { if x != nil { return x.FlowJobName } return "" } -type PauseResponse struct { +func (x *FlowStateChangeRequest) GetRequestedFlowState() FlowState { + if x != nil { + return x.RequestedFlowState + } + return FlowState_STATE_UNKNOWN +} + +type FlowStateChangeResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -1853,8 +1911,8 @@ type PauseResponse struct { ErrorMessage string `protobuf:"bytes,2,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` } -func (x *PauseResponse) Reset() { - *x = PauseResponse{} +func (x *FlowStateChangeResponse) Reset() { + *x = FlowStateChangeResponse{} if protoimpl.UnsafeEnabled { mi := &file_route_proto_msgTypes[30] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -1862,13 +1920,13 @@ func (x *PauseResponse) Reset() { } } -func (x *PauseResponse) String() string { +func (x *FlowStateChangeResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PauseResponse) ProtoMessage() {} +func (*FlowStateChangeResponse) ProtoMessage() {} -func (x *PauseResponse) ProtoReflect() protoreflect.Message { +func (x *FlowStateChangeResponse) ProtoReflect() protoreflect.Message { mi := &file_route_proto_msgTypes[30] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -1880,19 +1938,19 @@ func (x *PauseResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PauseResponse.ProtoReflect.Descriptor instead. -func (*PauseResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use FlowStateChangeResponse.ProtoReflect.Descriptor instead. +func (*FlowStateChangeResponse) Descriptor() ([]byte, []int) { return file_route_proto_rawDescGZIP(), []int{30} } -func (x *PauseResponse) GetOk() bool { +func (x *FlowStateChangeResponse) GetOk() bool { if x != nil { return x.Ok } return false } -func (x *PauseResponse) GetErrorMessage() string { +func (x *FlowStateChangeResponse) GetErrorMessage() string { if x != nil { return x.ErrorMessage } @@ -2109,13 +2167,19 @@ var file_route_proto_rawDesc = []byte{ 0x64, 0x63, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x08, 0x0a, - 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x53, 0x0a, 0x0c, 0x50, 0x61, 0x75, 0x73, 0x65, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, - 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, - 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x66, 0x6c, 0x6f, 0x77, - 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x44, 0x0a, 0x0d, - 0x50, 0x61, 0x75, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, + 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0xa8, 0x01, 0x0a, 0x16, 0x46, 0x6c, 0x6f, 0x77, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, + 0x77, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, + 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x49, 0x0a, 0x14, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x65, 0x64, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x17, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x12, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x64, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, + 0x74, 0x65, 0x22, 0x4e, 0x0a, 0x17, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, + 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, @@ -2127,110 +2191,116 @@ var file_route_proto_rawDesc = []byte{ 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, - 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x32, 0xea, 0x0b, 0x0a, 0x0b, - 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x74, 0x0a, 0x0c, 0x56, - 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x21, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, - 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, - 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, 0x3a, 0x01, 0x2a, 0x22, 0x12, 0x2f, - 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, - 0x65, 0x12, 0x6c, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, - 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, - 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x20, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, - 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x3a, 0x01, 0x2a, 0x22, 0x10, 0x2f, - 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x12, - 0x64, 0x0a, 0x08, 0x44, 0x72, 0x6f, 0x70, 0x50, 0x65, 0x65, 0x72, 0x12, 0x1d, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x44, 0x72, 0x6f, 0x70, 0x50, - 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x44, 0x72, 0x6f, 0x70, 0x50, 0x65, - 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, - 0x02, 0x13, 0x3a, 0x01, 0x2a, 0x22, 0x0e, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, - 0x2f, 0x64, 0x72, 0x6f, 0x70, 0x12, 0x79, 0x0a, 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, - 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, - 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, - 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, - 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x1f, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x19, 0x3a, 0x01, 0x2a, 0x22, 0x14, 0x2f, 0x76, 0x31, 0x2f, - 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x2f, 0x63, 0x64, 0x63, 0x2f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, - 0x12, 0x7d, 0x0a, 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, - 0x6f, 0x77, 0x12, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, - 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, - 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x20, 0x82, - 0xd3, 0xe4, 0x93, 0x02, 0x1a, 0x3a, 0x01, 0x2a, 0x22, 0x15, 0x2f, 0x76, 0x31, 0x2f, 0x66, 0x6c, - 0x6f, 0x77, 0x73, 0x2f, 0x71, 0x72, 0x65, 0x70, 0x2f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x12, - 0x79, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x12, 0x2d, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x6f, 0x73, - 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, - 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x70, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, - 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, - 0x72, 0x73, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x12, 0x74, 0x0a, 0x11, 0x47, 0x65, - 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x49, 0x6e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, - 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, - 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, - 0x65, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x18, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x12, 0x12, 0x10, - 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, - 0x12, 0x6e, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x12, 0x21, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x54, 0x61, - 0x62, 0x6c, 0x65, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, - 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, - 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, - 0x12, 0x81, 0x01, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x53, 0x6c, 0x6f, 0x74, 0x49, 0x6e, 0x66, 0x6f, - 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, - 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, 0x74, 0x69, - 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, - 0x65, 0x65, 0x72, 0x53, 0x6c, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, - 0x72, 0x73, 0x2f, 0x73, 0x6c, 0x6f, 0x74, 0x73, 0x2f, 0x7b, 0x70, 0x65, 0x65, 0x72, 0x5f, 0x6e, - 0x61, 0x6d, 0x65, 0x7d, 0x12, 0x81, 0x01, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, - 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, - 0x75, 0x74, 0x65, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, - 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, - 0x74, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, - 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x2f, 0x7b, 0x70, 0x65, - 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x12, 0x6a, 0x0a, 0x0c, 0x53, 0x68, 0x75, 0x74, - 0x64, 0x6f, 0x77, 0x6e, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, + 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x43, 0x0a, 0x09, 0x46, + 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, + 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x53, + 0x54, 0x41, 0x54, 0x45, 0x5f, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, + 0x0a, 0x0c, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x50, 0x41, 0x55, 0x53, 0x45, 0x44, 0x10, 0x02, + 0x32, 0x84, 0x0c, 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x74, 0x0a, 0x0c, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, + 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, + 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, 0x3a, + 0x01, 0x2a, 0x22, 0x12, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x76, 0x61, + 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x12, 0x6c, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x50, 0x65, 0x65, 0x72, 0x12, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, + 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x3a, - 0x01, 0x2a, 0x22, 0x10, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x2f, - 0x64, 0x72, 0x6f, 0x70, 0x12, 0x46, 0x0a, 0x09, 0x50, 0x61, 0x75, 0x73, 0x65, 0x46, 0x6c, 0x6f, - 0x77, 0x12, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, - 0x2e, 0x50, 0x61, 0x75, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x61, 0x75, - 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x7a, 0x0a, 0x0c, - 0x4d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x2e, 0x70, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, 0x72, 0x72, - 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, - 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, - 0x2f, 0x6d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x2f, 0x7b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, - 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, 0x0a, 0x52, 0x6f, - 0x75, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, - 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, - 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, - 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xe2, 0x02, - 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, 0x47, 0x50, 0x42, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x01, 0x2a, 0x22, 0x10, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x63, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x12, 0x64, 0x0a, 0x08, 0x44, 0x72, 0x6f, 0x70, 0x50, 0x65, 0x65, 0x72, + 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, + 0x44, 0x72, 0x6f, 0x70, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x44, + 0x72, 0x6f, 0x70, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x3a, 0x01, 0x2a, 0x22, 0x0e, 0x2f, 0x76, 0x31, 0x2f, + 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x64, 0x72, 0x6f, 0x70, 0x12, 0x79, 0x0a, 0x0d, 0x43, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x22, 0x2e, 0x70, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1f, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x19, 0x3a, 0x01, 0x2a, 0x22, + 0x14, 0x2f, 0x76, 0x31, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x2f, 0x63, 0x64, 0x63, 0x2f, 0x63, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x12, 0x7d, 0x0a, 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, + 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, + 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, + 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x20, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1a, 0x3a, 0x01, 0x2a, 0x22, 0x15, 0x2f, + 0x76, 0x31, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x2f, 0x71, 0x72, 0x65, 0x70, 0x2f, 0x63, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x12, 0x79, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, + 0x61, 0x73, 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, + 0x65, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, + 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, + 0x2e, 0x50, 0x65, 0x65, 0x72, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, 0x76, + 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x73, 0x12, + 0x74, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x49, 0x6e, 0x53, 0x63, + 0x68, 0x65, 0x6d, 0x61, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, + 0x75, 0x74, 0x65, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, + 0x6c, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x18, 0x82, 0xd3, 0xe4, + 0x93, 0x02, 0x12, 0x12, 0x10, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x74, + 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x6e, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6c, 0x75, + 0x6d, 0x6e, 0x73, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x43, 0x6f, 0x6c, 0x75, 0x6d, + 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x13, 0x12, 0x11, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x63, 0x6f, + 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x12, 0x81, 0x01, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x53, 0x6c, 0x6f, + 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x50, 0x65, 0x65, + 0x72, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, + 0x75, 0x74, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x53, 0x6c, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, + 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x6c, 0x6f, 0x74, 0x73, 0x2f, 0x7b, 0x70, + 0x65, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x12, 0x81, 0x01, 0x0a, 0x0b, 0x47, 0x65, + 0x74, 0x53, 0x74, 0x61, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, + 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, + 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, + 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, + 0x73, 0x2f, 0x7b, 0x70, 0x65, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x12, 0x6a, 0x0a, + 0x0c, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x1d, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, + 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, + 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1b, 0x82, 0xd3, + 0xe4, 0x93, 0x02, 0x15, 0x3a, 0x01, 0x2a, 0x22, 0x10, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x69, 0x72, + 0x72, 0x6f, 0x72, 0x73, 0x2f, 0x64, 0x72, 0x6f, 0x70, 0x12, 0x60, 0x0a, 0x0f, 0x46, 0x6c, 0x6f, + 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x24, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, + 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x7a, 0x0a, 0x0c, 0x4d, + 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x2e, 0x70, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, 0x72, 0x72, 0x6f, + 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, + 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, + 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, + 0x6d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x2f, 0x7b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, + 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, 0x0a, 0x52, 0x6f, 0x75, + 0x74, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, + 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, + 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xca, + 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xe2, 0x02, 0x17, + 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x52, 0x6f, 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -2245,101 +2315,103 @@ func file_route_proto_rawDescGZIP() []byte { return file_route_proto_rawDescData } -var file_route_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_route_proto_enumTypes = make([]protoimpl.EnumInfo, 3) var file_route_proto_msgTypes = make([]protoimpl.MessageInfo, 31) var file_route_proto_goTypes = []interface{}{ (ValidatePeerStatus)(0), // 0: peerdb_route.ValidatePeerStatus (CreatePeerStatus)(0), // 1: peerdb_route.CreatePeerStatus - (*CreateCDCFlowRequest)(nil), // 2: peerdb_route.CreateCDCFlowRequest - (*CreateCDCFlowResponse)(nil), // 3: peerdb_route.CreateCDCFlowResponse - (*CreateQRepFlowRequest)(nil), // 4: peerdb_route.CreateQRepFlowRequest - (*CreateQRepFlowResponse)(nil), // 5: peerdb_route.CreateQRepFlowResponse - (*ShutdownRequest)(nil), // 6: peerdb_route.ShutdownRequest - (*ShutdownResponse)(nil), // 7: peerdb_route.ShutdownResponse - (*ValidatePeerRequest)(nil), // 8: peerdb_route.ValidatePeerRequest - (*CreatePeerRequest)(nil), // 9: peerdb_route.CreatePeerRequest - (*DropPeerRequest)(nil), // 10: peerdb_route.DropPeerRequest - (*DropPeerResponse)(nil), // 11: peerdb_route.DropPeerResponse - (*ValidatePeerResponse)(nil), // 12: peerdb_route.ValidatePeerResponse - (*CreatePeerResponse)(nil), // 13: peerdb_route.CreatePeerResponse - (*MirrorStatusRequest)(nil), // 14: peerdb_route.MirrorStatusRequest - (*PartitionStatus)(nil), // 15: peerdb_route.PartitionStatus - (*QRepMirrorStatus)(nil), // 16: peerdb_route.QRepMirrorStatus - (*CDCSyncStatus)(nil), // 17: peerdb_route.CDCSyncStatus - (*PeerSchemasResponse)(nil), // 18: peerdb_route.PeerSchemasResponse - (*SchemaTablesRequest)(nil), // 19: peerdb_route.SchemaTablesRequest - (*SchemaTablesResponse)(nil), // 20: peerdb_route.SchemaTablesResponse - (*TableColumnsRequest)(nil), // 21: peerdb_route.TableColumnsRequest - (*TableColumnsResponse)(nil), // 22: peerdb_route.TableColumnsResponse - (*PostgresPeerActivityInfoRequest)(nil), // 23: peerdb_route.PostgresPeerActivityInfoRequest - (*SlotInfo)(nil), // 24: peerdb_route.SlotInfo - (*StatInfo)(nil), // 25: peerdb_route.StatInfo - (*PeerSlotResponse)(nil), // 26: peerdb_route.PeerSlotResponse - (*PeerStatResponse)(nil), // 27: peerdb_route.PeerStatResponse - (*SnapshotStatus)(nil), // 28: peerdb_route.SnapshotStatus - (*CDCMirrorStatus)(nil), // 29: peerdb_route.CDCMirrorStatus - (*MirrorStatusResponse)(nil), // 30: peerdb_route.MirrorStatusResponse - (*PauseRequest)(nil), // 31: peerdb_route.PauseRequest - (*PauseResponse)(nil), // 32: peerdb_route.PauseResponse - (*FlowConnectionConfigs)(nil), // 33: peerdb_flow.FlowConnectionConfigs - (*QRepConfig)(nil), // 34: peerdb_flow.QRepConfig - (*Peer)(nil), // 35: peerdb_peers.Peer - (*timestamppb.Timestamp)(nil), // 36: google.protobuf.Timestamp + (FlowState)(0), // 2: peerdb_route.FlowState + (*CreateCDCFlowRequest)(nil), // 3: peerdb_route.CreateCDCFlowRequest + (*CreateCDCFlowResponse)(nil), // 4: peerdb_route.CreateCDCFlowResponse + (*CreateQRepFlowRequest)(nil), // 5: peerdb_route.CreateQRepFlowRequest + (*CreateQRepFlowResponse)(nil), // 6: peerdb_route.CreateQRepFlowResponse + (*ShutdownRequest)(nil), // 7: peerdb_route.ShutdownRequest + (*ShutdownResponse)(nil), // 8: peerdb_route.ShutdownResponse + (*ValidatePeerRequest)(nil), // 9: peerdb_route.ValidatePeerRequest + (*CreatePeerRequest)(nil), // 10: peerdb_route.CreatePeerRequest + (*DropPeerRequest)(nil), // 11: peerdb_route.DropPeerRequest + (*DropPeerResponse)(nil), // 12: peerdb_route.DropPeerResponse + (*ValidatePeerResponse)(nil), // 13: peerdb_route.ValidatePeerResponse + (*CreatePeerResponse)(nil), // 14: peerdb_route.CreatePeerResponse + (*MirrorStatusRequest)(nil), // 15: peerdb_route.MirrorStatusRequest + (*PartitionStatus)(nil), // 16: peerdb_route.PartitionStatus + (*QRepMirrorStatus)(nil), // 17: peerdb_route.QRepMirrorStatus + (*CDCSyncStatus)(nil), // 18: peerdb_route.CDCSyncStatus + (*PeerSchemasResponse)(nil), // 19: peerdb_route.PeerSchemasResponse + (*SchemaTablesRequest)(nil), // 20: peerdb_route.SchemaTablesRequest + (*SchemaTablesResponse)(nil), // 21: peerdb_route.SchemaTablesResponse + (*TableColumnsRequest)(nil), // 22: peerdb_route.TableColumnsRequest + (*TableColumnsResponse)(nil), // 23: peerdb_route.TableColumnsResponse + (*PostgresPeerActivityInfoRequest)(nil), // 24: peerdb_route.PostgresPeerActivityInfoRequest + (*SlotInfo)(nil), // 25: peerdb_route.SlotInfo + (*StatInfo)(nil), // 26: peerdb_route.StatInfo + (*PeerSlotResponse)(nil), // 27: peerdb_route.PeerSlotResponse + (*PeerStatResponse)(nil), // 28: peerdb_route.PeerStatResponse + (*SnapshotStatus)(nil), // 29: peerdb_route.SnapshotStatus + (*CDCMirrorStatus)(nil), // 30: peerdb_route.CDCMirrorStatus + (*MirrorStatusResponse)(nil), // 31: peerdb_route.MirrorStatusResponse + (*FlowStateChangeRequest)(nil), // 32: peerdb_route.FlowStateChangeRequest + (*FlowStateChangeResponse)(nil), // 33: peerdb_route.FlowStateChangeResponse + (*FlowConnectionConfigs)(nil), // 34: peerdb_flow.FlowConnectionConfigs + (*QRepConfig)(nil), // 35: peerdb_flow.QRepConfig + (*Peer)(nil), // 36: peerdb_peers.Peer + (*timestamppb.Timestamp)(nil), // 37: google.protobuf.Timestamp } var file_route_proto_depIdxs = []int32{ - 33, // 0: peerdb_route.CreateCDCFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 34, // 1: peerdb_route.CreateQRepFlowRequest.qrep_config:type_name -> peerdb_flow.QRepConfig - 35, // 2: peerdb_route.ShutdownRequest.source_peer:type_name -> peerdb_peers.Peer - 35, // 3: peerdb_route.ShutdownRequest.destination_peer:type_name -> peerdb_peers.Peer - 35, // 4: peerdb_route.ValidatePeerRequest.peer:type_name -> peerdb_peers.Peer - 35, // 5: peerdb_route.CreatePeerRequest.peer:type_name -> peerdb_peers.Peer + 34, // 0: peerdb_route.CreateCDCFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs + 35, // 1: peerdb_route.CreateQRepFlowRequest.qrep_config:type_name -> peerdb_flow.QRepConfig + 36, // 2: peerdb_route.ShutdownRequest.source_peer:type_name -> peerdb_peers.Peer + 36, // 3: peerdb_route.ShutdownRequest.destination_peer:type_name -> peerdb_peers.Peer + 36, // 4: peerdb_route.ValidatePeerRequest.peer:type_name -> peerdb_peers.Peer + 36, // 5: peerdb_route.CreatePeerRequest.peer:type_name -> peerdb_peers.Peer 0, // 6: peerdb_route.ValidatePeerResponse.status:type_name -> peerdb_route.ValidatePeerStatus 1, // 7: peerdb_route.CreatePeerResponse.status:type_name -> peerdb_route.CreatePeerStatus - 36, // 8: peerdb_route.PartitionStatus.start_time:type_name -> google.protobuf.Timestamp - 36, // 9: peerdb_route.PartitionStatus.end_time:type_name -> google.protobuf.Timestamp - 34, // 10: peerdb_route.QRepMirrorStatus.config:type_name -> peerdb_flow.QRepConfig - 15, // 11: peerdb_route.QRepMirrorStatus.partitions:type_name -> peerdb_route.PartitionStatus - 36, // 12: peerdb_route.CDCSyncStatus.start_time:type_name -> google.protobuf.Timestamp - 36, // 13: peerdb_route.CDCSyncStatus.end_time:type_name -> google.protobuf.Timestamp - 24, // 14: peerdb_route.PeerSlotResponse.slot_data:type_name -> peerdb_route.SlotInfo - 25, // 15: peerdb_route.PeerStatResponse.stat_data:type_name -> peerdb_route.StatInfo - 16, // 16: peerdb_route.SnapshotStatus.clones:type_name -> peerdb_route.QRepMirrorStatus - 33, // 17: peerdb_route.CDCMirrorStatus.config:type_name -> peerdb_flow.FlowConnectionConfigs - 28, // 18: peerdb_route.CDCMirrorStatus.snapshot_status:type_name -> peerdb_route.SnapshotStatus - 17, // 19: peerdb_route.CDCMirrorStatus.cdc_syncs:type_name -> peerdb_route.CDCSyncStatus - 16, // 20: peerdb_route.MirrorStatusResponse.qrep_status:type_name -> peerdb_route.QRepMirrorStatus - 29, // 21: peerdb_route.MirrorStatusResponse.cdc_status:type_name -> peerdb_route.CDCMirrorStatus - 8, // 22: peerdb_route.FlowService.ValidatePeer:input_type -> peerdb_route.ValidatePeerRequest - 9, // 23: peerdb_route.FlowService.CreatePeer:input_type -> peerdb_route.CreatePeerRequest - 10, // 24: peerdb_route.FlowService.DropPeer:input_type -> peerdb_route.DropPeerRequest - 2, // 25: peerdb_route.FlowService.CreateCDCFlow:input_type -> peerdb_route.CreateCDCFlowRequest - 4, // 26: peerdb_route.FlowService.CreateQRepFlow:input_type -> peerdb_route.CreateQRepFlowRequest - 23, // 27: peerdb_route.FlowService.GetSchemas:input_type -> peerdb_route.PostgresPeerActivityInfoRequest - 19, // 28: peerdb_route.FlowService.GetTablesInSchema:input_type -> peerdb_route.SchemaTablesRequest - 21, // 29: peerdb_route.FlowService.GetColumns:input_type -> peerdb_route.TableColumnsRequest - 23, // 30: peerdb_route.FlowService.GetSlotInfo:input_type -> peerdb_route.PostgresPeerActivityInfoRequest - 23, // 31: peerdb_route.FlowService.GetStatInfo:input_type -> peerdb_route.PostgresPeerActivityInfoRequest - 6, // 32: peerdb_route.FlowService.ShutdownFlow:input_type -> peerdb_route.ShutdownRequest - 31, // 33: peerdb_route.FlowService.PauseFlow:input_type -> peerdb_route.PauseRequest - 14, // 34: peerdb_route.FlowService.MirrorStatus:input_type -> peerdb_route.MirrorStatusRequest - 12, // 35: peerdb_route.FlowService.ValidatePeer:output_type -> peerdb_route.ValidatePeerResponse - 13, // 36: peerdb_route.FlowService.CreatePeer:output_type -> peerdb_route.CreatePeerResponse - 11, // 37: peerdb_route.FlowService.DropPeer:output_type -> peerdb_route.DropPeerResponse - 3, // 38: peerdb_route.FlowService.CreateCDCFlow:output_type -> peerdb_route.CreateCDCFlowResponse - 5, // 39: peerdb_route.FlowService.CreateQRepFlow:output_type -> peerdb_route.CreateQRepFlowResponse - 18, // 40: peerdb_route.FlowService.GetSchemas:output_type -> peerdb_route.PeerSchemasResponse - 20, // 41: peerdb_route.FlowService.GetTablesInSchema:output_type -> peerdb_route.SchemaTablesResponse - 22, // 42: peerdb_route.FlowService.GetColumns:output_type -> peerdb_route.TableColumnsResponse - 26, // 43: peerdb_route.FlowService.GetSlotInfo:output_type -> peerdb_route.PeerSlotResponse - 27, // 44: peerdb_route.FlowService.GetStatInfo:output_type -> peerdb_route.PeerStatResponse - 7, // 45: peerdb_route.FlowService.ShutdownFlow:output_type -> peerdb_route.ShutdownResponse - 32, // 46: peerdb_route.FlowService.PauseFlow:output_type -> peerdb_route.PauseResponse - 30, // 47: peerdb_route.FlowService.MirrorStatus:output_type -> peerdb_route.MirrorStatusResponse - 35, // [35:48] is the sub-list for method output_type - 22, // [22:35] is the sub-list for method input_type - 22, // [22:22] is the sub-list for extension type_name - 22, // [22:22] is the sub-list for extension extendee - 0, // [0:22] is the sub-list for field type_name + 37, // 8: peerdb_route.PartitionStatus.start_time:type_name -> google.protobuf.Timestamp + 37, // 9: peerdb_route.PartitionStatus.end_time:type_name -> google.protobuf.Timestamp + 35, // 10: peerdb_route.QRepMirrorStatus.config:type_name -> peerdb_flow.QRepConfig + 16, // 11: peerdb_route.QRepMirrorStatus.partitions:type_name -> peerdb_route.PartitionStatus + 37, // 12: peerdb_route.CDCSyncStatus.start_time:type_name -> google.protobuf.Timestamp + 37, // 13: peerdb_route.CDCSyncStatus.end_time:type_name -> google.protobuf.Timestamp + 25, // 14: peerdb_route.PeerSlotResponse.slot_data:type_name -> peerdb_route.SlotInfo + 26, // 15: peerdb_route.PeerStatResponse.stat_data:type_name -> peerdb_route.StatInfo + 17, // 16: peerdb_route.SnapshotStatus.clones:type_name -> peerdb_route.QRepMirrorStatus + 34, // 17: peerdb_route.CDCMirrorStatus.config:type_name -> peerdb_flow.FlowConnectionConfigs + 29, // 18: peerdb_route.CDCMirrorStatus.snapshot_status:type_name -> peerdb_route.SnapshotStatus + 18, // 19: peerdb_route.CDCMirrorStatus.cdc_syncs:type_name -> peerdb_route.CDCSyncStatus + 17, // 20: peerdb_route.MirrorStatusResponse.qrep_status:type_name -> peerdb_route.QRepMirrorStatus + 30, // 21: peerdb_route.MirrorStatusResponse.cdc_status:type_name -> peerdb_route.CDCMirrorStatus + 2, // 22: peerdb_route.FlowStateChangeRequest.requested_flow_state:type_name -> peerdb_route.FlowState + 9, // 23: peerdb_route.FlowService.ValidatePeer:input_type -> peerdb_route.ValidatePeerRequest + 10, // 24: peerdb_route.FlowService.CreatePeer:input_type -> peerdb_route.CreatePeerRequest + 11, // 25: peerdb_route.FlowService.DropPeer:input_type -> peerdb_route.DropPeerRequest + 3, // 26: peerdb_route.FlowService.CreateCDCFlow:input_type -> peerdb_route.CreateCDCFlowRequest + 5, // 27: peerdb_route.FlowService.CreateQRepFlow:input_type -> peerdb_route.CreateQRepFlowRequest + 24, // 28: peerdb_route.FlowService.GetSchemas:input_type -> peerdb_route.PostgresPeerActivityInfoRequest + 20, // 29: peerdb_route.FlowService.GetTablesInSchema:input_type -> peerdb_route.SchemaTablesRequest + 22, // 30: peerdb_route.FlowService.GetColumns:input_type -> peerdb_route.TableColumnsRequest + 24, // 31: peerdb_route.FlowService.GetSlotInfo:input_type -> peerdb_route.PostgresPeerActivityInfoRequest + 24, // 32: peerdb_route.FlowService.GetStatInfo:input_type -> peerdb_route.PostgresPeerActivityInfoRequest + 7, // 33: peerdb_route.FlowService.ShutdownFlow:input_type -> peerdb_route.ShutdownRequest + 32, // 34: peerdb_route.FlowService.FlowStateChange:input_type -> peerdb_route.FlowStateChangeRequest + 15, // 35: peerdb_route.FlowService.MirrorStatus:input_type -> peerdb_route.MirrorStatusRequest + 13, // 36: peerdb_route.FlowService.ValidatePeer:output_type -> peerdb_route.ValidatePeerResponse + 14, // 37: peerdb_route.FlowService.CreatePeer:output_type -> peerdb_route.CreatePeerResponse + 12, // 38: peerdb_route.FlowService.DropPeer:output_type -> peerdb_route.DropPeerResponse + 4, // 39: peerdb_route.FlowService.CreateCDCFlow:output_type -> peerdb_route.CreateCDCFlowResponse + 6, // 40: peerdb_route.FlowService.CreateQRepFlow:output_type -> peerdb_route.CreateQRepFlowResponse + 19, // 41: peerdb_route.FlowService.GetSchemas:output_type -> peerdb_route.PeerSchemasResponse + 21, // 42: peerdb_route.FlowService.GetTablesInSchema:output_type -> peerdb_route.SchemaTablesResponse + 23, // 43: peerdb_route.FlowService.GetColumns:output_type -> peerdb_route.TableColumnsResponse + 27, // 44: peerdb_route.FlowService.GetSlotInfo:output_type -> peerdb_route.PeerSlotResponse + 28, // 45: peerdb_route.FlowService.GetStatInfo:output_type -> peerdb_route.PeerStatResponse + 8, // 46: peerdb_route.FlowService.ShutdownFlow:output_type -> peerdb_route.ShutdownResponse + 33, // 47: peerdb_route.FlowService.FlowStateChange:output_type -> peerdb_route.FlowStateChangeResponse + 31, // 48: peerdb_route.FlowService.MirrorStatus:output_type -> peerdb_route.MirrorStatusResponse + 36, // [36:49] is the sub-list for method output_type + 23, // [23:36] is the sub-list for method input_type + 23, // [23:23] is the sub-list for extension type_name + 23, // [23:23] is the sub-list for extension extendee + 0, // [0:23] is the sub-list for field type_name } func init() { file_route_proto_init() } @@ -2699,7 +2771,7 @@ func file_route_proto_init() { } } file_route_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PauseRequest); i { + switch v := v.(*FlowStateChangeRequest); i { case 0: return &v.state case 1: @@ -2711,7 +2783,7 @@ func file_route_proto_init() { } } file_route_proto_msgTypes[30].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PauseResponse); i { + switch v := v.(*FlowStateChangeResponse); i { case 0: return &v.state case 1: @@ -2732,7 +2804,7 @@ func file_route_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_route_proto_rawDesc, - NumEnums: 2, + NumEnums: 3, NumMessages: 31, NumExtensions: 0, NumServices: 1, diff --git a/flow/generated/protos/route_grpc.pb.go b/flow/generated/protos/route_grpc.pb.go index 4631b2046b..4e2be9f12d 100644 --- a/flow/generated/protos/route_grpc.pb.go +++ b/flow/generated/protos/route_grpc.pb.go @@ -30,7 +30,7 @@ const ( FlowService_GetSlotInfo_FullMethodName = "/peerdb_route.FlowService/GetSlotInfo" FlowService_GetStatInfo_FullMethodName = "/peerdb_route.FlowService/GetStatInfo" FlowService_ShutdownFlow_FullMethodName = "/peerdb_route.FlowService/ShutdownFlow" - FlowService_PauseFlow_FullMethodName = "/peerdb_route.FlowService/PauseFlow" + FlowService_FlowStateChange_FullMethodName = "/peerdb_route.FlowService/FlowStateChange" FlowService_MirrorStatus_FullMethodName = "/peerdb_route.FlowService/MirrorStatus" ) @@ -49,7 +49,7 @@ type FlowServiceClient interface { GetSlotInfo(ctx context.Context, in *PostgresPeerActivityInfoRequest, opts ...grpc.CallOption) (*PeerSlotResponse, error) GetStatInfo(ctx context.Context, in *PostgresPeerActivityInfoRequest, opts ...grpc.CallOption) (*PeerStatResponse, error) ShutdownFlow(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) - PauseFlow(ctx context.Context, in *PauseRequest, opts ...grpc.CallOption) (*PauseResponse, error) + FlowStateChange(ctx context.Context, in *FlowStateChangeRequest, opts ...grpc.CallOption) (*FlowStateChangeResponse, error) MirrorStatus(ctx context.Context, in *MirrorStatusRequest, opts ...grpc.CallOption) (*MirrorStatusResponse, error) } @@ -160,9 +160,9 @@ func (c *flowServiceClient) ShutdownFlow(ctx context.Context, in *ShutdownReques return out, nil } -func (c *flowServiceClient) PauseFlow(ctx context.Context, in *PauseRequest, opts ...grpc.CallOption) (*PauseResponse, error) { - out := new(PauseResponse) - err := c.cc.Invoke(ctx, FlowService_PauseFlow_FullMethodName, in, out, opts...) +func (c *flowServiceClient) FlowStateChange(ctx context.Context, in *FlowStateChangeRequest, opts ...grpc.CallOption) (*FlowStateChangeResponse, error) { + out := new(FlowStateChangeResponse) + err := c.cc.Invoke(ctx, FlowService_FlowStateChange_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -193,7 +193,7 @@ type FlowServiceServer interface { GetSlotInfo(context.Context, *PostgresPeerActivityInfoRequest) (*PeerSlotResponse, error) GetStatInfo(context.Context, *PostgresPeerActivityInfoRequest) (*PeerStatResponse, error) ShutdownFlow(context.Context, *ShutdownRequest) (*ShutdownResponse, error) - PauseFlow(context.Context, *PauseRequest) (*PauseResponse, error) + FlowStateChange(context.Context, *FlowStateChangeRequest) (*FlowStateChangeResponse, error) MirrorStatus(context.Context, *MirrorStatusRequest) (*MirrorStatusResponse, error) mustEmbedUnimplementedFlowServiceServer() } @@ -235,8 +235,8 @@ func (UnimplementedFlowServiceServer) GetStatInfo(context.Context, *PostgresPeer func (UnimplementedFlowServiceServer) ShutdownFlow(context.Context, *ShutdownRequest) (*ShutdownResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ShutdownFlow not implemented") } -func (UnimplementedFlowServiceServer) PauseFlow(context.Context, *PauseRequest) (*PauseResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method PauseFlow not implemented") +func (UnimplementedFlowServiceServer) FlowStateChange(context.Context, *FlowStateChangeRequest) (*FlowStateChangeResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method FlowStateChange not implemented") } func (UnimplementedFlowServiceServer) MirrorStatus(context.Context, *MirrorStatusRequest) (*MirrorStatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method MirrorStatus not implemented") @@ -452,20 +452,20 @@ func _FlowService_ShutdownFlow_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } -func _FlowService_PauseFlow_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PauseRequest) +func _FlowService_FlowStateChange_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(FlowStateChangeRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(FlowServiceServer).PauseFlow(ctx, in) + return srv.(FlowServiceServer).FlowStateChange(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: FlowService_PauseFlow_FullMethodName, + FullMethod: FlowService_FlowStateChange_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FlowServiceServer).PauseFlow(ctx, req.(*PauseRequest)) + return srv.(FlowServiceServer).FlowStateChange(ctx, req.(*FlowStateChangeRequest)) } return interceptor(ctx, in, info, handler) } @@ -540,8 +540,8 @@ var FlowService_ServiceDesc = grpc.ServiceDesc{ Handler: _FlowService_ShutdownFlow_Handler, }, { - MethodName: "PauseFlow", - Handler: _FlowService_PauseFlow_Handler, + MethodName: "FlowStateChange", + Handler: _FlowService_FlowStateChange_Handler, }, { MethodName: "MirrorStatus", diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 245ee18005..afeb8f7fc5 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -13,6 +13,7 @@ const ( NoopSignal CDCFlowSignal = iota ShutdownSignal PauseSignal + EnableMetricsKey ContextKey = "enableMetrics" CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor" ) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index ccf8cd242f..1ea18dd9eb 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" + util "github.com/PeerDB-io/peer-flow/utils" "github.com/google/uuid" "github.com/hashicorp/go-multierror" "go.temporal.io/api/enums/v1" @@ -147,30 +148,13 @@ func GetChildWorkflowID( // CDCFlowWorkflowResult is the result of the PeerFlowWorkflow. type CDCFlowWorkflowResult = CDCFlowWorkflowState -func (w *CDCFlowWorkflowExecution) signalHandler(state *CDCFlowWorkflowState, v shared.CDCFlowSignal) { - w.logger.Info("received signal - ", v) - if v == shared.ShutdownSignal { - w.logger.Info("received shutdown signal") - state.ActiveSignal = v - } else if v == shared.PauseSignal { - w.logger.Info("received pause signal") - if state.ActiveSignal == shared.NoopSignal { - w.logger.Info("workflow was running, pausing it") - state.ActiveSignal = shared.PauseSignal - } else if state.ActiveSignal == shared.PauseSignal { - w.logger.Info("workflow was paused, resuming it") - state.ActiveSignal = shared.NoopSignal - } - } -} - func (w *CDCFlowWorkflowExecution) receiveAndHandleSignal(ctx workflow.Context, state *CDCFlowWorkflowState) { signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) var signalVal shared.CDCFlowSignal ok := signalChan.ReceiveAsync(&signalVal) if ok { - w.signalHandler(state, signalVal) + state.ActiveSignal = util.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) } } @@ -294,27 +278,25 @@ func CDCFlowWorkflowWithConfig( // check and act on signals before a fresh flow starts. w.receiveAndHandleSignal(ctx, state) - // check if the peer flow has been shutdown - if state.ActiveSignal == shared.ShutdownSignal { - w.logger.Info("peer flow has been shutdown") - return state, nil - } - if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) + var signalVal shared.CDCFlowSignal + for state.ActiveSignal == shared.PauseSignal { - err = workflow.Sleep(ctx, 1*time.Minute) - if err != nil { - return state, err - } w.logger.Info("mirror has been paused for ", time.Since(startTime)) - w.receiveAndHandleSignal(ctx, state) - } - if state.ActiveSignal == shared.ShutdownSignal { - // handling going from paused to shutdown - continue + // only place we block on receive, so signal processing is immediate + ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) + if ok { + state.ActiveSignal = util.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) + } } } + // check if the peer flow has been shutdown + if state.ActiveSignal == shared.ShutdownSignal { + w.logger.Info("peer flow has been shutdown") + return state, nil + } // check if total sync flows have been completed // since this happens immediately after we check for signals, the case of a signal being missed diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 91fb276ad9..b195b55b79 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" + util "github.com/PeerDB-io/peer-flow/utils" "github.com/google/uuid" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" @@ -342,30 +343,13 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta return nil } -func (q *QRepFlowExecution) signalHandler(v shared.CDCFlowSignal) { - q.logger.Info("received signal - ", v) - if v == shared.ShutdownSignal { - q.logger.Info("received shutdown signal") - q.activeSignal = v - } else if v == shared.PauseSignal { - q.logger.Info("received pause signal") - if q.activeSignal == shared.NoopSignal { - q.logger.Info("workflow was running, pausing it") - q.activeSignal = shared.PauseSignal - } else if q.activeSignal == shared.PauseSignal { - q.logger.Info("workflow was paused, resuming it") - q.activeSignal = shared.NoopSignal - } - } -} - func (q *QRepFlowExecution) receiveAndHandleSignal(ctx workflow.Context) { signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) var signalVal shared.CDCFlowSignal ok := signalChan.ReceiveAsync(&signalVal) if ok { - q.signalHandler(signalVal) + q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) } } @@ -453,7 +437,7 @@ func QRepFlowWorkflow( state.NumPartitionsProcessed += uint64(len(partitions.Partitions)) if len(partitions.Partitions) > 0 { - lastPartition = partitions.Partitions[len(partitions.Partitions)-1] + state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1] } // sleep for a while and continue the workflow @@ -469,26 +453,24 @@ func QRepFlowWorkflow( // here, we handle signals after the end of the flow because a new workflow does not inherit the signals // and the chance of missing a signal is much higher if the check is before the time consuming parts run q.receiveAndHandleSignal(ctx) - if q.activeSignal == shared.ShutdownSignal { - q.logger.Info("terminating workflow - ", config.FlowJobName) - return nil - } if q.activeSignal == shared.PauseSignal { startTime := time.Now() + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) + var signalVal shared.CDCFlowSignal + for q.activeSignal == shared.PauseSignal { - err = workflow.Sleep(ctx, 1*time.Minute) - if err != nil { - return err - } q.logger.Info("mirror has been paused for ", time.Since(startTime)) - q.receiveAndHandleSignal(ctx) - } - if q.activeSignal == shared.ShutdownSignal { - // handling going from paused to shutdown - q.logger.Info("terminating workflow - ", config.FlowJobName) - return nil + // only place we block on receive, so signal processing is immediate + ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) + if ok { + q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + } } } + if q.activeSignal == shared.ShutdownSignal { + q.logger.Info("terminating workflow - ", config.FlowJobName) + return nil + } // here, we handle signals after the end of the flow because a new workflow does not inherit the signals // and the chance of missing a signal is much higher if the check is before the time consuming parts run diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index a36a6a2997..c06a34bec9 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -131,6 +131,10 @@ pub enum PeerDDL { PauseMirror { if_exists: bool, flow_job_name: String, + }, + ResumeMirror { + if_exists: bool, + flow_job_name: String, } } diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index eaf27ee0b9..e149d16eb5 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -129,7 +129,7 @@ impl FlowGrpcClient { workflow_id: workflow_details.workflow_id, source_peer: Some(workflow_details.source_peer), destination_peer: Some(workflow_details.destination_peer), - remove_flow_entry:false + remove_flow_entry: false, }; let response = self.client.shutdown_flow(shutdown_flow_req).await?; let shutdown_response = response.into_inner(); @@ -159,16 +159,21 @@ impl FlowGrpcClient { } } - pub async fn pause_flow_job( + pub async fn flow_state_change( &mut self, flow_job_name: &str, workflow_id: &str, + pause: bool, ) -> anyhow::Result<()> { - let pause_flow_req = pt::peerdb_route::PauseRequest{ + let pause_flow_req = pt::peerdb_route::FlowStateChangeRequest { flow_job_name: flow_job_name.to_owned(), - workflow_id: workflow_id.to_owned() + workflow_id: workflow_id.to_owned(), + requested_flow_state: match pause { + true => pt::peerdb_route::FlowState::StatePaused.into(), + false => pt::peerdb_route::FlowState::StateRunning.into(), + }, }; - let response = self.client.pause_flow(pause_flow_req).await?; + let response = self.client.flow_state_change(pause_flow_req).await?; let pause_response = response.into_inner(); if pause_response.ok { Ok(()) diff --git a/nexus/pt/src/peerdb_route.rs b/nexus/pt/src/peerdb_route.rs index 8dcc77c3fd..5a46ba2f58 100644 --- a/nexus/pt/src/peerdb_route.rs +++ b/nexus/pt/src/peerdb_route.rs @@ -256,15 +256,17 @@ pub mod mirror_status_response { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PauseRequest { +pub struct FlowStateChangeRequest { #[prost(string, tag="1")] pub workflow_id: ::prost::alloc::string::String, #[prost(string, tag="2")] pub flow_job_name: ::prost::alloc::string::String, + #[prost(enumeration="FlowState", tag="3")] + pub requested_flow_state: i32, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PauseResponse { +pub struct FlowStateChangeResponse { #[prost(bool, tag="1")] pub ok: bool, #[prost(string, tag="2")] @@ -328,6 +330,36 @@ impl CreatePeerStatus { } } } +/// in the future, consider moving DropFlow to this and reduce route surface +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum FlowState { + StateUnknown = 0, + StateRunning = 1, + StatePaused = 2, +} +impl FlowState { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + FlowState::StateUnknown => "STATE_UNKNOWN", + FlowState::StateRunning => "STATE_RUNNING", + FlowState::StatePaused => "STATE_PAUSED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "STATE_UNKNOWN" => Some(Self::StateUnknown), + "STATE_RUNNING" => Some(Self::StateRunning), + "STATE_PAUSED" => Some(Self::StatePaused), + _ => None, + } + } +} include!("peerdb_route.tonic.rs"); include!("peerdb_route.serde.rs"); // @@protoc_insertion_point(module) \ No newline at end of file diff --git a/nexus/pt/src/peerdb_route.serde.rs b/nexus/pt/src/peerdb_route.serde.rs index 641acbb4fc..3c086248d6 100644 --- a/nexus/pt/src/peerdb_route.serde.rs +++ b/nexus/pt/src/peerdb_route.serde.rs @@ -1218,7 +1218,83 @@ impl<'de> serde::Deserialize<'de> for DropPeerResponse { deserializer.deserialize_struct("peerdb_route.DropPeerResponse", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for MirrorStatusRequest { +impl serde::Serialize for FlowState { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::StateUnknown => "STATE_UNKNOWN", + Self::StateRunning => "STATE_RUNNING", + Self::StatePaused => "STATE_PAUSED", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for FlowState { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "STATE_UNKNOWN", + "STATE_RUNNING", + "STATE_PAUSED", + ]; + + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = FlowState; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + use std::convert::TryFrom; + i32::try_from(v) + .ok() + .and_then(FlowState::from_i32) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + use std::convert::TryFrom; + i32::try_from(v) + .ok() + .and_then(FlowState::from_i32) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "STATE_UNKNOWN" => Ok(FlowState::StateUnknown), + "STATE_RUNNING" => Ok(FlowState::StateRunning), + "STATE_PAUSED" => Ok(FlowState::StatePaused), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} +impl serde::Serialize for FlowStateChangeRequest { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -1226,30 +1302,50 @@ impl serde::Serialize for MirrorStatusRequest { { use serde::ser::SerializeStruct; let mut len = 0; + if !self.workflow_id.is_empty() { + len += 1; + } if !self.flow_job_name.is_empty() { len += 1; } - let mut struct_ser = serializer.serialize_struct("peerdb_route.MirrorStatusRequest", len)?; + if self.requested_flow_state != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_route.FlowStateChangeRequest", len)?; + if !self.workflow_id.is_empty() { + struct_ser.serialize_field("workflowId", &self.workflow_id)?; + } if !self.flow_job_name.is_empty() { struct_ser.serialize_field("flowJobName", &self.flow_job_name)?; } + if self.requested_flow_state != 0 { + let v = FlowState::from_i32(self.requested_flow_state) + .ok_or_else(|| serde::ser::Error::custom(format!("Invalid variant {}", self.requested_flow_state)))?; + struct_ser.serialize_field("requestedFlowState", &v)?; + } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for MirrorStatusRequest { +impl<'de> serde::Deserialize<'de> for FlowStateChangeRequest { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ + "workflow_id", + "workflowId", "flow_job_name", "flowJobName", + "requested_flow_state", + "requestedFlowState", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { + WorkflowId, FlowJobName, + RequestedFlowState, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -1272,7 +1368,9 @@ impl<'de> serde::Deserialize<'de> for MirrorStatusRequest { E: serde::de::Error, { match value { + "workflowId" | "workflow_id" => Ok(GeneratedField::WorkflowId), "flowJobName" | "flow_job_name" => Ok(GeneratedField::FlowJobName), + "requestedFlowState" | "requested_flow_state" => Ok(GeneratedField::RequestedFlowState), _ => Ok(GeneratedField::__SkipField__), } } @@ -1282,39 +1380,55 @@ impl<'de> serde::Deserialize<'de> for MirrorStatusRequest { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = MirrorStatusRequest; + type Value = FlowStateChangeRequest; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct peerdb_route.MirrorStatusRequest") + formatter.write_str("struct peerdb_route.FlowStateChangeRequest") } - fn visit_map(self, mut map: V) -> std::result::Result + fn visit_map(self, mut map: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { + let mut workflow_id__ = None; let mut flow_job_name__ = None; + let mut requested_flow_state__ = None; while let Some(k) = map.next_key()? { match k { + GeneratedField::WorkflowId => { + if workflow_id__.is_some() { + return Err(serde::de::Error::duplicate_field("workflowId")); + } + workflow_id__ = Some(map.next_value()?); + } GeneratedField::FlowJobName => { if flow_job_name__.is_some() { return Err(serde::de::Error::duplicate_field("flowJobName")); } flow_job_name__ = Some(map.next_value()?); } + GeneratedField::RequestedFlowState => { + if requested_flow_state__.is_some() { + return Err(serde::de::Error::duplicate_field("requestedFlowState")); + } + requested_flow_state__ = Some(map.next_value::()? as i32); + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } } } - Ok(MirrorStatusRequest { + Ok(FlowStateChangeRequest { + workflow_id: workflow_id__.unwrap_or_default(), flow_job_name: flow_job_name__.unwrap_or_default(), + requested_flow_state: requested_flow_state__.unwrap_or_default(), }) } } - deserializer.deserialize_struct("peerdb_route.MirrorStatusRequest", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("peerdb_route.FlowStateChangeRequest", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for MirrorStatusResponse { +impl serde::Serialize for FlowStateChangeResponse { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -1322,58 +1436,38 @@ impl serde::Serialize for MirrorStatusResponse { { use serde::ser::SerializeStruct; let mut len = 0; - if !self.flow_job_name.is_empty() { + if self.ok { len += 1; } if !self.error_message.is_empty() { len += 1; } - if self.status.is_some() { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("peerdb_route.MirrorStatusResponse", len)?; - if !self.flow_job_name.is_empty() { - struct_ser.serialize_field("flowJobName", &self.flow_job_name)?; + let mut struct_ser = serializer.serialize_struct("peerdb_route.FlowStateChangeResponse", len)?; + if self.ok { + struct_ser.serialize_field("ok", &self.ok)?; } if !self.error_message.is_empty() { struct_ser.serialize_field("errorMessage", &self.error_message)?; } - if let Some(v) = self.status.as_ref() { - match v { - mirror_status_response::Status::QrepStatus(v) => { - struct_ser.serialize_field("qrepStatus", v)?; - } - mirror_status_response::Status::CdcStatus(v) => { - struct_ser.serialize_field("cdcStatus", v)?; - } - } - } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for MirrorStatusResponse { +impl<'de> serde::Deserialize<'de> for FlowStateChangeResponse { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "flow_job_name", - "flowJobName", + "ok", "error_message", "errorMessage", - "qrep_status", - "qrepStatus", - "cdc_status", - "cdcStatus", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - FlowJobName, + Ok, ErrorMessage, - QrepStatus, - CdcStatus, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -1396,10 +1490,8 @@ impl<'de> serde::Deserialize<'de> for MirrorStatusResponse { E: serde::de::Error, { match value { - "flowJobName" | "flow_job_name" => Ok(GeneratedField::FlowJobName), + "ok" => Ok(GeneratedField::Ok), "errorMessage" | "error_message" => Ok(GeneratedField::ErrorMessage), - "qrepStatus" | "qrep_status" => Ok(GeneratedField::QrepStatus), - "cdcStatus" | "cdc_status" => Ok(GeneratedField::CdcStatus), _ => Ok(GeneratedField::__SkipField__), } } @@ -1409,26 +1501,25 @@ impl<'de> serde::Deserialize<'de> for MirrorStatusResponse { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = MirrorStatusResponse; + type Value = FlowStateChangeResponse; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct peerdb_route.MirrorStatusResponse") + formatter.write_str("struct peerdb_route.FlowStateChangeResponse") } - fn visit_map(self, mut map: V) -> std::result::Result + fn visit_map(self, mut map: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { - let mut flow_job_name__ = None; + let mut ok__ = None; let mut error_message__ = None; - let mut status__ = None; while let Some(k) = map.next_key()? { match k { - GeneratedField::FlowJobName => { - if flow_job_name__.is_some() { - return Err(serde::de::Error::duplicate_field("flowJobName")); + GeneratedField::Ok => { + if ok__.is_some() { + return Err(serde::de::Error::duplicate_field("ok")); } - flow_job_name__ = Some(map.next_value()?); + ok__ = Some(map.next_value()?); } GeneratedField::ErrorMessage => { if error_message__.is_some() { @@ -1436,36 +1527,21 @@ impl<'de> serde::Deserialize<'de> for MirrorStatusResponse { } error_message__ = Some(map.next_value()?); } - GeneratedField::QrepStatus => { - if status__.is_some() { - return Err(serde::de::Error::duplicate_field("qrepStatus")); - } - status__ = map.next_value::<::std::option::Option<_>>()?.map(mirror_status_response::Status::QrepStatus) -; - } - GeneratedField::CdcStatus => { - if status__.is_some() { - return Err(serde::de::Error::duplicate_field("cdcStatus")); - } - status__ = map.next_value::<::std::option::Option<_>>()?.map(mirror_status_response::Status::CdcStatus) -; - } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } } } - Ok(MirrorStatusResponse { - flow_job_name: flow_job_name__.unwrap_or_default(), + Ok(FlowStateChangeResponse { + ok: ok__.unwrap_or_default(), error_message: error_message__.unwrap_or_default(), - status: status__, }) } } - deserializer.deserialize_struct("peerdb_route.MirrorStatusResponse", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("peerdb_route.FlowStateChangeResponse", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for PartitionStatus { +impl serde::Serialize for MirrorStatusRequest { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -1473,57 +1549,30 @@ impl serde::Serialize for PartitionStatus { { use serde::ser::SerializeStruct; let mut len = 0; - if !self.partition_id.is_empty() { - len += 1; - } - if self.start_time.is_some() { - len += 1; - } - if self.end_time.is_some() { - len += 1; - } - if self.num_rows != 0 { + if !self.flow_job_name.is_empty() { len += 1; } - let mut struct_ser = serializer.serialize_struct("peerdb_route.PartitionStatus", len)?; - if !self.partition_id.is_empty() { - struct_ser.serialize_field("partitionId", &self.partition_id)?; - } - if let Some(v) = self.start_time.as_ref() { - struct_ser.serialize_field("startTime", v)?; - } - if let Some(v) = self.end_time.as_ref() { - struct_ser.serialize_field("endTime", v)?; - } - if self.num_rows != 0 { - struct_ser.serialize_field("numRows", &self.num_rows)?; + let mut struct_ser = serializer.serialize_struct("peerdb_route.MirrorStatusRequest", len)?; + if !self.flow_job_name.is_empty() { + struct_ser.serialize_field("flowJobName", &self.flow_job_name)?; } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for PartitionStatus { +impl<'de> serde::Deserialize<'de> for MirrorStatusRequest { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "partition_id", - "partitionId", - "start_time", - "startTime", - "end_time", - "endTime", - "num_rows", - "numRows", + "flow_job_name", + "flowJobName", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - PartitionId, - StartTime, - EndTime, - NumRows, + FlowJobName, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -1546,10 +1595,7 @@ impl<'de> serde::Deserialize<'de> for PartitionStatus { E: serde::de::Error, { match value { - "partitionId" | "partition_id" => Ok(GeneratedField::PartitionId), - "startTime" | "start_time" => Ok(GeneratedField::StartTime), - "endTime" | "end_time" => Ok(GeneratedField::EndTime), - "numRows" | "num_rows" => Ok(GeneratedField::NumRows), + "flowJobName" | "flow_job_name" => Ok(GeneratedField::FlowJobName), _ => Ok(GeneratedField::__SkipField__), } } @@ -1559,65 +1605,39 @@ impl<'de> serde::Deserialize<'de> for PartitionStatus { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = PartitionStatus; + type Value = MirrorStatusRequest; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct peerdb_route.PartitionStatus") + formatter.write_str("struct peerdb_route.MirrorStatusRequest") } - fn visit_map(self, mut map: V) -> std::result::Result + fn visit_map(self, mut map: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { - let mut partition_id__ = None; - let mut start_time__ = None; - let mut end_time__ = None; - let mut num_rows__ = None; + let mut flow_job_name__ = None; while let Some(k) = map.next_key()? { match k { - GeneratedField::PartitionId => { - if partition_id__.is_some() { - return Err(serde::de::Error::duplicate_field("partitionId")); - } - partition_id__ = Some(map.next_value()?); - } - GeneratedField::StartTime => { - if start_time__.is_some() { - return Err(serde::de::Error::duplicate_field("startTime")); - } - start_time__ = map.next_value()?; - } - GeneratedField::EndTime => { - if end_time__.is_some() { - return Err(serde::de::Error::duplicate_field("endTime")); - } - end_time__ = map.next_value()?; - } - GeneratedField::NumRows => { - if num_rows__.is_some() { - return Err(serde::de::Error::duplicate_field("numRows")); + GeneratedField::FlowJobName => { + if flow_job_name__.is_some() { + return Err(serde::de::Error::duplicate_field("flowJobName")); } - num_rows__ = - Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) - ; + flow_job_name__ = Some(map.next_value()?); } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } } } - Ok(PartitionStatus { - partition_id: partition_id__.unwrap_or_default(), - start_time: start_time__, - end_time: end_time__, - num_rows: num_rows__.unwrap_or_default(), + Ok(MirrorStatusRequest { + flow_job_name: flow_job_name__.unwrap_or_default(), }) } } - deserializer.deserialize_struct("peerdb_route.PartitionStatus", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("peerdb_route.MirrorStatusRequest", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for PauseRequest { +impl serde::Serialize for MirrorStatusResponse { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -1625,39 +1645,58 @@ impl serde::Serialize for PauseRequest { { use serde::ser::SerializeStruct; let mut len = 0; - if !self.workflow_id.is_empty() { + if !self.flow_job_name.is_empty() { len += 1; } - if !self.flow_job_name.is_empty() { + if !self.error_message.is_empty() { len += 1; } - let mut struct_ser = serializer.serialize_struct("peerdb_route.PauseRequest", len)?; - if !self.workflow_id.is_empty() { - struct_ser.serialize_field("workflowId", &self.workflow_id)?; + if self.status.is_some() { + len += 1; } + let mut struct_ser = serializer.serialize_struct("peerdb_route.MirrorStatusResponse", len)?; if !self.flow_job_name.is_empty() { struct_ser.serialize_field("flowJobName", &self.flow_job_name)?; } + if !self.error_message.is_empty() { + struct_ser.serialize_field("errorMessage", &self.error_message)?; + } + if let Some(v) = self.status.as_ref() { + match v { + mirror_status_response::Status::QrepStatus(v) => { + struct_ser.serialize_field("qrepStatus", v)?; + } + mirror_status_response::Status::CdcStatus(v) => { + struct_ser.serialize_field("cdcStatus", v)?; + } + } + } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for PauseRequest { +impl<'de> serde::Deserialize<'de> for MirrorStatusResponse { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "workflow_id", - "workflowId", "flow_job_name", "flowJobName", + "error_message", + "errorMessage", + "qrep_status", + "qrepStatus", + "cdc_status", + "cdcStatus", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - WorkflowId, FlowJobName, + ErrorMessage, + QrepStatus, + CdcStatus, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -1680,8 +1719,10 @@ impl<'de> serde::Deserialize<'de> for PauseRequest { E: serde::de::Error, { match value { - "workflowId" | "workflow_id" => Ok(GeneratedField::WorkflowId), "flowJobName" | "flow_job_name" => Ok(GeneratedField::FlowJobName), + "errorMessage" | "error_message" => Ok(GeneratedField::ErrorMessage), + "qrepStatus" | "qrep_status" => Ok(GeneratedField::QrepStatus), + "cdcStatus" | "cdc_status" => Ok(GeneratedField::CdcStatus), _ => Ok(GeneratedField::__SkipField__), } } @@ -1691,47 +1732,63 @@ impl<'de> serde::Deserialize<'de> for PauseRequest { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = PauseRequest; + type Value = MirrorStatusResponse; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct peerdb_route.PauseRequest") + formatter.write_str("struct peerdb_route.MirrorStatusResponse") } - fn visit_map(self, mut map: V) -> std::result::Result + fn visit_map(self, mut map: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { - let mut workflow_id__ = None; let mut flow_job_name__ = None; + let mut error_message__ = None; + let mut status__ = None; while let Some(k) = map.next_key()? { match k { - GeneratedField::WorkflowId => { - if workflow_id__.is_some() { - return Err(serde::de::Error::duplicate_field("workflowId")); - } - workflow_id__ = Some(map.next_value()?); - } GeneratedField::FlowJobName => { if flow_job_name__.is_some() { return Err(serde::de::Error::duplicate_field("flowJobName")); } flow_job_name__ = Some(map.next_value()?); } + GeneratedField::ErrorMessage => { + if error_message__.is_some() { + return Err(serde::de::Error::duplicate_field("errorMessage")); + } + error_message__ = Some(map.next_value()?); + } + GeneratedField::QrepStatus => { + if status__.is_some() { + return Err(serde::de::Error::duplicate_field("qrepStatus")); + } + status__ = map.next_value::<::std::option::Option<_>>()?.map(mirror_status_response::Status::QrepStatus) +; + } + GeneratedField::CdcStatus => { + if status__.is_some() { + return Err(serde::de::Error::duplicate_field("cdcStatus")); + } + status__ = map.next_value::<::std::option::Option<_>>()?.map(mirror_status_response::Status::CdcStatus) +; + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } } } - Ok(PauseRequest { - workflow_id: workflow_id__.unwrap_or_default(), + Ok(MirrorStatusResponse { flow_job_name: flow_job_name__.unwrap_or_default(), + error_message: error_message__.unwrap_or_default(), + status: status__, }) } } - deserializer.deserialize_struct("peerdb_route.PauseRequest", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("peerdb_route.MirrorStatusResponse", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for PauseResponse { +impl serde::Serialize for PartitionStatus { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -1739,38 +1796,57 @@ impl serde::Serialize for PauseResponse { { use serde::ser::SerializeStruct; let mut len = 0; - if self.ok { + if !self.partition_id.is_empty() { len += 1; } - if !self.error_message.is_empty() { + if self.start_time.is_some() { len += 1; } - let mut struct_ser = serializer.serialize_struct("peerdb_route.PauseResponse", len)?; - if self.ok { - struct_ser.serialize_field("ok", &self.ok)?; + if self.end_time.is_some() { + len += 1; } - if !self.error_message.is_empty() { - struct_ser.serialize_field("errorMessage", &self.error_message)?; + if self.num_rows != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_route.PartitionStatus", len)?; + if !self.partition_id.is_empty() { + struct_ser.serialize_field("partitionId", &self.partition_id)?; + } + if let Some(v) = self.start_time.as_ref() { + struct_ser.serialize_field("startTime", v)?; + } + if let Some(v) = self.end_time.as_ref() { + struct_ser.serialize_field("endTime", v)?; + } + if self.num_rows != 0 { + struct_ser.serialize_field("numRows", &self.num_rows)?; } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for PauseResponse { +impl<'de> serde::Deserialize<'de> for PartitionStatus { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "ok", - "error_message", - "errorMessage", + "partition_id", + "partitionId", + "start_time", + "startTime", + "end_time", + "endTime", + "num_rows", + "numRows", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - Ok, - ErrorMessage, + PartitionId, + StartTime, + EndTime, + NumRows, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -1793,8 +1869,10 @@ impl<'de> serde::Deserialize<'de> for PauseResponse { E: serde::de::Error, { match value { - "ok" => Ok(GeneratedField::Ok), - "errorMessage" | "error_message" => Ok(GeneratedField::ErrorMessage), + "partitionId" | "partition_id" => Ok(GeneratedField::PartitionId), + "startTime" | "start_time" => Ok(GeneratedField::StartTime), + "endTime" | "end_time" => Ok(GeneratedField::EndTime), + "numRows" | "num_rows" => Ok(GeneratedField::NumRows), _ => Ok(GeneratedField::__SkipField__), } } @@ -1804,44 +1882,62 @@ impl<'de> serde::Deserialize<'de> for PauseResponse { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = PauseResponse; + type Value = PartitionStatus; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct peerdb_route.PauseResponse") + formatter.write_str("struct peerdb_route.PartitionStatus") } - fn visit_map(self, mut map: V) -> std::result::Result + fn visit_map(self, mut map: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { - let mut ok__ = None; - let mut error_message__ = None; + let mut partition_id__ = None; + let mut start_time__ = None; + let mut end_time__ = None; + let mut num_rows__ = None; while let Some(k) = map.next_key()? { match k { - GeneratedField::Ok => { - if ok__.is_some() { - return Err(serde::de::Error::duplicate_field("ok")); + GeneratedField::PartitionId => { + if partition_id__.is_some() { + return Err(serde::de::Error::duplicate_field("partitionId")); } - ok__ = Some(map.next_value()?); + partition_id__ = Some(map.next_value()?); } - GeneratedField::ErrorMessage => { - if error_message__.is_some() { - return Err(serde::de::Error::duplicate_field("errorMessage")); + GeneratedField::StartTime => { + if start_time__.is_some() { + return Err(serde::de::Error::duplicate_field("startTime")); } - error_message__ = Some(map.next_value()?); + start_time__ = map.next_value()?; + } + GeneratedField::EndTime => { + if end_time__.is_some() { + return Err(serde::de::Error::duplicate_field("endTime")); + } + end_time__ = map.next_value()?; + } + GeneratedField::NumRows => { + if num_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("numRows")); + } + num_rows__ = + Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } } } - Ok(PauseResponse { - ok: ok__.unwrap_or_default(), - error_message: error_message__.unwrap_or_default(), + Ok(PartitionStatus { + partition_id: partition_id__.unwrap_or_default(), + start_time: start_time__, + end_time: end_time__, + num_rows: num_rows__.unwrap_or_default(), }) } } - deserializer.deserialize_struct("peerdb_route.PauseResponse", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("peerdb_route.PartitionStatus", FIELDS, GeneratedVisitor) } } impl serde::Serialize for PeerSchemasResponse { diff --git a/nexus/pt/src/peerdb_route.tonic.rs b/nexus/pt/src/peerdb_route.tonic.rs index cdc690b9e4..7c479c1064 100644 --- a/nexus/pt/src/peerdb_route.tonic.rs +++ b/nexus/pt/src/peerdb_route.tonic.rs @@ -374,10 +374,13 @@ pub mod flow_service_client { self.inner.unary(req, path, codec).await } /// - pub async fn pause_flow( + pub async fn flow_state_change( &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -389,11 +392,11 @@ pub mod flow_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/peerdb_route.FlowService/PauseFlow", + "/peerdb_route.FlowService/FlowStateChange", ); let mut req = request.into_request(); req.extensions_mut() - .insert(GrpcMethod::new("peerdb_route.FlowService", "PauseFlow")); + .insert(GrpcMethod::new("peerdb_route.FlowService", "FlowStateChange")); self.inner.unary(req, path, codec).await } /// @@ -520,10 +523,13 @@ pub mod flow_service_server { tonic::Status, >; /// - async fn pause_flow( + async fn flow_state_change( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// async fn mirror_status( &self, @@ -1117,22 +1123,26 @@ pub mod flow_service_server { }; Box::pin(fut) } - "/peerdb_route.FlowService/PauseFlow" => { + "/peerdb_route.FlowService/FlowStateChange" => { #[allow(non_camel_case_types)] - struct PauseFlowSvc(pub Arc); - impl tonic::server::UnaryService - for PauseFlowSvc { - type Response = super::PauseResponse; + struct FlowStateChangeSvc(pub Arc); + impl< + T: FlowService, + > tonic::server::UnaryService + for FlowStateChangeSvc { + type Response = super::FlowStateChangeResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).pause_flow(request).await }; + let fut = async move { + (*inner).flow_state_change(request).await + }; Box::pin(fut) } } @@ -1143,7 +1153,7 @@ pub mod flow_service_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = PauseFlowSvc(inner); + let method = FlowStateChangeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 504d6e7d55..578c0f5f4f 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -681,7 +681,7 @@ impl NexusBackend { if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler - .pause_flow_job(flow_job_name, &workflow_details.workflow_id) + .flow_state_change(flow_job_name, &workflow_details.workflow_id, true) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -706,6 +706,67 @@ impl NexusBackend { format!("no such mirror: {:?}", flow_job_name), )))) } + }, + PeerDDL::ResumeMirror { + if_exists, + flow_job_name, + } => { + if self.flow_handler.is_none() { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: "flow service is not configured".to_owned(), + }))); + } + + let catalog = self.catalog.lock().await; + tracing::info!( + "[RESUME MIRROR] mirror_name: {}, if_exists: {}", + flow_job_name, + if_exists + ); + let workflow_details = catalog + .get_workflow_details_for_flow_job(flow_job_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to query catalog for job metadata: {:?}", + err + ), + })) + })?; + tracing::info!( + "[RESUME MIRROR] got workflow id: {:?}", + workflow_details.as_ref().map(|w| &w.workflow_id) + ); + + if let Some(workflow_details) = workflow_details { + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + flow_handler + .flow_state_change(flow_job_name, &workflow_details.workflow_id, false) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to shutdown flow job: {:?}", err), + })) + })?; + let drop_mirror_success = format!("RESUME MIRROR {}", flow_job_name); + Ok(vec![Response::Execution(Tag::new_for_execution( + &drop_mirror_success, + None, + ))]) + } else if *if_exists { + let no_mirror_success = "NO SUCH MIRROR"; + Ok(vec![Response::Execution(Tag::new_for_execution( + no_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("no such mirror: {:?}", flow_job_name), + )))) + } } }, NexusStatement::PeerQuery { stmt, assoc } => { diff --git a/protos/route.proto b/protos/route.proto index 500060c02b..d02a30bc89 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -175,12 +175,20 @@ message MirrorStatusResponse { string error_message = 4; } -message PauseRequest { +// in the future, consider moving DropFlow to this and reduce route surface +enum FlowState { + STATE_UNKNOWN = 0; + STATE_RUNNING = 1; + STATE_PAUSED = 2; +} + +message FlowStateChangeRequest { string workflow_id = 1; string flow_job_name = 2; + FlowState requested_flow_state = 3; } -message PauseResponse { +message FlowStateChangeResponse { bool ok = 1; string error_message = 2; } @@ -238,7 +246,7 @@ service FlowService { rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) { option (google.api.http) = { post: "/v1/mirrors/drop", body: "*" }; } - rpc PauseFlow(PauseRequest) returns (PauseResponse) {} + rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) {} rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) { option (google.api.http) = { get: "/v1/mirrors/{flow_job_name}" }; } diff --git a/ui/grpc_generated/route.ts b/ui/grpc_generated/route.ts index 3ce421da32..08d133fa83 100644 --- a/ui/grpc_generated/route.ts +++ b/ui/grpc_generated/route.ts @@ -97,6 +97,46 @@ export function createPeerStatusToJSON(object: CreatePeerStatus): string { } } +/** in the future, consider moving DropFlow to this and reduce route surface */ +export enum FlowState { + STATE_UNKNOWN = 0, + STATE_RUNNING = 1, + STATE_PAUSED = 2, + UNRECOGNIZED = -1, +} + +export function flowStateFromJSON(object: any): FlowState { + switch (object) { + case 0: + case "STATE_UNKNOWN": + return FlowState.STATE_UNKNOWN; + case 1: + case "STATE_RUNNING": + return FlowState.STATE_RUNNING; + case 2: + case "STATE_PAUSED": + return FlowState.STATE_PAUSED; + case -1: + case "UNRECOGNIZED": + default: + return FlowState.UNRECOGNIZED; + } +} + +export function flowStateToJSON(object: FlowState): string { + switch (object) { + case FlowState.STATE_UNKNOWN: + return "STATE_UNKNOWN"; + case FlowState.STATE_RUNNING: + return "STATE_RUNNING"; + case FlowState.STATE_PAUSED: + return "STATE_PAUSED"; + case FlowState.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + export interface CreateCDCFlowRequest { connectionConfigs: FlowConnectionConfigs | undefined; createCatalogEntry: boolean; @@ -254,12 +294,13 @@ export interface MirrorStatusResponse { errorMessage: string; } -export interface PauseRequest { +export interface FlowStateChangeRequest { workflowId: string; flowJobName: string; + requestedFlowState: FlowState; } -export interface PauseResponse { +export interface FlowStateChangeResponse { ok: boolean; errorMessage: string; } @@ -2498,25 +2539,28 @@ export const MirrorStatusResponse = { }, }; -function createBasePauseRequest(): PauseRequest { - return { workflowId: "", flowJobName: "" }; +function createBaseFlowStateChangeRequest(): FlowStateChangeRequest { + return { workflowId: "", flowJobName: "", requestedFlowState: 0 }; } -export const PauseRequest = { - encode(message: PauseRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { +export const FlowStateChangeRequest = { + encode(message: FlowStateChangeRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { if (message.workflowId !== "") { writer.uint32(10).string(message.workflowId); } if (message.flowJobName !== "") { writer.uint32(18).string(message.flowJobName); } + if (message.requestedFlowState !== 0) { + writer.uint32(24).int32(message.requestedFlowState); + } return writer; }, - decode(input: _m0.Reader | Uint8Array, length?: number): PauseRequest { + decode(input: _m0.Reader | Uint8Array, length?: number): FlowStateChangeRequest { const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); let end = length === undefined ? reader.len : reader.pos + length; - const message = createBasePauseRequest(); + const message = createBaseFlowStateChangeRequest(); while (reader.pos < end) { const tag = reader.uint32(); switch (tag >>> 3) { @@ -2534,6 +2578,13 @@ export const PauseRequest = { message.flowJobName = reader.string(); continue; + case 3: + if (tag !== 24) { + break; + } + + message.requestedFlowState = reader.int32() as any; + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -2543,14 +2594,15 @@ export const PauseRequest = { return message; }, - fromJSON(object: any): PauseRequest { + fromJSON(object: any): FlowStateChangeRequest { return { workflowId: isSet(object.workflowId) ? String(object.workflowId) : "", flowJobName: isSet(object.flowJobName) ? String(object.flowJobName) : "", + requestedFlowState: isSet(object.requestedFlowState) ? flowStateFromJSON(object.requestedFlowState) : 0, }; }, - toJSON(message: PauseRequest): unknown { + toJSON(message: FlowStateChangeRequest): unknown { const obj: any = {}; if (message.workflowId !== "") { obj.workflowId = message.workflowId; @@ -2558,26 +2610,30 @@ export const PauseRequest = { if (message.flowJobName !== "") { obj.flowJobName = message.flowJobName; } + if (message.requestedFlowState !== 0) { + obj.requestedFlowState = flowStateToJSON(message.requestedFlowState); + } return obj; }, - create, I>>(base?: I): PauseRequest { - return PauseRequest.fromPartial(base ?? ({} as any)); + create, I>>(base?: I): FlowStateChangeRequest { + return FlowStateChangeRequest.fromPartial(base ?? ({} as any)); }, - fromPartial, I>>(object: I): PauseRequest { - const message = createBasePauseRequest(); + fromPartial, I>>(object: I): FlowStateChangeRequest { + const message = createBaseFlowStateChangeRequest(); message.workflowId = object.workflowId ?? ""; message.flowJobName = object.flowJobName ?? ""; + message.requestedFlowState = object.requestedFlowState ?? 0; return message; }, }; -function createBasePauseResponse(): PauseResponse { +function createBaseFlowStateChangeResponse(): FlowStateChangeResponse { return { ok: false, errorMessage: "" }; } -export const PauseResponse = { - encode(message: PauseResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { +export const FlowStateChangeResponse = { + encode(message: FlowStateChangeResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { if (message.ok === true) { writer.uint32(8).bool(message.ok); } @@ -2587,10 +2643,10 @@ export const PauseResponse = { return writer; }, - decode(input: _m0.Reader | Uint8Array, length?: number): PauseResponse { + decode(input: _m0.Reader | Uint8Array, length?: number): FlowStateChangeResponse { const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); let end = length === undefined ? reader.len : reader.pos + length; - const message = createBasePauseResponse(); + const message = createBaseFlowStateChangeResponse(); while (reader.pos < end) { const tag = reader.uint32(); switch (tag >>> 3) { @@ -2617,14 +2673,14 @@ export const PauseResponse = { return message; }, - fromJSON(object: any): PauseResponse { + fromJSON(object: any): FlowStateChangeResponse { return { ok: isSet(object.ok) ? Boolean(object.ok) : false, errorMessage: isSet(object.errorMessage) ? String(object.errorMessage) : "", }; }, - toJSON(message: PauseResponse): unknown { + toJSON(message: FlowStateChangeResponse): unknown { const obj: any = {}; if (message.ok === true) { obj.ok = message.ok; @@ -2635,11 +2691,11 @@ export const PauseResponse = { return obj; }, - create, I>>(base?: I): PauseResponse { - return PauseResponse.fromPartial(base ?? ({} as any)); + create, I>>(base?: I): FlowStateChangeResponse { + return FlowStateChangeResponse.fromPartial(base ?? ({} as any)); }, - fromPartial, I>>(object: I): PauseResponse { - const message = createBasePauseResponse(); + fromPartial, I>>(object: I): FlowStateChangeResponse { + const message = createBaseFlowStateChangeResponse(); message.ok = object.ok ?? false; message.errorMessage = object.errorMessage ?? ""; return message; @@ -2750,14 +2806,14 @@ export const FlowServiceService = { responseSerialize: (value: ShutdownResponse) => Buffer.from(ShutdownResponse.encode(value).finish()), responseDeserialize: (value: Buffer) => ShutdownResponse.decode(value), }, - pauseFlow: { - path: "/peerdb_route.FlowService/PauseFlow", + flowStateChange: { + path: "/peerdb_route.FlowService/FlowStateChange", requestStream: false, responseStream: false, - requestSerialize: (value: PauseRequest) => Buffer.from(PauseRequest.encode(value).finish()), - requestDeserialize: (value: Buffer) => PauseRequest.decode(value), - responseSerialize: (value: PauseResponse) => Buffer.from(PauseResponse.encode(value).finish()), - responseDeserialize: (value: Buffer) => PauseResponse.decode(value), + requestSerialize: (value: FlowStateChangeRequest) => Buffer.from(FlowStateChangeRequest.encode(value).finish()), + requestDeserialize: (value: Buffer) => FlowStateChangeRequest.decode(value), + responseSerialize: (value: FlowStateChangeResponse) => Buffer.from(FlowStateChangeResponse.encode(value).finish()), + responseDeserialize: (value: Buffer) => FlowStateChangeResponse.decode(value), }, mirrorStatus: { path: "/peerdb_route.FlowService/MirrorStatus", @@ -2782,7 +2838,7 @@ export interface FlowServiceServer extends UntypedServiceImplementation { getSlotInfo: handleUnaryCall; getStatInfo: handleUnaryCall; shutdownFlow: handleUnaryCall; - pauseFlow: handleUnaryCall; + flowStateChange: handleUnaryCall; mirrorStatus: handleUnaryCall; } @@ -2952,20 +3008,20 @@ export interface FlowServiceClient extends Client { options: Partial, callback: (error: ServiceError | null, response: ShutdownResponse) => void, ): ClientUnaryCall; - pauseFlow( - request: PauseRequest, - callback: (error: ServiceError | null, response: PauseResponse) => void, + flowStateChange( + request: FlowStateChangeRequest, + callback: (error: ServiceError | null, response: FlowStateChangeResponse) => void, ): ClientUnaryCall; - pauseFlow( - request: PauseRequest, + flowStateChange( + request: FlowStateChangeRequest, metadata: Metadata, - callback: (error: ServiceError | null, response: PauseResponse) => void, + callback: (error: ServiceError | null, response: FlowStateChangeResponse) => void, ): ClientUnaryCall; - pauseFlow( - request: PauseRequest, + flowStateChange( + request: FlowStateChangeRequest, metadata: Metadata, options: Partial, - callback: (error: ServiceError | null, response: PauseResponse) => void, + callback: (error: ServiceError | null, response: FlowStateChangeResponse) => void, ): ClientUnaryCall; mirrorStatus( request: MirrorStatusRequest,