diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index d9a6e0a2c5..29777bdbdd 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -154,7 +154,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( return nil, fmt.Errorf("unable to update flow config in catalog: %w", err) } - state := peerflow.NewCDCFlowState() + state := peerflow.NewCDCFlowWorkflowState() _, err = h.temporalClient.ExecuteWorkflow( ctx, // context workflowOptions, // workflow start options @@ -350,6 +350,37 @@ func (h *FlowRequestHandler) ShutdownFlow( }, nil } +func (h *FlowRequestHandler) FlowStateChange( + ctx context.Context, + req *protos.FlowStateChangeRequest, +) (*protos.FlowStateChangeResponse, error) { + var err error + if req.RequestedFlowState == protos.FlowState_STATE_PAUSED { + err = h.temporalClient.SignalWorkflow( + ctx, + req.WorkflowId, + "", + shared.CDCFlowSignalName, + shared.PauseSignal, + ) + } else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING { + err = h.temporalClient.SignalWorkflow( + ctx, + req.WorkflowId, + "", + shared.CDCFlowSignalName, + shared.NoopSignal, + ) + } + if err != nil { + return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err) + } + + return &protos.FlowStateChangeResponse{ + Ok: true, + }, nil +} + func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error { expBackoff := backoff.NewExponentialBackOff() expBackoff.InitialInterval = 3 * time.Second diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index fdca5abafa..923beb265b 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -2,7 +2,6 @@ package connpostgres import ( "context" - "database/sql" "fmt" "regexp" "time" @@ -839,7 +838,7 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error { } defer func() { deferErr := syncFlowCleanupTx.Rollback(c.ctx) - if deferErr != sql.ErrTxDone && deferErr != nil { + if deferErr != pgx.ErrTxClosed && deferErr != nil { log.WithFields(log.Fields{ "flowName": jobName, }).Errorf("unexpected error while rolling back transaction for flow cleanup: %v", deferErr) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 0abf3311d6..4f53e9a6a4 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -65,7 +65,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, connectionGen.FlowJobName, ) if err == nil { - var state peerflow.CDCFlowState + var state peerflow.CDCFlowWorkflowState err = response.Get(&state) if err != nil { log.Errorln(err) @@ -95,7 +95,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, connectionGen.FlowJobName, ) if err == nil { - var state peerflow.CDCFlowState + var state peerflow.CDCFlowWorkflowState err = response.Get(&state) if err != nil { log.Errorln(err) diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index 1c38efbd13..0f3dcc8845 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -2595,7 +2595,7 @@ type QRepConfig struct { // this is the location where the avro files will be written // if this starts with gs:// then it will be written to GCS // if this starts with s3:// then it will be written to S3 - // if nothing is specified then it will be written to local disk + // if nothing is specified then it will be written to local disk, only supported in Snowflake // if using GCS or S3 make sure your instance has the correct permissions. StagingPath string `protobuf:"bytes,15,opt,name=staging_path,json=stagingPath,proto3" json:"staging_path,omitempty"` // This setting overrides batch_size_int and batch_duration_seconds diff --git a/flow/generated/protos/route.pb.go b/flow/generated/protos/route.pb.go index 3d393c16d2..60755e99f6 100644 --- a/flow/generated/protos/route.pb.go +++ b/flow/generated/protos/route.pb.go @@ -120,6 +120,56 @@ func (CreatePeerStatus) EnumDescriptor() ([]byte, []int) { return file_route_proto_rawDescGZIP(), []int{1} } +// in the future, consider moving DropFlow to this and reduce route surface +type FlowState int32 + +const ( + FlowState_STATE_UNKNOWN FlowState = 0 + FlowState_STATE_RUNNING FlowState = 1 + FlowState_STATE_PAUSED FlowState = 2 +) + +// Enum value maps for FlowState. +var ( + FlowState_name = map[int32]string{ + 0: "STATE_UNKNOWN", + 1: "STATE_RUNNING", + 2: "STATE_PAUSED", + } + FlowState_value = map[string]int32{ + "STATE_UNKNOWN": 0, + "STATE_RUNNING": 1, + "STATE_PAUSED": 2, + } +) + +func (x FlowState) Enum() *FlowState { + p := new(FlowState) + *p = x + return p +} + +func (x FlowState) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (FlowState) Descriptor() protoreflect.EnumDescriptor { + return file_route_proto_enumTypes[2].Descriptor() +} + +func (FlowState) Type() protoreflect.EnumType { + return &file_route_proto_enumTypes[2] +} + +func (x FlowState) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use FlowState.Descriptor instead. +func (FlowState) EnumDescriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{2} +} + type CreateCDCFlowRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1789,6 +1839,124 @@ func (*MirrorStatusResponse_QrepStatus) isMirrorStatusResponse_Status() {} func (*MirrorStatusResponse_CdcStatus) isMirrorStatusResponse_Status() {} +type FlowStateChangeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + WorkflowId string `protobuf:"bytes,1,opt,name=workflow_id,json=workflowId,proto3" json:"workflow_id,omitempty"` + FlowJobName string `protobuf:"bytes,2,opt,name=flow_job_name,json=flowJobName,proto3" json:"flow_job_name,omitempty"` + RequestedFlowState FlowState `protobuf:"varint,3,opt,name=requested_flow_state,json=requestedFlowState,proto3,enum=peerdb_route.FlowState" json:"requested_flow_state,omitempty"` +} + +func (x *FlowStateChangeRequest) Reset() { + *x = FlowStateChangeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_route_proto_msgTypes[29] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FlowStateChangeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FlowStateChangeRequest) ProtoMessage() {} + +func (x *FlowStateChangeRequest) ProtoReflect() protoreflect.Message { + mi := &file_route_proto_msgTypes[29] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FlowStateChangeRequest.ProtoReflect.Descriptor instead. +func (*FlowStateChangeRequest) Descriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{29} +} + +func (x *FlowStateChangeRequest) GetWorkflowId() string { + if x != nil { + return x.WorkflowId + } + return "" +} + +func (x *FlowStateChangeRequest) GetFlowJobName() string { + if x != nil { + return x.FlowJobName + } + return "" +} + +func (x *FlowStateChangeRequest) GetRequestedFlowState() FlowState { + if x != nil { + return x.RequestedFlowState + } + return FlowState_STATE_UNKNOWN +} + +type FlowStateChangeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ok bool `protobuf:"varint,1,opt,name=ok,proto3" json:"ok,omitempty"` + ErrorMessage string `protobuf:"bytes,2,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` +} + +func (x *FlowStateChangeResponse) Reset() { + *x = FlowStateChangeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_route_proto_msgTypes[30] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FlowStateChangeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FlowStateChangeResponse) ProtoMessage() {} + +func (x *FlowStateChangeResponse) ProtoReflect() protoreflect.Message { + mi := &file_route_proto_msgTypes[30] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FlowStateChangeResponse.ProtoReflect.Descriptor instead. +func (*FlowStateChangeResponse) Descriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{30} +} + +func (x *FlowStateChangeResponse) GetOk() bool { + if x != nil { + return x.Ok + } + return false +} + +func (x *FlowStateChangeResponse) GetErrorMessage() string { + if x != nil { + return x.ErrorMessage + } + return "" +} + var File_route_proto protoreflect.FileDescriptor var file_route_proto_rawDesc = []byte{ @@ -1999,16 +2167,36 @@ var file_route_proto_rawDesc = []byte{ 0x64, 0x63, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x08, 0x0a, - 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2a, 0x42, 0x0a, 0x12, 0x56, 0x61, 0x6c, 0x69, 0x64, - 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, - 0x10, 0x43, 0x52, 0x45, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, - 0x4e, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x01, 0x12, 0x0b, - 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x02, 0x2a, 0x43, 0x0a, 0x10, 0x43, - 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, - 0x16, 0x0a, 0x12, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, - 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, - 0x45, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, - 0x32, 0xa2, 0x0b, 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0xa8, 0x01, 0x0a, 0x16, 0x46, 0x6c, 0x6f, 0x77, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, + 0x77, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, + 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x49, 0x0a, 0x14, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x65, 0x64, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x17, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x12, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x64, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, + 0x74, 0x65, 0x22, 0x4e, 0x0a, 0x17, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, + 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, + 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x23, 0x0a, + 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x2a, 0x42, 0x0a, 0x12, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, + 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x10, 0x43, 0x52, 0x45, 0x41, + 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x09, + 0x0a, 0x05, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, + 0x41, 0x4c, 0x49, 0x44, 0x10, 0x02, 0x2a, 0x43, 0x0a, 0x10, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x56, 0x41, + 0x4c, 0x49, 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, + 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, + 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x43, 0x0a, 0x09, 0x46, + 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, + 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x53, + 0x54, 0x41, 0x54, 0x45, 0x5f, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, + 0x0a, 0x0c, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x50, 0x41, 0x55, 0x53, 0x45, 0x44, 0x10, 0x02, + 0x32, 0x84, 0x0c, 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x74, 0x0a, 0x0c, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, @@ -2090,23 +2278,29 @@ var file_route_proto_rawDesc = []byte{ 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x3a, 0x01, 0x2a, 0x22, 0x10, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x69, 0x72, - 0x72, 0x6f, 0x72, 0x73, 0x2f, 0x64, 0x72, 0x6f, 0x70, 0x12, 0x7a, 0x0a, 0x0c, 0x4d, 0x69, 0x72, - 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, 0x72, 0x72, - 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x69, - 0x72, 0x72, 0x6f, 0x72, 0x73, 0x2f, 0x7b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, - 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, 0x0a, 0x52, 0x6f, 0x75, 0x74, 0x65, - 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, - 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, - 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xca, 0x02, 0x0b, - 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xe2, 0x02, 0x17, 0x50, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, - 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, - 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x72, 0x73, 0x2f, 0x64, 0x72, 0x6f, 0x70, 0x12, 0x60, 0x0a, 0x0f, 0x46, 0x6c, 0x6f, + 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x24, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, + 0x65, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x7a, 0x0a, 0x0c, 0x4d, + 0x69, 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x2e, 0x70, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, 0x72, 0x72, 0x6f, + 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, + 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4d, 0x69, + 0x72, 0x72, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x12, 0x1b, 0x2f, 0x76, 0x31, 0x2f, + 0x6d, 0x69, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x2f, 0x7b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, + 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x7d, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, 0x0a, 0x52, 0x6f, 0x75, + 0x74, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, + 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, + 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xca, + 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xe2, 0x02, 0x17, + 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x52, 0x6f, 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -2121,97 +2315,103 @@ func file_route_proto_rawDescGZIP() []byte { return file_route_proto_rawDescData } -var file_route_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_route_proto_msgTypes = make([]protoimpl.MessageInfo, 29) +var file_route_proto_enumTypes = make([]protoimpl.EnumInfo, 3) +var file_route_proto_msgTypes = make([]protoimpl.MessageInfo, 31) var file_route_proto_goTypes = []interface{}{ (ValidatePeerStatus)(0), // 0: peerdb_route.ValidatePeerStatus (CreatePeerStatus)(0), // 1: peerdb_route.CreatePeerStatus - (*CreateCDCFlowRequest)(nil), // 2: peerdb_route.CreateCDCFlowRequest - (*CreateCDCFlowResponse)(nil), // 3: peerdb_route.CreateCDCFlowResponse - (*CreateQRepFlowRequest)(nil), // 4: peerdb_route.CreateQRepFlowRequest - (*CreateQRepFlowResponse)(nil), // 5: peerdb_route.CreateQRepFlowResponse - (*ShutdownRequest)(nil), // 6: peerdb_route.ShutdownRequest - (*ShutdownResponse)(nil), // 7: peerdb_route.ShutdownResponse - (*ValidatePeerRequest)(nil), // 8: peerdb_route.ValidatePeerRequest - (*CreatePeerRequest)(nil), // 9: peerdb_route.CreatePeerRequest - (*DropPeerRequest)(nil), // 10: peerdb_route.DropPeerRequest - (*DropPeerResponse)(nil), // 11: peerdb_route.DropPeerResponse - (*ValidatePeerResponse)(nil), // 12: peerdb_route.ValidatePeerResponse - (*CreatePeerResponse)(nil), // 13: peerdb_route.CreatePeerResponse - (*MirrorStatusRequest)(nil), // 14: peerdb_route.MirrorStatusRequest - (*PartitionStatus)(nil), // 15: peerdb_route.PartitionStatus - (*QRepMirrorStatus)(nil), // 16: peerdb_route.QRepMirrorStatus - (*CDCSyncStatus)(nil), // 17: peerdb_route.CDCSyncStatus - (*PeerSchemasResponse)(nil), // 18: peerdb_route.PeerSchemasResponse - (*SchemaTablesRequest)(nil), // 19: peerdb_route.SchemaTablesRequest - (*SchemaTablesResponse)(nil), // 20: peerdb_route.SchemaTablesResponse - (*TableColumnsRequest)(nil), // 21: peerdb_route.TableColumnsRequest - (*TableColumnsResponse)(nil), // 22: peerdb_route.TableColumnsResponse - (*PostgresPeerActivityInfoRequest)(nil), // 23: peerdb_route.PostgresPeerActivityInfoRequest - (*SlotInfo)(nil), // 24: peerdb_route.SlotInfo - (*StatInfo)(nil), // 25: peerdb_route.StatInfo - (*PeerSlotResponse)(nil), // 26: peerdb_route.PeerSlotResponse - (*PeerStatResponse)(nil), // 27: peerdb_route.PeerStatResponse - (*SnapshotStatus)(nil), // 28: peerdb_route.SnapshotStatus - (*CDCMirrorStatus)(nil), // 29: peerdb_route.CDCMirrorStatus - (*MirrorStatusResponse)(nil), // 30: peerdb_route.MirrorStatusResponse - (*FlowConnectionConfigs)(nil), // 31: peerdb_flow.FlowConnectionConfigs - (*QRepConfig)(nil), // 32: peerdb_flow.QRepConfig - (*Peer)(nil), // 33: peerdb_peers.Peer - (*timestamppb.Timestamp)(nil), // 34: google.protobuf.Timestamp + (FlowState)(0), // 2: peerdb_route.FlowState + (*CreateCDCFlowRequest)(nil), // 3: peerdb_route.CreateCDCFlowRequest + (*CreateCDCFlowResponse)(nil), // 4: peerdb_route.CreateCDCFlowResponse + (*CreateQRepFlowRequest)(nil), // 5: peerdb_route.CreateQRepFlowRequest + (*CreateQRepFlowResponse)(nil), // 6: peerdb_route.CreateQRepFlowResponse + (*ShutdownRequest)(nil), // 7: peerdb_route.ShutdownRequest + (*ShutdownResponse)(nil), // 8: peerdb_route.ShutdownResponse + (*ValidatePeerRequest)(nil), // 9: peerdb_route.ValidatePeerRequest + (*CreatePeerRequest)(nil), // 10: peerdb_route.CreatePeerRequest + (*DropPeerRequest)(nil), // 11: peerdb_route.DropPeerRequest + (*DropPeerResponse)(nil), // 12: peerdb_route.DropPeerResponse + (*ValidatePeerResponse)(nil), // 13: peerdb_route.ValidatePeerResponse + (*CreatePeerResponse)(nil), // 14: peerdb_route.CreatePeerResponse + (*MirrorStatusRequest)(nil), // 15: peerdb_route.MirrorStatusRequest + (*PartitionStatus)(nil), // 16: peerdb_route.PartitionStatus + (*QRepMirrorStatus)(nil), // 17: peerdb_route.QRepMirrorStatus + (*CDCSyncStatus)(nil), // 18: peerdb_route.CDCSyncStatus + (*PeerSchemasResponse)(nil), // 19: peerdb_route.PeerSchemasResponse + (*SchemaTablesRequest)(nil), // 20: peerdb_route.SchemaTablesRequest + (*SchemaTablesResponse)(nil), // 21: peerdb_route.SchemaTablesResponse + (*TableColumnsRequest)(nil), // 22: peerdb_route.TableColumnsRequest + (*TableColumnsResponse)(nil), // 23: peerdb_route.TableColumnsResponse + (*PostgresPeerActivityInfoRequest)(nil), // 24: peerdb_route.PostgresPeerActivityInfoRequest + (*SlotInfo)(nil), // 25: peerdb_route.SlotInfo + (*StatInfo)(nil), // 26: peerdb_route.StatInfo + (*PeerSlotResponse)(nil), // 27: peerdb_route.PeerSlotResponse + (*PeerStatResponse)(nil), // 28: peerdb_route.PeerStatResponse + (*SnapshotStatus)(nil), // 29: peerdb_route.SnapshotStatus + (*CDCMirrorStatus)(nil), // 30: peerdb_route.CDCMirrorStatus + (*MirrorStatusResponse)(nil), // 31: peerdb_route.MirrorStatusResponse + (*FlowStateChangeRequest)(nil), // 32: peerdb_route.FlowStateChangeRequest + (*FlowStateChangeResponse)(nil), // 33: peerdb_route.FlowStateChangeResponse + (*FlowConnectionConfigs)(nil), // 34: peerdb_flow.FlowConnectionConfigs + (*QRepConfig)(nil), // 35: peerdb_flow.QRepConfig + (*Peer)(nil), // 36: peerdb_peers.Peer + (*timestamppb.Timestamp)(nil), // 37: google.protobuf.Timestamp } var file_route_proto_depIdxs = []int32{ - 31, // 0: peerdb_route.CreateCDCFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 32, // 1: peerdb_route.CreateQRepFlowRequest.qrep_config:type_name -> peerdb_flow.QRepConfig - 33, // 2: peerdb_route.ShutdownRequest.source_peer:type_name -> peerdb_peers.Peer - 33, // 3: peerdb_route.ShutdownRequest.destination_peer:type_name -> peerdb_peers.Peer - 33, // 4: peerdb_route.ValidatePeerRequest.peer:type_name -> peerdb_peers.Peer - 33, // 5: peerdb_route.CreatePeerRequest.peer:type_name -> peerdb_peers.Peer + 34, // 0: peerdb_route.CreateCDCFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs + 35, // 1: peerdb_route.CreateQRepFlowRequest.qrep_config:type_name -> peerdb_flow.QRepConfig + 36, // 2: peerdb_route.ShutdownRequest.source_peer:type_name -> peerdb_peers.Peer + 36, // 3: peerdb_route.ShutdownRequest.destination_peer:type_name -> peerdb_peers.Peer + 36, // 4: peerdb_route.ValidatePeerRequest.peer:type_name -> peerdb_peers.Peer + 36, // 5: peerdb_route.CreatePeerRequest.peer:type_name -> peerdb_peers.Peer 0, // 6: peerdb_route.ValidatePeerResponse.status:type_name -> peerdb_route.ValidatePeerStatus 1, // 7: peerdb_route.CreatePeerResponse.status:type_name -> peerdb_route.CreatePeerStatus - 34, // 8: peerdb_route.PartitionStatus.start_time:type_name -> google.protobuf.Timestamp - 34, // 9: peerdb_route.PartitionStatus.end_time:type_name -> google.protobuf.Timestamp - 32, // 10: peerdb_route.QRepMirrorStatus.config:type_name -> peerdb_flow.QRepConfig - 15, // 11: peerdb_route.QRepMirrorStatus.partitions:type_name -> peerdb_route.PartitionStatus - 34, // 12: peerdb_route.CDCSyncStatus.start_time:type_name -> google.protobuf.Timestamp - 34, // 13: peerdb_route.CDCSyncStatus.end_time:type_name -> google.protobuf.Timestamp - 24, // 14: peerdb_route.PeerSlotResponse.slot_data:type_name -> peerdb_route.SlotInfo - 25, // 15: peerdb_route.PeerStatResponse.stat_data:type_name -> peerdb_route.StatInfo - 16, // 16: peerdb_route.SnapshotStatus.clones:type_name -> peerdb_route.QRepMirrorStatus - 31, // 17: peerdb_route.CDCMirrorStatus.config:type_name -> peerdb_flow.FlowConnectionConfigs - 28, // 18: peerdb_route.CDCMirrorStatus.snapshot_status:type_name -> peerdb_route.SnapshotStatus - 17, // 19: peerdb_route.CDCMirrorStatus.cdc_syncs:type_name -> peerdb_route.CDCSyncStatus - 16, // 20: peerdb_route.MirrorStatusResponse.qrep_status:type_name -> peerdb_route.QRepMirrorStatus - 29, // 21: peerdb_route.MirrorStatusResponse.cdc_status:type_name -> peerdb_route.CDCMirrorStatus - 8, // 22: peerdb_route.FlowService.ValidatePeer:input_type -> peerdb_route.ValidatePeerRequest - 9, // 23: peerdb_route.FlowService.CreatePeer:input_type -> peerdb_route.CreatePeerRequest - 10, // 24: peerdb_route.FlowService.DropPeer:input_type -> peerdb_route.DropPeerRequest - 2, // 25: peerdb_route.FlowService.CreateCDCFlow:input_type -> peerdb_route.CreateCDCFlowRequest - 4, // 26: peerdb_route.FlowService.CreateQRepFlow:input_type -> peerdb_route.CreateQRepFlowRequest - 23, // 27: peerdb_route.FlowService.GetSchemas:input_type -> peerdb_route.PostgresPeerActivityInfoRequest - 19, // 28: peerdb_route.FlowService.GetTablesInSchema:input_type -> peerdb_route.SchemaTablesRequest - 21, // 29: peerdb_route.FlowService.GetColumns:input_type -> peerdb_route.TableColumnsRequest - 23, // 30: peerdb_route.FlowService.GetSlotInfo:input_type -> peerdb_route.PostgresPeerActivityInfoRequest - 23, // 31: peerdb_route.FlowService.GetStatInfo:input_type -> peerdb_route.PostgresPeerActivityInfoRequest - 6, // 32: peerdb_route.FlowService.ShutdownFlow:input_type -> peerdb_route.ShutdownRequest - 14, // 33: peerdb_route.FlowService.MirrorStatus:input_type -> peerdb_route.MirrorStatusRequest - 12, // 34: peerdb_route.FlowService.ValidatePeer:output_type -> peerdb_route.ValidatePeerResponse - 13, // 35: peerdb_route.FlowService.CreatePeer:output_type -> peerdb_route.CreatePeerResponse - 11, // 36: peerdb_route.FlowService.DropPeer:output_type -> peerdb_route.DropPeerResponse - 3, // 37: peerdb_route.FlowService.CreateCDCFlow:output_type -> peerdb_route.CreateCDCFlowResponse - 5, // 38: peerdb_route.FlowService.CreateQRepFlow:output_type -> peerdb_route.CreateQRepFlowResponse - 18, // 39: peerdb_route.FlowService.GetSchemas:output_type -> peerdb_route.PeerSchemasResponse - 20, // 40: peerdb_route.FlowService.GetTablesInSchema:output_type -> peerdb_route.SchemaTablesResponse - 22, // 41: peerdb_route.FlowService.GetColumns:output_type -> peerdb_route.TableColumnsResponse - 26, // 42: peerdb_route.FlowService.GetSlotInfo:output_type -> peerdb_route.PeerSlotResponse - 27, // 43: peerdb_route.FlowService.GetStatInfo:output_type -> peerdb_route.PeerStatResponse - 7, // 44: peerdb_route.FlowService.ShutdownFlow:output_type -> peerdb_route.ShutdownResponse - 30, // 45: peerdb_route.FlowService.MirrorStatus:output_type -> peerdb_route.MirrorStatusResponse - 34, // [34:46] is the sub-list for method output_type - 22, // [22:34] is the sub-list for method input_type - 22, // [22:22] is the sub-list for extension type_name - 22, // [22:22] is the sub-list for extension extendee - 0, // [0:22] is the sub-list for field type_name + 37, // 8: peerdb_route.PartitionStatus.start_time:type_name -> google.protobuf.Timestamp + 37, // 9: peerdb_route.PartitionStatus.end_time:type_name -> google.protobuf.Timestamp + 35, // 10: peerdb_route.QRepMirrorStatus.config:type_name -> peerdb_flow.QRepConfig + 16, // 11: peerdb_route.QRepMirrorStatus.partitions:type_name -> peerdb_route.PartitionStatus + 37, // 12: peerdb_route.CDCSyncStatus.start_time:type_name -> google.protobuf.Timestamp + 37, // 13: peerdb_route.CDCSyncStatus.end_time:type_name -> google.protobuf.Timestamp + 25, // 14: peerdb_route.PeerSlotResponse.slot_data:type_name -> peerdb_route.SlotInfo + 26, // 15: peerdb_route.PeerStatResponse.stat_data:type_name -> peerdb_route.StatInfo + 17, // 16: peerdb_route.SnapshotStatus.clones:type_name -> peerdb_route.QRepMirrorStatus + 34, // 17: peerdb_route.CDCMirrorStatus.config:type_name -> peerdb_flow.FlowConnectionConfigs + 29, // 18: peerdb_route.CDCMirrorStatus.snapshot_status:type_name -> peerdb_route.SnapshotStatus + 18, // 19: peerdb_route.CDCMirrorStatus.cdc_syncs:type_name -> peerdb_route.CDCSyncStatus + 17, // 20: peerdb_route.MirrorStatusResponse.qrep_status:type_name -> peerdb_route.QRepMirrorStatus + 30, // 21: peerdb_route.MirrorStatusResponse.cdc_status:type_name -> peerdb_route.CDCMirrorStatus + 2, // 22: peerdb_route.FlowStateChangeRequest.requested_flow_state:type_name -> peerdb_route.FlowState + 9, // 23: peerdb_route.FlowService.ValidatePeer:input_type -> peerdb_route.ValidatePeerRequest + 10, // 24: peerdb_route.FlowService.CreatePeer:input_type -> peerdb_route.CreatePeerRequest + 11, // 25: peerdb_route.FlowService.DropPeer:input_type -> peerdb_route.DropPeerRequest + 3, // 26: peerdb_route.FlowService.CreateCDCFlow:input_type -> peerdb_route.CreateCDCFlowRequest + 5, // 27: peerdb_route.FlowService.CreateQRepFlow:input_type -> peerdb_route.CreateQRepFlowRequest + 24, // 28: peerdb_route.FlowService.GetSchemas:input_type -> peerdb_route.PostgresPeerActivityInfoRequest + 20, // 29: peerdb_route.FlowService.GetTablesInSchema:input_type -> peerdb_route.SchemaTablesRequest + 22, // 30: peerdb_route.FlowService.GetColumns:input_type -> peerdb_route.TableColumnsRequest + 24, // 31: peerdb_route.FlowService.GetSlotInfo:input_type -> peerdb_route.PostgresPeerActivityInfoRequest + 24, // 32: peerdb_route.FlowService.GetStatInfo:input_type -> peerdb_route.PostgresPeerActivityInfoRequest + 7, // 33: peerdb_route.FlowService.ShutdownFlow:input_type -> peerdb_route.ShutdownRequest + 32, // 34: peerdb_route.FlowService.FlowStateChange:input_type -> peerdb_route.FlowStateChangeRequest + 15, // 35: peerdb_route.FlowService.MirrorStatus:input_type -> peerdb_route.MirrorStatusRequest + 13, // 36: peerdb_route.FlowService.ValidatePeer:output_type -> peerdb_route.ValidatePeerResponse + 14, // 37: peerdb_route.FlowService.CreatePeer:output_type -> peerdb_route.CreatePeerResponse + 12, // 38: peerdb_route.FlowService.DropPeer:output_type -> peerdb_route.DropPeerResponse + 4, // 39: peerdb_route.FlowService.CreateCDCFlow:output_type -> peerdb_route.CreateCDCFlowResponse + 6, // 40: peerdb_route.FlowService.CreateQRepFlow:output_type -> peerdb_route.CreateQRepFlowResponse + 19, // 41: peerdb_route.FlowService.GetSchemas:output_type -> peerdb_route.PeerSchemasResponse + 21, // 42: peerdb_route.FlowService.GetTablesInSchema:output_type -> peerdb_route.SchemaTablesResponse + 23, // 43: peerdb_route.FlowService.GetColumns:output_type -> peerdb_route.TableColumnsResponse + 27, // 44: peerdb_route.FlowService.GetSlotInfo:output_type -> peerdb_route.PeerSlotResponse + 28, // 45: peerdb_route.FlowService.GetStatInfo:output_type -> peerdb_route.PeerStatResponse + 8, // 46: peerdb_route.FlowService.ShutdownFlow:output_type -> peerdb_route.ShutdownResponse + 33, // 47: peerdb_route.FlowService.FlowStateChange:output_type -> peerdb_route.FlowStateChangeResponse + 31, // 48: peerdb_route.FlowService.MirrorStatus:output_type -> peerdb_route.MirrorStatusResponse + 36, // [36:49] is the sub-list for method output_type + 23, // [23:36] is the sub-list for method input_type + 23, // [23:23] is the sub-list for extension type_name + 23, // [23:23] is the sub-list for extension extendee + 0, // [0:23] is the sub-list for field type_name } func init() { file_route_proto_init() } @@ -2570,6 +2770,30 @@ func file_route_proto_init() { return nil } } + file_route_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FlowStateChangeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_route_proto_msgTypes[30].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FlowStateChangeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_route_proto_msgTypes[28].OneofWrappers = []interface{}{ (*MirrorStatusResponse_QrepStatus)(nil), @@ -2580,8 +2804,8 @@ func file_route_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_route_proto_rawDesc, - NumEnums: 2, - NumMessages: 29, + NumEnums: 3, + NumMessages: 31, NumExtensions: 0, NumServices: 1, }, diff --git a/flow/generated/protos/route_grpc.pb.go b/flow/generated/protos/route_grpc.pb.go index 05136feccf..4e2be9f12d 100644 --- a/flow/generated/protos/route_grpc.pb.go +++ b/flow/generated/protos/route_grpc.pb.go @@ -30,6 +30,7 @@ const ( FlowService_GetSlotInfo_FullMethodName = "/peerdb_route.FlowService/GetSlotInfo" FlowService_GetStatInfo_FullMethodName = "/peerdb_route.FlowService/GetStatInfo" FlowService_ShutdownFlow_FullMethodName = "/peerdb_route.FlowService/ShutdownFlow" + FlowService_FlowStateChange_FullMethodName = "/peerdb_route.FlowService/FlowStateChange" FlowService_MirrorStatus_FullMethodName = "/peerdb_route.FlowService/MirrorStatus" ) @@ -48,6 +49,7 @@ type FlowServiceClient interface { GetSlotInfo(ctx context.Context, in *PostgresPeerActivityInfoRequest, opts ...grpc.CallOption) (*PeerSlotResponse, error) GetStatInfo(ctx context.Context, in *PostgresPeerActivityInfoRequest, opts ...grpc.CallOption) (*PeerStatResponse, error) ShutdownFlow(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) + FlowStateChange(ctx context.Context, in *FlowStateChangeRequest, opts ...grpc.CallOption) (*FlowStateChangeResponse, error) MirrorStatus(ctx context.Context, in *MirrorStatusRequest, opts ...grpc.CallOption) (*MirrorStatusResponse, error) } @@ -158,6 +160,15 @@ func (c *flowServiceClient) ShutdownFlow(ctx context.Context, in *ShutdownReques return out, nil } +func (c *flowServiceClient) FlowStateChange(ctx context.Context, in *FlowStateChangeRequest, opts ...grpc.CallOption) (*FlowStateChangeResponse, error) { + out := new(FlowStateChangeResponse) + err := c.cc.Invoke(ctx, FlowService_FlowStateChange_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *flowServiceClient) MirrorStatus(ctx context.Context, in *MirrorStatusRequest, opts ...grpc.CallOption) (*MirrorStatusResponse, error) { out := new(MirrorStatusResponse) err := c.cc.Invoke(ctx, FlowService_MirrorStatus_FullMethodName, in, out, opts...) @@ -182,6 +193,7 @@ type FlowServiceServer interface { GetSlotInfo(context.Context, *PostgresPeerActivityInfoRequest) (*PeerSlotResponse, error) GetStatInfo(context.Context, *PostgresPeerActivityInfoRequest) (*PeerStatResponse, error) ShutdownFlow(context.Context, *ShutdownRequest) (*ShutdownResponse, error) + FlowStateChange(context.Context, *FlowStateChangeRequest) (*FlowStateChangeResponse, error) MirrorStatus(context.Context, *MirrorStatusRequest) (*MirrorStatusResponse, error) mustEmbedUnimplementedFlowServiceServer() } @@ -223,6 +235,9 @@ func (UnimplementedFlowServiceServer) GetStatInfo(context.Context, *PostgresPeer func (UnimplementedFlowServiceServer) ShutdownFlow(context.Context, *ShutdownRequest) (*ShutdownResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ShutdownFlow not implemented") } +func (UnimplementedFlowServiceServer) FlowStateChange(context.Context, *FlowStateChangeRequest) (*FlowStateChangeResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method FlowStateChange not implemented") +} func (UnimplementedFlowServiceServer) MirrorStatus(context.Context, *MirrorStatusRequest) (*MirrorStatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method MirrorStatus not implemented") } @@ -437,6 +452,24 @@ func _FlowService_ShutdownFlow_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _FlowService_FlowStateChange_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(FlowStateChangeRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FlowServiceServer).FlowStateChange(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: FlowService_FlowStateChange_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FlowServiceServer).FlowStateChange(ctx, req.(*FlowStateChangeRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _FlowService_MirrorStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(MirrorStatusRequest) if err := dec(in); err != nil { @@ -506,6 +539,10 @@ var FlowService_ServiceDesc = grpc.ServiceDesc{ MethodName: "ShutdownFlow", Handler: _FlowService_ShutdownFlow_Handler, }, + { + MethodName: "FlowStateChange", + Handler: _FlowService_FlowStateChange_Handler, + }, { MethodName: "MirrorStatus", Handler: _FlowService_MirrorStatus_Handler, diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 17efcfc452..afeb8f7fc5 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -12,6 +12,8 @@ type ContextKey string const ( NoopSignal CDCFlowSignal = iota ShutdownSignal + PauseSignal + EnableMetricsKey ContextKey = "enableMetrics" CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor" ) diff --git a/flow/utils/signals.go b/flow/utils/signals.go new file mode 100644 index 0000000000..94eddd6289 --- /dev/null +++ b/flow/utils/signals.go @@ -0,0 +1,27 @@ +package util + +import ( + "github.com/PeerDB-io/peer-flow/shared" + "go.temporal.io/sdk/log" +) + +func FlowSignalHandler(activeSignal shared.CDCFlowSignal, + v shared.CDCFlowSignal, logger log.Logger) shared.CDCFlowSignal { + if v == shared.ShutdownSignal { + logger.Info("received shutdown signal") + return v + } else if v == shared.PauseSignal { + logger.Info("received pause signal") + if activeSignal == shared.NoopSignal { + logger.Info("workflow was running, pausing it") + return v + } + } else if v == shared.NoopSignal { + logger.Info("received resume signal") + if activeSignal == shared.PauseSignal { + logger.Info("workflow was paused, resuming it") + return v + } + } + return activeSignal +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 9ac0ade826..59d58c8803 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" + util "github.com/PeerDB-io/peer-flow/utils" "github.com/google/uuid" "github.com/hashicorp/go-multierror" "go.temporal.io/api/enums/v1" @@ -34,25 +35,7 @@ type CDCFlowLimits struct { MaxBatchSize int } -type CDCFlowWorkflowInput struct { - CDCFlowLimits - // The JDBC URL for the catalog database. - CatalogJdbcURL string - // The name of the peer flow to execute. - PeerFlowName string - // Number of sync flows to execute in total. - // If 0, the number of sync flows will be continuously executed until the peer flow is cancelled. - // This is typically non-zero for testing purposes. - TotalSyncFlows int - // Number of normalize flows to execute in total. - // If 0, the number of sync flows will be continuously executed until the peer flow is cancelled. - // This is typically non-zero for testing purposes. - TotalNormalizeFlows int - // Maximum number of rows in a sync flow batch. - MaxBatchSize int -} - -type CDCFlowState struct { +type CDCFlowWorkflowState struct { // Progress events for the peer flow. Progress []string // Accumulates status for sync flows spawned. @@ -63,6 +46,8 @@ type CDCFlowState struct { ActiveSignal shared.CDCFlowSignal // SetupComplete indicates whether the peer flow setup has completed. SetupComplete bool + // SnapshotComplete indicates whether the initial snapshot workflow has completed. + SnapshotComplete bool // Errors encountered during child sync flow executions. SyncFlowErrors error // Errors encountered during child sync flow executions. @@ -73,8 +58,8 @@ type CDCFlowState struct { } // returns a new empty PeerFlowState -func NewCDCFlowState() *CDCFlowState { - return &CDCFlowState{ +func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { + return &CDCFlowWorkflowState{ Progress: []string{"started"}, SyncFlowStatuses: nil, NormalizeFlowStatuses: nil, @@ -93,7 +78,7 @@ func NewCDCFlowState() *CDCFlowState { } // truncate the progress and other arrays to a max of 10 elements -func (s *CDCFlowState) TruncateProgress() { +func (s *CDCFlowWorkflowState) TruncateProgress() { if len(s.Progress) > 10 { s.Progress = s.Progress[len(s.Progress)-10:] } @@ -115,7 +100,7 @@ func (s *CDCFlowState) TruncateProgress() { } } -func (s *CDCFlowState) SendWALHeartbeat(ctx workflow.Context, cfg *protos.FlowConnectionConfigs) error { +func (s *CDCFlowWorkflowState) SendWALHeartbeat(ctx workflow.Context, cfg *protos.FlowConnectionConfigs) error { walHeartbeatCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) @@ -131,6 +116,7 @@ func (s *CDCFlowState) SendWALHeartbeat(ctx workflow.Context, cfg *protos.FlowCo type CDCFlowWorkflowExecution struct { flowExecutionID string logger log.Logger + ctx workflow.Context } // NewCDCFlowWorkflowExecution creates a new instance of PeerFlowWorkflowExecution. @@ -138,6 +124,7 @@ func NewCDCFlowWorkflowExecution(ctx workflow.Context) *CDCFlowWorkflowExecution return &CDCFlowWorkflowExecution{ flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, logger: workflow.GetLogger(ctx), + ctx: ctx, } } @@ -159,16 +146,26 @@ func GetChildWorkflowID( } // CDCFlowWorkflowResult is the result of the PeerFlowWorkflow. -type CDCFlowWorkflowResult = CDCFlowState +type CDCFlowWorkflowResult = CDCFlowWorkflowState + +func (w *CDCFlowWorkflowExecution) receiveAndHandleSignalAsync(ctx workflow.Context, state *CDCFlowWorkflowState) { + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) + + var signalVal shared.CDCFlowSignal + ok := signalChan.ReceiveAsync(&signalVal) + if ok { + state.ActiveSignal = util.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) + } +} func CDCFlowWorkflowWithConfig( ctx workflow.Context, cfg *protos.FlowConnectionConfigs, limits *CDCFlowLimits, - state *CDCFlowState, + state *CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error) { if state == nil { - state = NewCDCFlowState() + state = NewCDCFlowWorkflowState() } if cfg == nil { @@ -182,28 +179,18 @@ func CDCFlowWorkflowWithConfig( } // Support a Query for the current state of the peer flow. - err := workflow.SetQueryHandler(ctx, CDCFlowStatusQuery, func(jobName string) (CDCFlowState, error) { + err := workflow.SetQueryHandler(ctx, CDCFlowStatusQuery, func(jobName string) (CDCFlowWorkflowState, error) { return *state, nil }) if err != nil { return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err) } - signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) - signalHandler := func(_ workflow.Context, v shared.CDCFlowSignal) { - w.logger.Info("received signal - ", v) - state.ActiveSignal = v - } - - // Support a signal to pause the peer flow. - selector := workflow.NewSelector(ctx) - selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) { - var signalVal shared.CDCFlowSignal - c.Receive(ctx, &signalVal) - signalHandler(ctx, signalVal) - }) - - if !state.SetupComplete { + // we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled + // because Resync modifies TableMappings before Setup and also before Snapshot + // for safety, rely on the idempotency of SetupFlow instead + // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. + if !(state.SetupComplete && state.SnapshotComplete) { // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { @@ -232,6 +219,7 @@ func CDCFlowWorkflowWithConfig( if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) } + state.SetupComplete = true // next part of the setup is to snapshot-initial-copy and setup replication slots. snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName) @@ -276,7 +264,7 @@ func CDCFlowWorkflowWithConfig( } } - state.SetupComplete = true + state.SnapshotComplete = true state.Progress = append(state.Progress, "executed setup flow and snapshot flow") } @@ -287,6 +275,23 @@ func CDCFlowWorkflowWithConfig( currentSyncFlowNum := 0 for { + // check and act on signals before a fresh flow starts. + w.receiveAndHandleSignalAsync(ctx, state) + + if state.ActiveSignal == shared.PauseSignal { + startTime := time.Now() + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) + var signalVal shared.CDCFlowSignal + + for state.ActiveSignal == shared.PauseSignal { + w.logger.Info("mirror has been paused for ", time.Since(startTime)) + // only place we block on receive, so signal processing is immediate + ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) + if ok { + state.ActiveSignal = util.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) + } + } + } // check if the peer flow has been shutdown if state.ActiveSignal == shared.ShutdownSignal { w.logger.Info("peer flow has been shutdown") @@ -294,6 +299,8 @@ func CDCFlowWorkflowWithConfig( } // check if total sync flows have been completed + // since this happens immediately after we check for signals, the case of a signal being missed + // due to a new workflow starting is vanishingly low, but possible if limits.TotalSyncFlows != 0 && currentSyncFlowNum == limits.TotalSyncFlows { w.logger.Info("All the syncflows have completed successfully, there was a"+ " limit on the number of syncflows to be executed: ", limits.TotalSyncFlows) @@ -390,6 +397,7 @@ func CDCFlowWorkflowWithConfig( cfg, ) + selector := workflow.NewSelector(ctx) selector.AddFuture(childNormalizeFlowFuture, func(f workflow.Future) { var childNormalizeFlowRes *model.NormalizeResponse if err := f.Get(ctx, &childNormalizeFlowRes); err != nil { diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 45f4fdab9f..f20d17951e 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" + util "github.com/PeerDB-io/peer-flow/utils" "github.com/google/uuid" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" @@ -20,6 +21,17 @@ type QRepFlowExecution struct { flowExecutionID string logger log.Logger runUUID string + // being tracked for future workflow signalling + childPartitionWorkflows []workflow.ChildWorkflowFuture + // Current signalled state of the peer flow. + activeSignal shared.CDCFlowSignal +} + +type QRepPartitionFlowExecution struct { + config *protos.QRepConfig + flowExecutionID string + logger log.Logger + runUUID string } // returns a new empty PeerFlowState @@ -37,6 +49,19 @@ func NewQRepFlowState() *protos.QRepFlowState { // NewQRepFlowExecution creates a new instance of QRepFlowExecution. func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution { return &QRepFlowExecution{ + config: config, + flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + logger: workflow.GetLogger(ctx), + runUUID: runUUID, + childPartitionWorkflows: nil, + activeSignal: shared.NoopSignal, + } +} + +// NewQRepFlowExecution creates a new instance of QRepFlowExecution. +func NewQRepPartitionFlowExecution(ctx workflow.Context, + config *protos.QRepConfig, runUUID string) *QRepPartitionFlowExecution { + return &QRepPartitionFlowExecution{ config: config, flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, logger: workflow.GetLogger(ctx), @@ -123,7 +148,8 @@ func (q *QRepFlowExecution) GetPartitions( } // ReplicatePartitions replicates the partition batch. -func (q *QRepFlowExecution) ReplicatePartitions(ctx workflow.Context, partitions *protos.QRepPartitionBatch) error { +func (q *QRepPartitionFlowExecution) ReplicatePartitions(ctx workflow.Context, + partitions *protos.QRepPartitionBatch) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 24 * 5 * time.Hour, HeartbeatTimeout: 5 * time.Minute, @@ -156,7 +182,7 @@ func (q *QRepFlowExecution) getPartitionWorkflowID(ctx workflow.Context) (string // startChildWorkflow starts a single child workflow. func (q *QRepFlowExecution) startChildWorkflow( ctx workflow.Context, - partitions *protos.QRepPartitionBatch) (workflow.Future, error) { + partitions *protos.QRepPartitionBatch) (workflow.ChildWorkflowFuture, error) { wid, err := q.getPartitionWorkflowID(ctx) if err != nil { return nil, fmt.Errorf("failed to get child workflow ID: %w", err) @@ -198,7 +224,6 @@ func (q *QRepFlowExecution) processPartitions( q.logger.Info("processing partitions in batches", "num batches", len(batches)) - futures := make([]workflow.Future, 0) for i, parts := range batches { batch := &protos.QRepPartitionBatch{ Partitions: parts, @@ -209,16 +234,17 @@ func (q *QRepFlowExecution) processPartitions( return fmt.Errorf("failed to start child workflow: %w", err) } - futures = append(futures, future) + q.childPartitionWorkflows = append(q.childPartitionWorkflows, future) } // wait for all the child workflows to complete - for _, future := range futures { + for _, future := range q.childPartitionWorkflows { if err := future.Get(ctx, nil); err != nil { return fmt.Errorf("failed to wait for child workflow: %w", err) } } + q.childPartitionWorkflows = nil q.logger.Info("all partitions in batch processed") return nil } @@ -317,6 +343,16 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta return nil } +func (q *QRepFlowExecution) receiveAndHandleSignalAsync(ctx workflow.Context) { + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) + + var signalVal shared.CDCFlowSignal + ok := signalChan.ReceiveAsync(&signalVal) + if ok { + q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + } +} + func QRepFlowWorkflow( ctx workflow.Context, config *protos.QRepConfig, @@ -335,21 +371,6 @@ func QRepFlowWorkflow( maxParallelWorkers = int(config.MaxParallelWorkers) } - // register a signal handler to terminate the workflow - terminateWorkflow := false - signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) - - s := workflow.NewSelector(ctx) - s.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) { - var signalVal shared.CDCFlowSignal - c.Receive(ctx, &signalVal) - logger.Info("received signal", "signal", signalVal) - if signalVal == shared.ShutdownSignal { - logger.Info("received shutdown signal") - terminateWorkflow = true - } - }) - // register a query to get the number of partitions processed err := workflow.SetQueryHandler(ctx, "num-partitions-processed", func() (uint64, error) { return state.NumPartitionsProcessed, nil @@ -419,14 +440,6 @@ func QRepFlowWorkflow( state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1] } - s.AddDefault(func() {}) - - s.Select(ctx) - if terminateWorkflow { - q.logger.Info("terminating workflow - ", config.FlowJobName) - return nil - } - // sleep for a while and continue the workflow err = q.waitForNewRows(ctx, state.LastPartition) if err != nil { @@ -437,6 +450,28 @@ func QRepFlowWorkflow( "Last Partition", state.LastPartition, "Number of Partitions Processed", state.NumPartitionsProcessed) + // here, we handle signals after the end of the flow because a new workflow does not inherit the signals + // and the chance of missing a signal is much higher if the check is before the time consuming parts run + q.receiveAndHandleSignalAsync(ctx) + if q.activeSignal == shared.PauseSignal { + startTime := time.Now() + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) + var signalVal shared.CDCFlowSignal + + for q.activeSignal == shared.PauseSignal { + q.logger.Info("mirror has been paused for ", time.Since(startTime)) + // only place we block on receive, so signal processing is immediate + ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) + if ok { + q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + } + } + } + if q.activeSignal == shared.ShutdownSignal { + q.logger.Info("terminating workflow - ", config.FlowJobName) + return nil + } + // Continue the workflow with new state return workflow.NewContinueAsNewError(ctx, QRepFlowWorkflow, config, state) } @@ -448,6 +483,6 @@ func QRepPartitionWorkflow( partitions *protos.QRepPartitionBatch, runUUID string, ) error { - q := NewQRepFlowExecution(ctx, config, runUUID) + q := NewQRepPartitionFlowExecution(ctx, config, runUUID) return q.ReplicatePartitions(ctx, partitions) } diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index b97077f576..82f7dd569d 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -128,6 +128,14 @@ pub enum PeerDDL { mirror_name: String, query_string: Option, }, + PauseMirror { + if_exists: bool, + flow_job_name: String, + }, + ResumeMirror { + if_exists: bool, + flow_job_name: String, + } } impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { @@ -404,6 +412,20 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { query_string, })) } + Statement::PauseMirror { + if_exists, + mirror_name, + } => Ok(Some(PeerDDL::PauseMirror { + if_exists: *if_exists, + flow_job_name: mirror_name.to_string().to_lowercase(), + })), + Statement::ResumeMirror { + if_exists, + mirror_name, + } => Ok(Some(PeerDDL::ResumeMirror { + if_exists: *if_exists, + flow_job_name: mirror_name.to_string().to_lowercase(), + })), _ => Ok(None), } } diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 6adae529b7..e149d16eb5 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -129,7 +129,7 @@ impl FlowGrpcClient { workflow_id: workflow_details.workflow_id, source_peer: Some(workflow_details.source_peer), destination_peer: Some(workflow_details.destination_peer), - remove_flow_entry:false + remove_flow_entry: false, }; let response = self.client.shutdown_flow(shutdown_flow_req).await?; let shutdown_response = response.into_inner(); @@ -159,6 +159,32 @@ impl FlowGrpcClient { } } + pub async fn flow_state_change( + &mut self, + flow_job_name: &str, + workflow_id: &str, + pause: bool, + ) -> anyhow::Result<()> { + let pause_flow_req = pt::peerdb_route::FlowStateChangeRequest { + flow_job_name: flow_job_name.to_owned(), + workflow_id: workflow_id.to_owned(), + requested_flow_state: match pause { + true => pt::peerdb_route::FlowState::StatePaused.into(), + false => pt::peerdb_route::FlowState::StateRunning.into(), + }, + }; + let response = self.client.flow_state_change(pause_flow_req).await?; + let pause_response = response.into_inner(); + if pause_response.ok { + Ok(()) + } else { + Err(anyhow::anyhow!(format!( + "failed to pause/unpause flow job: {:?}", + pause_response.error_message + ))) + } + } + pub async fn start_peer_flow_job( &mut self, job: &FlowJob, diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 326ca71950..fe56e43f78 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -441,7 +441,7 @@ pub struct QRepConfig { /// this is the location where the avro files will be written /// if this starts with gs:// then it will be written to GCS /// if this starts with s3:// then it will be written to S3 - /// if nothing is specified then it will be written to local disk + /// if nothing is specified then it will be written to local disk, only supported in Snowflake /// if using GCS or S3 make sure your instance has the correct permissions. #[prost(string, tag="15")] pub staging_path: ::prost::alloc::string::String, diff --git a/nexus/pt/src/peerdb_route.rs b/nexus/pt/src/peerdb_route.rs index bfc3d51062..5a46ba2f58 100644 --- a/nexus/pt/src/peerdb_route.rs +++ b/nexus/pt/src/peerdb_route.rs @@ -254,6 +254,24 @@ pub mod mirror_status_response { CdcStatus(super::CdcMirrorStatus), } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FlowStateChangeRequest { + #[prost(string, tag="1")] + pub workflow_id: ::prost::alloc::string::String, + #[prost(string, tag="2")] + pub flow_job_name: ::prost::alloc::string::String, + #[prost(enumeration="FlowState", tag="3")] + pub requested_flow_state: i32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FlowStateChangeResponse { + #[prost(bool, tag="1")] + pub ok: bool, + #[prost(string, tag="2")] + pub error_message: ::prost::alloc::string::String, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum ValidatePeerStatus { @@ -312,6 +330,36 @@ impl CreatePeerStatus { } } } +/// in the future, consider moving DropFlow to this and reduce route surface +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum FlowState { + StateUnknown = 0, + StateRunning = 1, + StatePaused = 2, +} +impl FlowState { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + FlowState::StateUnknown => "STATE_UNKNOWN", + FlowState::StateRunning => "STATE_RUNNING", + FlowState::StatePaused => "STATE_PAUSED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "STATE_UNKNOWN" => Some(Self::StateUnknown), + "STATE_RUNNING" => Some(Self::StateRunning), + "STATE_PAUSED" => Some(Self::StatePaused), + _ => None, + } + } +} include!("peerdb_route.tonic.rs"); include!("peerdb_route.serde.rs"); // @@protoc_insertion_point(module) \ No newline at end of file diff --git a/nexus/pt/src/peerdb_route.serde.rs b/nexus/pt/src/peerdb_route.serde.rs index b3f64e0b9c..3c086248d6 100644 --- a/nexus/pt/src/peerdb_route.serde.rs +++ b/nexus/pt/src/peerdb_route.serde.rs @@ -1218,6 +1218,329 @@ impl<'de> serde::Deserialize<'de> for DropPeerResponse { deserializer.deserialize_struct("peerdb_route.DropPeerResponse", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for FlowState { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::StateUnknown => "STATE_UNKNOWN", + Self::StateRunning => "STATE_RUNNING", + Self::StatePaused => "STATE_PAUSED", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for FlowState { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "STATE_UNKNOWN", + "STATE_RUNNING", + "STATE_PAUSED", + ]; + + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = FlowState; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + use std::convert::TryFrom; + i32::try_from(v) + .ok() + .and_then(FlowState::from_i32) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + use std::convert::TryFrom; + i32::try_from(v) + .ok() + .and_then(FlowState::from_i32) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "STATE_UNKNOWN" => Ok(FlowState::StateUnknown), + "STATE_RUNNING" => Ok(FlowState::StateRunning), + "STATE_PAUSED" => Ok(FlowState::StatePaused), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} +impl serde::Serialize for FlowStateChangeRequest { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.workflow_id.is_empty() { + len += 1; + } + if !self.flow_job_name.is_empty() { + len += 1; + } + if self.requested_flow_state != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_route.FlowStateChangeRequest", len)?; + if !self.workflow_id.is_empty() { + struct_ser.serialize_field("workflowId", &self.workflow_id)?; + } + if !self.flow_job_name.is_empty() { + struct_ser.serialize_field("flowJobName", &self.flow_job_name)?; + } + if self.requested_flow_state != 0 { + let v = FlowState::from_i32(self.requested_flow_state) + .ok_or_else(|| serde::ser::Error::custom(format!("Invalid variant {}", self.requested_flow_state)))?; + struct_ser.serialize_field("requestedFlowState", &v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for FlowStateChangeRequest { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "workflow_id", + "workflowId", + "flow_job_name", + "flowJobName", + "requested_flow_state", + "requestedFlowState", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + WorkflowId, + FlowJobName, + RequestedFlowState, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "workflowId" | "workflow_id" => Ok(GeneratedField::WorkflowId), + "flowJobName" | "flow_job_name" => Ok(GeneratedField::FlowJobName), + "requestedFlowState" | "requested_flow_state" => Ok(GeneratedField::RequestedFlowState), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = FlowStateChangeRequest; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_route.FlowStateChangeRequest") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut workflow_id__ = None; + let mut flow_job_name__ = None; + let mut requested_flow_state__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::WorkflowId => { + if workflow_id__.is_some() { + return Err(serde::de::Error::duplicate_field("workflowId")); + } + workflow_id__ = Some(map.next_value()?); + } + GeneratedField::FlowJobName => { + if flow_job_name__.is_some() { + return Err(serde::de::Error::duplicate_field("flowJobName")); + } + flow_job_name__ = Some(map.next_value()?); + } + GeneratedField::RequestedFlowState => { + if requested_flow_state__.is_some() { + return Err(serde::de::Error::duplicate_field("requestedFlowState")); + } + requested_flow_state__ = Some(map.next_value::()? as i32); + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(FlowStateChangeRequest { + workflow_id: workflow_id__.unwrap_or_default(), + flow_job_name: flow_job_name__.unwrap_or_default(), + requested_flow_state: requested_flow_state__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("peerdb_route.FlowStateChangeRequest", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for FlowStateChangeResponse { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.ok { + len += 1; + } + if !self.error_message.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_route.FlowStateChangeResponse", len)?; + if self.ok { + struct_ser.serialize_field("ok", &self.ok)?; + } + if !self.error_message.is_empty() { + struct_ser.serialize_field("errorMessage", &self.error_message)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for FlowStateChangeResponse { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "ok", + "error_message", + "errorMessage", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Ok, + ErrorMessage, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "ok" => Ok(GeneratedField::Ok), + "errorMessage" | "error_message" => Ok(GeneratedField::ErrorMessage), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = FlowStateChangeResponse; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_route.FlowStateChangeResponse") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut ok__ = None; + let mut error_message__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Ok => { + if ok__.is_some() { + return Err(serde::de::Error::duplicate_field("ok")); + } + ok__ = Some(map.next_value()?); + } + GeneratedField::ErrorMessage => { + if error_message__.is_some() { + return Err(serde::de::Error::duplicate_field("errorMessage")); + } + error_message__ = Some(map.next_value()?); + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(FlowStateChangeResponse { + ok: ok__.unwrap_or_default(), + error_message: error_message__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("peerdb_route.FlowStateChangeResponse", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for MirrorStatusRequest { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/nexus/pt/src/peerdb_route.tonic.rs b/nexus/pt/src/peerdb_route.tonic.rs index f2143777c2..7c479c1064 100644 --- a/nexus/pt/src/peerdb_route.tonic.rs +++ b/nexus/pt/src/peerdb_route.tonic.rs @@ -374,6 +374,32 @@ pub mod flow_service_client { self.inner.unary(req, path, codec).await } /// + pub async fn flow_state_change( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/peerdb_route.FlowService/FlowStateChange", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("peerdb_route.FlowService", "FlowStateChange")); + self.inner.unary(req, path, codec).await + } + /// pub async fn mirror_status( &mut self, request: impl tonic::IntoRequest, @@ -497,6 +523,14 @@ pub mod flow_service_server { tonic::Status, >; /// + async fn flow_state_change( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// async fn mirror_status( &self, request: tonic::Request, @@ -1089,6 +1123,52 @@ pub mod flow_service_server { }; Box::pin(fut) } + "/peerdb_route.FlowService/FlowStateChange" => { + #[allow(non_camel_case_types)] + struct FlowStateChangeSvc(pub Arc); + impl< + T: FlowService, + > tonic::server::UnaryService + for FlowStateChangeSvc { + type Response = super::FlowStateChangeResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).flow_state_change(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = FlowStateChangeSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/peerdb_route.FlowService/MirrorStatus" => { #[allow(non_camel_case_types)] struct MirrorStatusSvc(pub Arc); diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 3352664b7f..578c0f5f4f 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -646,6 +646,128 @@ impl NexusBackend { } } } + PeerDDL::PauseMirror { + if_exists, + flow_job_name, + } => { + if self.flow_handler.is_none() { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: "flow service is not configured".to_owned(), + }))); + } + + let catalog = self.catalog.lock().await; + tracing::info!( + "[PAUSE MIRROR] mirror_name: {}, if_exists: {}", + flow_job_name, + if_exists + ); + let workflow_details = catalog + .get_workflow_details_for_flow_job(flow_job_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to query catalog for job metadata: {:?}", + err + ), + })) + })?; + tracing::info!( + "[PAUSE MIRROR] got workflow id: {:?}", + workflow_details.as_ref().map(|w| &w.workflow_id) + ); + + if let Some(workflow_details) = workflow_details { + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + flow_handler + .flow_state_change(flow_job_name, &workflow_details.workflow_id, true) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to shutdown flow job: {:?}", err), + })) + })?; + let drop_mirror_success = format!("PAUSE MIRROR {}", flow_job_name); + Ok(vec![Response::Execution(Tag::new_for_execution( + &drop_mirror_success, + None, + ))]) + } else if *if_exists { + let no_mirror_success = "NO SUCH MIRROR"; + Ok(vec![Response::Execution(Tag::new_for_execution( + no_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("no such mirror: {:?}", flow_job_name), + )))) + } + }, + PeerDDL::ResumeMirror { + if_exists, + flow_job_name, + } => { + if self.flow_handler.is_none() { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: "flow service is not configured".to_owned(), + }))); + } + + let catalog = self.catalog.lock().await; + tracing::info!( + "[RESUME MIRROR] mirror_name: {}, if_exists: {}", + flow_job_name, + if_exists + ); + let workflow_details = catalog + .get_workflow_details_for_flow_job(flow_job_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to query catalog for job metadata: {:?}", + err + ), + })) + })?; + tracing::info!( + "[RESUME MIRROR] got workflow id: {:?}", + workflow_details.as_ref().map(|w| &w.workflow_id) + ); + + if let Some(workflow_details) = workflow_details { + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + flow_handler + .flow_state_change(flow_job_name, &workflow_details.workflow_id, false) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to shutdown flow job: {:?}", err), + })) + })?; + let drop_mirror_success = format!("RESUME MIRROR {}", flow_job_name); + Ok(vec![Response::Execution(Tag::new_for_execution( + &drop_mirror_success, + None, + ))]) + } else if *if_exists { + let no_mirror_success = "NO SUCH MIRROR"; + Ok(vec![Response::Execution(Tag::new_for_execution( + no_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("no such mirror: {:?}", flow_job_name), + )))) + } + } }, NexusStatement::PeerQuery { stmt, assoc } => { // get the query executor diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index d03b760388..db7b94fed0 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit d03b7603887554137de11a46ae52ba3318615293 +Subproject commit db7b94fed063ceab383f24c4894e5e3fddaa9468 diff --git a/protos/flow.proto b/protos/flow.proto index 4f31235ca2..a690845a46 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -297,7 +297,7 @@ message QRepConfig { // this is the location where the avro files will be written // if this starts with gs:// then it will be written to GCS // if this starts with s3:// then it will be written to S3 - // if nothing is specified then it will be written to local disk + // if nothing is specified then it will be written to local disk, only supported in Snowflake // if using GCS or S3 make sure your instance has the correct permissions. string staging_path = 15; diff --git a/protos/route.proto b/protos/route.proto index 0a9e1d23f9..d02a30bc89 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -175,6 +175,24 @@ message MirrorStatusResponse { string error_message = 4; } +// in the future, consider moving DropFlow to this and reduce route surface +enum FlowState { + STATE_UNKNOWN = 0; + STATE_RUNNING = 1; + STATE_PAUSED = 2; +} + +message FlowStateChangeRequest { + string workflow_id = 1; + string flow_job_name = 2; + FlowState requested_flow_state = 3; +} + +message FlowStateChangeResponse { + bool ok = 1; + string error_message = 2; +} + service FlowService { rpc ValidatePeer(ValidatePeerRequest) returns (ValidatePeerResponse) { option (google.api.http) = { @@ -228,6 +246,7 @@ service FlowService { rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) { option (google.api.http) = { post: "/v1/mirrors/drop", body: "*" }; } + rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) {} rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) { option (google.api.http) = { get: "/v1/mirrors/{flow_job_name}" }; } diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 6cc5e8a0e1..4689b31370 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -408,7 +408,7 @@ export interface QRepConfig { * this is the location where the avro files will be written * if this starts with gs:// then it will be written to GCS * if this starts with s3:// then it will be written to S3 - * if nothing is specified then it will be written to local disk + * if nothing is specified then it will be written to local disk, only supported in Snowflake * if using GCS or S3 make sure your instance has the correct permissions. */ stagingPath: string; diff --git a/ui/grpc_generated/route.ts b/ui/grpc_generated/route.ts index f514ba550d..08d133fa83 100644 --- a/ui/grpc_generated/route.ts +++ b/ui/grpc_generated/route.ts @@ -97,6 +97,46 @@ export function createPeerStatusToJSON(object: CreatePeerStatus): string { } } +/** in the future, consider moving DropFlow to this and reduce route surface */ +export enum FlowState { + STATE_UNKNOWN = 0, + STATE_RUNNING = 1, + STATE_PAUSED = 2, + UNRECOGNIZED = -1, +} + +export function flowStateFromJSON(object: any): FlowState { + switch (object) { + case 0: + case "STATE_UNKNOWN": + return FlowState.STATE_UNKNOWN; + case 1: + case "STATE_RUNNING": + return FlowState.STATE_RUNNING; + case 2: + case "STATE_PAUSED": + return FlowState.STATE_PAUSED; + case -1: + case "UNRECOGNIZED": + default: + return FlowState.UNRECOGNIZED; + } +} + +export function flowStateToJSON(object: FlowState): string { + switch (object) { + case FlowState.STATE_UNKNOWN: + return "STATE_UNKNOWN"; + case FlowState.STATE_RUNNING: + return "STATE_RUNNING"; + case FlowState.STATE_PAUSED: + return "STATE_PAUSED"; + case FlowState.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + export interface CreateCDCFlowRequest { connectionConfigs: FlowConnectionConfigs | undefined; createCatalogEntry: boolean; @@ -254,6 +294,17 @@ export interface MirrorStatusResponse { errorMessage: string; } +export interface FlowStateChangeRequest { + workflowId: string; + flowJobName: string; + requestedFlowState: FlowState; +} + +export interface FlowStateChangeResponse { + ok: boolean; + errorMessage: string; +} + function createBaseCreateCDCFlowRequest(): CreateCDCFlowRequest { return { connectionConfigs: undefined, createCatalogEntry: false }; } @@ -2488,6 +2539,169 @@ export const MirrorStatusResponse = { }, }; +function createBaseFlowStateChangeRequest(): FlowStateChangeRequest { + return { workflowId: "", flowJobName: "", requestedFlowState: 0 }; +} + +export const FlowStateChangeRequest = { + encode(message: FlowStateChangeRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.workflowId !== "") { + writer.uint32(10).string(message.workflowId); + } + if (message.flowJobName !== "") { + writer.uint32(18).string(message.flowJobName); + } + if (message.requestedFlowState !== 0) { + writer.uint32(24).int32(message.requestedFlowState); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): FlowStateChangeRequest { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseFlowStateChangeRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.workflowId = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.flowJobName = reader.string(); + continue; + case 3: + if (tag !== 24) { + break; + } + + message.requestedFlowState = reader.int32() as any; + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): FlowStateChangeRequest { + return { + workflowId: isSet(object.workflowId) ? String(object.workflowId) : "", + flowJobName: isSet(object.flowJobName) ? String(object.flowJobName) : "", + requestedFlowState: isSet(object.requestedFlowState) ? flowStateFromJSON(object.requestedFlowState) : 0, + }; + }, + + toJSON(message: FlowStateChangeRequest): unknown { + const obj: any = {}; + if (message.workflowId !== "") { + obj.workflowId = message.workflowId; + } + if (message.flowJobName !== "") { + obj.flowJobName = message.flowJobName; + } + if (message.requestedFlowState !== 0) { + obj.requestedFlowState = flowStateToJSON(message.requestedFlowState); + } + return obj; + }, + + create, I>>(base?: I): FlowStateChangeRequest { + return FlowStateChangeRequest.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): FlowStateChangeRequest { + const message = createBaseFlowStateChangeRequest(); + message.workflowId = object.workflowId ?? ""; + message.flowJobName = object.flowJobName ?? ""; + message.requestedFlowState = object.requestedFlowState ?? 0; + return message; + }, +}; + +function createBaseFlowStateChangeResponse(): FlowStateChangeResponse { + return { ok: false, errorMessage: "" }; +} + +export const FlowStateChangeResponse = { + encode(message: FlowStateChangeResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.ok === true) { + writer.uint32(8).bool(message.ok); + } + if (message.errorMessage !== "") { + writer.uint32(18).string(message.errorMessage); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): FlowStateChangeResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseFlowStateChangeResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 8) { + break; + } + + message.ok = reader.bool(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.errorMessage = reader.string(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): FlowStateChangeResponse { + return { + ok: isSet(object.ok) ? Boolean(object.ok) : false, + errorMessage: isSet(object.errorMessage) ? String(object.errorMessage) : "", + }; + }, + + toJSON(message: FlowStateChangeResponse): unknown { + const obj: any = {}; + if (message.ok === true) { + obj.ok = message.ok; + } + if (message.errorMessage !== "") { + obj.errorMessage = message.errorMessage; + } + return obj; + }, + + create, I>>(base?: I): FlowStateChangeResponse { + return FlowStateChangeResponse.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): FlowStateChangeResponse { + const message = createBaseFlowStateChangeResponse(); + message.ok = object.ok ?? false; + message.errorMessage = object.errorMessage ?? ""; + return message; + }, +}; + export type FlowServiceService = typeof FlowServiceService; export const FlowServiceService = { validatePeer: { @@ -2592,6 +2806,15 @@ export const FlowServiceService = { responseSerialize: (value: ShutdownResponse) => Buffer.from(ShutdownResponse.encode(value).finish()), responseDeserialize: (value: Buffer) => ShutdownResponse.decode(value), }, + flowStateChange: { + path: "/peerdb_route.FlowService/FlowStateChange", + requestStream: false, + responseStream: false, + requestSerialize: (value: FlowStateChangeRequest) => Buffer.from(FlowStateChangeRequest.encode(value).finish()), + requestDeserialize: (value: Buffer) => FlowStateChangeRequest.decode(value), + responseSerialize: (value: FlowStateChangeResponse) => Buffer.from(FlowStateChangeResponse.encode(value).finish()), + responseDeserialize: (value: Buffer) => FlowStateChangeResponse.decode(value), + }, mirrorStatus: { path: "/peerdb_route.FlowService/MirrorStatus", requestStream: false, @@ -2615,6 +2838,7 @@ export interface FlowServiceServer extends UntypedServiceImplementation { getSlotInfo: handleUnaryCall; getStatInfo: handleUnaryCall; shutdownFlow: handleUnaryCall; + flowStateChange: handleUnaryCall; mirrorStatus: handleUnaryCall; } @@ -2784,6 +3008,21 @@ export interface FlowServiceClient extends Client { options: Partial, callback: (error: ServiceError | null, response: ShutdownResponse) => void, ): ClientUnaryCall; + flowStateChange( + request: FlowStateChangeRequest, + callback: (error: ServiceError | null, response: FlowStateChangeResponse) => void, + ): ClientUnaryCall; + flowStateChange( + request: FlowStateChangeRequest, + metadata: Metadata, + callback: (error: ServiceError | null, response: FlowStateChangeResponse) => void, + ): ClientUnaryCall; + flowStateChange( + request: FlowStateChangeRequest, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: FlowStateChangeResponse) => void, + ): ClientUnaryCall; mirrorStatus( request: MirrorStatusRequest, callback: (error: ServiceError | null, response: MirrorStatusResponse) => void,