diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs index 8b992084ba3..13c8a3c0606 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs @@ -11,7 +11,7 @@ use ockam::{node, Context, Result, TcpOutletOptions, TcpTransportExtension}; use ockam_api::authenticator::enrollment_tokens::TokenAcceptor; use ockam_api::authenticator::one_time_code::OneTimeCode; use ockam_api::nodes::NodeManager; -use ockam_api::{multiaddr_to_route, multiaddr_to_transport_route, DefaultAddress}; +use ockam_api::{multiaddr_to_route, multiaddr_to_transport_route}; use ockam_core::AsyncTryClone; use ockam_multiaddr::MultiAddr; @@ -80,11 +80,11 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime node.context().async_try_clone().await?, Arc::new(tcp.clone()), node.secure_channels(), - RemoteCredentialRetrieverInfo::new( + RemoteCredentialRetrieverInfo::create_for_project_member( project.authority_identifier(), project_authority_route, - DefaultAddress::CREDENTIAL_ISSUER.into(), ), + "test".to_string(), // FIXME LATER CRED )); // 3. create an access control policy checking the value of the "component" attribute of the caller diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs index 4d48291f168..a48a29af5f9 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs @@ -9,7 +9,7 @@ use ockam::{route, Context, Result}; use ockam_api::authenticator::enrollment_tokens::TokenAcceptor; use ockam_api::authenticator::one_time_code::OneTimeCode; use ockam_api::nodes::NodeManager; -use ockam_api::{multiaddr_to_route, multiaddr_to_transport_route, DefaultAddress}; +use ockam_api::{multiaddr_to_route, multiaddr_to_transport_route}; use ockam_core::compat::sync::Arc; use ockam_core::AsyncTryClone; use ockam_multiaddr::MultiAddr; @@ -80,11 +80,11 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime node.context().async_try_clone().await?, Arc::new(tcp.clone()), node.secure_channels(), - RemoteCredentialRetrieverInfo::new( + RemoteCredentialRetrieverInfo::create_for_project_member( project.authority_identifier(), project_authority_route, - DefaultAddress::CREDENTIAL_ISSUER.into(), ), + "test".to_string(), // FIXME LATER CRED )); // 3. create an access control policy checking the value of the "component" attribute of the caller diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs b/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs index 3aab2fed08e..f0fa4284e65 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs @@ -1,5 +1,7 @@ +use crate::cloud::project::Project; use crate::nodes::service::{NodeManagerCredentialRetrieverOptions, NodeManagerTrustOptions}; -use crate::{multiaddr_to_transport_route, CliState, DefaultAddress}; +use crate::nodes::NodeManager; +use crate::{multiaddr_to_transport_route, CliState}; use ockam::identity::{IdentitiesVerification, RemoteCredentialRetrieverInfo}; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{Error, Result}; @@ -7,6 +9,156 @@ use ockam_multiaddr::MultiAddr; use ockam_vault::SoftwareVaultForVerifyingSignatures; impl CliState { + async fn retrieve_trust_options_explicit_project_authority( + &self, + authority_identity: &str, + authority_route: &Option, + expect_cached_credential: bool, + ) -> Result { + let identities_verification = IdentitiesVerification::new( + self.change_history_repository(), + SoftwareVaultForVerifyingSignatures::create(), + ); + + let authority_identity = hex::decode(authority_identity).map_err(|_e| { + Error::new( + Origin::Api, + Kind::NotFound, + "Invalid authority identity hex", + ) + })?; + let authority_identifier = identities_verification + .import(None, &authority_identity) + .await?; + + if let Some(authority_multiaddr) = authority_route { + if expect_cached_credential { + return Err(Error::new( + Origin::Api, + Kind::NotFound, + "Authority address was provided but expect_cached_credential is true", + )); + } + + let authority_route = + multiaddr_to_transport_route(authority_multiaddr).ok_or(Error::new( + Origin::Api, + Kind::NotFound, + format!("Invalid authority route: {}", &authority_multiaddr), + ))?; + let info = RemoteCredentialRetrieverInfo::create_for_project_member( + authority_identifier.clone(), + authority_route, + ); + + let trust_options = NodeManagerTrustOptions::new( + NodeManagerCredentialRetrieverOptions::Remote { + info, + project_id: "unknown".to_string(), + }, + NodeManagerCredentialRetrieverOptions::None, + Some(authority_identifier.clone()), + NodeManagerCredentialRetrieverOptions::None, + ); + + info!( + "TrustOptions configured: Authority: {}. Credentials retrieved from Remote Authority: {}", + authority_identifier, authority_multiaddr + ); + + return Ok(trust_options); + } + + if expect_cached_credential { + let trust_options = NodeManagerTrustOptions::new( + NodeManagerCredentialRetrieverOptions::CacheOnly { + issuer: authority_identifier.clone(), + project_id: "unknown".to_string(), + }, + NodeManagerCredentialRetrieverOptions::None, + Some(authority_identifier.clone()), + NodeManagerCredentialRetrieverOptions::None, + ); + + info!( + "TrustOptions configured: Authority: {}. Expect credentials in cache", + authority_identifier + ); + + return Ok(trust_options); + } + + let trust_options = NodeManagerTrustOptions::new( + NodeManagerCredentialRetrieverOptions::None, + NodeManagerCredentialRetrieverOptions::None, + Some(authority_identifier.clone()), + NodeManagerCredentialRetrieverOptions::None, + ); + + info!( + "TrustOptions configured: Authority: {}. Only verifying credentials", + authority_identifier + ); + + Ok(trust_options) + } + + async fn retrieve_trust_options_with_project( + &self, + project: Project, + ) -> Result { + let authority_identifier = project.authority_identifier()?; + let authority_multiaddr = project.authority_multiaddr()?; + let authority_route = + multiaddr_to_transport_route(authority_multiaddr).ok_or(Error::new( + Origin::Api, + Kind::NotFound, + format!("Invalid authority route: {}", &authority_multiaddr), + ))?; + + let project_id = project.project_id().to_string(); + let project_member_retriever = NodeManagerCredentialRetrieverOptions::Remote { + info: RemoteCredentialRetrieverInfo::create_for_project_member( + authority_identifier.clone(), + authority_route, + ), + project_id: project_id.clone(), + }; + + let controller_identifier = NodeManager::load_controller_identifier()?; + let controller_transport_route = NodeManager::controller_route().await?; + + let project_admin_retriever = NodeManagerCredentialRetrieverOptions::Remote { + info: RemoteCredentialRetrieverInfo::create_for_project_admin( + controller_identifier.clone(), + controller_transport_route.clone(), + project_id.clone(), + ), + project_id: project_id.clone(), + }; + + let account_admin_retriever = NodeManagerCredentialRetrieverOptions::Remote { + info: RemoteCredentialRetrieverInfo::create_for_account_admin( + controller_identifier.clone(), + controller_transport_route, + ), + project_id: project_id.clone(), // FIXME CRED Should be account it + }; + + let trust_options = NodeManagerTrustOptions::new( + project_member_retriever, + project_admin_retriever, + Some(authority_identifier.clone()), + account_admin_retriever, + ); + + info!( + "TrustOptions configured: Authority: {}. Credentials retrieved from project: {}", + authority_identifier, authority_multiaddr + ); + Ok(trust_options) + } + /// Create [`NodeManagerTrustOptions`] depending on what trust information we possess /// 1. Either we explicitly know the Authority identity that we trust, and optionally route to its node to request /// a new credential @@ -35,82 +187,15 @@ impl CliState { )); } - if authority_route.is_some() && expect_cached_credential { - return Err(Error::new( - Origin::Api, - Kind::NotFound, - "Authority address was provided but expect_cached_credential is true", - )); - } - + // We're using explicitly specified authority instead of a project if let Some(authority_identity) = authority_identity { - let identities_verification = IdentitiesVerification::new( - self.change_history_repository(), - SoftwareVaultForVerifyingSignatures::create(), - ); - - let authority_identity = hex::decode(authority_identity).map_err(|_e| { - Error::new( - Origin::Api, - Kind::NotFound, - "Invalid authority identity hex", - ) - })?; - let authority_identifier = identities_verification - .import(None, &authority_identity) - .await?; - - let trust_options = if let Some(authority_multiaddr) = authority_route { - let authority_route = - multiaddr_to_transport_route(authority_multiaddr).ok_or(Error::new( - Origin::Api, - Kind::NotFound, - format!("Invalid authority route: {}", &authority_multiaddr), - ))?; - let info = RemoteCredentialRetrieverInfo::new( - authority_identifier.clone(), + return self + .retrieve_trust_options_explicit_project_authority( + authority_identity, authority_route, - DefaultAddress::CREDENTIAL_ISSUER.into(), - ); - - let trust_options = NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::Remote(info), - Some(authority_identifier.clone()), - ); - - info!( - "TrustOptions configured: Authority: {}. Credentials retrieved from Remote Authority: {}", - authority_identifier, authority_multiaddr - ); - - trust_options - } else if expect_cached_credential { - let trust_options = NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::CacheOnly(authority_identifier.clone()), - Some(authority_identifier.clone()), - ); - - info!( - "TrustOptions configured: Authority: {}. Expect credentials in cache", - authority_identifier - ); - - trust_options - } else { - let trust_options = NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::None, - Some(authority_identifier.clone()), - ); - - info!( - "TrustOptions configured: Authority: {}. Only verifying credentials", - authority_identifier - ); - - trust_options - }; - - return Ok(trust_options); + expect_cached_credential, + ) + .await; } let project = match project_name { @@ -123,35 +208,14 @@ impl CliState { None => { info!("TrustOptions configured: No Authority. No Credentials"); return Ok(NodeManagerTrustOptions::new( + NodeManagerCredentialRetrieverOptions::None, NodeManagerCredentialRetrieverOptions::None, None, + NodeManagerCredentialRetrieverOptions::None, )); } }; - let authority_identifier = project.authority_identifier()?; - let authority_multiaddr = project.authority_multiaddr()?; - let authority_route = - multiaddr_to_transport_route(authority_multiaddr).ok_or(Error::new( - Origin::Api, - Kind::NotFound, - format!("Invalid authority route: {}", &authority_multiaddr), - ))?; - let info = RemoteCredentialRetrieverInfo::new( - authority_identifier.clone(), - authority_route, - DefaultAddress::CREDENTIAL_ISSUER.into(), - ); - - let trust_options = NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::Remote(info), - Some(authority_identifier.clone()), - ); - - info!( - "TrustOptions configured: Authority: {}. Credentials retrieved from project: {}", - authority_identifier, authority_multiaddr - ); - Ok(trust_options) + self.retrieve_trust_options_with_project(project).await } } diff --git a/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs b/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs index 45bd33cc804..587eb1c9f6a 100644 --- a/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs +++ b/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs @@ -85,10 +85,10 @@ impl NodeManager { project_identifier: &Identifier, project_multiaddr: &MultiAddr, caller_identifier: &Identifier, - credentials_enabled: CredentialsEnabled, + credentials_enabled: CredentialsEnabled, // FIXME: Choose as a member or as an admin ) -> Result { let credential_retriever_creator = match credentials_enabled { - CredentialsEnabled::On => self.credential_retriever_creator.clone(), + CredentialsEnabled::On => self.credential_retriever_creators.project_member.clone(), CredentialsEnabled::Off => None, }; @@ -246,7 +246,7 @@ impl NodeManager { get_env_with_default::(OCKAM_CONTROLLER_ADDR, default_addr).unwrap() } - async fn controller_route() -> Result { + pub async fn controller_route() -> Result { let multiaddr = Self::controller_multiaddr(); multiaddr_to_transport_route(&multiaddr).ok_or_else(|| { ApiError::core(format!( diff --git a/implementations/rust/ockam/ockam_api/src/nodes/mod.rs b/implementations/rust/ockam/ockam_api/src/nodes/mod.rs index 78263c30981..be706e7c0c8 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/mod.rs @@ -7,7 +7,7 @@ pub use service::background_node_client::*; pub use service::in_memory_node::*; pub use service::policy::*; /// The main node-manager service running on remote nodes -pub use service::{IdentityOverride, NodeManager, NodeManagerWorker}; +pub use service::{NodeManager, NodeManagerWorker}; /// A const address to bind and send messages to pub const NODEMANAGER_ADDR: &str = "_internal.nodemanager"; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service.rs b/implementations/rust/ockam/ockam_api/src/nodes/service.rs index 72adbe941aa..9f87b566db8 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service.rs @@ -1,46 +1,10 @@ //! Node Manager (Node Man, the superhero that we deserve) -use std::collections::BTreeMap; -use std::error::Error as _; -use std::net::SocketAddr; -use std::path::PathBuf; -use std::time::Duration; +use minicbor::Encode; -use miette::IntoDiagnostic; -use minicbor::{Decoder, Encode}; - -use ockam::identity::models::CredentialAndPurposeKey; -use ockam::identity::{ - CachedCredentialRetrieverCreator, CredentialRetrieverCreator, MemoryCredentialRetrieverCreator, - RemoteCredentialRetrieverCreator, RemoteCredentialRetrieverInfo, -}; -use ockam::identity::{Identifier, SecureChannels}; -use ockam::{ - Address, Context, RelayService, RelayServiceOptions, Result, Routed, TcpTransport, Worker, -}; -use ockam_abac::expr::str; -use ockam_abac::{Action, Env, Expr, Resource}; -use ockam_core::api::{Method, RequestHeader, Response}; -use ockam_core::compat::{string::String, sync::Arc}; -use ockam_core::flow_control::FlowControlId; -use ockam_core::{AllowAll, AsyncTryClone, IncomingAccessControl}; -use ockam_multiaddr::MultiAddr; - -use crate::cli_state::CliState; -use crate::cloud::{AuthorityNodeClient, CredentialsEnabled, ProjectNodeClient}; -use crate::nodes::connection::{ - Connection, ConnectionBuilder, PlainTcpInstantiator, ProjectInstantiator, - SecureChannelInstantiator, -}; -use crate::nodes::models::policies::SetPolicyRequest; -use crate::nodes::models::portal::{OutletList, OutletStatus}; -use crate::nodes::models::transport::{TransportMode, TransportType}; -use crate::nodes::registry::KafkaServiceKind; -use crate::nodes::service::default_address::DefaultAddress; -use crate::nodes::{InMemoryNode, NODEMANAGER_ADDR}; -use crate::session::MedicHandle; - -use super::registry::Registry; +use ockam::{Address, Result}; +use ockam_core::api::{RequestHeader, Response}; +use ockam_core::compat::string::String; pub(crate) mod background_node_client; pub mod default_address; @@ -57,6 +21,14 @@ mod secure_channel; mod transport; pub mod workers; +mod manager; +mod trust; +mod worker; + +pub use manager::*; +pub use trust::*; +pub use worker::*; + const TARGET: &str = "ockam_api::nodemanager::service"; /// Generate a new alias for some user created extension @@ -77,729 +49,3 @@ pub(crate) fn encode_response>( Ok(v) } - -/// Node manager provides high-level operations to -/// - send messages -/// - create secure channels, inlet, outlet -/// - configure the trust -/// - manage persistent data -pub struct NodeManager { - pub(crate) cli_state: CliState, - node_name: String, - node_identifier: Identifier, - api_transport_flow_control_id: FlowControlId, - pub(crate) tcp_transport: TcpTransport, - pub(crate) secure_channels: Arc, - pub(crate) credential_retriever_creator: Option>, - project_authority: Option, - pub(crate) registry: Arc, - pub(crate) medic_handle: MedicHandle, -} - -impl NodeManager { - pub fn identifier(&self) -> Identifier { - self.node_identifier.clone() - } - - pub(crate) async fn get_identifier_by_name( - &self, - identity_name: Option, - ) -> Result { - if let Some(name) = identity_name { - Ok(self.cli_state.get_identifier_by_name(name.as_ref()).await?) - } else { - Ok(self.identifier()) - } - } - - pub fn credential_retriever_creator(&self) -> Option> { - self.credential_retriever_creator.clone() - } - - pub fn project_authority(&self) -> Option { - self.project_authority.clone() - } - - pub fn node_name(&self) -> String { - self.node_name.clone() - } - - pub fn tcp_transport(&self) -> &TcpTransport { - &self.tcp_transport - } - - pub async fn list_outlets(&self) -> OutletList { - OutletList::new( - self.registry - .outlets - .entries() - .await - .iter() - .map(|(_, info)| { - OutletStatus::new(info.socket_addr, info.worker_addr.clone(), None) - }) - .collect(), - ) - } - - /// Delete the current node data - pub async fn delete_node(&self) -> Result<()> { - self.cli_state.remove_node(&self.node_name).await?; - Ok(()) - } -} - -impl NodeManager { - pub async fn create_authority_client( - &self, - authority_identifier: &Identifier, - authority_route: &MultiAddr, - caller_identity_name: Option, - credential_retriever_creator: Option>, - ) -> miette::Result { - self.make_authority_node_client( - authority_identifier, - authority_route, - &self - .get_identifier_by_name(caller_identity_name) - .await - .into_diagnostic()?, - credential_retriever_creator, - ) - .await - .into_diagnostic() - } - - pub async fn create_project_client( - &self, - project_identifier: &Identifier, - project_multiaddr: &MultiAddr, - caller_identity_name: Option, - credentials_enabled: CredentialsEnabled, - ) -> miette::Result { - self.make_project_node_client( - project_identifier, - project_multiaddr, - &self - .get_identifier_by_name(caller_identity_name) - .await - .into_diagnostic()?, - credentials_enabled, - ) - .await - .into_diagnostic() - } -} - -#[derive(Clone)] -pub struct NodeManagerWorker { - pub node_manager: Arc, -} - -impl NodeManagerWorker { - pub fn new(node_manager: Arc) -> Self { - NodeManagerWorker { node_manager } - } - - pub async fn stop(&self, ctx: &Context) -> Result<()> { - self.node_manager.stop(ctx).await?; - ctx.stop_worker(NODEMANAGER_ADDR).await?; - Ok(()) - } -} - -pub struct IdentityOverride { - pub identity: Vec, - pub vault_path: PathBuf, -} - -impl NodeManager { - async fn access_control( - &self, - authority: Option, - resource: Resource, - action: Action, - expression: Option, - ) -> Result> { - let resource_name_str = resource.resource_name.as_str(); - let resource_type_str = resource.resource_type.to_string(); - let action_str = action.as_ref(); - if let Some(authority) = authority { - // Populate environment with known attributes: - let mut env = Env::new(); - env.put("resource.id", str(resource_name_str)); - env.put("action.id", str(action_str)); - - // Store policy for the given resource and action - let policies = self.cli_state.policies(); - if let Some(expression) = expression { - policies - .store_policy_for_resource_name(&resource.resource_name, &action, &expression) - .await?; - } - self.cli_state.store_resource(&resource).await?; - - // Create the policy access control - let policy_access_control = policies - .make_policy_access_control( - self.cli_state.identities_attributes(), - resource, - action, - env, - authority, - ) - .await?; - - cfg_if::cfg_if! { - if #[cfg(feature = "std")] { - let cached_policy_access_control = ockam_core::access_control::CachedIncomingAccessControl::new( - Box::new(policy_access_control)); - Ok(Arc::new(cached_policy_access_control)) - } else { - Ok(Arc::new(policy_access_control)) - } - } - } else { - warn! { - resource_name = resource_name_str, - resource_type = resource_type_str, - action = action_str, - "no policy access control set" - } - Ok(Arc::new(AllowAll)) - } - } -} - -#[derive(Debug)] -pub struct NodeManagerGeneralOptions { - cli_state: CliState, - node_name: String, - start_default_services: bool, - persistent: bool, -} - -impl NodeManagerGeneralOptions { - pub fn new( - cli_state: CliState, - node_name: String, - start_default_services: bool, - persistent: bool, - ) -> Self { - Self { - cli_state, - node_name, - start_default_services, - persistent, - } - } -} - -#[derive(Clone)] -/// Transport to build connection -pub struct ApiTransport { - /// Type of transport being requested - pub tt: TransportType, - /// Mode of transport being requested - pub tm: TransportMode, - /// Socket address - pub socket_address: SocketAddr, - /// Worker address - pub worker_address: String, - /// Processor address - pub processor_address: String, - /// FlowControlId - pub flow_control_id: FlowControlId, -} - -#[derive(Debug)] -pub struct NodeManagerTransportOptions { - api_transport_flow_control_id: FlowControlId, - tcp_transport: TcpTransport, -} - -impl NodeManagerTransportOptions { - pub fn new(api_transport_flow_control_id: FlowControlId, tcp_transport: TcpTransport) -> Self { - Self { - api_transport_flow_control_id, - tcp_transport, - } - } -} - -#[derive(Debug)] -pub enum NodeManagerCredentialRetrieverOptions { - None, - CacheOnly(Identifier), - Remote(RemoteCredentialRetrieverInfo), - InMemory(CredentialAndPurposeKey), -} - -pub struct NodeManagerTrustOptions { - credential_retriever_options: NodeManagerCredentialRetrieverOptions, - authority: Option, -} - -impl NodeManagerTrustOptions { - pub fn new( - credential_retriever_options: NodeManagerCredentialRetrieverOptions, - authority: Option, - ) -> Self { - Self { - credential_retriever_options, - authority, - } - } -} - -impl NodeManager { - /// Create a new NodeManager with the node name from the ockam CLI - #[instrument(name = "create_node_manager", skip_all, fields(node_name = general_options.node_name))] - pub async fn create( - ctx: &Context, - general_options: NodeManagerGeneralOptions, - transport_options: NodeManagerTransportOptions, - trust_options: NodeManagerTrustOptions, - ) -> Result { - debug!("create transports"); - let api_transport_id = random_alias(); - let mut transports = BTreeMap::new(); - transports.insert( - api_transport_id.clone(), - transport_options.api_transport_flow_control_id.clone(), - ); - - let mut cli_state = general_options.cli_state; - cli_state.set_node_name(general_options.node_name.clone()); - - let secure_channels = cli_state - .secure_channels(&general_options.node_name) - .await?; - - let registry = Arc::new(Registry::default()); - debug!("start the medic"); - let medic_handle = MedicHandle::start_medic(ctx, registry.clone()).await?; - - debug!("retrieve the node identifier"); - let node_identifier = cli_state - .get_node(&general_options.node_name) - .await? - .identifier(); - - debug!("create default resource type policies"); - cli_state - .policies() - .store_default_resource_type_policies() - .await?; - - let credential_retriever_creator: Option> = - match trust_options.credential_retriever_options { - NodeManagerCredentialRetrieverOptions::None => None, - NodeManagerCredentialRetrieverOptions::CacheOnly(issuer) => { - Some(Arc::new(CachedCredentialRetrieverCreator::new( - issuer.clone(), - secure_channels.identities().cached_credentials_repository(), - ))) - } - NodeManagerCredentialRetrieverOptions::Remote(info) => { - Some(Arc::new(RemoteCredentialRetrieverCreator::new( - ctx.async_try_clone().await?, - Arc::new(transport_options.tcp_transport.clone()), - secure_channels.clone(), - info.clone(), - ))) - } - NodeManagerCredentialRetrieverOptions::InMemory(credential) => { - Some(Arc::new(MemoryCredentialRetrieverCreator::new(credential))) - } - }; - - let mut s = Self { - cli_state, - node_name: general_options.node_name, - node_identifier, - api_transport_flow_control_id: transport_options.api_transport_flow_control_id, - tcp_transport: transport_options.tcp_transport, - secure_channels, - credential_retriever_creator, - project_authority: trust_options.authority, - registry, - medic_handle, - }; - - debug!("retrieve the node identifier"); - s.initialize_services(ctx, general_options.start_default_services) - .await?; - info!("created a node manager for the node: {}", s.node_name); - - Ok(s) - } - - async fn initialize_default_services( - &self, - ctx: &Context, - api_flow_control_id: &FlowControlId, - ) -> Result<()> { - // Start services - ctx.flow_controls() - .add_consumer(DefaultAddress::UPPERCASE_SERVICE, api_flow_control_id); - self.start_uppercase_service_impl(ctx, DefaultAddress::UPPERCASE_SERVICE.into()) - .await?; - - RelayService::create( - ctx, - DefaultAddress::RELAY_SERVICE, - RelayServiceOptions::new() - .service_as_consumer(api_flow_control_id) - .relay_as_consumer(api_flow_control_id), - ) - .await?; - - self.create_secure_channel_listener( - DefaultAddress::SECURE_CHANNEL_LISTENER.into(), - None, // Not checking identifiers here in favor of credential check - None, - ctx, - ) - .await?; - - Ok(()) - } - - async fn initialize_services( - &mut self, - ctx: &Context, - start_default_services: bool, - ) -> Result<()> { - let api_flow_control_id = self.api_transport_flow_control_id.clone(); - - if start_default_services { - self.initialize_default_services(ctx, &api_flow_control_id) - .await?; - } - - // Always start the echoer service as ockam_api::Medic assumes it will be - // started unconditionally on every node. It's used for liveliness checks. - ctx.flow_controls() - .add_consumer(DefaultAddress::ECHO_SERVICE, &api_flow_control_id); - self.start_echoer_service(ctx, DefaultAddress::ECHO_SERVICE.into()) - .await?; - - Ok(()) - } - - pub async fn make_connection( - &self, - ctx: Arc, - addr: &MultiAddr, - identifier: Identifier, - authorized: Option, - timeout: Option, - ) -> Result { - let authorized = authorized.map(|authorized| vec![authorized]); - self.connect(ctx, addr, identifier, authorized, timeout) - .await - } - - /// Resolve project ID (if any), create secure channel (if needed) and create a tcp connection - /// Returns [`Connection`] - async fn connect( - &self, - ctx: Arc, - addr: &MultiAddr, - identifier: Identifier, - authorized: Option>, - timeout: Option, - ) -> Result { - debug!(?timeout, "connecting to {}", &addr); - let connection = ConnectionBuilder::new(addr.clone()) - .instantiate( - ctx.clone(), - self, - ProjectInstantiator::new(identifier.clone(), timeout), - ) - .await? - .instantiate(ctx.clone(), self, PlainTcpInstantiator::new()) - .await? - .instantiate( - ctx.clone(), - self, - SecureChannelInstantiator::new(&identifier, timeout, authorized), - ) - .await? - .build(); - connection.add_default_consumers(ctx); - - debug!("connected to {connection:?}"); - Ok(connection) - } - - pub(crate) async fn resolve_project(&self, name: &str) -> Result<(MultiAddr, Identifier)> { - let project = self.cli_state.projects().get_project_by_name(name).await?; - Ok(( - project.project_multiaddr()?.clone(), - project.project_identifier()?, - )) - } -} - -impl NodeManagerWorker { - //////// Request matching and response handling //////// - - #[instrument(skip_all, fields(method = ?req.method(), path = req.path()))] - async fn handle_request( - &mut self, - ctx: &mut Context, - req: &RequestHeader, - dec: &mut Decoder<'_>, - ) -> Result> { - debug! { - target: TARGET, - id = %req.id(), - method = ?req.method(), - path = %req.path(), - body = %req.has_body(), - "request" - } - - use Method::*; - let path = req.path(); - let path_segments = req.path_segments::<5>(); - let method = match req.method() { - Some(m) => m, - None => todo!(), - }; - - let r = match (method, path_segments.as_slice()) { - // ==*== Basic node information ==*== - // TODO: create, delete, destroy remote nodes - (Get, ["node"]) => encode_response(req, self.get_node_status(ctx).await)?, - - // ==*== Tcp Connection ==*== - (Get, ["node", "tcp", "connection"]) => self.get_tcp_connections(req).await.to_vec()?, - (Get, ["node", "tcp", "connection", address]) => { - encode_response(req, self.get_tcp_connection(address.to_string()).await)? - } - (Post, ["node", "tcp", "connection"]) => { - encode_response(req, self.create_tcp_connection(ctx, dec.decode()?).await)? - } - (Delete, ["node", "tcp", "connection"]) => { - encode_response(req, self.delete_tcp_connection(dec.decode()?).await)? - } - - // ==*== Tcp Listeners ==*== - (Get, ["node", "tcp", "listener"]) => self.get_tcp_listeners(req).await.to_vec()?, - (Get, ["node", "tcp", "listener", address]) => { - encode_response(req, self.get_tcp_listener(address.to_string()).await)? - } - (Post, ["node", "tcp", "listener"]) => { - encode_response(req, self.create_tcp_listener(dec.decode()?).await)? - } - (Delete, ["node", "tcp", "listener"]) => { - encode_response(req, self.delete_tcp_listener(dec.decode()?).await)? - } - - // ==*== Secure channels ==*== - (Get, ["node", "secure_channel"]) => { - encode_response(req, self.list_secure_channels().await)? - } - (Get, ["node", "secure_channel_listener"]) => { - encode_response(req, self.list_secure_channel_listener().await)? - } - (Post, ["node", "secure_channel"]) => { - encode_response(req, self.create_secure_channel(dec.decode()?, ctx).await)? - } - (Delete, ["node", "secure_channel"]) => { - encode_response(req, self.delete_secure_channel(dec.decode()?, ctx).await)? - } - (Get, ["node", "show_secure_channel"]) => { - encode_response(req, self.show_secure_channel(dec.decode()?).await)? - } - (Post, ["node", "secure_channel_listener"]) => encode_response( - req, - self.create_secure_channel_listener(dec.decode()?, ctx) - .await, - )?, - (Delete, ["node", "secure_channel_listener"]) => encode_response( - req, - self.delete_secure_channel_listener(dec.decode()?, ctx) - .await, - )?, - (Get, ["node", "show_secure_channel_listener"]) => { - encode_response(req, self.show_secure_channel_listener(dec.decode()?).await)? - } - - // ==*== Services ==*== - (Post, ["node", "services", DefaultAddress::UPPERCASE_SERVICE]) => { - encode_response(req, self.start_uppercase_service(ctx, dec.decode()?).await)? - } - (Post, ["node", "services", DefaultAddress::ECHO_SERVICE]) => { - encode_response(req, self.start_echoer_service(ctx, dec.decode()?).await)? - } - (Post, ["node", "services", DefaultAddress::HOP_SERVICE]) => { - encode_response(req, self.start_hop_service(ctx, dec.decode()?).await)? - } - (Post, ["node", "services", DefaultAddress::KAFKA_OUTLET]) => encode_response( - req, - self.start_kafka_outlet_service(ctx, dec.decode()?).await, - )?, - (Delete, ["node", "services", DefaultAddress::KAFKA_OUTLET]) => encode_response( - req, - self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Outlet) - .await, - )?, - (Post, ["node", "services", DefaultAddress::KAFKA_CONSUMER]) => encode_response( - req, - self.start_kafka_consumer_service(ctx, dec.decode()?).await, - )?, - (Delete, ["node", "services", DefaultAddress::KAFKA_CONSUMER]) => encode_response( - req, - self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Consumer) - .await, - )?, - (Post, ["node", "services", DefaultAddress::KAFKA_PRODUCER]) => encode_response( - req, - self.start_kafka_producer_service(ctx, dec.decode()?).await, - )?, - (Delete, ["node", "services", DefaultAddress::KAFKA_PRODUCER]) => encode_response( - req, - self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Producer) - .await, - )?, - (Post, ["node", "services", DefaultAddress::KAFKA_DIRECT]) => encode_response( - req, - self.start_kafka_direct_service(ctx, dec.decode()?).await, - )?, - (Delete, ["node", "services", DefaultAddress::KAFKA_DIRECT]) => encode_response( - req, - self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Direct) - .await, - )?, - (Get, ["node", "services"]) => encode_response(req, self.list_services().await)?, - (Get, ["node", "services", service_type]) => { - encode_response(req, self.list_services_of_type(service_type).await)? - } - - // ==*== Relay commands ==*== - (Get, ["node", "relay", alias]) => { - encode_response(req, self.show_relay(req, alias).await)? - } - (Get, ["node", "relay"]) => encode_response(req, self.get_relays(req).await)?, - (Delete, ["node", "relay", alias]) => { - encode_response(req, self.delete_relay(req, alias).await)? - } - (Post, ["node", "relay"]) => { - encode_response(req, self.create_relay(ctx, req, dec.decode()?).await)? - } - - // ==*== Inlets & Outlets ==*== - (Get, ["node", "inlet"]) => encode_response(req, self.get_inlets().await)?, - (Get, ["node", "inlet", alias]) => encode_response(req, self.show_inlet(alias).await)?, - (Get, ["node", "outlet"]) => self.get_outlets(req).await.to_vec()?, - (Get, ["node", "outlet", addr]) => { - let addr: Address = addr.to_string().into(); - encode_response(req, self.show_outlet(&addr).await)? - } - (Post, ["node", "inlet"]) => { - encode_response(req, self.create_inlet(ctx, dec.decode()?).await)? - } - (Post, ["node", "outlet"]) => { - encode_response(req, self.create_outlet(ctx, dec.decode()?).await)? - } - (Delete, ["node", "outlet", addr]) => { - let addr: Address = addr.to_string().into(); - encode_response(req, self.delete_outlet(&addr).await)? - } - (Delete, ["node", "inlet", alias]) => { - encode_response(req, self.delete_inlet(alias).await)? - } - (Delete, ["node", "portal"]) => todo!(), - - // ==*== Flow Controls ==*== - (Post, ["node", "flow_controls", "add_consumer"]) => { - encode_response(req, self.add_consumer(ctx, dec.decode()?).await)? - } - - // ==*== Workers ==*== - (Get, ["node", "workers"]) => encode_response(req, self.list_workers(ctx).await)?, - - // ==*== Policies ==*== - (Post, ["policy", action]) => { - let payload: SetPolicyRequest = dec.decode()?; - encode_response( - req, - self.add_policy(action, payload.resource, payload.expression) - .await, - )? - } - (Get, ["policy", action]) => { - encode_response(req, self.get_policy(action, dec.decode()?).await)? - } - (Get, ["policy"]) => encode_response(req, self.list_policies(dec.decode()?).await)?, - (Delete, ["policy", action]) => { - encode_response(req, self.delete_policy(action, dec.decode()?).await)? - } - - // ==*== Messages ==*== - (Post, ["v0", "message"]) => { - encode_response(req, self.send_message(ctx, dec.decode()?).await)? - } - - // ==*== Catch-all for Unimplemented APIs ==*== - _ => { - warn!(%method, %path, "Called invalid endpoint"); - Response::bad_request(req, &format!("Invalid endpoint: {} {}", method, path)) - .to_vec()? - } - }; - Ok(r) - } -} - -#[ockam::worker] -impl Worker for NodeManagerWorker { - type Message = Vec; - type Context = Context; - - async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> { - self.node_manager.medic_handle.stop_medic(ctx).await - } - - async fn handle_message(&mut self, ctx: &mut Context, msg: Routed>) -> Result<()> { - let return_route = msg.return_route(); - let body = msg.into_body()?; - let mut dec = Decoder::new(&body); - let req: RequestHeader = match dec.decode() { - Ok(r) => r, - Err(e) => { - error!("Failed to decode request: {:?}", e); - return Ok(()); - } - }; - - let r = match self.handle_request(ctx, &req, &mut dec).await { - Ok(r) => r, - Err(err) => { - error! { - target: TARGET, - re = %req.id(), - method = ?req.method(), - path = %req.path(), - code = %err.code(), - cause = ?err.source(), - "failed to handle request" - } - Response::internal_error(&req, &format!("failed to handle request: {err} {req:?}")) - .to_vec()? - } - }; - debug! { - target: TARGET, - re = %req.id(), - method = ?req.method(), - path = %req.path(), - "responding" - } - ctx.send(return_route, r).await - } -} diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs new file mode 100644 index 00000000000..b1bbef4d762 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs @@ -0,0 +1,517 @@ +use crate::cloud::email_address::EmailAddress; +use crate::cloud::project::Project; +use crate::cloud::share::RoleInShare; +use crate::cloud::{AuthorityNodeClient, CredentialsEnabled, ProjectNodeClient}; +use crate::nodes::connection::{ + Connection, ConnectionBuilder, PlainTcpInstantiator, ProjectInstantiator, + SecureChannelInstantiator, +}; +use crate::nodes::models::portal::{OutletList, OutletStatus}; +use crate::nodes::models::transport::{TransportMode, TransportType}; +use crate::nodes::registry::Registry; +use crate::nodes::service::{ + random_alias, CredentialRetrieverCreators, CredentialScope, + NodeManagerCredentialRetrieverOptions, NodeManagerTrustOptions, +}; +use crate::session::MedicHandle; +use crate::{CliState, DefaultAddress, EnrollmentStatus}; +use miette::IntoDiagnostic; +use ockam::identity::{ + CachedCredentialRetrieverCreator, CredentialRetrieverCreator, Identifier, + MemoryCredentialRetrieverCreator, RemoteCredentialRetrieverCreator, SecureChannels, +}; +use ockam::{RelayService, RelayServiceOptions}; +use ockam_abac::expr::str; +use ockam_abac::{Action, Env, Expr, Resource}; +use ockam_core::flow_control::FlowControlId; +use ockam_core::{AllowAll, AsyncTryClone, IncomingAccessControl}; +use ockam_multiaddr::MultiAddr; +use ockam_node::Context; +use ockam_transport_tcp::TcpTransport; +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +/// Node manager provides high-level operations to +/// - send messages +/// - create secure channels, inlet, outlet +/// - configure the trust +/// - manage persistent data +pub struct NodeManager { + pub(crate) cli_state: CliState, + pub(super) node_name: String, + pub(super) node_identifier: Identifier, + pub(super) api_transport_flow_control_id: FlowControlId, + pub(crate) tcp_transport: TcpTransport, + pub(crate) secure_channels: Arc, + pub(crate) credential_retriever_creators: CredentialRetrieverCreators, + pub(super) project_authority: Option, + pub(crate) registry: Arc, + pub(crate) medic_handle: MedicHandle, +} + +impl NodeManager { + pub fn identifier(&self) -> Identifier { + self.node_identifier.clone() + } + + pub(crate) async fn get_identifier_by_name( + &self, + identity_name: Option, + ) -> ockam_core::Result { + if let Some(name) = identity_name { + Ok(self.cli_state.get_identifier_by_name(name.as_ref()).await?) + } else { + Ok(self.identifier()) + } + } + + pub fn credential_retriever_creators(&self) -> CredentialRetrieverCreators { + self.credential_retriever_creators.clone() + } + + pub fn project_authority(&self) -> Option { + self.project_authority.clone() + } + + pub fn node_name(&self) -> String { + self.node_name.clone() + } + + pub fn tcp_transport(&self) -> &TcpTransport { + &self.tcp_transport + } + + pub async fn list_outlets(&self) -> OutletList { + OutletList::new( + self.registry + .outlets + .entries() + .await + .iter() + .map(|(_, info)| { + OutletStatus::new(info.socket_addr, info.worker_addr.clone(), None) + }) + .collect(), + ) + } + + /// Delete the current node data + pub async fn delete_node(&self) -> ockam_core::Result<()> { + self.cli_state.remove_node(&self.node_name).await?; + Ok(()) + } +} + +impl NodeManager { + pub async fn create_authority_client( + &self, + project: &Project, + caller_identity_name: Option, + ) -> miette::Result { + let enrolled = self + .cli_state + .get_identity_enrollments(EnrollmentStatus::Enrolled) + .await?; + + let caller_identifier = self + .get_identifier_by_name(caller_identity_name) + .await + .into_diagnostic()?; + + let emails: Vec = enrolled + .iter() + .flat_map(|x| { + if x.identifier() == caller_identifier { + x.email() + } else { + None + } + }) + .collect(); + + // TODO: Move to a separate function + let is_project_admin = project + .model() + .user_roles + .iter() + .any(|u| u.role == RoleInShare::Admin && emails.contains(&u.email)); + + let credential_retriever_creator = if is_project_admin { + self.credential_retriever_creators.project_admin.clone() + } else { + None + }; + + self.make_authority_node_client( + &project.authority_identifier().into_diagnostic()?, + project.authority_multiaddr().into_diagnostic()?, + &caller_identifier, + credential_retriever_creator, + ) + .await + .into_diagnostic() + } + + pub async fn create_project_client( + &self, + project_identifier: &Identifier, + project_multiaddr: &MultiAddr, + caller_identity_name: Option, + credentials_enabled: CredentialsEnabled, + ) -> miette::Result { + self.make_project_node_client( + project_identifier, + project_multiaddr, + &self + .get_identifier_by_name(caller_identity_name) + .await + .into_diagnostic()?, + credentials_enabled, + ) + .await + .into_diagnostic() + } +} + +impl NodeManager { + pub(super) async fn access_control( + &self, + authority: Option, + resource: Resource, + action: Action, + expression: Option, + ) -> ockam_core::Result> { + let resource_name_str = resource.resource_name.as_str(); + let resource_type_str = resource.resource_type.to_string(); + let action_str = action.as_ref(); + if let Some(authority) = authority { + // Populate environment with known attributes: + let mut env = Env::new(); + env.put("resource.id", str(resource_name_str)); + env.put("action.id", str(action_str)); + + // Store policy for the given resource and action + let policies = self.cli_state.policies(); + if let Some(expression) = expression { + policies + .store_policy_for_resource_name(&resource.resource_name, &action, &expression) + .await?; + } + self.cli_state.store_resource(&resource).await?; + + // Create the policy access control + let policy_access_control = policies + .make_policy_access_control( + self.cli_state.identities_attributes(), + resource, + action, + env, + authority, + ) + .await?; + cfg_if::cfg_if! { + if #[cfg(feature = "std")] { + let cached_policy_access_control = ockam_core::access_control::CachedIncomingAccessControl::new( + Box::new(policy_access_control)); + Ok(Arc::new(cached_policy_access_control)) + } else { + Ok(Arc::new(policy_access_control)) + } + } + } else { + warn! { + resource_name = resource_name_str, + resource_type = resource_type_str, + action = action_str, + "no policy access control set" + } + Ok(Arc::new(AllowAll)) + } + } +} + +#[derive(Debug)] +pub struct NodeManagerGeneralOptions { + pub(super) cli_state: CliState, + pub(super) node_name: String, + pub(super) start_default_services: bool, + pub(super) persistent: bool, +} + +impl NodeManagerGeneralOptions { + pub fn new( + cli_state: CliState, + node_name: String, + start_default_services: bool, + persistent: bool, + ) -> Self { + Self { + cli_state, + node_name, + start_default_services, + persistent, + } + } +} + +#[derive(Clone)] +/// Transport to build connection +pub struct ApiTransport { + /// Type of transport being requested + pub tt: TransportType, + /// Mode of transport being requested + pub tm: TransportMode, + /// Socket address + pub socket_address: SocketAddr, + /// Worker address + pub worker_address: String, + /// Processor address + pub processor_address: String, + /// FlowControlId + pub flow_control_id: FlowControlId, +} + +#[derive(Debug)] +pub struct NodeManagerTransportOptions { + api_transport_flow_control_id: FlowControlId, + tcp_transport: TcpTransport, +} + +impl NodeManagerTransportOptions { + pub fn new(api_transport_flow_control_id: FlowControlId, tcp_transport: TcpTransport) -> Self { + Self { + api_transport_flow_control_id, + tcp_transport, + } + } +} + +impl NodeManager { + /// Create a new NodeManager with the node name from the ockam CLI + #[instrument(name = "create_node_manager", skip_all, fields(node_name = general_options.node_name))] + pub async fn create( + ctx: &Context, + general_options: NodeManagerGeneralOptions, + transport_options: NodeManagerTransportOptions, + trust_options: NodeManagerTrustOptions, + ) -> ockam_core::Result { + debug!("create transports"); + let api_transport_id = random_alias(); + let mut transports = BTreeMap::new(); + transports.insert( + api_transport_id.clone(), + transport_options.api_transport_flow_control_id.clone(), + ); + + let mut cli_state = general_options.cli_state; + cli_state.set_node_name(general_options.node_name.clone()); + + let secure_channels = cli_state + .secure_channels(&general_options.node_name) + .await?; + + let registry = Arc::new(Registry::default()); + debug!("start the medic"); + let medic_handle = MedicHandle::start_medic(ctx, registry.clone()).await?; + + debug!("retrieve the node identifier"); + let node_identifier = cli_state + .get_node(&general_options.node_name) + .await? + .identifier(); + + debug!("create default resource type policies"); + cli_state + .policies() + .store_default_resource_type_policies() + .await?; + + let project_member_credential_retriever_creator: Option< + Arc, + > = match trust_options.project_member_credential_retriever_options { + NodeManagerCredentialRetrieverOptions::None => None, + NodeManagerCredentialRetrieverOptions::CacheOnly { issuer, project_id } => { + Some(Arc::new(CachedCredentialRetrieverCreator::new( + issuer.clone(), + CredentialScope::ProjectMember { project_id }.to_string(), + secure_channels.identities().cached_credentials_repository(), + ))) + } + NodeManagerCredentialRetrieverOptions::Remote { info, project_id } => { + Some(Arc::new(RemoteCredentialRetrieverCreator::new( + ctx.async_try_clone().await?, + Arc::new(transport_options.tcp_transport.clone()), + secure_channels.clone(), + info.clone(), + CredentialScope::ProjectMember { project_id }.to_string(), + ))) + } + NodeManagerCredentialRetrieverOptions::InMemory(credential) => { + Some(Arc::new(MemoryCredentialRetrieverCreator::new(credential))) + } + }; + + let project_admin_credential_retriever_creator: Option< + Arc, + > = match trust_options.project_admin_credential_retriever_options { + NodeManagerCredentialRetrieverOptions::None => None, + NodeManagerCredentialRetrieverOptions::CacheOnly { issuer, project_id } => { + Some(Arc::new(CachedCredentialRetrieverCreator::new( + issuer.clone(), + CredentialScope::ProjectAdmin { project_id }.to_string(), + secure_channels.identities().cached_credentials_repository(), + ))) + } + NodeManagerCredentialRetrieverOptions::Remote { info, project_id } => { + Some(Arc::new(RemoteCredentialRetrieverCreator::new( + ctx.async_try_clone().await?, + Arc::new(transport_options.tcp_transport.clone()), + secure_channels.clone(), + info.clone(), + CredentialScope::ProjectAdmin { project_id }.to_string(), + ))) + } + NodeManagerCredentialRetrieverOptions::InMemory(credential) => { + Some(Arc::new(MemoryCredentialRetrieverCreator::new(credential))) + } + }; + + let credential_retriever_creators = CredentialRetrieverCreators { + project_member: project_member_credential_retriever_creator, + project_admin: project_admin_credential_retriever_creator, + _account_admin: None, + }; + + let mut s = Self { + cli_state, + node_name: general_options.node_name, + node_identifier, + api_transport_flow_control_id: transport_options.api_transport_flow_control_id, + tcp_transport: transport_options.tcp_transport, + secure_channels, + credential_retriever_creators, + project_authority: trust_options.project_authority, + registry, + medic_handle, + }; + + debug!("retrieve the node identifier"); + s.initialize_services(ctx, general_options.start_default_services) + .await?; + info!("created a node manager for the node: {}", s.node_name); + + Ok(s) + } + + async fn initialize_default_services( + &self, + ctx: &Context, + api_flow_control_id: &FlowControlId, + ) -> ockam_core::Result<()> { + // Start services + ctx.flow_controls() + .add_consumer(DefaultAddress::UPPERCASE_SERVICE, api_flow_control_id); + self.start_uppercase_service_impl(ctx, DefaultAddress::UPPERCASE_SERVICE.into()) + .await?; + + RelayService::create( + ctx, + DefaultAddress::RELAY_SERVICE, + RelayServiceOptions::new() + .service_as_consumer(api_flow_control_id) + .relay_as_consumer(api_flow_control_id), + ) + .await?; + + self.create_secure_channel_listener( + DefaultAddress::SECURE_CHANNEL_LISTENER.into(), + None, // Not checking identifiers here in favor of credential check + None, + ctx, + ) + .await?; + + Ok(()) + } + + async fn initialize_services( + &mut self, + ctx: &Context, + start_default_services: bool, + ) -> ockam_core::Result<()> { + let api_flow_control_id = self.api_transport_flow_control_id.clone(); + + if start_default_services { + self.initialize_default_services(ctx, &api_flow_control_id) + .await?; + } + + // Always start the echoer service as ockam_api::Medic assumes it will be + // started unconditionally on every node. It's used for liveliness checks. + ctx.flow_controls() + .add_consumer(DefaultAddress::ECHO_SERVICE, &api_flow_control_id); + self.start_echoer_service(ctx, DefaultAddress::ECHO_SERVICE.into()) + .await?; + + Ok(()) + } + + pub async fn make_connection( + &self, + ctx: Arc, + addr: &MultiAddr, + identifier: Identifier, + authorized: Option, + timeout: Option, + ) -> ockam_core::Result { + let authorized = authorized.map(|authorized| vec![authorized]); + self.connect(ctx, addr, identifier, authorized, timeout) + .await + } + + /// Resolve project ID (if any), create secure channel (if needed) and create a tcp connection + /// Returns [`Connection`] + async fn connect( + &self, + ctx: Arc, + addr: &MultiAddr, + identifier: Identifier, + authorized: Option>, + timeout: Option, + ) -> ockam_core::Result { + debug!(?timeout, "connecting to {}", &addr); + let connection = ConnectionBuilder::new(addr.clone()) + .instantiate( + ctx.clone(), + self, + ProjectInstantiator::new(identifier.clone(), timeout), + ) + .await? + .instantiate(ctx.clone(), self, PlainTcpInstantiator::new()) + .await? + .instantiate( + ctx.clone(), + self, + SecureChannelInstantiator::new(&identifier, timeout, authorized), + ) + .await? + .build(); + connection.add_default_consumers(ctx); + + debug!("connected to {connection:?}"); + Ok(connection) + } + + pub(crate) async fn resolve_project( + &self, + name: &str, + ) -> ockam_core::Result<(MultiAddr, Identifier)> { + let project = self.cli_state.projects().get_project_by_name(name).await?; + Ok(( + project.project_multiaddr()?.clone(), + project.project_identifier()?, + )) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs index ceb64395f60..53a311ca39a 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs @@ -207,7 +207,7 @@ impl NodeManager { None => options, }; - let options = match self.credential_retriever_creator.as_ref() { + let options = match self.credential_retriever_creators.project_member.as_ref() { None => options, Some(credential_retriever_creator) => { options.with_credential_retriever_creator(credential_retriever_creator.clone())? @@ -315,7 +315,7 @@ impl NodeManager { None => options, }; - let options = match self.credential_retriever_creator.as_ref() { + let options = match self.credential_retriever_creators.project_member.as_ref() { None => options, Some(credential_retriever_creator) => { options.with_credential_retriever_creator(credential_retriever_creator.clone())? diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/trust.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/trust.rs new file mode 100644 index 00000000000..cd06b82b1cb --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/trust.rs @@ -0,0 +1,71 @@ +use ockam::identity::models::CredentialAndPurposeKey; +use ockam::identity::{CredentialRetrieverCreator, Identifier, RemoteCredentialRetrieverInfo}; +use std::fmt::Display; +use std::sync::Arc; + +#[derive(Clone)] +pub struct CredentialRetrieverCreators { + pub(crate) project_member: Option>, + pub(crate) project_admin: Option>, + pub(crate) _account_admin: Option>, +} + +pub enum CredentialScope { + ProjectMember { project_id: String }, + ProjectAdmin { project_id: String }, + AccountAdmin { account_id: String }, +} + +impl Display for CredentialScope { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let str = match self { + CredentialScope::ProjectMember { project_id } => { + format!("project-member-{}", project_id) + } + CredentialScope::ProjectAdmin { project_id } => { + format!("project-admin-{}", project_id) + } + CredentialScope::AccountAdmin { account_id } => { + format!("account-admin-{}", account_id) + } + }; + write!(f, "{}", str) + } +} + +#[derive(Debug)] +pub enum NodeManagerCredentialRetrieverOptions { + None, + CacheOnly { + issuer: Identifier, + project_id: String, + }, + Remote { + info: RemoteCredentialRetrieverInfo, + project_id: String, + }, + InMemory(CredentialAndPurposeKey), +} + +pub struct NodeManagerTrustOptions { + pub(super) project_member_credential_retriever_options: NodeManagerCredentialRetrieverOptions, + pub(super) project_authority: Option, + pub(super) project_admin_credential_retriever_options: NodeManagerCredentialRetrieverOptions, + pub(super) _account_admin_credential_retriever_options: NodeManagerCredentialRetrieverOptions, +} + +impl NodeManagerTrustOptions { + pub fn new( + project_member_credential_retriever_options: NodeManagerCredentialRetrieverOptions, + project_admin_credential_retriever_options: NodeManagerCredentialRetrieverOptions, + project_authority: Option, + account_admin_credential_retriever_options: NodeManagerCredentialRetrieverOptions, + ) -> Self { + Self { + project_member_credential_retriever_options, + project_admin_credential_retriever_options, + project_authority, + _account_admin_credential_retriever_options: account_admin_credential_retriever_options, + } + } +} diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs new file mode 100644 index 00000000000..b8aec1fb7fd --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs @@ -0,0 +1,293 @@ +use crate::nodes::models::policies::SetPolicyRequest; +use crate::nodes::registry::KafkaServiceKind; +use crate::nodes::service::{encode_response, TARGET}; +use crate::nodes::{InMemoryNode, NODEMANAGER_ADDR}; +use crate::DefaultAddress; +use minicbor::Decoder; +use ockam_core::api::{RequestHeader, Response}; +use ockam_core::{Address, Routed, Worker}; +use ockam_node::Context; +use std::error::Error; +use std::sync::Arc; + +#[derive(Clone)] +pub struct NodeManagerWorker { + pub node_manager: Arc, +} + +impl NodeManagerWorker { + pub fn new(node_manager: Arc) -> Self { + NodeManagerWorker { node_manager } + } + + pub async fn stop(&self, ctx: &Context) -> ockam_core::Result<()> { + self.node_manager.stop(ctx).await?; + ctx.stop_worker(NODEMANAGER_ADDR).await?; + Ok(()) + } +} + +impl NodeManagerWorker { + //////// Request matching and response handling //////// + + #[instrument(skip_all, fields(method = ?req.method(), path = req.path()))] + async fn handle_request( + &mut self, + ctx: &mut Context, + req: &RequestHeader, + dec: &mut Decoder<'_>, + ) -> ockam_core::Result> { + debug! { + target: TARGET, + id = %req.id(), + method = ?req.method(), + path = %req.path(), + body = %req.has_body(), + "request" + } + + use ockam_core::api::Method::*; + let path = req.path(); + let path_segments = req.path_segments::<5>(); + let method = match req.method() { + Some(m) => m, + None => todo!(), + }; + + let r = match (method, path_segments.as_slice()) { + // ==*== Basic node information ==*== + // TODO: create, delete, destroy remote nodes + (Get, ["node"]) => encode_response(req, self.get_node_status(ctx).await)?, + + // ==*== Tcp Connection ==*== + (Get, ["node", "tcp", "connection"]) => self.get_tcp_connections(req).await.to_vec()?, + (Get, ["node", "tcp", "connection", address]) => { + encode_response(req, self.get_tcp_connection(address.to_string()).await)? + } + (Post, ["node", "tcp", "connection"]) => { + encode_response(req, self.create_tcp_connection(ctx, dec.decode()?).await)? + } + (Delete, ["node", "tcp", "connection"]) => { + encode_response(req, self.delete_tcp_connection(dec.decode()?).await)? + } + + // ==*== Tcp Listeners ==*== + (Get, ["node", "tcp", "listener"]) => self.get_tcp_listeners(req).await.to_vec()?, + (Get, ["node", "tcp", "listener", address]) => { + encode_response(req, self.get_tcp_listener(address.to_string()).await)? + } + (Post, ["node", "tcp", "listener"]) => { + encode_response(req, self.create_tcp_listener(dec.decode()?).await)? + } + (Delete, ["node", "tcp", "listener"]) => { + encode_response(req, self.delete_tcp_listener(dec.decode()?).await)? + } + + // ==*== Secure channels ==*== + (Get, ["node", "secure_channel"]) => { + encode_response(req, self.list_secure_channels().await)? + } + (Get, ["node", "secure_channel_listener"]) => { + encode_response(req, self.list_secure_channel_listener().await)? + } + (Post, ["node", "secure_channel"]) => { + encode_response(req, self.create_secure_channel(dec.decode()?, ctx).await)? + } + (Delete, ["node", "secure_channel"]) => { + encode_response(req, self.delete_secure_channel(dec.decode()?, ctx).await)? + } + (Get, ["node", "show_secure_channel"]) => { + encode_response(req, self.show_secure_channel(dec.decode()?).await)? + } + (Post, ["node", "secure_channel_listener"]) => encode_response( + req, + self.create_secure_channel_listener(dec.decode()?, ctx) + .await, + )?, + (Delete, ["node", "secure_channel_listener"]) => encode_response( + req, + self.delete_secure_channel_listener(dec.decode()?, ctx) + .await, + )?, + (Get, ["node", "show_secure_channel_listener"]) => { + encode_response(req, self.show_secure_channel_listener(dec.decode()?).await)? + } + + // ==*== Services ==*== + (Post, ["node", "services", DefaultAddress::UPPERCASE_SERVICE]) => { + encode_response(req, self.start_uppercase_service(ctx, dec.decode()?).await)? + } + (Post, ["node", "services", DefaultAddress::ECHO_SERVICE]) => { + encode_response(req, self.start_echoer_service(ctx, dec.decode()?).await)? + } + (Post, ["node", "services", DefaultAddress::HOP_SERVICE]) => { + encode_response(req, self.start_hop_service(ctx, dec.decode()?).await)? + } + (Post, ["node", "services", DefaultAddress::KAFKA_OUTLET]) => encode_response( + req, + self.start_kafka_outlet_service(ctx, dec.decode()?).await, + )?, + (Delete, ["node", "services", DefaultAddress::KAFKA_OUTLET]) => encode_response( + req, + self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Outlet) + .await, + )?, + (Post, ["node", "services", DefaultAddress::KAFKA_CONSUMER]) => encode_response( + req, + self.start_kafka_consumer_service(ctx, dec.decode()?).await, + )?, + (Delete, ["node", "services", DefaultAddress::KAFKA_CONSUMER]) => encode_response( + req, + self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Consumer) + .await, + )?, + (Post, ["node", "services", DefaultAddress::KAFKA_PRODUCER]) => encode_response( + req, + self.start_kafka_producer_service(ctx, dec.decode()?).await, + )?, + (Delete, ["node", "services", DefaultAddress::KAFKA_PRODUCER]) => encode_response( + req, + self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Producer) + .await, + )?, + (Post, ["node", "services", DefaultAddress::KAFKA_DIRECT]) => encode_response( + req, + self.start_kafka_direct_service(ctx, dec.decode()?).await, + )?, + (Delete, ["node", "services", DefaultAddress::KAFKA_DIRECT]) => encode_response( + req, + self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Direct) + .await, + )?, + (Get, ["node", "services"]) => encode_response(req, self.list_services().await)?, + (Get, ["node", "services", service_type]) => { + encode_response(req, self.list_services_of_type(service_type).await)? + } + + // ==*== Relay commands ==*== + (Get, ["node", "relay", alias]) => { + encode_response(req, self.show_relay(req, alias).await)? + } + (Get, ["node", "relay"]) => encode_response(req, self.get_relays(req).await)?, + (Delete, ["node", "relay", alias]) => { + encode_response(req, self.delete_relay(req, alias).await)? + } + (Post, ["node", "relay"]) => { + encode_response(req, self.create_relay(ctx, req, dec.decode()?).await)? + } + + // ==*== Inlets & Outlets ==*== + (Get, ["node", "inlet"]) => encode_response(req, self.get_inlets().await)?, + (Get, ["node", "inlet", alias]) => encode_response(req, self.show_inlet(alias).await)?, + (Get, ["node", "outlet"]) => self.get_outlets(req).await.to_vec()?, + (Get, ["node", "outlet", addr]) => { + let addr: Address = addr.to_string().into(); + encode_response(req, self.show_outlet(&addr).await)? + } + (Post, ["node", "inlet"]) => { + encode_response(req, self.create_inlet(ctx, dec.decode()?).await)? + } + (Post, ["node", "outlet"]) => { + encode_response(req, self.create_outlet(ctx, dec.decode()?).await)? + } + (Delete, ["node", "outlet", addr]) => { + let addr: Address = addr.to_string().into(); + encode_response(req, self.delete_outlet(&addr).await)? + } + (Delete, ["node", "inlet", alias]) => { + encode_response(req, self.delete_inlet(alias).await)? + } + (Delete, ["node", "portal"]) => todo!(), + + // ==*== Flow Controls ==*== + (Post, ["node", "flow_controls", "add_consumer"]) => { + encode_response(req, self.add_consumer(ctx, dec.decode()?).await)? + } + + // ==*== Workers ==*== + (Get, ["node", "workers"]) => encode_response(req, self.list_workers(ctx).await)?, + + // ==*== Policies ==*== + (Post, ["policy", action]) => { + let payload: SetPolicyRequest = dec.decode()?; + encode_response( + req, + self.add_policy(action, payload.resource, payload.expression) + .await, + )? + } + (Get, ["policy", action]) => { + encode_response(req, self.get_policy(action, dec.decode()?).await)? + } + (Get, ["policy"]) => encode_response(req, self.list_policies(dec.decode()?).await)?, + (Delete, ["policy", action]) => { + encode_response(req, self.delete_policy(action, dec.decode()?).await)? + } + + // ==*== Messages ==*== + (Post, ["v0", "message"]) => { + encode_response(req, self.send_message(ctx, dec.decode()?).await)? + } + + // ==*== Catch-all for Unimplemented APIs ==*== + _ => { + warn!(%method, %path, "Called invalid endpoint"); + Response::bad_request(req, &format!("Invalid endpoint: {} {}", method, path)) + .to_vec()? + } + }; + Ok(r) + } +} + +#[ockam::worker] +impl Worker for NodeManagerWorker { + type Message = Vec; + type Context = Context; + + async fn shutdown(&mut self, ctx: &mut Self::Context) -> ockam_core::Result<()> { + self.node_manager.medic_handle.stop_medic(ctx).await + } + + async fn handle_message( + &mut self, + ctx: &mut Context, + msg: Routed>, + ) -> ockam_core::Result<()> { + let return_route = msg.return_route(); + let body = msg.into_body()?; + let mut dec = Decoder::new(&body); + let req: RequestHeader = match dec.decode() { + Ok(r) => r, + Err(e) => { + error!("Failed to decode request: {:?}", e); + return Ok(()); + } + }; + + let r = match self.handle_request(ctx, &req, &mut dec).await { + Ok(r) => r, + Err(err) => { + error! { + target: TARGET, + re = %req.id(), + method = ?req.method(), + path = %req.path(), + code = %err.code(), + cause = ?err.source(), + "failed to handle request" + } + Response::internal_error(&req, &format!("failed to handle request: {err} {req:?}")) + .to_vec()? + } + }; + debug! { + target: TARGET, + re = %req.id(), + method = ?req.method(), + path = %req.path(), + "responding" + } + ctx.send(return_route, r).await + } +} diff --git a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs index f14aec896e2..a20d68f8ba2 100644 --- a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs @@ -101,7 +101,9 @@ pub async fn start_manager_for_tests( trust_options.unwrap_or_else(|| { NodeManagerTrustOptions::new( NodeManagerCredentialRetrieverOptions::InMemory(credential), + NodeManagerCredentialRetrieverOptions::None, Some(identifier), + NodeManagerCredentialRetrieverOptions::None, ) }), ) @@ -205,8 +207,10 @@ impl TestNode { &mut context, listen_addr, Some(NodeManagerTrustOptions::new( + NodeManagerCredentialRetrieverOptions::None, NodeManagerCredentialRetrieverOptions::None, None, + NodeManagerCredentialRetrieverOptions::None, )), ) .await diff --git a/implementations/rust/ockam/ockam_app_lib/src/enroll/enroll_user.rs b/implementations/rust/ockam/ockam_app_lib/src/enroll/enroll_user.rs index 781d5aeae98..40f92ce2b2d 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/enroll/enroll_user.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/enroll/enroll_user.rs @@ -136,7 +136,7 @@ impl AppState { let node = cli_state.get_node(NODE_NAME).await?; let identifier = node.identifier(); cli_state - .set_identifier_as_enrolled(&identifier) + .set_identifier_as_enrolled(&identifier, &user_info.email) .await .into_diagnostic()?; info!(%identifier, "User enrolled successfully"); diff --git a/implementations/rust/ockam/ockam_app_lib/src/projects/commands.rs b/implementations/rust/ockam/ockam_app_lib/src/projects/commands.rs index 138d80eaeae..703a9265725 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/projects/commands.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/projects/commands.rs @@ -29,11 +29,7 @@ impl AppState { .ok_or_else(|| Error::ProjectNotFound(project_id.to_owned()))? .clone(); let authority_node = self - .authority_node( - &project.authority_identifier().into_diagnostic()?, - project.authority_multiaddr().into_diagnostic()?, - None, - ) + .authority_node(&project, None) .await .into_diagnostic()?; let otc = authority_node diff --git a/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs b/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs index dea2fd192d4..92dc3ad0d9b 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs @@ -7,7 +7,6 @@ use tokio::sync::RwLock; use tracing::{error, info, trace, warn}; pub use kind::StateKind; -use ockam::identity::Identifier; use ockam::Context; use ockam::{NodeBuilder, TcpListenerOptions, TcpTransport}; use ockam_api::cli_state::CliState; @@ -19,7 +18,6 @@ use ockam_api::nodes::models::portal::OutletStatus; use ockam_api::nodes::service::{NodeManagerGeneralOptions, NodeManagerTransportOptions}; use ockam_api::nodes::{BackgroundNodeClient, InMemoryNode, NodeManagerWorker, NODEMANAGER_ADDR}; use ockam_core::AsyncTryClone; -use ockam_multiaddr::MultiAddr; use crate::api::notification::rust::{Notification, NotificationCallback}; use crate::api::state::rust::{ @@ -379,18 +377,12 @@ impl AppState { pub async fn authority_node( &self, - authority_identifier: &Identifier, - authority_route: &MultiAddr, + project: &Project, caller_identity_name: Option, ) -> Result { let node_manager = self.node_manager.read().await; Ok(node_manager - .create_authority_client( - authority_identifier, - authority_route, - caller_identity_name, - None, - ) + .create_authority_client(project, caller_identity_name) .await?) } diff --git a/implementations/rust/ockam/ockam_command/src/credential/store.rs b/implementations/rust/ockam/ockam_command/src/credential/store.rs index 7a9a6d3de1b..dba37ce3682 100644 --- a/implementations/rust/ockam/ockam_command/src/credential/store.rs +++ b/implementations/rust/ockam/ockam_command/src/credential/store.rs @@ -100,6 +100,7 @@ impl StoreCommand { .put( &subject, &purpose_key_data.subject, + "test", // FIXME LATER CRED, credential_data.expires_at, credential.clone(), ) diff --git a/implementations/rust/ockam/ockam_command/src/enroll/command.rs b/implementations/rust/ockam/ockam_command/src/enroll/command.rs index 3961f939c8b..6a19fdd8d9c 100644 --- a/implementations/rust/ockam/ockam_command/src/enroll/command.rs +++ b/implementations/rust/ockam/ockam_command/src/enroll/command.rs @@ -287,7 +287,7 @@ impl EnrollCommand { .await .wrap_err("Failed to enroll your local Identity with Ockam Orchestrator")?; opts.state - .set_identifier_as_enrolled(&node.identifier()) + .set_identifier_as_enrolled(&node.identifier(), &user_info.email) .await .wrap_err("Unable to set your local Identity as enrolled")?; diff --git a/implementations/rust/ockam/ockam_command/src/project/enroll.rs b/implementations/rust/ockam/ockam_command/src/project/enroll.rs index c5853ae323c..c1a69b053e6 100644 --- a/implementations/rust/ockam/ockam_command/src/project/enroll.rs +++ b/implementations/rust/ockam/ockam_command/src/project/enroll.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use clap::Args; use colorful::Colorful; +use miette::miette; use miette::Context as _; -use miette::{miette, IntoDiagnostic}; use ockam::Context; use ockam_api::cli_state::enrollments::EnrollmentTicket; @@ -72,12 +72,7 @@ impl Command for EnrollCommand { ) .await?; let authority_node_client = node - .create_authority_client( - &project.authority_identifier().into_diagnostic()?, - project.authority_multiaddr().into_diagnostic()?, - Some(identity.name()), - None, - ) + .create_authority_client(&project, Some(identity.name())) .await?; // Enroll diff --git a/implementations/rust/ockam/ockam_command/src/project/ticket.rs b/implementations/rust/ockam/ockam_command/src/project/ticket.rs index ef8bcae62d3..1c6774d1f89 100644 --- a/implementations/rust/ockam/ockam_command/src/project/ticket.rs +++ b/implementations/rust/ockam/ockam_command/src/project/ticket.rs @@ -125,12 +125,7 @@ impl TicketCommand { .await?; let authority_node_client = node - .create_authority_client( - &project.authority_identifier().into_diagnostic()?, - project.authority_multiaddr().into_diagnostic()?, - Some(identity), - None, - ) + .create_authority_client(&project, Some(identity)) .await?; let attributes = self.attributes()?; diff --git a/implementations/rust/ockam/ockam_command/src/project/util.rs b/implementations/rust/ockam/ockam_command/src/project/util.rs index 0ded58b39fb..f388194fdc5 100644 --- a/implementations/rust/ockam/ockam_command/src/project/util.rs +++ b/implementations/rust/ockam/ockam_command/src/project/util.rs @@ -223,14 +223,7 @@ async fn check_authority_node_accessible( retry_strategy: Take, spinner_option: Option, ) -> Result { - let authority_node = node - .create_authority_client( - &project.authority_identifier()?, - project.authority_multiaddr()?, - None, - None, - ) - .await?; + let authority_node = node.create_authority_client(&project, None).await?; if let Some(spinner) = spinner_option.as_ref() { spinner.set_message("Establishing secure channel to project authority..."); diff --git a/implementations/rust/ockam/ockam_command/src/project_member/mod.rs b/implementations/rust/ockam/ockam_command/src/project_member/mod.rs index 8e9d58b2e39..dcb7a74f582 100644 --- a/implementations/rust/ockam/ockam_command/src/project_member/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/project_member/mod.rs @@ -6,7 +6,7 @@ use clap::Subcommand; use delete::DeleteCommand; use list::ListCommand; use list_ids::ListIdsCommand; -use miette::{miette, IntoDiagnostic}; +use miette::miette; use ockam_api::authenticator::direct::{ OCKAM_ROLE_ATTRIBUTE_ENROLLER_VALUE, OCKAM_ROLE_ATTRIBUTE_KEY, }; @@ -113,12 +113,7 @@ pub(super) async fn create_authority_client( .await?; Ok(node - .create_authority_client( - &project.authority_identifier().into_diagnostic()?, - project.authority_multiaddr().into_diagnostic()?, - Some(identity), - None, - ) + .create_authority_client(project, Some(identity)) .await?) } diff --git a/implementations/rust/ockam/ockam_core/src/api.rs b/implementations/rust/ockam/ockam_core/src/api.rs index fce07edcd56..1faaf677d13 100644 --- a/implementations/rust/ockam/ockam_core/src/api.rs +++ b/implementations/rust/ockam/ockam_core/src/api.rs @@ -566,7 +566,7 @@ impl Request { Request::build(Method::Patch, path) } - fn build>(method: Method, path: P) -> Request { + pub fn build>(method: Method, path: P) -> Request { Request { header: RequestHeader::new(method, path, false), body: None, diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cache_retriever.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cache_retriever.rs index f0ef29fb48a..a787034df86 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cache_retriever.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/cache_retriever.rs @@ -6,6 +6,7 @@ use crate::{ }; use async_trait::async_trait; use ockam_core::compat::boxed::Box; +use ockam_core::compat::string::String; use ockam_core::compat::sync::Arc; use ockam_core::{Address, Result}; use tracing::{debug, error}; @@ -18,6 +19,7 @@ pub const DEFAULT_CREDENTIAL_CLOCK_SKEW_GAP: TimestampInSeconds = TimestampInSec pub struct CachedCredentialRetriever { issuer: Identifier, subject: Identifier, + scope: String, cache: Arc, } @@ -26,11 +28,13 @@ impl CachedCredentialRetriever { pub fn new( issuer: Identifier, subject: Identifier, + scope: String, cache: Arc, ) -> Self { Self { issuer, subject, + scope, cache, } } @@ -39,6 +43,7 @@ impl CachedCredentialRetriever { pub async fn retrieve_impl( issuer: &Identifier, for_identity: &Identifier, + scope: &str, now: TimestampInSeconds, cache: Arc, clock_skew_gap: TimestampInSeconds, @@ -49,7 +54,7 @@ impl CachedCredentialRetriever { ); // check if we have a valid cached credential - if let Some(cached_credential) = cache.get(for_identity, issuer).await? { + if let Some(cached_credential) = cache.get(for_identity, issuer, scope).await? { // add an extra minute to have a bit of leeway for clock skew if cached_credential.get_expires_at()? > now + clock_skew_gap { debug!("Found valid cached credential for: {}", for_identity); @@ -59,7 +64,7 @@ impl CachedCredentialRetriever { "Found expired cached credential for: {}. Deleting...", for_identity ); - let delete_res = cache.delete(for_identity, issuer).await; + let delete_res = cache.delete(for_identity, issuer, scope).await; if let Some(err) = delete_res.err() { error!( @@ -80,13 +85,18 @@ impl CachedCredentialRetriever { /// Creator for [`CachedCredentialRetriever`] pub struct CachedCredentialRetrieverCreator { issuer: Identifier, + scope: String, cache: Arc, } impl CachedCredentialRetrieverCreator { /// Constructor - pub fn new(issuer: Identifier, cache: Arc) -> Self { - Self { issuer, cache } + pub fn new(issuer: Identifier, scope: String, cache: Arc) -> Self { + Self { + issuer, + scope, + cache, + } } } @@ -96,6 +106,7 @@ impl CredentialRetrieverCreator for CachedCredentialRetrieverCreator { Ok(Arc::new(CachedCredentialRetriever::new( self.issuer.clone(), subject.clone(), + self.scope.clone(), self.cache.clone(), ))) } @@ -112,6 +123,7 @@ impl CredentialRetriever for CachedCredentialRetriever { match Self::retrieve_impl( &self.issuer, &self.subject, + &self.scope, now, self.cache.clone(), // We can't refresh the credential, so let's still present it even if it's diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/info.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/info.rs index d375ade74f7..c1fd2a753db 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/info.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/info.rs @@ -1,27 +1,72 @@ -use serde::{Deserialize, Serialize}; - +use ockam_core::api::Method; +use ockam_core::compat::string::{String, ToString}; use ockam_core::{Address, Route}; use crate::Identifier; /// Information necessary to connect to a remote credential retriever -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub struct RemoteCredentialRetrieverInfo { /// Issuer identity, used to validate retrieved credentials pub issuer: Identifier, /// Route used to establish a secure channel to the remote node pub route: Route, - /// Address of the credentials service on the remote node + /// Address of the credentials service on the remote node, e.g. "credential_issuer" or "accounts" pub service_address: Address, + /// Request path, e.g. "/" or "/v0/project/$project_id" + pub api_service_address: String, + /// Request method, e.g. Post or Get + pub request_method: Method, } impl RemoteCredentialRetrieverInfo { - /// Create new information for a credential retriever - pub fn new(issuer: Identifier, route: Route, service_address: Address) -> Self { + /// Create info for a project member credential that we get from Project Membership Authority + pub fn create_for_project_member(issuer: Identifier, route: Route) -> Self { + Self::new( + issuer, + route, + "credential_issuer".into(), + "/".to_string(), + Method::Post, + ) + } + + /// Create info for a project admin credential that we get from the Orchestrator + pub fn create_for_project_admin(issuer: Identifier, route: Route, project_id: String) -> Self { + Self::new( + issuer, + route, + "accounts".into(), + format!("/v0/project/{}", project_id), + Method::Get, + ) + } + + /// Create info for a account admin credential that we get from the Orchestrator + pub fn create_for_account_admin(issuer: Identifier, route: Route) -> Self { + Self::new( + issuer, + route, + "accounts".into(), + "/v0/account".to_string(), + Method::Get, + ) + } + + /// Constructor + pub fn new( + issuer: Identifier, + route: Route, + service_address: Address, + api_service_address: String, + request_method: Method, + ) -> Self { Self { issuer, route, service_address, + api_service_address, + request_method, } } } diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs index ba560174faa..bcb009e4d08 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs @@ -2,6 +2,7 @@ use core::cmp::max; use tracing::{debug, error, info, trace, warn}; use ockam_core::api::Request; +use ockam_core::compat::string::{String, ToString}; use ockam_core::compat::sync::{Arc, RwLock}; use ockam_core::compat::time::Duration; use ockam_core::compat::vec::Vec; @@ -76,6 +77,7 @@ pub struct RemoteCredentialRetriever { secure_channels: Arc, pub(super) issuer_info: RemoteCredentialRetrieverInfo, pub(super) subject: Identifier, + scope: String, pub(super) timing_options: RemoteCredentialRetrieverTimingOptions, is_initialized: Arc>, @@ -92,6 +94,7 @@ impl RemoteCredentialRetriever { secure_channels: Arc, issuer_info: RemoteCredentialRetrieverInfo, subject: Identifier, + scope: String, timing_options: RemoteCredentialRetrieverTimingOptions, ) -> Self { debug!( @@ -105,6 +108,7 @@ impl RemoteCredentialRetriever { secure_channels, issuer_info, subject, + scope, timing_options, is_initialized: Arc::new(Mutex::new(false)), last_presented_credential: Arc::new(RwLock::new(None)), @@ -129,6 +133,7 @@ impl RemoteCredentialRetriever { let last_presented_credential = match CachedCredentialRetriever::retrieve_impl( &self.issuer_info.issuer, &self.subject, + &self.scope, now, self.secure_channels .identities @@ -285,7 +290,14 @@ impl RemoteCredentialRetriever { ); let credential = client - .ask(&self.ctx, "credential_issuer", Request::post("/")) + .ask( + &self.ctx, + &self.issuer_info.service_address.to_string(), // FIXME: Use Address type + Request::build( + self.issuer_info.request_method, + self.issuer_info.api_service_address.clone(), + ), + ) .await? .success()?; @@ -318,6 +330,7 @@ impl RemoteCredentialRetriever { .put( &self.subject, &self.issuer_info.issuer, + &self.scope, expires_at, credential, ) diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_creator.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_creator.rs index 616598b572a..c90c2a7b7ad 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_creator.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever_creator.rs @@ -1,5 +1,6 @@ use ockam_core::compat::boxed::Box; use ockam_core::compat::collections::BTreeMap; +use ockam_core::compat::string::String; use ockam_core::compat::sync::Arc; use ockam_core::{async_trait, Address, AllowAll, DenyAll, Mailboxes, Result}; use ockam_node::compat::asynchronous::RwLock; @@ -18,6 +19,7 @@ pub struct RemoteCredentialRetrieverCreator { transport: Arc, secure_channels: Arc, info: RemoteCredentialRetrieverInfo, + scope: String, timing_options: RemoteCredentialRetrieverTimingOptions, // Should be only one retriever per subject Identifier @@ -31,12 +33,14 @@ impl RemoteCredentialRetrieverCreator { transport: Arc, secure_channels: Arc, info: RemoteCredentialRetrieverInfo, + scope: String, ) -> Self { Self { ctx, transport, secure_channels, info, + scope, timing_options: Default::default(), registry: Default::default(), } @@ -48,6 +52,7 @@ impl RemoteCredentialRetrieverCreator { transport: Arc, secure_channels: Arc, info: RemoteCredentialRetrieverInfo, + scope: String, timing_options: RemoteCredentialRetrieverTimingOptions, ) -> Self { Self { @@ -55,6 +60,7 @@ impl RemoteCredentialRetrieverCreator { transport, secure_channels, info, + scope, timing_options, registry: Default::default(), } @@ -105,6 +111,7 @@ impl CredentialRetrieverCreator for RemoteCredentialRetrieverCreator { self.secure_channels.clone(), self.info.clone(), subject.clone(), + self.scope.clone(), self.timing_options, ); debug!( diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository.rs index 7c0c2eb115a..1ce97a94b89 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository.rs @@ -12,6 +12,7 @@ pub trait CredentialRepository: Send + Sync + 'static { &self, subject: &Identifier, issuer: &Identifier, + scope: &str, ) -> Result>; /// Put credential (overwriting) @@ -19,10 +20,11 @@ pub trait CredentialRepository: Send + Sync + 'static { &self, subject: &Identifier, issuer: &Identifier, + scope: &str, expires_at: TimestampInSeconds, credential: CredentialAndPurposeKey, ) -> Result<()>; /// Delete credential - async fn delete(&self, subject: &Identifier, issuer: &Identifier) -> Result<()>; + async fn delete(&self, subject: &Identifier, issuer: &Identifier, scope: &str) -> Result<()>; } diff --git a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs index efd55fbf72f..7265bc2f9e7 100644 --- a/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs +++ b/implementations/rust/ockam/ockam_identity/src/identities/storage/credential_repository_sql.rs @@ -59,12 +59,14 @@ impl CredentialRepository for CredentialSqlxDatabase { &self, subject: &Identifier, issuer: &Identifier, + scope: &str, ) -> Result> { let query = query_as( - "SELECT credential FROM credential WHERE subject_identifier=$1 AND issuer_identifier=$2 AND node_name=$3" + "SELECT credential FROM credential WHERE subject_identifier=$1 AND issuer_identifier=$2 AND scope=$3 AND node_name=$4" ) .bind(subject.to_sql()) .bind(issuer.to_sql()) + .bind(scope.to_sql()) .bind(self.database.node_name()?.to_sql()); let cached_credential: Option = query .fetch_optional(&*self.database.pool) @@ -77,24 +79,27 @@ impl CredentialRepository for CredentialSqlxDatabase { &self, subject: &Identifier, issuer: &Identifier, + scope: &str, expires_at: TimestampInSeconds, credential: CredentialAndPurposeKey, ) -> Result<()> { let query = query( - "INSERT OR REPLACE INTO credential (subject_identifier, issuer_identifier, credential, expires_at, node_name) VALUES (?, ?, ?, ?, ?)" + "INSERT OR REPLACE INTO credential (subject_identifier, issuer_identifier, scope, credential, expires_at, node_name) VALUES (?, ?, ?, ?, ?, ?)" ) .bind(subject.to_sql()) .bind(issuer.to_sql()) + .bind(scope.to_sql()) .bind(credential.encode_as_cbor_bytes()?.to_sql()) .bind(expires_at.to_sql()) .bind(self.database.node_name()?.to_sql()); query.execute(&*self.database.pool).await.void() } - async fn delete(&self, subject: &Identifier, issuer: &Identifier) -> Result<()> { - let query = query("DELETE FROM credential WHERE subject_identifier=$1 AND issuer_identifier=$2 AND node_name=$3") + async fn delete(&self, subject: &Identifier, issuer: &Identifier, scope: &str) -> Result<()> { + let query = query("DELETE FROM credential WHERE subject_identifier=$1 AND issuer_identifier=$2 AND scope=$3 AND node_name=$4") .bind(subject.to_sql()) .bind(issuer.to_sql()) + .bind(scope.to_sql()) .bind(self.database.node_name()?.to_sql()); query.execute(&*self.database.pool).await.void() } @@ -125,6 +130,7 @@ mod tests { #[tokio::test] async fn test_cached_credential_repository() -> Result<()> { + let scope = "test".to_string(); let repository = create_repository().await?; let identities = identities().await?; @@ -145,11 +151,12 @@ mod tests { .put( &subject, &issuer, + &scope, credential1.get_credential_data()?.expires_at, credential1.clone(), ) .await?; - let credential2 = repository.get(&subject, &issuer).await?; + let credential2 = repository.get(&subject, &issuer, &scope).await?; assert_eq!(credential2, Some(credential1)); let attributes2 = AttributesBuilder::with_schema(CredentialSchemaIdentifier(1)) @@ -164,15 +171,16 @@ mod tests { .put( &subject, &issuer, + &scope, credential3.get_credential_data()?.expires_at, credential3.clone(), ) .await?; - let credential4 = repository.get(&subject, &issuer).await?; + let credential4 = repository.get(&subject, &issuer, &scope).await?; assert_eq!(credential4, Some(credential3)); - repository.delete(&subject, &issuer).await?; - let result = repository.get(&subject, &issuer).await?; + repository.delete(&subject, &issuer, &scope).await?; + let result = repository.get(&subject, &issuer, &scope).await?; assert_eq!(result, None); Ok(()) diff --git a/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs b/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs index bd57a7bdbc5..426f3556205 100644 --- a/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs +++ b/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs @@ -277,11 +277,11 @@ async fn init( ctx.async_try_clone().await?, Arc::new(tcp), client_secure_channels.clone(), - RemoteCredentialRetrieverInfo::new( + RemoteCredentialRetrieverInfo::create_for_project_member( authority.clone(), route!["authority_api"], - "credential_issuer".into(), ), + "test".to_string(), timing_options, )); diff --git a/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/sql/20240307100000_credential_add_scope.sql b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/sql/20240307100000_credential_add_scope.sql new file mode 100644 index 00000000000..d45af4de5d8 --- /dev/null +++ b/implementations/rust/ockam/ockam_node/src/storage/database/migrations/node_migrations/sql/20240307100000_credential_add_scope.sql @@ -0,0 +1,26 @@ +DROP TABLE credential; + +CREATE TABLE credential +( + subject_identifier TEXT NOT NULL, + issuer_identifier TEXT NOT NULL, + scope TEXT NOT NULL, + credential TEXT NOT NULL, + expires_at INTEGER, + node_name TEXT NOT NULL -- node name to isolate credential that each node has +); + +CREATE UNIQUE INDEX credential_issuer_subject_scope_index ON credential (issuer_identifier, subject_identifier, scope); + +-- Drop unique index since we now allow multiple entries with the same issuer&subject pair +DROP INDEX identity_attributes_attested_by_index; + +-- FIXME: Add scope + +CREATE INDEX identity_attributes_expires_node_name_index ON identity_attributes (expires, node_name); + +CREATE INDEX identity_attributes_identifier_attested_by_node_name_index ON identity_attributes (identifier, attested_by, node_name); + +CREATE INDEX identity_identifier_index ON identity_attributes (identifier); + +CREATE INDEX identity_node_name_index ON identity_attributes (node_name);