diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index b3e8d9c29a..510d92b635 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/PeerDB-io/peer-flow/connectors" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -275,3 +276,115 @@ func (h *FlowRequestHandler) ListPeers( Peers: peers, }, nil } + +func (h *FlowRequestHandler) ValidatePeer( + ctx context.Context, + req *protos.ValidatePeerRequest, +) (*protos.ValidatePeerResponse, error) { + if req.Peer == nil { + return &protos.ValidatePeerResponse{ + Status: protos.ValidatePeerStatus_INVALID, + Message: "no peer provided", + }, nil + } + + if len(req.Peer.Name) == 0 { + return &protos.ValidatePeerResponse{ + Status: protos.ValidatePeerStatus_INVALID, + Message: "no peer name provided", + }, nil + } + + conn, err := connectors.GetConnector(ctx, req.Peer) + if err != nil { + return &protos.ValidatePeerResponse{ + Status: protos.ValidatePeerStatus_INVALID, + Message: fmt.Sprintf("peer type is missing or "+ + "your requested configuration for %s peer %s was invalidated: %s", + req.Peer.Type, req.Peer.Name, err), + }, nil + } + + status := conn.ConnectionActive() + if !status { + return &protos.ValidatePeerResponse{ + Status: protos.ValidatePeerStatus_INVALID, + Message: fmt.Sprintf("failed to establish active connection to %s peer %s.", + req.Peer.Type, req.Peer.Name), + }, nil + } + + return &protos.ValidatePeerResponse{ + Status: protos.ValidatePeerStatus_VALID, + Message: fmt.Sprintf("%s peer %s is valid", + req.Peer.Type, req.Peer.Name), + }, nil +} + +func (h *FlowRequestHandler) CreatePeer( + ctx context.Context, + req *protos.CreatePeerRequest, +) (*protos.CreatePeerResponse, error) { + status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{Peer: req.Peer}) + if validateErr != nil { + return nil, validateErr + } + if status.Status != protos.ValidatePeerStatus_VALID { + return &protos.CreatePeerResponse{ + Status: protos.CreatePeerStatus_FAILED, + Message: status.Message, + }, nil + } + + config := req.Peer.Config + wrongConfigResponse := &protos.CreatePeerResponse{ + Status: protos.CreatePeerStatus_FAILED, + Message: fmt.Sprintf("invalid config for %s peer %s", + req.Peer.Type, req.Peer.Name), + } + var encodedConfig []byte + var encodingErr error + peerType := req.Peer.Type + switch peerType { + case protos.DBType_POSTGRES: + pgConfigObject, ok := config.(*protos.Peer_PostgresConfig) + if !ok { + return wrongConfigResponse, nil + } + pgConfig := pgConfigObject.PostgresConfig + + encodedConfig, encodingErr = proto.Marshal(pgConfig) + + case protos.DBType_SNOWFLAKE: + sfConfigObject, ok := config.(*protos.Peer_SnowflakeConfig) + if !ok { + return wrongConfigResponse, nil + } + sfConfig := sfConfigObject.SnowflakeConfig + encodedConfig, encodingErr = proto.Marshal(sfConfig) + + default: + return wrongConfigResponse, nil + } + if encodingErr != nil { + log.Errorf("failed to encode peer configuration for %s peer %s : %v", + req.Peer.Type, req.Peer.Name, encodingErr) + return nil, encodingErr + } + + _, err := h.pool.Exec(ctx, "INSERT INTO peers (name, type, options) VALUES ($1, $2, $3)", + req.Peer.Name, peerType, encodedConfig, + ) + if err != nil { + return &protos.CreatePeerResponse{ + Status: protos.CreatePeerStatus_FAILED, + Message: fmt.Sprintf("failed to insert into peers table for %s peer %s: %s", + req.Peer.Type, req.Peer.Name, err.Error()), + }, nil + } + + return &protos.CreatePeerResponse{ + Status: protos.CreatePeerStatus_CREATED, + Message: "", + }, nil +} diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 9810fe6672..fb908d2cb0 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -3,6 +3,7 @@ package connectors import ( "context" "errors" + "fmt" connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" @@ -190,6 +191,40 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon } } +func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { + inner := peer.Type + switch inner { + case protos.DBType_POSTGRES: + pgConfig := peer.GetPostgresConfig() + + if pgConfig == nil { + return nil, fmt.Errorf("missing postgres config for %s peer %s", peer.Type.String(), peer.Name) + } + return connpostgres.NewPostgresConnector(ctx, pgConfig) + case protos.DBType_BIGQUERY: + bqConfig := peer.GetBigqueryConfig() + if bqConfig == nil { + return nil, fmt.Errorf("missing bigquery config for %s peer %s", peer.Type.String(), peer.Name) + } + return connbigquery.NewBigQueryConnector(ctx, bqConfig) + + case protos.DBType_SNOWFLAKE: + sfConfig := peer.GetSnowflakeConfig() + if sfConfig == nil { + return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name) + } + return connsnowflake.NewSnowflakeConnector(ctx, sfConfig) + // case protos.DBType_S3: + // return conns3.NewS3Connector(ctx, config.GetS3Config()) + // case protos.DBType_EVENTHUB: + // return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) + // case protos.DBType_SQLSERVER: + // return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig()) + default: + return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String()) + } +} + func GetQRepConsolidateConnector(ctx context.Context, config *protos.Peer) (QRepConsolidateConnector, error) { inner := config.Config diff --git a/flow/generated/protos/route.pb.go b/flow/generated/protos/route.pb.go index d11e4f6e87..9d580676fb 100644 --- a/flow/generated/protos/route.pb.go +++ b/flow/generated/protos/route.pb.go @@ -21,6 +21,104 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type ValidatePeerStatus int32 + +const ( + ValidatePeerStatus_CREATION_UNKNOWN ValidatePeerStatus = 0 + ValidatePeerStatus_VALID ValidatePeerStatus = 1 + ValidatePeerStatus_INVALID ValidatePeerStatus = 2 +) + +// Enum value maps for ValidatePeerStatus. +var ( + ValidatePeerStatus_name = map[int32]string{ + 0: "CREATION_UNKNOWN", + 1: "VALID", + 2: "INVALID", + } + ValidatePeerStatus_value = map[string]int32{ + "CREATION_UNKNOWN": 0, + "VALID": 1, + "INVALID": 2, + } +) + +func (x ValidatePeerStatus) Enum() *ValidatePeerStatus { + p := new(ValidatePeerStatus) + *p = x + return p +} + +func (x ValidatePeerStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ValidatePeerStatus) Descriptor() protoreflect.EnumDescriptor { + return file_route_proto_enumTypes[0].Descriptor() +} + +func (ValidatePeerStatus) Type() protoreflect.EnumType { + return &file_route_proto_enumTypes[0] +} + +func (x ValidatePeerStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ValidatePeerStatus.Descriptor instead. +func (ValidatePeerStatus) EnumDescriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{0} +} + +type CreatePeerStatus int32 + +const ( + CreatePeerStatus_VALIDATION_UNKNOWN CreatePeerStatus = 0 + CreatePeerStatus_CREATED CreatePeerStatus = 1 + CreatePeerStatus_FAILED CreatePeerStatus = 2 +) + +// Enum value maps for CreatePeerStatus. +var ( + CreatePeerStatus_name = map[int32]string{ + 0: "VALIDATION_UNKNOWN", + 1: "CREATED", + 2: "FAILED", + } + CreatePeerStatus_value = map[string]int32{ + "VALIDATION_UNKNOWN": 0, + "CREATED": 1, + "FAILED": 2, + } +) + +func (x CreatePeerStatus) Enum() *CreatePeerStatus { + p := new(CreatePeerStatus) + *p = x + return p +} + +func (x CreatePeerStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (CreatePeerStatus) Descriptor() protoreflect.EnumDescriptor { + return file_route_proto_enumTypes[1].Descriptor() +} + +func (CreatePeerStatus) Type() protoreflect.EnumType { + return &file_route_proto_enumTypes[1] +} + +func (x CreatePeerStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use CreatePeerStatus.Descriptor instead. +func (CreatePeerStatus) EnumDescriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{1} +} + type CreateCDCFlowRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -420,6 +518,210 @@ func (x *ListPeersResponse) GetPeers() []*Peer { return nil } +type ValidatePeerRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Peer *Peer `protobuf:"bytes,1,opt,name=peer,proto3" json:"peer,omitempty"` +} + +func (x *ValidatePeerRequest) Reset() { + *x = ValidatePeerRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_route_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ValidatePeerRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ValidatePeerRequest) ProtoMessage() {} + +func (x *ValidatePeerRequest) ProtoReflect() protoreflect.Message { + mi := &file_route_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ValidatePeerRequest.ProtoReflect.Descriptor instead. +func (*ValidatePeerRequest) Descriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{8} +} + +func (x *ValidatePeerRequest) GetPeer() *Peer { + if x != nil { + return x.Peer + } + return nil +} + +type CreatePeerRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Peer *Peer `protobuf:"bytes,1,opt,name=peer,proto3" json:"peer,omitempty"` +} + +func (x *CreatePeerRequest) Reset() { + *x = CreatePeerRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_route_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreatePeerRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreatePeerRequest) ProtoMessage() {} + +func (x *CreatePeerRequest) ProtoReflect() protoreflect.Message { + mi := &file_route_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreatePeerRequest.ProtoReflect.Descriptor instead. +func (*CreatePeerRequest) Descriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{9} +} + +func (x *CreatePeerRequest) GetPeer() *Peer { + if x != nil { + return x.Peer + } + return nil +} + +type ValidatePeerResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status ValidatePeerStatus `protobuf:"varint,1,opt,name=status,proto3,enum=peerdb_route.ValidatePeerStatus" json:"status,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *ValidatePeerResponse) Reset() { + *x = ValidatePeerResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_route_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ValidatePeerResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ValidatePeerResponse) ProtoMessage() {} + +func (x *ValidatePeerResponse) ProtoReflect() protoreflect.Message { + mi := &file_route_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ValidatePeerResponse.ProtoReflect.Descriptor instead. +func (*ValidatePeerResponse) Descriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{10} +} + +func (x *ValidatePeerResponse) GetStatus() ValidatePeerStatus { + if x != nil { + return x.Status + } + return ValidatePeerStatus_CREATION_UNKNOWN +} + +func (x *ValidatePeerResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +type CreatePeerResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status CreatePeerStatus `protobuf:"varint,1,opt,name=status,proto3,enum=peerdb_route.CreatePeerStatus" json:"status,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *CreatePeerResponse) Reset() { + *x = CreatePeerResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_route_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreatePeerResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreatePeerResponse) ProtoMessage() {} + +func (x *CreatePeerResponse) ProtoReflect() protoreflect.Message { + mi := &file_route_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreatePeerResponse.ProtoReflect.Descriptor instead. +func (*CreatePeerResponse) Descriptor() ([]byte, []int) { + return file_route_proto_rawDescGZIP(), []int{11} +} + +func (x *CreatePeerResponse) GetStatus() CreatePeerStatus { + if x != nil { + return x.Status + } + return CreatePeerStatus_VALIDATION_UNKNOWN +} + +func (x *CreatePeerResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + var File_route_proto protoreflect.FileDescriptor var file_route_proto_rawDesc = []byte{ @@ -469,38 +771,79 @@ var file_route_proto_rawDesc = []byte{ 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, - 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x32, 0xe9, 0x02, - 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4e, 0x0a, - 0x09, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x12, 0x1e, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, - 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, - 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5a, 0x0a, - 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x22, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, - 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, - 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0e, 0x43, 0x72, 0x65, - 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x23, 0x2e, 0x70, 0x65, + 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x22, 0x3d, 0x0a, + 0x13, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x26, 0x0a, 0x04, 0x70, 0x65, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, + 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x04, 0x70, 0x65, 0x65, 0x72, 0x22, 0x3b, 0x0a, 0x11, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x26, 0x0a, 0x04, 0x70, 0x65, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, + 0x65, 0x65, 0x72, 0x52, 0x04, 0x70, 0x65, 0x65, 0x72, 0x22, 0x6a, 0x0a, 0x14, 0x56, 0x61, 0x6c, + 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x38, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x20, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, + 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x66, 0x0a, 0x12, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, + 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x06, 0x73, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, - 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, + 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0x42, 0x0a, + 0x12, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x10, 0x43, 0x52, 0x45, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, + 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x56, 0x41, 0x4c, + 0x49, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, + 0x02, 0x2a, 0x43, 0x0a, 0x10, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, + 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, + 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, + 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x32, 0x95, 0x04, 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4e, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, + 0x65, 0x72, 0x73, 0x12, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0c, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, + 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, + 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, + 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x51, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x1f, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, + 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, + 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x5a, 0x0a, 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, + 0x6c, 0x6f, 0x77, 0x12, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, + 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, + 0x0a, 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, + 0x12, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x0c, 0x53, 0x68, 0x75, 0x74, - 0x64, 0x6f, 0x77, 0x6e, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, 0x0a, 0x52, - 0x6f, 0x75, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, - 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, - 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, - 0x65, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xe2, - 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, 0x47, 0x50, - 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, + 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, + 0x0c, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x1d, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, + 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, + 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x7c, + 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x42, 0x0a, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, + 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, + 0x6f, 0x75, 0x74, 0x65, 0xe2, 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, + 0x74, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, + 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -515,39 +858,54 @@ func file_route_proto_rawDescGZIP() []byte { return file_route_proto_rawDescData } -var file_route_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_route_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_route_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_route_proto_goTypes = []interface{}{ - (*CreateCDCFlowRequest)(nil), // 0: peerdb_route.CreateCDCFlowRequest - (*CreateCDCFlowResponse)(nil), // 1: peerdb_route.CreateCDCFlowResponse - (*CreateQRepFlowRequest)(nil), // 2: peerdb_route.CreateQRepFlowRequest - (*CreateQRepFlowResponse)(nil), // 3: peerdb_route.CreateQRepFlowResponse - (*ShutdownRequest)(nil), // 4: peerdb_route.ShutdownRequest - (*ShutdownResponse)(nil), // 5: peerdb_route.ShutdownResponse - (*ListPeersRequest)(nil), // 6: peerdb_route.ListPeersRequest - (*ListPeersResponse)(nil), // 7: peerdb_route.ListPeersResponse - (*FlowConnectionConfigs)(nil), // 8: peerdb_flow.FlowConnectionConfigs - (*QRepConfig)(nil), // 9: peerdb_flow.QRepConfig - (*Peer)(nil), // 10: peerdb_peers.Peer + (ValidatePeerStatus)(0), // 0: peerdb_route.ValidatePeerStatus + (CreatePeerStatus)(0), // 1: peerdb_route.CreatePeerStatus + (*CreateCDCFlowRequest)(nil), // 2: peerdb_route.CreateCDCFlowRequest + (*CreateCDCFlowResponse)(nil), // 3: peerdb_route.CreateCDCFlowResponse + (*CreateQRepFlowRequest)(nil), // 4: peerdb_route.CreateQRepFlowRequest + (*CreateQRepFlowResponse)(nil), // 5: peerdb_route.CreateQRepFlowResponse + (*ShutdownRequest)(nil), // 6: peerdb_route.ShutdownRequest + (*ShutdownResponse)(nil), // 7: peerdb_route.ShutdownResponse + (*ListPeersRequest)(nil), // 8: peerdb_route.ListPeersRequest + (*ListPeersResponse)(nil), // 9: peerdb_route.ListPeersResponse + (*ValidatePeerRequest)(nil), // 10: peerdb_route.ValidatePeerRequest + (*CreatePeerRequest)(nil), // 11: peerdb_route.CreatePeerRequest + (*ValidatePeerResponse)(nil), // 12: peerdb_route.ValidatePeerResponse + (*CreatePeerResponse)(nil), // 13: peerdb_route.CreatePeerResponse + (*FlowConnectionConfigs)(nil), // 14: peerdb_flow.FlowConnectionConfigs + (*QRepConfig)(nil), // 15: peerdb_flow.QRepConfig + (*Peer)(nil), // 16: peerdb_peers.Peer } var file_route_proto_depIdxs = []int32{ - 8, // 0: peerdb_route.CreateCDCFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 9, // 1: peerdb_route.CreateQRepFlowRequest.qrep_config:type_name -> peerdb_flow.QRepConfig - 10, // 2: peerdb_route.ShutdownRequest.source_peer:type_name -> peerdb_peers.Peer - 10, // 3: peerdb_route.ShutdownRequest.destination_peer:type_name -> peerdb_peers.Peer - 10, // 4: peerdb_route.ListPeersResponse.peers:type_name -> peerdb_peers.Peer - 6, // 5: peerdb_route.FlowService.ListPeers:input_type -> peerdb_route.ListPeersRequest - 0, // 6: peerdb_route.FlowService.CreateCDCFlow:input_type -> peerdb_route.CreateCDCFlowRequest - 2, // 7: peerdb_route.FlowService.CreateQRepFlow:input_type -> peerdb_route.CreateQRepFlowRequest - 4, // 8: peerdb_route.FlowService.ShutdownFlow:input_type -> peerdb_route.ShutdownRequest - 7, // 9: peerdb_route.FlowService.ListPeers:output_type -> peerdb_route.ListPeersResponse - 1, // 10: peerdb_route.FlowService.CreateCDCFlow:output_type -> peerdb_route.CreateCDCFlowResponse - 3, // 11: peerdb_route.FlowService.CreateQRepFlow:output_type -> peerdb_route.CreateQRepFlowResponse - 5, // 12: peerdb_route.FlowService.ShutdownFlow:output_type -> peerdb_route.ShutdownResponse - 9, // [9:13] is the sub-list for method output_type - 5, // [5:9] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 14, // 0: peerdb_route.CreateCDCFlowRequest.connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs + 15, // 1: peerdb_route.CreateQRepFlowRequest.qrep_config:type_name -> peerdb_flow.QRepConfig + 16, // 2: peerdb_route.ShutdownRequest.source_peer:type_name -> peerdb_peers.Peer + 16, // 3: peerdb_route.ShutdownRequest.destination_peer:type_name -> peerdb_peers.Peer + 16, // 4: peerdb_route.ListPeersResponse.peers:type_name -> peerdb_peers.Peer + 16, // 5: peerdb_route.ValidatePeerRequest.peer:type_name -> peerdb_peers.Peer + 16, // 6: peerdb_route.CreatePeerRequest.peer:type_name -> peerdb_peers.Peer + 0, // 7: peerdb_route.ValidatePeerResponse.status:type_name -> peerdb_route.ValidatePeerStatus + 1, // 8: peerdb_route.CreatePeerResponse.status:type_name -> peerdb_route.CreatePeerStatus + 8, // 9: peerdb_route.FlowService.ListPeers:input_type -> peerdb_route.ListPeersRequest + 10, // 10: peerdb_route.FlowService.ValidatePeer:input_type -> peerdb_route.ValidatePeerRequest + 11, // 11: peerdb_route.FlowService.CreatePeer:input_type -> peerdb_route.CreatePeerRequest + 2, // 12: peerdb_route.FlowService.CreateCDCFlow:input_type -> peerdb_route.CreateCDCFlowRequest + 4, // 13: peerdb_route.FlowService.CreateQRepFlow:input_type -> peerdb_route.CreateQRepFlowRequest + 6, // 14: peerdb_route.FlowService.ShutdownFlow:input_type -> peerdb_route.ShutdownRequest + 9, // 15: peerdb_route.FlowService.ListPeers:output_type -> peerdb_route.ListPeersResponse + 12, // 16: peerdb_route.FlowService.ValidatePeer:output_type -> peerdb_route.ValidatePeerResponse + 13, // 17: peerdb_route.FlowService.CreatePeer:output_type -> peerdb_route.CreatePeerResponse + 3, // 18: peerdb_route.FlowService.CreateCDCFlow:output_type -> peerdb_route.CreateCDCFlowResponse + 5, // 19: peerdb_route.FlowService.CreateQRepFlow:output_type -> peerdb_route.CreateQRepFlowResponse + 7, // 20: peerdb_route.FlowService.ShutdownFlow:output_type -> peerdb_route.ShutdownResponse + 15, // [15:21] is the sub-list for method output_type + 9, // [9:15] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_route_proto_init() } @@ -654,19 +1012,68 @@ func file_route_proto_init() { return nil } } + file_route_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ValidatePeerRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_route_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreatePeerRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_route_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ValidatePeerResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_route_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreatePeerResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_route_proto_rawDesc, - NumEnums: 0, - NumMessages: 8, + NumEnums: 2, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, GoTypes: file_route_proto_goTypes, DependencyIndexes: file_route_proto_depIdxs, + EnumInfos: file_route_proto_enumTypes, MessageInfos: file_route_proto_msgTypes, }.Build() File_route_proto = out.File diff --git a/flow/generated/protos/route_grpc.pb.go b/flow/generated/protos/route_grpc.pb.go index 9efd24b251..ca9e1662bb 100644 --- a/flow/generated/protos/route_grpc.pb.go +++ b/flow/generated/protos/route_grpc.pb.go @@ -20,6 +20,8 @@ const _ = grpc.SupportPackageIsVersion7 const ( FlowService_ListPeers_FullMethodName = "/peerdb_route.FlowService/ListPeers" + FlowService_ValidatePeer_FullMethodName = "/peerdb_route.FlowService/ValidatePeer" + FlowService_CreatePeer_FullMethodName = "/peerdb_route.FlowService/CreatePeer" FlowService_CreateCDCFlow_FullMethodName = "/peerdb_route.FlowService/CreateCDCFlow" FlowService_CreateQRepFlow_FullMethodName = "/peerdb_route.FlowService/CreateQRepFlow" FlowService_ShutdownFlow_FullMethodName = "/peerdb_route.FlowService/ShutdownFlow" @@ -30,6 +32,8 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type FlowServiceClient interface { ListPeers(ctx context.Context, in *ListPeersRequest, opts ...grpc.CallOption) (*ListPeersResponse, error) + ValidatePeer(ctx context.Context, in *ValidatePeerRequest, opts ...grpc.CallOption) (*ValidatePeerResponse, error) + CreatePeer(ctx context.Context, in *CreatePeerRequest, opts ...grpc.CallOption) (*CreatePeerResponse, error) CreateCDCFlow(ctx context.Context, in *CreateCDCFlowRequest, opts ...grpc.CallOption) (*CreateCDCFlowResponse, error) CreateQRepFlow(ctx context.Context, in *CreateQRepFlowRequest, opts ...grpc.CallOption) (*CreateQRepFlowResponse, error) ShutdownFlow(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) @@ -52,6 +56,24 @@ func (c *flowServiceClient) ListPeers(ctx context.Context, in *ListPeersRequest, return out, nil } +func (c *flowServiceClient) ValidatePeer(ctx context.Context, in *ValidatePeerRequest, opts ...grpc.CallOption) (*ValidatePeerResponse, error) { + out := new(ValidatePeerResponse) + err := c.cc.Invoke(ctx, FlowService_ValidatePeer_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *flowServiceClient) CreatePeer(ctx context.Context, in *CreatePeerRequest, opts ...grpc.CallOption) (*CreatePeerResponse, error) { + out := new(CreatePeerResponse) + err := c.cc.Invoke(ctx, FlowService_CreatePeer_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *flowServiceClient) CreateCDCFlow(ctx context.Context, in *CreateCDCFlowRequest, opts ...grpc.CallOption) (*CreateCDCFlowResponse, error) { out := new(CreateCDCFlowResponse) err := c.cc.Invoke(ctx, FlowService_CreateCDCFlow_FullMethodName, in, out, opts...) @@ -84,6 +106,8 @@ func (c *flowServiceClient) ShutdownFlow(ctx context.Context, in *ShutdownReques // for forward compatibility type FlowServiceServer interface { ListPeers(context.Context, *ListPeersRequest) (*ListPeersResponse, error) + ValidatePeer(context.Context, *ValidatePeerRequest) (*ValidatePeerResponse, error) + CreatePeer(context.Context, *CreatePeerRequest) (*CreatePeerResponse, error) CreateCDCFlow(context.Context, *CreateCDCFlowRequest) (*CreateCDCFlowResponse, error) CreateQRepFlow(context.Context, *CreateQRepFlowRequest) (*CreateQRepFlowResponse, error) ShutdownFlow(context.Context, *ShutdownRequest) (*ShutdownResponse, error) @@ -97,6 +121,12 @@ type UnimplementedFlowServiceServer struct { func (UnimplementedFlowServiceServer) ListPeers(context.Context, *ListPeersRequest) (*ListPeersResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ListPeers not implemented") } +func (UnimplementedFlowServiceServer) ValidatePeer(context.Context, *ValidatePeerRequest) (*ValidatePeerResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ValidatePeer not implemented") +} +func (UnimplementedFlowServiceServer) CreatePeer(context.Context, *CreatePeerRequest) (*CreatePeerResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreatePeer not implemented") +} func (UnimplementedFlowServiceServer) CreateCDCFlow(context.Context, *CreateCDCFlowRequest) (*CreateCDCFlowResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method CreateCDCFlow not implemented") } @@ -137,6 +167,42 @@ func _FlowService_ListPeers_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _FlowService_ValidatePeer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ValidatePeerRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FlowServiceServer).ValidatePeer(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: FlowService_ValidatePeer_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FlowServiceServer).ValidatePeer(ctx, req.(*ValidatePeerRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _FlowService_CreatePeer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreatePeerRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FlowServiceServer).CreatePeer(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: FlowService_CreatePeer_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FlowServiceServer).CreatePeer(ctx, req.(*CreatePeerRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _FlowService_CreateCDCFlow_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(CreateCDCFlowRequest) if err := dec(in); err != nil { @@ -202,6 +268,14 @@ var FlowService_ServiceDesc = grpc.ServiceDesc{ MethodName: "ListPeers", Handler: _FlowService_ListPeers_Handler, }, + { + MethodName: "ValidatePeer", + Handler: _FlowService_ValidatePeer_Handler, + }, + { + MethodName: "CreatePeer", + Handler: _FlowService_CreatePeer_Handler, + }, { MethodName: "CreateCDCFlow", Handler: _FlowService_CreateCDCFlow_Handler, diff --git a/nexus/pt/src/peerdb_route.rs b/nexus/pt/src/peerdb_route.rs index fb51949ea6..f564f070e4 100644 --- a/nexus/pt/src/peerdb_route.rs +++ b/nexus/pt/src/peerdb_route.rs @@ -53,6 +53,92 @@ pub struct ListPeersResponse { #[prost(message, repeated, tag="1")] pub peers: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ValidatePeerRequest { + #[prost(message, optional, tag="1")] + pub peer: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CreatePeerRequest { + #[prost(message, optional, tag="1")] + pub peer: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ValidatePeerResponse { + #[prost(enumeration="ValidatePeerStatus", tag="1")] + pub status: i32, + #[prost(string, tag="2")] + pub message: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CreatePeerResponse { + #[prost(enumeration="CreatePeerStatus", tag="1")] + pub status: i32, + #[prost(string, tag="2")] + pub message: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ValidatePeerStatus { + CreationUnknown = 0, + Valid = 1, + Invalid = 2, +} +impl ValidatePeerStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ValidatePeerStatus::CreationUnknown => "CREATION_UNKNOWN", + ValidatePeerStatus::Valid => "VALID", + ValidatePeerStatus::Invalid => "INVALID", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CREATION_UNKNOWN" => Some(Self::CreationUnknown), + "VALID" => Some(Self::Valid), + "INVALID" => Some(Self::Invalid), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum CreatePeerStatus { + ValidationUnknown = 0, + Created = 1, + Failed = 2, +} +impl CreatePeerStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + CreatePeerStatus::ValidationUnknown => "VALIDATION_UNKNOWN", + CreatePeerStatus::Created => "CREATED", + CreatePeerStatus::Failed => "FAILED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "VALIDATION_UNKNOWN" => Some(Self::ValidationUnknown), + "CREATED" => Some(Self::Created), + "FAILED" => Some(Self::Failed), + _ => None, + } + } +} include!("peerdb_route.tonic.rs"); include!("peerdb_route.serde.rs"); // @@protoc_insertion_point(module) \ No newline at end of file diff --git a/nexus/pt/src/peerdb_route.serde.rs b/nexus/pt/src/peerdb_route.serde.rs index e636634484..685aefd6f5 100644 --- a/nexus/pt/src/peerdb_route.serde.rs +++ b/nexus/pt/src/peerdb_route.serde.rs @@ -191,6 +191,291 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowResponse { deserializer.deserialize_struct("peerdb_route.CreateCDCFlowResponse", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CreatePeerRequest { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.peer.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_route.CreatePeerRequest", len)?; + if let Some(v) = self.peer.as_ref() { + struct_ser.serialize_field("peer", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for CreatePeerRequest { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "peer", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Peer, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "peer" => Ok(GeneratedField::Peer), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CreatePeerRequest; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_route.CreatePeerRequest") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut peer__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Peer => { + if peer__.is_some() { + return Err(serde::de::Error::duplicate_field("peer")); + } + peer__ = map.next_value()?; + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(CreatePeerRequest { + peer: peer__, + }) + } + } + deserializer.deserialize_struct("peerdb_route.CreatePeerRequest", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for CreatePeerResponse { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.status != 0 { + len += 1; + } + if !self.message.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_route.CreatePeerResponse", len)?; + if self.status != 0 { + let v = CreatePeerStatus::from_i32(self.status) + .ok_or_else(|| serde::ser::Error::custom(format!("Invalid variant {}", self.status)))?; + struct_ser.serialize_field("status", &v)?; + } + if !self.message.is_empty() { + struct_ser.serialize_field("message", &self.message)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for CreatePeerResponse { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "status", + "message", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Status, + Message, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "status" => Ok(GeneratedField::Status), + "message" => Ok(GeneratedField::Message), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CreatePeerResponse; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_route.CreatePeerResponse") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut status__ = None; + let mut message__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Status => { + if status__.is_some() { + return Err(serde::de::Error::duplicate_field("status")); + } + status__ = Some(map.next_value::()? as i32); + } + GeneratedField::Message => { + if message__.is_some() { + return Err(serde::de::Error::duplicate_field("message")); + } + message__ = Some(map.next_value()?); + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(CreatePeerResponse { + status: status__.unwrap_or_default(), + message: message__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("peerdb_route.CreatePeerResponse", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for CreatePeerStatus { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::ValidationUnknown => "VALIDATION_UNKNOWN", + Self::Created => "CREATED", + Self::Failed => "FAILED", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for CreatePeerStatus { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "VALIDATION_UNKNOWN", + "CREATED", + "FAILED", + ]; + + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CreatePeerStatus; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + use std::convert::TryFrom; + i32::try_from(v) + .ok() + .and_then(CreatePeerStatus::from_i32) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + use std::convert::TryFrom; + i32::try_from(v) + .ok() + .and_then(CreatePeerStatus::from_i32) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "VALIDATION_UNKNOWN" => Ok(CreatePeerStatus::ValidationUnknown), + "CREATED" => Ok(CreatePeerStatus::Created), + "FAILED" => Ok(CreatePeerStatus::Failed), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} impl serde::Serialize for CreateQRepFlowRequest { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -813,3 +1098,288 @@ impl<'de> serde::Deserialize<'de> for ShutdownResponse { deserializer.deserialize_struct("peerdb_route.ShutdownResponse", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for ValidatePeerRequest { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.peer.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_route.ValidatePeerRequest", len)?; + if let Some(v) = self.peer.as_ref() { + struct_ser.serialize_field("peer", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ValidatePeerRequest { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "peer", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Peer, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "peer" => Ok(GeneratedField::Peer), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ValidatePeerRequest; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_route.ValidatePeerRequest") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut peer__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Peer => { + if peer__.is_some() { + return Err(serde::de::Error::duplicate_field("peer")); + } + peer__ = map.next_value()?; + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(ValidatePeerRequest { + peer: peer__, + }) + } + } + deserializer.deserialize_struct("peerdb_route.ValidatePeerRequest", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for ValidatePeerResponse { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.status != 0 { + len += 1; + } + if !self.message.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_route.ValidatePeerResponse", len)?; + if self.status != 0 { + let v = ValidatePeerStatus::from_i32(self.status) + .ok_or_else(|| serde::ser::Error::custom(format!("Invalid variant {}", self.status)))?; + struct_ser.serialize_field("status", &v)?; + } + if !self.message.is_empty() { + struct_ser.serialize_field("message", &self.message)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ValidatePeerResponse { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "status", + "message", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Status, + Message, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "status" => Ok(GeneratedField::Status), + "message" => Ok(GeneratedField::Message), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ValidatePeerResponse; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_route.ValidatePeerResponse") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut status__ = None; + let mut message__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Status => { + if status__.is_some() { + return Err(serde::de::Error::duplicate_field("status")); + } + status__ = Some(map.next_value::()? as i32); + } + GeneratedField::Message => { + if message__.is_some() { + return Err(serde::de::Error::duplicate_field("message")); + } + message__ = Some(map.next_value()?); + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(ValidatePeerResponse { + status: status__.unwrap_or_default(), + message: message__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("peerdb_route.ValidatePeerResponse", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for ValidatePeerStatus { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::CreationUnknown => "CREATION_UNKNOWN", + Self::Valid => "VALID", + Self::Invalid => "INVALID", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for ValidatePeerStatus { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "CREATION_UNKNOWN", + "VALID", + "INVALID", + ]; + + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ValidatePeerStatus; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + use std::convert::TryFrom; + i32::try_from(v) + .ok() + .and_then(ValidatePeerStatus::from_i32) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + use std::convert::TryFrom; + i32::try_from(v) + .ok() + .and_then(ValidatePeerStatus::from_i32) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "CREATION_UNKNOWN" => Ok(ValidatePeerStatus::CreationUnknown), + "VALID" => Ok(ValidatePeerStatus::Valid), + "INVALID" => Ok(ValidatePeerStatus::Invalid), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} diff --git a/nexus/pt/src/peerdb_route.tonic.rs b/nexus/pt/src/peerdb_route.tonic.rs index 1f75c48893..9c251014ec 100644 --- a/nexus/pt/src/peerdb_route.tonic.rs +++ b/nexus/pt/src/peerdb_route.tonic.rs @@ -112,6 +112,58 @@ pub mod flow_service_client { self.inner.unary(req, path, codec).await } /// + pub async fn validate_peer( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/peerdb_route.FlowService/ValidatePeer", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("peerdb_route.FlowService", "ValidatePeer")); + self.inner.unary(req, path, codec).await + } + /// + pub async fn create_peer( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/peerdb_route.FlowService/CreatePeer", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("peerdb_route.FlowService", "CreatePeer")); + self.inner.unary(req, path, codec).await + } + /// pub async fn create_cdc_flow( &mut self, request: impl tonic::IntoRequest, @@ -207,6 +259,22 @@ pub mod flow_service_server { tonic::Status, >; /// + async fn validate_peer( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// + async fn create_peer( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// async fn create_cdc_flow( &self, request: tonic::Request, @@ -355,6 +423,96 @@ pub mod flow_service_server { }; Box::pin(fut) } + "/peerdb_route.FlowService/ValidatePeer" => { + #[allow(non_camel_case_types)] + struct ValidatePeerSvc(pub Arc); + impl< + T: FlowService, + > tonic::server::UnaryService + for ValidatePeerSvc { + type Response = super::ValidatePeerResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).validate_peer(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ValidatePeerSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/peerdb_route.FlowService/CreatePeer" => { + #[allow(non_camel_case_types)] + struct CreatePeerSvc(pub Arc); + impl< + T: FlowService, + > tonic::server::UnaryService + for CreatePeerSvc { + type Response = super::CreatePeerResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).create_peer(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = CreatePeerSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/peerdb_route.FlowService/CreateCDCFlow" => { #[allow(non_camel_case_types)] struct CreateCDCFlowSvc(pub Arc); diff --git a/protos/route.proto b/protos/route.proto index d7cd10f08e..96dd65aaaa 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -41,8 +41,40 @@ message ListPeersResponse { repeated peerdb_peers.Peer peers = 1; } +message ValidatePeerRequest { + peerdb_peers.Peer peer = 1; +} + +message CreatePeerRequest { + peerdb_peers.Peer peer = 1; +} + +enum ValidatePeerStatus { + CREATION_UNKNOWN = 0; + VALID = 1; + INVALID = 2; +} + +enum CreatePeerStatus { + VALIDATION_UNKNOWN = 0; + CREATED = 1; + FAILED = 2; +} + +message ValidatePeerResponse { + ValidatePeerStatus status = 1; + string message = 2; +} + +message CreatePeerResponse { + CreatePeerStatus status = 1; + string message = 2; +} + service FlowService { rpc ListPeers(ListPeersRequest) returns (ListPeersResponse) {} + rpc ValidatePeer(ValidatePeerRequest) returns (ValidatePeerResponse) {} + rpc CreatePeer(CreatePeerRequest) returns (CreatePeerResponse) {} rpc CreateCDCFlow(CreateCDCFlowRequest) returns (CreateCDCFlowResponse) {} rpc CreateQRepFlow(CreateQRepFlowRequest) returns (CreateQRepFlowResponse) {} rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) {} diff --git a/ui/app/connectors/create/page.tsx b/ui/app/connectors/create/page.tsx index f55ba52cb4..30c458489e 100644 --- a/ui/app/connectors/create/page.tsx +++ b/ui/app/connectors/create/page.tsx @@ -7,15 +7,15 @@ import { LayoutMain, RowWithSelect } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; import { Select } from '@/lib/Select'; -export default function CreateConnector() { +export default function CreatePeer() { return ( - }>Learn about connectors + }>Learn about peers