From e61951f30551d1c1a636c41d34f35a007c593eac Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Thu, 11 Jan 2024 10:56:24 -0700 Subject: [PATCH] Capsicum (#481) Capsicum is a security technology that restricts processes from accessing any global namespaces. After entering capsicum mode, a process may no longer use a syscall like open(); instead it's restricted to openat(). This PR: * Modifies unftp-sbe-fs to work in capability mode, using cap-std * Modifies libunftp to provide hooks for Capsicum-using consumers * Extends libunftp-auth-jsonfile to allow per-user home directories * Adds an example to unftp-sbe-fs demonstrating Capsicum mode. Fixes #475 --- Cargo.toml | 8 +- .../examples/jsonfile_auth.rs | 12 +- crates/unftp-auth-rest/examples/rest.rs | 13 +- crates/unftp-sbe-fs/Cargo.toml | 10 + crates/unftp-sbe-fs/examples/basic.rs | 4 +- .../unftp-sbe-fs/examples/cap-ftpd-worker.rs | 267 ++++++++++++++++ crates/unftp-sbe-fs/examples/cap-ftpd.rs | 29 ++ crates/unftp-sbe-fs/examples/proxyprotocol.rs | 5 +- crates/unftp-sbe-fs/src/cap_fs.rs | 106 ++++++ crates/unftp-sbe-fs/src/ext.rs | 6 +- crates/unftp-sbe-fs/src/lib.rs | 190 +++++------ crates/unftp-sbe-fs/src/tests.rs | 7 + crates/unftp-sbe-fs/tests/main.rs | 6 +- crates/unftp-sbe-gcs/examples/gcs.rs | 10 +- crates/unftp-sbe-gcs/src/ext.rs | 9 +- crates/unftp-sbe-gcs/src/lib.rs | 10 +- crates/unftp-sbe-gcs/tests/main.rs | 7 +- src/auth/user.rs | 13 +- src/lib.rs | 7 +- src/notification/event.rs | 6 +- src/notification/mod.rs | 4 +- src/server/controlchan/commands/pass.rs | 26 +- src/server/controlchan/commands/pasv.rs | 29 +- src/server/controlchan/commands/user.rs | 23 +- src/server/controlchan/control_loop.rs | 16 +- src/server/ftpserver.rs | 301 +++++++++++++++--- src/server/ftpserver/chosen.rs | 4 + src/server/ftpserver/listen.rs | 36 ++- src/server/ftpserver/options.rs | 33 +- src/server/session.rs | 8 + src/storage/storage_backend.rs | 9 + tests/common.rs | 5 +- 32 files changed, 988 insertions(+), 231 deletions(-) create mode 100644 crates/unftp-sbe-fs/examples/cap-ftpd-worker.rs create mode 100644 crates/unftp-sbe-fs/examples/cap-ftpd.rs create mode 100644 crates/unftp-sbe-fs/src/cap_fs.rs diff --git a/Cargo.toml b/Cargo.toml index 1dedd98e..a7c605df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ getrandom = "0.2.11" lazy_static = "1.4.0" md-5 = "0.10.6" moka = { version = "0.11.3", default-features = false, features = ["sync"] } +nix = { version = "0.26.2", default-features = false, features = ["fs"] } prometheus = { version = "0.13.3", default-features = false } proxy-protocol = "0.5.0" rustls = "0.21.10" @@ -47,7 +48,7 @@ rustls-pemfile = "1.0.4" slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_info"] } slog-stdlog = "4.1.1" thiserror = "1.0.51" -tokio = { version = "1.35.1", features = ["macros", "rt", "net", "sync", "io-util", "time"] } +tokio = { version = "1.35.1", features = ["macros", "rt", "net", "process", "sync", "io-util", "time"] } tokio-rustls = "0.24.1" tokio-util = { version = "0.7.10", features = ["codec"] } tracing = { version = "0.1.40", default-features = false } @@ -61,3 +62,8 @@ libc = "0.2" pretty_assertions = "1.4.0" tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] } unftp-sbe-fs = { path = "../libunftp/crates/unftp-sbe-fs"} + + +[patch.crates-io] +capsicum = { git = "https://github.com/asomers/capsicum-rs", rev = "24330ee"} +casper-sys = { git = "https://github.com/asomers/capsicum-rs", rev = "24330ee"} diff --git a/crates/unftp-auth-jsonfile/examples/jsonfile_auth.rs b/crates/unftp-auth-jsonfile/examples/jsonfile_auth.rs index 06b44ce0..7f84fddf 100644 --- a/crates/unftp-auth-jsonfile/examples/jsonfile_auth.rs +++ b/crates/unftp-auth-jsonfile/examples/jsonfile_auth.rs @@ -2,17 +2,21 @@ use std::sync::Arc; use unftp_auth_jsonfile::JsonFileAuthenticator; use unftp_sbe_fs::ServerExt; -pub fn main() -> Result<(), Box> { +#[tokio::main(flavor = "current_thread")] +pub async fn main() -> Result<(), Box> { pretty_env_logger::init(); let authenticator = JsonFileAuthenticator::from_file(String::from("credentials.json"))?; let addr = "127.0.0.1:2121"; - let server = libunftp::Server::with_fs(std::env::temp_dir()).authenticator(Arc::new(authenticator)); + let server = libunftp::Server::with_fs(std::env::temp_dir()) + .authenticator(Arc::new(authenticator)) + .build() + .await + .unwrap(); println!("Starting ftp server on {}", addr); - let runtime = tokio::runtime::Builder::new_current_thread().enable_io().enable_time().build().unwrap(); - runtime.block_on(server.listen(addr))?; + server.listen(addr).await?; Ok(()) } diff --git a/crates/unftp-auth-rest/examples/rest.rs b/crates/unftp-auth-rest/examples/rest.rs index f52513ed..0fe921ed 100644 --- a/crates/unftp-auth-rest/examples/rest.rs +++ b/crates/unftp-auth-rest/examples/rest.rs @@ -1,10 +1,10 @@ use std::env; use std::sync::Arc; -use tokio::runtime::Builder as TokioBuilder; use unftp_auth_rest::{Builder, RestAuthenticator}; use unftp_sbe_fs::ServerExt; -pub fn main() -> Result<(), Box> { +#[tokio::main(flavor = "current_thread")] +pub async fn main() -> Result<(), Box> { pretty_env_logger::init(); let _args: Vec = env::args().collect(); @@ -21,10 +21,13 @@ pub fn main() -> Result<(), Box> { .build()?; let addr = "127.0.0.1:2121"; - let server = libunftp::Server::with_fs(std::env::temp_dir()).authenticator(Arc::new(authenticator)); + let server = libunftp::Server::with_fs(std::env::temp_dir()) + .authenticator(Arc::new(authenticator)) + .build() + .await + .unwrap(); println!("Starting ftp server on {}", addr); - let runtime = TokioBuilder::new_current_thread().enable_io().enable_time().build()?; - runtime.block_on(server.listen(addr))?; + server.listen(addr).await?; Ok(()) } diff --git a/crates/unftp-sbe-fs/Cargo.toml b/crates/unftp-sbe-fs/Cargo.toml index fe08477d..aebaa1b2 100644 --- a/crates/unftp-sbe-fs/Cargo.toml +++ b/crates/unftp-sbe-fs/Cargo.toml @@ -21,7 +21,9 @@ readme = "README.md" [dependencies] async-trait = "0.1.75" cfg-if = "1.0" +cap-std = "2.0" futures = { version = "0.3.29", default-features = false, features = ["std"] } +lazy_static = "1.4.0" libunftp = { version="0.19.1", path="../../"} path_abs = "0.5.1" tokio = { version = "1.35.1", features = ["rt", "net", "sync", "io-util", "time", "fs"] } @@ -31,13 +33,21 @@ tracing-attributes = "0.1.27" [dev-dependencies] async_ftp = "6.0.0" +async-trait = "0.1.73" more-asserts = "0.3.1" pretty_assertions = "1.4.0" pretty_env_logger = "0.5.0" rstest = "0.18.2" +serde = { version = "1.0.188", features = ["derive"] } +serde_json = "1.0.107" slog-async = "2.8.0" slog-term = "2.9.0" tempfile = "3.8.1" tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] } tracing-subscriber = "0.3.18" getrandom = "0.2.11" + +[target.'cfg(target_os = "freebsd")'.dev-dependencies] +capsicum = { version = "0.3.0", features = ["casper"] } +capsicum-net = { version = "0.1.0", features = ["tokio"], git = "https://github.com/asomers/capsicum-net", rev = "c6fc574" } + diff --git a/crates/unftp-sbe-fs/examples/basic.rs b/crates/unftp-sbe-fs/examples/basic.rs index 6fdbb358..e4b62122 100644 --- a/crates/unftp-sbe-fs/examples/basic.rs +++ b/crates/unftp-sbe-fs/examples/basic.rs @@ -1,11 +1,11 @@ use unftp_sbe_fs::ServerExt; -#[tokio::main] +#[tokio::main(flavor = "current_thread")] pub async fn main() { pretty_env_logger::init(); let addr = "127.0.0.1:2121"; - let server = libunftp::Server::with_fs(std::env::temp_dir()); + let server = libunftp::Server::with_fs(std::env::temp_dir()).build().await.unwrap(); println!("Starting ftp server on {}", addr); server.listen(addr).await.unwrap(); diff --git a/crates/unftp-sbe-fs/examples/cap-ftpd-worker.rs b/crates/unftp-sbe-fs/examples/cap-ftpd-worker.rs new file mode 100644 index 00000000..7c57cdb5 --- /dev/null +++ b/crates/unftp-sbe-fs/examples/cap-ftpd-worker.rs @@ -0,0 +1,267 @@ +//! A libexec helper for cap-std. It takes an int as $1 which is interpreted as +//! a file descriptor for an already-connected an authenticated control socket. +//! Do not invoke this program directly. Rather, invoke it by examples/cap-ftpd +use std::{ + env, + os::fd::{FromRawFd, RawFd}, + process::exit, + sync::{Arc, Mutex}, +}; + +use cfg_if::cfg_if; + +use tokio::net::TcpStream; + +use libunftp::Server; +use unftp_sbe_fs::Filesystem; + +mod auth { + use std::{ + collections::HashMap, + fmt, fs, + io::Read, + path::{Path, PathBuf}, + time::Duration, + }; + + use async_trait::async_trait; + use libunftp::auth::{AuthenticationError, Authenticator, DefaultUser, UserDetail}; + use serde::Deserialize; + use tokio::time::sleep; + + #[derive(Debug)] + pub struct User { + username: String, + home: Option, + } + + #[derive(Deserialize, Clone, Debug)] + #[serde(untagged)] + enum Credentials { + Plaintext { + username: String, + password: Option, + home: Option, + }, + } + + #[derive(Clone, Debug)] + struct UserCreds { + pub password: Option, + pub home: Option, + } + + impl User { + fn new(username: &str, home: &Option) -> Self { + User { + username: username.to_owned(), + home: home.clone(), + } + } + } + + impl UserDetail for User { + fn home(&self) -> Option<&Path> { + match &self.home { + None => None, + Some(p) => Some(p.as_path()), + } + } + } + + impl fmt::Display for User { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.username.as_str()) + } + } + + /// This structure implements the libunftp `Authenticator` trait + #[derive(Clone, Debug)] + pub struct JsonFileAuthenticator { + credentials_map: HashMap, + } + + impl JsonFileAuthenticator { + /// Initialize a new [`JsonFileAuthenticator`] from file. + pub fn from_file>(filename: P) -> Result> { + let mut f = fs::File::open(&filename)?; + + let mut json = String::new(); + f.read_to_string(&mut json)?; + + Self::from_json(json) + } + + /// Initialize a new [`JsonFileAuthenticator`] from json string. + pub fn from_json>(json: T) -> Result> { + let credentials_list: Vec = serde_json::from_str::>(&json.into())?; + let map: Result, _> = credentials_list.into_iter().map(Self::list_entry_to_map_entry).collect(); + Ok(JsonFileAuthenticator { credentials_map: map? }) + } + + fn list_entry_to_map_entry(user_info: Credentials) -> Result<(String, UserCreds), Box> { + let map_entry = match user_info { + Credentials::Plaintext { username, password, home } => (username.clone(), UserCreds { password, home }), + }; + Ok(map_entry) + } + + fn check_password(given_password: &str, actual_password: &Option) -> Result<(), ()> { + if let Some(pwd) = actual_password { + if pwd == given_password { + Ok(()) + } else { + Err(()) + } + } else { + Err(()) + } + } + } + + #[async_trait] + impl Authenticator for JsonFileAuthenticator { + #[tracing_attributes::instrument] + async fn authenticate(&self, username: &str, creds: &libunftp::auth::Credentials) -> Result { + let res = if let Some(actual_creds) = self.credentials_map.get(username) { + let pass_check_result = match &creds.password { + Some(ref given_password) => { + if Self::check_password(given_password, &actual_creds.password).is_ok() { + Some(Ok(User::new(username, &actual_creds.home))) + } else { + Some(Err(AuthenticationError::BadPassword)) + } + } + None => None, + }; + + match pass_check_result { + None => Err(AuthenticationError::BadPassword), + Some(pass_res) => { + if pass_res.is_ok() { + Ok(User::new(username, &actual_creds.home)) + } else { + pass_res + } + } + } + } else { + Err(AuthenticationError::BadUser) + }; + + if res.is_err() { + sleep(Duration::from_millis(1500)).await; + } + + res + } + + fn name(&self) -> &str { + std::any::type_name::() + } + } + + #[async_trait] + impl Authenticator for JsonFileAuthenticator { + #[tracing_attributes::instrument] + async fn authenticate(&self, username: &str, creds: &libunftp::auth::Credentials) -> Result { + let _: User = self.authenticate(username, creds).await?; + Ok(DefaultUser {}) + } + } +} + +use auth::{JsonFileAuthenticator, User}; + +cfg_if! { + if #[cfg(target_os = "freebsd")] { + use std::{ + io, + net::IpAddr, + ops::Range + }; + use async_trait::async_trait; + use capsicum::casper::Casper; + use capsicum_net::{CapNetAgent, CasperExt, tokio::TcpSocketExt}; + use tokio::net::TcpSocket; + + #[derive(Debug)] + struct CapBinder { + agent: CapNetAgent + } + + impl CapBinder { + fn new(agent: CapNetAgent) -> Self { + Self{agent} + } + } + + #[async_trait] + impl libunftp::options::Binder for CapBinder { + async fn bind(&mut self, local_addr: IpAddr, passive_ports: Range) -> io::Result { + const BIND_RETRIES: u8 = 10; + + for _ in 1..BIND_RETRIES { + let mut data = [0u8; 2]; + getrandom::getrandom(&mut data).expect("Error generating random port"); + let r16 = u16::from_ne_bytes(data); + let p = passive_ports.start + r16 % (passive_ports.end - passive_ports.start); + let socket = TcpSocket::new_v4()?; + let addr = std::net::SocketAddr::new(local_addr, p); + match socket.cap_bind(&mut self.agent, addr) { + Ok(()) => return Ok(socket), + Err(_) => todo!() + } + } + panic!() + } + } + } +} + +#[tokio::main(flavor = "current_thread")] +#[allow(unused_mut)] // Not unused on all OSes. +async fn main() { + println!("Starting helper"); + let args: Vec = env::args().collect(); + + if args.len() != 3 { + eprintln!("Usage: {} ", args[0]); + exit(2); + } + let fd: RawFd = if let Ok(fd) = args[2].parse() { + fd + } else { + eprintln!("Usage: {} \nFD must be numeric", args[0]); + exit(2) + }; + + let std_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + + let control_sock = TcpStream::from_std(std_stream).unwrap(); + + let auth = Arc::new(JsonFileAuthenticator::from_file(args[1].clone()).unwrap()); + // XXX This would be a lot easier if the libunftp API allowed creating the + // storage just before calling service. + let storage = Mutex::new(Some(Filesystem::new(std::env::temp_dir()))); + let sgen = Box::new(move || storage.lock().unwrap().take().unwrap()); + + let mut sb = libunftp::ServerBuilder::with_authenticator(sgen, auth); + cfg_if! { + if #[cfg(target_os = "freebsd")] { + // Safe because we're single-threaded + let mut casper = unsafe { Casper::new().unwrap() }; + + let cap_net = casper.net().unwrap(); + let binder = CapBinder::new(cap_net); + sb = sb.binder(binder); + } + } + let server: Server = sb.build().await.unwrap(); + cfg_if! { + if #[cfg(target_os = "freebsd")] { + capsicum::enter().unwrap(); + } + } + server.service(control_sock).await.unwrap() +} diff --git a/crates/unftp-sbe-fs/examples/cap-ftpd.rs b/crates/unftp-sbe-fs/examples/cap-ftpd.rs new file mode 100644 index 00000000..414c28a2 --- /dev/null +++ b/crates/unftp-sbe-fs/examples/cap-ftpd.rs @@ -0,0 +1,29 @@ +//! A server that jails each connected session with Capsicum. +use std::{ffi::OsString, path::Path, str::FromStr}; + +use unftp_sbe_fs::ServerExt; + +#[tokio::main(flavor = "current_thread")] +pub async fn main() { + let addr = "127.0.0.1:2121"; + + let args: Vec = std::env::args().collect(); + + if args.len() != 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(2); + } + let auth_file = &args[1]; + + let args: Vec = std::env::args().collect(); + let helper = Path::new(&args[0]).parent().unwrap().join("cap-ftpd-worker"); + let helper_args = vec![OsString::from_str(auth_file).unwrap()]; + let server = libunftp::Server::with_fs(std::env::temp_dir()) + .connection_helper(helper.into(), helper_args) + .build() + .await + .unwrap(); + + println!("Starting ftp server on {}", addr); + server.listen(addr).await.unwrap(); +} diff --git a/crates/unftp-sbe-fs/examples/proxyprotocol.rs b/crates/unftp-sbe-fs/examples/proxyprotocol.rs index 3f21327b..baf8c944 100644 --- a/crates/unftp-sbe-fs/examples/proxyprotocol.rs +++ b/crates/unftp-sbe-fs/examples/proxyprotocol.rs @@ -7,7 +7,10 @@ pub async fn main() { let addr = "127.0.0.1:2121"; let server = libunftp::Server::with_fs(std::env::temp_dir()) .proxy_protocol_mode(2121) - .passive_ports(5000..5005); + .passive_ports(5000..5005) + .build() + .await + .unwrap(); println!("Starting ftp server with proxy protocol on {}", addr); server.listen(addr).await.unwrap(); diff --git a/crates/unftp-sbe-fs/src/cap_fs.rs b/crates/unftp-sbe-fs/src/cap_fs.rs new file mode 100644 index 00000000..5d3f1280 --- /dev/null +++ b/crates/unftp-sbe-fs/src/cap_fs.rs @@ -0,0 +1,106 @@ +//! A capabilities-friendly workalike of tokio::fs +// Most of these functions are copied almost verbatim from tokio::fs, but with the std parts +// replaced by cap_std. + +use std::{io, path::Path, sync::Arc}; + +use tokio::{sync::mpsc, task::spawn_blocking}; +use tokio_stream::wrappers::ReceiverStream; + +/// Exact copy of tokio::fs::asyncify +async fn asyncify(f: F) -> io::Result +where + F: FnOnce() -> io::Result + Send + 'static, + T: Send + 'static, +{ + match spawn_blocking(f).await { + Ok(res) => res, + Err(_) => Err(io::Error::new(io::ErrorKind::Other, "background task failed")), + } +} + +/// Create a new directory somewhere under this one +pub async fn create_dir>(root: Arc, path: P) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || root.create_dir(path)).await +} + +pub async fn open>(root: Arc, path: P) -> io::Result { + let path = path.as_ref().to_owned(); + asyncify(move || root.open(path)).await +} + +pub async fn open_with>(root: Arc, path: P, options: cap_std::fs::OpenOptions) -> io::Result { + let path = path.as_ref().to_owned(); + asyncify(move || root.open_with(path, &options)).await +} + +/// Returns a stream over the entries within a directory. +/// +/// This is a capabilties-based, async version of [`std::fs::read_dir`](std::fs::read_dir) +/// +/// This operation is implemented by running the equivalent blocking +/// operation on a separate thread pool using [`spawn_blocking`]. +/// +/// [`spawn_blocking`]: tokio::task::spawn_blocking +pub fn read_dir(root: Arc, path: impl AsRef) -> ReceiverStream> { + const CHUNKSIZE: usize = 32; + + let path = path.as_ref().to_owned(); + let (tx, rx) = mpsc::channel(CHUNKSIZE); + tokio::spawn(spawn_blocking(move || { + let r = root.read_dir(path); + match r { + Ok(rd) => { + for entry in rd { + tx.blocking_send(entry).unwrap() + } + } + Err(e) => tx.blocking_send(Err(e)).unwrap(), + } + })); + ReceiverStream::new(rx) +} + +/// Removes an existing, empty directory. +/// +/// This is a capability-based, async version of +/// [`std::fs::remove_dir`](std::fs::remove_dir) +pub async fn remove_dir(root: Arc, path: impl AsRef) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || root.remove_dir(path)).await +} + +/// Removes a file from the filesystem. +/// +/// Note that there is no guarantee that the file is immediately deleted (e.g. +/// depending on platform, other open file descriptors may prevent immediate +/// removal). +/// +/// This is a capabilities-based, async version of [`std::fs::remove_file`][std] +/// +/// [std]: std::fs::remove_file +pub async fn remove_file(root: Arc, path: impl AsRef) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || root.remove_file(path)).await +} + +/// Renames a file or directory to a new name, replacing the original file if +/// `to` already exists. +/// +/// This will not work if the new name is on a different mount point. +/// +/// This is a capabilities-based async version of +/// [`std::fs::rename`](std::fs::rename) +pub async fn rename(root: Arc, from: impl AsRef, to: impl AsRef) -> io::Result<()> { + let from = from.as_ref().to_owned(); + let to = to.as_ref().to_owned(); + + asyncify(move || root.rename(from, &root, to)).await +} + +/// Queries the file system metadata for a path. +pub async fn symlink_metadata>(root: Arc, path: P) -> io::Result { + let path = path.as_ref().to_owned(); + asyncify(move || root.symlink_metadata(path)).await +} diff --git a/crates/unftp-sbe-fs/src/ext.rs b/crates/unftp-sbe-fs/src/ext.rs index e627eb08..c55feadd 100644 --- a/crates/unftp-sbe-fs/src/ext.rs +++ b/crates/unftp-sbe-fs/src/ext.rs @@ -1,6 +1,6 @@ use crate::Filesystem; use libunftp::auth::DefaultUser; -use libunftp::Server; +use libunftp::{Server, ServerBuilder}; use std::path::PathBuf; /// Extension trait purely for construction convenience. @@ -15,9 +15,9 @@ pub trait ServerExt { /// /// let server = Server::with_fs("/srv/ftp"); /// ``` - fn with_fs + Send + 'static>(path: P) -> Server { + fn with_fs + Send + 'static>(path: P) -> ServerBuilder { let p = path.into(); - libunftp::Server::new(Box::new(move || { + libunftp::ServerBuilder::new(Box::new(move || { let p = &p.clone(); Filesystem::new(p) })) diff --git a/crates/unftp-sbe-fs/src/lib.rs b/crates/unftp-sbe-fs/src/lib.rs index 93774590..835fb7c7 100644 --- a/crates/unftp-sbe-fs/src/lib.rs +++ b/crates/unftp-sbe-fs/src/lib.rs @@ -11,7 +11,10 @@ //! let ftp_home = std::env::temp_dir(); //! let server = libunftp::Server::with_fs(ftp_home) //! .greeting("Welcome to my FTP server") -//! .passive_ports(50000..65535); +//! .passive_ports(50000..65535) +//! .build() +//! .await +//! .unwrap(); //! //! server.listen("127.0.0.1:2121").await; //! } @@ -20,23 +23,25 @@ mod ext; pub use ext::ServerExt; +mod cap_fs; + use async_trait::async_trait; use cfg_if::cfg_if; +use futures::{future::TryFutureExt, stream::TryStreamExt}; +use lazy_static::lazy_static; use libunftp::auth::UserDetail; use libunftp::storage::{Error, ErrorKind, Fileinfo, Metadata, Result, StorageBackend}; use std::{ fmt::Debug, + io, path::{Path, PathBuf}, + sync::Arc, time::SystemTime, }; +use tokio::io::AsyncSeekExt; -cfg_if! { - if #[cfg(target_os = "linux")] { - use std::os::linux::fs::MetadataExt; - } else if #[cfg(target_os = "unix")] { - use std::os::unix::fs::MetadataExt; - } -} +#[cfg(target_os = "unix")] +use std::os::unix::fs::MetadataExt; /// The Filesystem struct is an implementation of the StorageBackend trait that keeps its files /// inside a specific root directory on local disk. @@ -44,23 +49,30 @@ cfg_if! { /// [`Filesystem`]: ./trait.Filesystem.html #[derive(Debug)] pub struct Filesystem { + // The Arc is necessary so we can pass it to async closures. Which is of dubious utility + // anyway, since most of those closures execute functions like fstatfs that are faster than the + // cost of switching a thread. + root_fd: Arc, root: PathBuf, } #[derive(Debug)] pub struct Meta { - inner: std::fs::Metadata, + inner: cap_std::fs::Metadata, } -/// Returns the canonical path corresponding to the input path, sequences like '../' resolved. -/// -/// I may decide to make this part of just the Filesystem implementation, because strictly speaking -/// '../' is only special on the context of a filesystem. Then again, FTP does kind of imply a -/// filesystem... hmm... -fn canonicalize>(path: P) -> Result { - use path_abs::PathAbs; - let p = PathAbs::new(path).map_err(|_| Error::from(ErrorKind::FileNameNotAllowedError))?; - Ok(p.as_path().to_path_buf()) +/// Strip the "/" prefix, if any, from a path. Suitable for preprocessing the input pathnames +/// supplied by the FTP client. +fn strip_prefixes(path: &Path) -> &Path { + lazy_static! { + static ref DOT: PathBuf = PathBuf::from("."); + static ref SLASH: PathBuf = PathBuf::from("/"); + } + if path == SLASH.as_path() { + DOT.as_path() + } else { + path.strip_prefix("/").unwrap_or(path) + } } impl Filesystem { @@ -69,32 +81,9 @@ impl Filesystem { /// asks for `hello.txt`, the server will send it `/srv/ftp/hello.txt`. pub fn new>(root: P) -> Self { let path = root.into(); - Filesystem { - root: canonicalize(&path).unwrap_or(path), - } - } - - /// Returns the full, absolute and canonical path corresponding to the (relative to FTP root) - /// input path, resolving symlinks and sequences like '../'. - async fn full_path>(&self, path: P) -> Result { - // `path.join(other_path)` replaces `path` with `other_path` if `other_path` is absolute, - // so we have to check for it. - let path = path.as_ref(); - let full_path = if path.starts_with("/") { - self.root.join(path.strip_prefix("/").unwrap()) - } else { - self.root.join(path) - }; - - let real_full_path = tokio::task::spawn_blocking(move || canonicalize(full_path)) - .await - .map_err(|e| Error::new(ErrorKind::LocalError, e))??; - - if real_full_path.starts_with(&self.root) { - Ok(real_full_path) - } else { - Err(Error::from(ErrorKind::PermanentFileNotAvailable)) - } + let aa = cap_std::ambient_authority(); + let root_fd = Arc::new(cap_std::fs::Dir::open_ambient_dir(&path, aa).unwrap()); + Filesystem { root_fd, root: path } } } @@ -102,15 +91,25 @@ impl Filesystem { impl StorageBackend for Filesystem { type Metadata = Meta; + fn enter(&mut self, user_detail: &User) -> io::Result<()> { + if let Some(path) = user_detail.home() { + let relpath = match path.strip_prefix(self.root.as_path()) { + Ok(r) => r, + Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "Path not a descendant of the previous root")), + }; + self.root_fd = Arc::new(self.root_fd.open_dir(relpath)?); + } + Ok(()) + } + fn supported_features(&self) -> u32 { libunftp::storage::FEATURE_RESTART | libunftp::storage::FEATURE_SITEMD5 } #[tracing_attributes::instrument] async fn metadata + Send + Debug>(&self, _user: &User, path: P) -> Result { - let full_path = self.full_path(path).await?; - - let fs_meta = tokio::fs::symlink_metadata(full_path) + let path = strip_prefixes(path.as_ref()); + let fs_meta = cap_fs::symlink_metadata(self.root_fd.clone(), &path) .await .map_err(|_| Error::from(ErrorKind::PermanentFileNotAvailable))?; Ok(Meta { inner: fs_meta }) @@ -123,32 +122,27 @@ impl StorageBackend for Filesystem { P: AsRef + Send + Debug, >::Metadata: Metadata, { - let full_path: PathBuf = self.full_path(path).await?; - - let prefix: PathBuf = self.root.clone(); - - let mut rd: tokio::fs::ReadDir = tokio::fs::read_dir(full_path).await?; - - let mut fis: Vec> = vec![]; - while let Ok(Some(dir_entry)) = rd.next_entry().await { - let prefix = prefix.clone(); - let path = dir_entry.path(); - let relpath = path.strip_prefix(prefix).unwrap(); - let relpath: PathBuf = std::path::PathBuf::from(relpath); - let metadata = tokio::fs::symlink_metadata(dir_entry.path()).await?; - let meta: Self::Metadata = Meta { inner: metadata }; - fis.push(Fileinfo { path: relpath, metadata: meta }) - } + let path = strip_prefixes(path.as_ref()); + + let fis: Vec> = cap_fs::read_dir(self.root_fd.clone(), path) + .and_then(|dirent| { + let entry_path: PathBuf = dirent.file_name().into(); + cap_fs::symlink_metadata(self.root_fd.clone(), path.join(entry_path.clone())).map_ok(move |meta| { + let metadata = Meta { inner: meta }; + Fileinfo { path: entry_path, metadata } + }) + }) + .try_collect::>() + .await?; Ok(fis) } //#[tracing_attributes::instrument] async fn get + Send + Debug>(&self, _user: &User, path: P, start_pos: u64) -> Result> { - use tokio::io::AsyncSeekExt; - - let full_path = self.full_path(path).await?; - let mut file = tokio::fs::File::open(full_path).await?; + let path = strip_prefixes(path.as_ref()); + let file = cap_fs::open(self.root_fd.clone(), path).await?; + let mut file = tokio::fs::File::from_std(file.into_std()); if start_pos > 0 { file.seek(std::io::SeekFrom::Start(start_pos)).await?; } @@ -163,16 +157,13 @@ impl StorageBackend for Filesystem { path: P, start_pos: u64, ) -> Result { - use tokio::io::AsyncSeekExt; // TODO: Add permission checks - let path = path.as_ref(); - let full_path = if path.starts_with("/") { - self.root.join(path.strip_prefix("/").unwrap()) - } else { - self.root.join(path) - }; - - let mut file = tokio::fs::OpenOptions::new().write(true).create(true).open(full_path).await?; + + let path = strip_prefixes(path.as_ref()); + let mut oo = cap_std::fs::OpenOptions::new(); + oo.write(true).create(true); + let file = cap_fs::open_with(self.root_fd.clone(), path, oo).await?; + let mut file = tokio::fs::File::from_std(file.into_std()); file.set_len(start_pos).await?; file.seek(std::io::SeekFrom::Start(start_pos)).await?; @@ -185,35 +176,38 @@ impl StorageBackend for Filesystem { #[tracing_attributes::instrument] async fn del + Send + Debug>(&self, _user: &User, path: P) -> Result<()> { - let full_path = self.full_path(path).await?; - tokio::fs::remove_file(full_path).await.map_err(|error: std::io::Error| error.into()) + let path = strip_prefixes(path.as_ref()); + cap_fs::remove_file(self.root_fd.clone(), path) + .await + .map_err(|error: std::io::Error| error.into()) } #[tracing_attributes::instrument] async fn rmd + Send + Debug>(&self, _user: &User, path: P) -> Result<()> { - let full_path = self.full_path(path).await?; - tokio::fs::remove_dir(full_path).await.map_err(|error: std::io::Error| error.into()) + let path = strip_prefixes(path.as_ref()); + cap_fs::remove_dir(self.root_fd.clone(), path) + .await + .map_err(|error: std::io::Error| error.into()) } #[tracing_attributes::instrument] async fn mkd + Send + Debug>(&self, _user: &User, path: P) -> Result<()> { - tokio::fs::create_dir(self.full_path(path).await?) + let path = strip_prefixes(path.as_ref()); + cap_fs::create_dir(self.root_fd.clone(), path) .await .map_err(|error: std::io::Error| error.into()) } #[tracing_attributes::instrument] async fn rename + Send + Debug>(&self, _user: &User, from: P, to: P) -> Result<()> { - let from = self.full_path(from).await?; - let to = self.full_path(to).await?; - - let from_rename = from.clone(); + let from = from.as_ref().strip_prefix("/").unwrap_or(from.as_ref()); + let to = to.as_ref().strip_prefix("/").unwrap_or(to.as_ref()); - let r = tokio::fs::symlink_metadata(from).await; + let r = cap_fs::symlink_metadata(self.root_fd.clone(), &from).await; match r { Ok(metadata) => { if metadata.is_file() || metadata.is_dir() { - let r = tokio::fs::rename(from_rename, to).await; + let r = cap_fs::rename(self.root_fd.clone(), from, to).await; match r { Ok(_) => Ok(()), Err(e) => Err(Error::new(ErrorKind::PermanentFileNotAvailable, e)), @@ -227,9 +221,8 @@ impl StorageBackend for Filesystem { } #[tracing_attributes::instrument] - async fn cwd + Send + Debug>(&self, _user: &User, path: P) -> Result<()> { - let full_path = self.full_path(path).await?; - tokio::fs::read_dir(full_path).await.map_err(|error: std::io::Error| error.into()).map(|_| ()) + async fn cwd + Send + Debug>(&self, user: &User, path: P) -> Result<()> { + self.list(user, path).await.map(drop) } } @@ -251,15 +244,12 @@ impl Metadata for Meta { } fn modified(&self) -> Result { - self.inner.modified().map_err(|e| e.into()) + self.inner.modified().map(cap_std::time::SystemTime::into_std).map_err(|e| e.into()) } fn gid(&self) -> u32 { cfg_if! { - if #[cfg(target_os = "linux")] { - self.inner.st_gid() - } - else if #[cfg(target_os = "unix")] { + if #[cfg(target_os = "unix")] { self.inner.gid() } else { 0 @@ -269,10 +259,7 @@ impl Metadata for Meta { fn uid(&self) -> u32 { cfg_if! { - if #[cfg(target_os = "linux")] { - self.inner.st_uid() - } - else if #[cfg(target_os = "unix")] { + if #[cfg(target_os = "unix")] { self.inner.uid() } else { 0 @@ -282,10 +269,7 @@ impl Metadata for Meta { fn links(&self) -> u64 { cfg_if! { - if #[cfg(target_os = "linux")] { - self.inner.st_nlink() - } - else if #[cfg(target_os = "unix")] { + if #[cfg(target_os = "unix")] { self.inner.nlink() } else { 1 diff --git a/crates/unftp-sbe-fs/src/tests.rs b/crates/unftp-sbe-fs/src/tests.rs index f249a10a..e3b108f2 100644 --- a/crates/unftp-sbe-fs/src/tests.rs +++ b/crates/unftp-sbe-fs/src/tests.rs @@ -6,6 +6,13 @@ use std::io::prelude::*; use std::io::Write; use tokio::runtime::Runtime; +#[test] +fn fs_strip_prefixes() { + assert_eq!(strip_prefixes(Path::new("foo/bar")), Path::new("foo/bar")); + assert_eq!(strip_prefixes(Path::new("/foo/bar")), Path::new("foo/bar")); + assert_eq!(strip_prefixes(Path::new("/")), Path::new(".")); +} + #[test] fn fs_stat() { let root = std::env::temp_dir(); diff --git a/crates/unftp-sbe-fs/tests/main.rs b/crates/unftp-sbe-fs/tests/main.rs index cd522c9d..3c8d9e75 100644 --- a/crates/unftp-sbe-fs/tests/main.rs +++ b/crates/unftp-sbe-fs/tests/main.rs @@ -1,5 +1,5 @@ use async_ftp::{types::Result, FtpStream}; -use libunftp::{auth::DefaultUser, options::FtpsRequired, Server}; +use libunftp::{auth::DefaultUser, options::FtpsRequired, ServerBuilder}; use pretty_assertions::assert_eq; use rstest::{fixture, rstest}; use std::fmt::Debug; @@ -32,14 +32,14 @@ struct Harness { async fn custom_server_harness(s: S) -> Harness where - S: Fn(PathBuf) -> Server, + S: Fn(PathBuf) -> ServerBuilder, { let port = TESTPORT.fetch_add(1, Ordering::Relaxed); let addr = format!("127.0.0.1:{}", port); let tempdir = tempfile::TempDir::new().unwrap(); let root = tempdir.path().to_path_buf(); - let server = s(root.clone()).listen(addr.clone()); + let server = s(root.clone()).build().await.unwrap().listen(addr.clone()); tokio::spawn(server); while async_ftp::FtpStream::connect(&addr).await.is_err() { diff --git a/crates/unftp-sbe-gcs/examples/gcs.rs b/crates/unftp-sbe-gcs/examples/gcs.rs index d825e93d..71421ecc 100644 --- a/crates/unftp-sbe-gcs/examples/gcs.rs +++ b/crates/unftp-sbe-gcs/examples/gcs.rs @@ -87,16 +87,22 @@ pub async fn main() -> Result<(), Box> { let ftps_key_file = matches .value_of(FTPS_KEY_FILE) .ok_or("Internal error: use of an undefined command line parameter")?; - libunftp::Server::new(Box::new(move || { + libunftp::ServerBuilder::new(Box::new(move || { unftp_sbe_gcs::CloudStorage::with_api_base(&gcs_base_url, &bucket_name, PathBuf::new(), service_account_key.clone()) })) .ftps(ftps_certs_file, ftps_key_file) + .build() + .await + .unwrap() .listen(BIND_ADDRESS) .await?; } else { - libunftp::Server::new(Box::new(move || { + libunftp::ServerBuilder::new(Box::new(move || { unftp_sbe_gcs::CloudStorage::with_api_base(&gcs_base_url, &bucket_name, PathBuf::new(), service_account_key.clone()) })) + .build() + .await + .unwrap() .listen(BIND_ADDRESS) .await?; } diff --git a/crates/unftp-sbe-gcs/src/ext.rs b/crates/unftp-sbe-gcs/src/ext.rs index 8294b60b..a7a9ee32 100644 --- a/crates/unftp-sbe-gcs/src/ext.rs +++ b/crates/unftp-sbe-gcs/src/ext.rs @@ -1,7 +1,7 @@ use crate::options::AuthMethod; use crate::CloudStorage; use libunftp::auth::DefaultUser; -use libunftp::Server; +use libunftp::{Server, ServerBuilder}; use std::path::PathBuf; /// Extension trait purely for construction convenience. @@ -15,16 +15,17 @@ pub trait ServerExt { /// use unftp_sbe_gcs::{ServerExt, options::AuthMethod}; /// use std::path::PathBuf; /// - /// let server = Server::with_gcs("my-bucket", PathBuf::from("/unftp"), AuthMethod::WorkloadIdentity(None)); + /// let server = Server::with_gcs("my-bucket", PathBuf::from("/unftp"), AuthMethod::WorkloadIdentity(None)) + /// .build(); /// ``` - fn with_gcs(bucket: Str, root: PathBuf, auth: AuthHow) -> Server + fn with_gcs(bucket: Str, root: PathBuf, auth: AuthHow) -> ServerBuilder where Str: Into, AuthHow: Into, { let s = bucket.into(); let a = auth.into(); - libunftp::Server::new(Box::new(move || CloudStorage::with_bucket_root(s.clone(), root.clone(), a.clone()))) + libunftp::ServerBuilder::new(Box::new(move || CloudStorage::with_bucket_root(s.clone(), root.clone(), a.clone()))) } } diff --git a/crates/unftp-sbe-gcs/src/lib.rs b/crates/unftp-sbe-gcs/src/lib.rs index eb3d2ce5..b34b1544 100644 --- a/crates/unftp-sbe-gcs/src/lib.rs +++ b/crates/unftp-sbe-gcs/src/lib.rs @@ -28,7 +28,10 @@ //! pub async fn main() { //! let server = Server::with_gcs("my-bucket", PathBuf::from("/unftp"), AuthMethod::WorkloadIdentity(None)) //! .greeting("Welcome to my FTP server") -//! .passive_ports(50000..65535); +//! .passive_ports(50000..65535) +//! .build() +//! .await +//! .unwrap(); //! //! server.listen("127.0.0.1:2121").await; //! } @@ -48,7 +51,10 @@ //! Box::new(move || CloudStorage::with_bucket_root("my-bucket", PathBuf::from("/ftp-root"), AuthMethod::WorkloadIdentity(None))) //! ) //! .greeting("Welcome to my FTP server") -//! .passive_ports(50000..65535); +//! .passive_ports(50000..65535) +//! .build() +//! .await +//! .unwrap(); //! //! server.listen("127.0.0.1:2121").await; //! } diff --git a/crates/unftp-sbe-gcs/tests/main.rs b/crates/unftp-sbe-gcs/tests/main.rs index e9ebceb7..4527773f 100644 --- a/crates/unftp-sbe-gcs/tests/main.rs +++ b/crates/unftp-sbe-gcs/tests/main.rs @@ -1,6 +1,6 @@ use async_ftp::FtpStream; use lazy_static::*; -use libunftp::Server; +use libunftp::ServerBuilder; use unftp_sbe_gcs::CloudStorage; use more_asserts::assert_ge; @@ -279,10 +279,13 @@ async fn run_test(test: impl Future) { let drain = slog_async::Async::new(drain).build().fuse(); tokio::spawn( - Server::new(Box::new(move || { + ServerBuilder::new(Box::new(move || { CloudStorage::with_api_base(GCS_BASE_URL, GCS_BUCKET, PathBuf::from("/"), AuthMethod::None) })) .logger(Some(Logger::root(drain, o!()))) + .build() + .await + .unwrap() .listen(ADDR), ); diff --git a/src/auth/user.rs b/src/auth/user.rs index 905a9138..b6da5f91 100644 --- a/src/auth/user.rs +++ b/src/auth/user.rs @@ -1,4 +1,7 @@ -use std::fmt::{self, Debug, Display, Formatter}; +use std::{ + fmt::{self, Debug, Display, Formatter}, + path::Path, +}; /// UserDetail defines the requirements for implementations that hold _Security Subject_ /// information for use by the server. @@ -16,6 +19,14 @@ pub trait UserDetail: Send + Sync + Display + Debug { fn account_enabled(&self) -> bool { true } + + /// Returns the user's home directory, if any. If the user has a home directory, then their + /// sessions will be restricted to this directory. + /// + /// The path should be absolute. + fn home(&self) -> Option<&Path> { + None + } } /// DefaultUser is a default implementation of the `UserDetail` trait that doesn't hold any user diff --git a/src/lib.rs b/src/lib.rs index 2be2decf..30e7e2c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,10 @@ //! let ftp_home = std::env::temp_dir(); //! let server = libunftp::Server::with_fs(ftp_home) //! .greeting("Welcome to my FTP server") -//! .passive_ports(50000..65535); +//! .passive_ports(50000..65535) +//! .build() +//! .await +//! .unwrap(); //! //! server.listen("127.0.0.1:2121").await; //! } @@ -50,6 +53,6 @@ pub mod notification; pub(crate) mod server; pub mod storage; -pub use crate::server::ftpserver::{error::ServerError, options, Server}; +pub use crate::server::ftpserver::{error::ServerError, options, Server, ServerBuilder}; type BoxError = Box; diff --git a/src/notification/event.rs b/src/notification/event.rs index 88fea758..100db878 100644 --- a/src/notification/event.rs +++ b/src/notification/event.rs @@ -70,7 +70,7 @@ pub struct EventMeta { } /// An listener for [`DataEvent`](crate::notification::DataEvent)s. Implementations can -/// be passed to [`Server::notify_data`](crate::Server::notify_data) +/// be passed to [`ServerBuilder::notify_data`](crate::ServerBuilder::notify_data) /// in order to receive notifications. #[async_trait] pub trait DataListener: Sync + Send + Debug { @@ -79,8 +79,8 @@ pub trait DataListener: Sync + Send + Debug { async fn receive_data_event(&self, e: DataEvent, m: EventMeta); } -/// An listener for [`PresenceEvent`](crate::notification::PresenceEvent)s. Implementations can -/// be passed to [`Server::notify_presence`](crate::Server::notify_presence) +/// A listener for [`PresenceEvent`](crate::notification::PresenceEvent)s. Implementations can +/// be passed to [`ServerBuilder::notify_presence`](crate::ServerBuilder::notify_presence) /// in order to receive notifications. #[async_trait] pub trait PresenceListener: Sync + Send + Debug { diff --git a/src/notification/mod.rs b/src/notification/mod.rs index 780a3be0..691ee55f 100644 --- a/src/notification/mod.rs +++ b/src/notification/mod.rs @@ -3,11 +3,11 @@ //! Allows users to listen to events emitted by libunftp. //! //! To listen for changes in data implement the [`DataListener`] -//! trait and use the [`Server::notify_data`](crate::Server::notify_data) method +//! trait and use the [`ServerBuilder::notify_data`](crate::ServerBuilder::notify_data) method //! to make libunftp notify it. //! //! To listen to logins and logouts implement the [`PresenceListener`] -//! trait and use the [`Server::notify_presence`](crate::Server::notify_data) method +//! trait and use the [`ServerBuilder::notify_presence`](crate::ServerBuilder::notify_data) method //! to make libunftp use it. //! diff --git a/src/server/controlchan/commands/pass.rs b/src/server/controlchan/commands/pass.rs index 9c8dd97e..4bbbc917 100644 --- a/src/server/controlchan/commands/pass.rs +++ b/src/server/controlchan/commands/pass.rs @@ -104,11 +104,27 @@ where ControlChanMsg::AuthFailed } else if user.account_enabled() { let mut session = session2clone.lock().await; - slog::info!(logger, "PASS: User {} logged in", user); - session.user = Arc::new(Some(user)); - ControlChanMsg::AuthSuccess { - username, - trace_id: session.trace_id, + // Using Arc::get_mut means that this won't work if the Session is + // currently servicing multiple commands concurrently. But it + // shouldn't ever be servicing PASS at the same time as another + // command. + match Arc::get_mut(&mut session.storage).map(|s| s.enter(&user)) { + Some(Err(e)) => { + slog::error!(logger, "{}", e); + ControlChanMsg::AuthFailed + } + None => { + slog::error!(logger, "Failed to lock Session::storage during PASS."); + ControlChanMsg::AuthFailed + } + Some(Ok(())) => { + slog::info!(logger, "PASS: User {} logged in", user); + session.user = Arc::new(Some(user)); + ControlChanMsg::AuthSuccess { + username, + trace_id: session.trace_id, + } + } } } else { slog::warn!(logger, "PASS: User {} authenticated but account is disabled", user); diff --git a/src/server/controlchan/commands/pasv.rs b/src/server/controlchan/commands/pasv.rs index ae55f6cf..71c2b2dd 100644 --- a/src/server/controlchan/commands/pasv.rs +++ b/src/server/controlchan/commands/pasv.rs @@ -24,8 +24,11 @@ use crate::{ }; use async_trait::async_trait; use std::{io, net::SocketAddr, ops::Range}; -use std::{net::Ipv4Addr, time::Duration}; -use tokio::net::TcpListener; +use std::{ + net::{IpAddr, Ipv4Addr}, + time::Duration, +}; +use tokio::net::TcpSocket; use tokio::sync::mpsc::{channel, Receiver, Sender}; const BIND_RETRIES: u8 = 10; @@ -39,10 +42,10 @@ impl Pasv { } #[tracing_attributes::instrument] - async fn try_port_range(local_addr: SocketAddr, passive_ports: Range) -> io::Result { + fn try_port_range(local_addr: IpAddr, passive_ports: Range) -> io::Result { let rng_length = passive_ports.end - passive_ports.start + 1; - let mut listener: io::Result = Err(io::Error::new(io::ErrorKind::InvalidInput, "Bind retries cannot be 0")); + let mut socket: io::Result = Err(io::Error::new(io::ErrorKind::InvalidInput, "Bind retries cannot be 0")); for _ in 1..BIND_RETRIES { let random_u32 = { @@ -52,13 +55,14 @@ impl Pasv { }; let port = random_u32 % rng_length as u32 + passive_ports.start as u32; - listener = TcpListener::bind(std::net::SocketAddr::new(local_addr.ip(), port as u16)).await; - if listener.is_ok() { + let s = TcpSocket::new_v4()?; + if s.bind(std::net::SocketAddr::new(local_addr, port as u16)).is_ok() { + socket = Ok(s); break; } } - listener + socket } // modifies the session by adding channels that are used to communicate with the data connection @@ -107,12 +111,16 @@ impl Pasv { } }; - let listener = Pasv::try_port_range(args.local_addr, args.passive_ports).await; - + let listener = if let Some(ref mut binder) = session.lock().await.binder { + binder.bind(args.local_addr.ip(), args.passive_ports).await + } else { + Pasv::try_port_range(args.local_addr.ip(), args.passive_ports) + }; let listener = match listener { Err(_) => return Ok(Reply::new(ReplyCode::CantOpenDataConnection, "No data connection established")), Ok(l) => l, }; + let listener = listener.listen(1024).unwrap(); let port = listener.local_addr()?.port(); @@ -127,7 +135,8 @@ impl Pasv { // We cannot await this since we first need to let the client know where to connect :-) tokio::spawn(async move { // Timeout if the client doesn't connect to the socket in a while, to avoid leaving the socket hanging open permanently. - match tokio::time::timeout(Duration::from_secs(15), listener.accept()).await { + let r = tokio::time::timeout(Duration::from_secs(15), listener.accept()).await; + match r { Ok(Ok((socket, _socket_addr))) => datachan::spawn_processing(logger, session, socket).await, Ok(Err(e)) => slog::error!(logger, "Error waiting for data connection: {}", e), Err(_) => slog::warn!(logger, "Client did not connect to data port in time"), diff --git a/src/server/controlchan/commands/user.rs b/src/server/controlchan/commands/user.rs index e4554948..6532697d 100644 --- a/src/server/controlchan/commands/user.rs +++ b/src/server/controlchan/commands/user.rs @@ -54,10 +54,25 @@ where match auth_result { Ok(user_detail) => { let user = username_str; - session.username = Some(user.to_string()); - session.state = SessionState::WaitCmd; - session.user = Arc::new(Some(user_detail)); - Ok(Reply::new(ReplyCode::UserLoggedInViaCert, "User logged in")) + // Using Arc::get_mut means that this won't work if the Session is + // currently servicing multiple commands concurrently. But it shouldn't + // ever be servicing USER at the same time as another command. + match Arc::get_mut(&mut session.storage).map(|s| s.enter(&user_detail)) { + Some(Err(e)) => { + slog::error!(args.logger, "{}", e); + Ok(Reply::new(ReplyCode::NotLoggedIn, "Invalid credentials")) + } + None => { + slog::error!(args.logger, "Failed to lock Session::storage during USER."); + Ok(Reply::new(ReplyCode::NotLoggedIn, "Temporarily unavailable")) + } + Some(Ok(())) => { + session.username = Some(user.to_string()); + session.state = SessionState::WaitCmd; + session.user = Arc::new(Some(user_detail)); + Ok(Reply::new(ReplyCode::UserLoggedInViaCert, "User logged in")) + } + } } Err(_e) => Ok(Reply::new(ReplyCode::NotLoggedIn, "Invalid credentials")), } diff --git a/src/server/controlchan/control_loop.rs b/src/server/controlchan/control_loop.rs index 2effe13a..202cec80 100644 --- a/src/server/controlchan/control_loop.rs +++ b/src/server/controlchan/control_loop.rs @@ -41,6 +41,7 @@ use tokio::{ mpsc::{channel, Receiver, Sender}, Mutex, }, + task::JoinHandle, }; use tokio_util::codec::{Decoder, Framed}; @@ -69,6 +70,7 @@ where pub data_listener: Arc, pub presence_listener: Arc, pub active_passive_mode: ActivePassiveMode, + pub binder: Arc>>>, } /// Does TCP processing when an FTP client connects @@ -80,7 +82,7 @@ pub(crate) async fn spawn( proxyloop_msg_tx: Option>, mut shutdown: shutdown::Listener, failed_logins: Option>, -) -> Result<(), ControlChanError> +) -> Result, ControlChanError> where User: UserDetail + 'static, Storage: StorageBackend + 'static, @@ -101,25 +103,29 @@ where data_listener, presence_listener, active_passive_mode, + binder, .. } = config; let tls_configured = matches!(ftps_config, FtpsConfig::On { .. }); let storage_features = storage.supported_features(); let (control_msg_tx, mut control_msg_rx): (Sender, Receiver) = channel(1); - let session: Session = Session::new(Arc::new(storage), tcp_stream.peer_addr()?) + let local_addr = tcp_stream.local_addr()?; + let mut session: Session = Session::new(Arc::new(storage), tcp_stream.peer_addr()?) .ftps(ftps_config.clone()) .metrics(collect_metrics) .control_msg_tx(control_msg_tx.clone()) .proxy_connection(proxy_connection) .failed_logins(failed_logins); + if let Some(b) = binder.lock().unwrap().take() { + session = session.binder(b); + } let mut logger = logger.new( slog::o!("trace-id" => format!("{}", session.trace_id), "source" => format!("{}", session.proxy_control.map(|p| p.source).unwrap_or(session.source))), ); let shared_session: SharedSession = Arc::new(Mutex::new(session)); - let local_addr = tcp_stream.local_addr()?; let event_chain = PrimaryEventHandler { logger: logger.clone(), @@ -177,7 +183,7 @@ where reply_sink.send(Reply::new(ReplyCode::ServiceReady, config.greeting)).await?; reply_sink.flush().await?; - tokio::spawn(async move { + let jh = tokio::spawn(async move { // The control channel event loop slog::info!(logger, "Starting control loop"); loop { @@ -292,7 +298,7 @@ where } }); - Ok(()) + Ok(jh) } // gets the reply to be sent to the client and tells if the connection should be closed. diff --git a/src/server/ftpserver.rs b/src/server/ftpserver.rs index eef2a249..c7274332 100644 --- a/src/server/ftpserver.rs +++ b/src/server/ftpserver.rs @@ -25,7 +25,7 @@ use crate::{ }; use options::{PassiveHost, DEFAULT_GREETING, DEFAULT_IDLE_SESSION_TIMEOUT_SECS}; use slog::*; -use std::{fmt::Debug, future::Future, net::SocketAddr, ops::Range, path::PathBuf, pin::Pin, sync::Arc, time::Duration}; +use std::{ffi::OsString, fmt::Debug, future::Future, net::SocketAddr, ops::Range, path::PathBuf, pin::Pin, sync::Arc, time::Duration}; /// An instance of an FTP(S) server. It aggregates an [`Authenticator`](crate::auth::Authenticator) /// implementation that will be used for authentication, and a [`StorageBackend`](crate::storage::StorageBackend) @@ -41,15 +41,44 @@ use std::{fmt::Debug, future::Future, net::SocketAddr, ops::Range, path::PathBuf /// use tokio::runtime::Runtime; /// /// let mut rt = Runtime::new().unwrap(); -/// let server = Server::with_fs("/srv/ftp"); -/// rt.spawn(server.listen("127.0.0.1:2121")); -/// // ... -/// drop(rt); +/// rt.spawn(async { +/// let server = Server::with_fs("/srv/ftp").build().await.unwrap(); +/// server.listen("127.0.0.1:2121").await.unwrap() +/// }); /// ``` /// /// [`Authenticator`]: auth::Authenticator /// [`StorageBackend`]: storage/trait.StorageBackend.html pub struct Server +where + Storage: StorageBackend, + User: UserDetail, +{ + storage: Arc Storage) + Send + Sync>, + greeting: &'static str, + authenticator: Arc>, + data_listener: Arc, + presence_listener: Arc, + passive_ports: Range, + passive_host: PassiveHost, + collect_metrics: bool, + ftps_mode: FtpsConfig, + ftps_required_control_chan: FtpsRequired, + ftps_required_data_chan: FtpsRequired, + idle_session_timeout: std::time::Duration, + proxy_protocol_mode: ProxyMode, + logger: slog::Logger, + site_md5: SiteMd5, + shutdown: Pin + Send + Sync>>, + failed_logins_policy: Option, + active_passive_mode: ActivePassiveMode, + connection_helper: Option, + connection_helper_args: Vec, + binder: Arc>>>, +} + +/// Used to create [`Server`]s. +pub struct ServerBuilder where Storage: StorageBackend, User: UserDetail, @@ -75,17 +104,20 @@ where shutdown: Pin + Send + Sync>>, failed_logins_policy: Option, active_passive_mode: ActivePassiveMode, + connection_helper: Option, + connection_helper_args: Vec, + binder: Option>, } -impl Server +impl ServerBuilder where Storage: StorageBackend + 'static, Storage::Metadata: Metadata, User: UserDetail + 'static, { - /// Construct a new [`Server`] with the given [`StorageBackend`] generator and an [`AnonymousAuthenticator`] + /// Construct a new [`ServerBuilder`] with the given [`StorageBackend`] generator and an [`AnonymousAuthenticator`] /// - /// [`Server`]: struct.Server.html + /// [`ServerBuilder`]: struct.ServerBuilder.html /// [`StorageBackend`]: ../storage/trait.StorageBackend.html /// [`AnonymousAuthenticator`]: ../auth/struct.AnonymousAuthenticator.html pub fn new(sbe_generator: Box Storage) + Send + Sync>) -> Self @@ -95,19 +127,20 @@ where Self::with_authenticator(sbe_generator, Arc::new(AnonymousAuthenticator {})) } - /// Construct a new [`Server`] with the given [`StorageBackend`] generator and [`Authenticator`]. The other parameters will be set to defaults. + /// Construct a new [`ServerBuilder`] with the given [`StorageBackend`] generator and [`Authenticator`]. The other parameters will be set to defaults. /// - /// [`Server`]: struct.Server.html + /// [`ServerBuilder`]: struct.ServerBuilder.html /// [`StorageBackend`]: ../storage/trait.StorageBackend.html /// [`Authenticator`]: ../auth/trait.Authenticator.html pub fn with_authenticator(sbe_generator: Box Storage) + Send + Sync>, authenticator: Arc + Send + Sync>) -> Self { - Server { + let passive_ports = options::DEFAULT_PASSIVE_PORTS; + ServerBuilder { storage: Arc::from(sbe_generator), greeting: DEFAULT_GREETING, authenticator, data_listener: Arc::new(NopListener {}), presence_listener: Arc::new(NopListener {}), - passive_ports: options::DEFAULT_PASSIVE_PORTS, + passive_ports, passive_host: options::DEFAULT_PASSIVE_HOST, ftps_mode: FtpsConfig::Off, collect_metrics: false, @@ -123,6 +156,9 @@ where shutdown: Box::pin(futures_util::future::pending()), failed_logins_policy: None, active_passive_mode: ActivePassiveMode::default(), + connection_helper: None, + connection_helper_args: Vec::new(), + binder: None, } } @@ -136,8 +172,9 @@ where /// use std::sync::Arc; /// /// // Use it in a builder-like pattern: - /// let mut server = Server::with_fs("/tmp") - /// .authenticator(Arc::new(auth::AnonymousAuthenticator{})); + /// let server = Server::with_fs("/tmp") + /// .authenticator(Arc::new(auth::AnonymousAuthenticator{})) + /// .build(); /// ``` /// /// [`Authenticator`]: ../auth/trait.Authenticator.html @@ -159,13 +196,49 @@ where /// use unftp_sbe_fs::ServerExt; /// /// let server = Server::with_fs("/tmp") - /// .active_passive_mode(ActivePassiveMode::ActiveAndPassive); + /// .active_passive_mode(ActivePassiveMode::ActiveAndPassive) + /// .build(); /// ``` pub fn active_passive_mode>(mut self, mode: M) -> Self { self.active_passive_mode = mode.into(); self } + /// Finalize the options and build a [`Server`]. + pub async fn build(self) -> std::result::Result, ServerError> { + let ftps_mode = match self.ftps_mode { + FtpsConfig::Off => FtpsConfig::Off, + FtpsConfig::Building { certs_file, key_file } => FtpsConfig::On { + tls_config: tls::new_config(certs_file, key_file, self.ftps_tls_flags, self.ftps_client_auth, self.ftps_trust_store.clone())?, + }, + FtpsConfig::On { tls_config } => FtpsConfig::On { tls_config }, + }; + let binder = Arc::new(std::sync::Mutex::new(self.binder)); + Ok(Server { + storage: self.storage, + greeting: self.greeting, + authenticator: self.authenticator, + data_listener: self.data_listener, + presence_listener: self.presence_listener, + passive_ports: self.passive_ports, + passive_host: self.passive_host, + collect_metrics: self.collect_metrics, + ftps_mode, + ftps_required_control_chan: self.ftps_required_control_chan, + ftps_required_data_chan: self.ftps_required_data_chan, + idle_session_timeout: self.idle_session_timeout, + proxy_protocol_mode: self.proxy_protocol_mode, + logger: self.logger, + site_md5: self.site_md5, + shutdown: self.shutdown, + failed_logins_policy: self.failed_logins_policy, + active_passive_mode: self.active_passive_mode, + connection_helper: self.connection_helper, + connection_helper_args: self.connection_helper_args, + binder, + }) + } + /// Enables FTPS by configuring the path to the certificates file and the private key file. Both /// should be in PEM format. /// @@ -187,7 +260,7 @@ where } /// Allows switching on Mutual TLS. For this to work the trust anchors also needs to be set using - /// the [ftps_trust_store](crate::Server::ftps_trust_store) method. + /// the [ftps_trust_store](crate::ServerBuilder::ftps_trust_store) method. /// /// # Example /// @@ -222,7 +295,7 @@ where /// Sets the certificates to use when verifying client certificates in Mutual TLS mode. This /// should point to certificates in a PEM formatted file. For this to have any effect MTLS needs - /// to be switched on via the [ftps_client_auth](crate::Server::ftps_client_auth) method. + /// to be switched on via the [ftps_client_auth](crate::ServerBuilder::ftps_client_auth) method. /// /// # Example /// @@ -273,11 +346,13 @@ where /// use unftp_sbe_fs::ServerExt; /// /// // Use it in a builder-like pattern: - /// let mut server = Server::with_fs("/tmp").greeting("Welcome to my FTP Server"); - /// + /// let server = Server::with_fs("/tmp") + /// .greeting("Welcome to my FTP Server") + /// .build(); + // /// // Or instead if you prefer: /// let mut server = Server::with_fs("/tmp"); - /// server.greeting("Welcome to my FTP Server"); + /// server.greeting("Welcome to my FTP Server").build(); /// ``` pub fn greeting(mut self, greeting: &'static str) -> Self { self.greeting = greeting; @@ -319,11 +394,11 @@ where /// use unftp_sbe_fs::ServerExt; /// /// // Use it in a builder-like pattern: - /// let mut server = Server::with_fs("/tmp").metrics(); + /// let mut builder = Server::with_fs("/tmp").metrics(); /// /// // Or instead if you prefer: - /// let mut server = Server::with_fs("/tmp"); - /// server.metrics(); + /// let mut builder = Server::with_fs("/tmp"); + /// builder.metrics(); /// ``` pub fn metrics(mut self) -> Self { self.collect_metrics = true; @@ -356,7 +431,8 @@ where /// use unftp_sbe_fs::ServerExt; /// /// let server = Server::with_fs("/tmp") - /// .passive_host([127,0,0,1]); + /// .passive_host([127,0,0,1]) + /// .build(); /// ``` /// Or the same but more explicitly: /// @@ -366,7 +442,8 @@ where /// use std::net::Ipv4Addr; /// /// let server = Server::with_fs("/tmp") - /// .passive_host(options::PassiveHost::Ip(Ipv4Addr::new(127, 0, 0, 1))); + /// .passive_host(options::PassiveHost::Ip(Ipv4Addr::new(127, 0, 0, 1))) + /// .build(); /// ``` /// /// To determine the passive IP from the incoming control connection: @@ -376,7 +453,8 @@ where /// use unftp_sbe_fs::ServerExt; /// /// let server = Server::with_fs("/tmp") - /// .passive_host(options::PassiveHost::FromConnection); + /// .passive_host(options::PassiveHost::FromConnection) + /// .build(); /// ``` /// /// Get the IP by resolving a DNS name: @@ -386,13 +464,23 @@ where /// use unftp_sbe_fs::ServerExt; /// /// let server = Server::with_fs("/tmp") - /// .passive_host("ftp.myserver.org"); + /// .passive_host("ftp.myserver.org") + /// .build(); /// ``` pub fn passive_host>(mut self, host_option: H) -> Self { self.passive_host = host_option.into(); self } + /// Set a callback for binding sockets + /// + /// If present, this helper will be used for binding sockets instead of the standard routines + /// in std or tokio. It can be useful in capability mode. + pub fn binder(mut self, b: B) -> Self { + self.binder = Some(Box::new(b)); + self + } + /// Sets the range of passive ports that we'll use for passive connections. /// /// # Example @@ -402,12 +490,12 @@ where /// use unftp_sbe_fs::ServerExt; /// /// // Use it in a builder-like pattern: - /// let server = Server::with_fs("/tmp") + /// let builder = Server::with_fs("/tmp") /// .passive_ports(49152..65535); /// /// // Or instead if you prefer: - /// let mut server = Server::with_fs("/tmp"); - /// server.passive_ports(49152..65535); + /// let mut builder = Server::with_fs("/tmp"); + /// builder.passive_ports(49152..65535); /// ``` pub fn passive_ports(mut self, range: Range) -> Self { self.passive_ports = range; @@ -438,7 +526,9 @@ where /// use unftp_sbe_fs::ServerExt; /// /// // Use it in a builder-like pattern: - /// let mut server = Server::with_fs("/tmp").proxy_protocol_mode(2121); + /// let mut server = Server::with_fs("/tmp") + /// .proxy_protocol_mode(2121) + /// .build(); /// ``` pub fn proxy_protocol_mode(mut self, external_control_port: u16) -> Self { self.proxy_protocol_mode = external_control_port.into(); @@ -457,11 +547,13 @@ where /// use libunftp::Server; /// use unftp_sbe_fs::ServerExt; /// - /// let mut server = Server::with_fs("/tmp").shutdown_indicator(async { - /// tokio::time::sleep(Duration::from_secs(10)).await; // Shut the server down after 10 seconds. - /// libunftp::options::Shutdown::new() - /// .grace_period(Duration::from_secs(5)) // Allow 5 seconds to shutdown gracefully - /// }); + /// let server = Server::with_fs("/tmp") + /// .shutdown_indicator(async { + /// // Shut the server down after 10 seconds. + /// tokio::time::sleep(Duration::from_secs(10)).await; + /// libunftp::options::Shutdown::new() + /// .grace_period(Duration::from_secs(5)) // Allow 5 seconds to shutdown gracefully + /// }).build(); /// ``` pub fn shutdown_indicator(mut self, indicator: I) -> Self where @@ -486,13 +578,33 @@ where /// use unftp_sbe_fs::ServerExt; /// /// // Use it in a builder-like pattern: - /// let mut server = Server::with_fs("/tmp").sitemd5(SiteMd5::None); + /// let server = Server::with_fs("/tmp") + /// .sitemd5(SiteMd5::None) + /// .build(); /// ``` pub fn sitemd5>(mut self, sitemd5_option: M) -> Self { self.site_md5 = sitemd5_option.into(); self } + /// Assign a connection helper to the server. + /// + /// Rather than listening for and servicing connections in the same binary, this option allows + /// accepted connections to be serviced by a different program. After accepting a connection, + /// the Server will execute the provided helper process. Any provided arguments will be passed + /// to the helper process. After those arguments, the Server will pass an integer, which is + /// the file descriptor number of the connected socket. + /// + /// # Arguments + /// + /// - `path` - Path to the helper executable + /// - `args` - Optional arguments to pass to the helper executable. + pub fn connection_helper(mut self, path: OsString, args: Vec) -> Self { + self.connection_helper = Some(path); + self.connection_helper_args = args; + self + } + /// Enables a password guessing protection policy /// /// Policy used to temporarily block an account, source IP or the @@ -523,18 +635,44 @@ where /// use unftp_sbe_fs::ServerExt; /// /// // With default policy - /// let server = Server::with_fs("/tmp").failed_logins_policy(FailedLoginsPolicy::default()); + /// let server = + /// Server::with_fs("/tmp") + /// .failed_logins_policy(FailedLoginsPolicy::default()) + /// .build(); /// /// // Or choose a specific policy like based on source IP and /// // longer block (maximum 3 attempts, 5 minutes, IP based /// // blocking) /// use std::time::Duration; - /// let server = Server::with_fs("/tmp").failed_logins_policy(FailedLoginsPolicy::new(3, Duration::from_secs(300), FailedLoginsBlock::IP)); + /// let server = Server::with_fs("/tmp") + /// .failed_logins_policy(FailedLoginsPolicy::new(3, Duration::from_secs(300), FailedLoginsBlock::IP)) + /// .build(); /// ``` pub fn failed_logins_policy(mut self, policy: FailedLoginsPolicy) -> Self { self.failed_logins_policy = Some(policy); self } +} + +impl Server +where + Storage: StorageBackend + 'static, + Storage::Metadata: Metadata, + User: UserDetail + 'static, +{ + /// Construct a new [`ServerBuilder`] with the given [`StorageBackend`] generator and an [`AnonymousAuthenticator`] + /// + /// [`ServerBuilder`]: struct.ServerBuilder.html + /// [`StorageBackend`]: ../storage/trait.StorageBackend.html + /// [`AnonymousAuthenticator`]: ../auth/struct.AnonymousAuthenticator.html + #[deprecated(since = "0.19.0", note = "use ServerBuilder::new instead")] + #[allow(clippy::new_ret_no_self)] + pub fn new(sbe_generator: Box Storage) + Send + Sync>) -> ServerBuilder + where + AnonymousAuthenticator: Authenticator, + { + ServerBuilder::new(sbe_generator) + } /// Runs the main FTP process asynchronously. Should be started in a async runtime context. /// @@ -546,22 +684,16 @@ where /// use tokio::runtime::Runtime; /// /// let mut rt = Runtime::new().unwrap(); - /// let server = Server::with_fs("/srv/ftp"); - /// rt.spawn(server.listen("127.0.0.1:2121")); + /// rt.spawn(async { + /// let server = Server::with_fs("/srv/ftp").build().await.unwrap(); + /// server.listen("127.0.0.1:2121").await + /// }); /// // ... /// drop(rt); /// ``` /// #[tracing_attributes::instrument] - pub async fn listen + Debug>(mut self, bind_address: T) -> std::result::Result<(), ServerError> { - self.ftps_mode = match self.ftps_mode { - FtpsConfig::Off => FtpsConfig::Off, - FtpsConfig::Building { certs_file, key_file } => FtpsConfig::On { - tls_config: tls::new_config(certs_file, key_file, self.ftps_tls_flags, self.ftps_client_auth, self.ftps_trust_store.clone())?, - }, - FtpsConfig::On { tls_config } => FtpsConfig::On { tls_config }, - }; - + pub async fn listen + Debug>(self, bind_address: T) -> std::result::Result<(), ServerError> { let logger = self.logger.clone(); let bind_address: SocketAddr = bind_address.into().parse()?; let shutdown_notifier = Arc::new(shutdown::Notifier::new()); @@ -588,6 +720,8 @@ where options: (&self).into(), shutdown_topic: shutdown_notifier.clone(), failed_logins: failed_logins.clone(), + connection_helper: self.connection_helper.clone(), + connection_helper_args: self.connection_helper_args.clone(), } .listen(), ) as Pin> + Send>>, @@ -611,6 +745,30 @@ where } } + /// Service a newly established connection as a control connection. + /// + /// Use this method instead of [`listen`](Server::listen) if you want to listen for and accept + /// new connections yourself, instead of using libunftp to do it. + pub async fn service(self, tcp_stream: tokio::net::TcpStream) -> std::result::Result<(), crate::server::ControlChanError> { + let failed_logins = self.failed_logins_policy.as_ref().map(|policy| FailedLoginsCache::new(policy.clone())); + let options: chosen::OptionsHolder = (&self).into(); + let shutdown_notifier = Arc::new(shutdown::Notifier::new()); + let shutdown_listener = shutdown_notifier.subscribe().await; + slog::debug!(self.logger, "Servicing control connection from"); + let result = controlchan::spawn_loop::((&options).into(), tcp_stream, None, None, shutdown_listener, failed_logins.clone()).await; + match result { + Err(err) => { + slog::error!(self.logger, "Could not spawn control channel loop: {:?}", err); + } + Ok(jh) => { + if let Err(e) = jh.await { + slog::error!(self.logger, "Control loop failed to complete: {:?}", e); + } + } + } + Ok(()) + } + // Waits for sub-components to shut down gracefully or aborts if the grace period expires async fn shutdown_linger(logger: slog::Logger, shutdown_notifier: Arc, grace_period: Duration) -> std::result::Result<(), ServerError> { let timeout = Box::pin(tokio::time::sleep(grace_period)); @@ -625,6 +783,19 @@ where } // TODO: Implement feature where we keep on listening for a while i.e. GracefulAcceptingConnections } + + /// Construct a new [`ServerBuilder`] with the given [`StorageBackend`] generator and [`Authenticator`]. The other parameters will be set to defaults. + /// + /// [`ServerBuilder`]: struct.ServerBuilder.html + /// [`StorageBackend`]: ../storage/trait.StorageBackend.html + /// [`Authenticator`]: ../auth/trait.Authenticator.html + #[deprecated(since = "0.19.0", note = "use ServerBuilder::with_authenticator instead")] + pub fn with_authenticator( + sbe_generator: Box Storage) + Send + Sync>, + authenticator: Arc + Send + Sync>, + ) -> ServerBuilder { + ServerBuilder::with_authenticator(sbe_generator, authenticator) + } } impl From<&Server> for chosen::OptionsHolder @@ -650,17 +821,18 @@ where data_listener: server.data_listener.clone(), presence_listener: server.presence_listener.clone(), active_passive_mode: server.active_passive_mode, + binder: server.binder.clone(), } } } -impl Debug for Server +impl Debug for ServerBuilder where Storage: StorageBackend, User: UserDetail, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Server") + f.debug_struct("ServerBuilder") .field("authenticator", &self.authenticator) .field("collect_metrics", &self.collect_metrics) .field("active_passive_mode", &self.active_passive_mode) @@ -681,3 +853,28 @@ where .finish() } } + +impl Debug for Server +where + Storage: StorageBackend, + User: UserDetail, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ServerBuilder") + .field("authenticator", &self.authenticator) + .field("collect_metrics", &self.collect_metrics) + .field("active_passive_mode", &self.active_passive_mode) + .field("greeting", &self.greeting) + .field("logger", &self.logger) + .field("metrics", &self.collect_metrics) + .field("passive_ports", &self.passive_ports) + .field("passive_host", &self.passive_host) + .field("ftps_mode", &self.ftps_mode) + .field("ftps_required_control_chan", &self.ftps_required_control_chan) + .field("ftps_required_data_chan", &self.ftps_required_data_chan) + .field("idle_session_timeout", &self.idle_session_timeout) + .field("proxy_protocol_mode", &self.proxy_protocol_mode) + .field("failed_logins_policy", &self.failed_logins_policy) + .finish() + } +} diff --git a/src/server/ftpserver/chosen.rs b/src/server/ftpserver/chosen.rs index 3cc18546..80e8ff8a 100644 --- a/src/server/ftpserver/chosen.rs +++ b/src/server/ftpserver/chosen.rs @@ -34,6 +34,7 @@ where pub data_listener: Arc, pub presence_listener: Arc, pub active_passive_mode: ActivePassiveMode, + pub binder: Arc>>>, } impl From<&OptionsHolder> for controlchan::LoopConfig @@ -43,6 +44,8 @@ where Storage::Metadata: Metadata, { fn from(server: &OptionsHolder) -> Self { + // So this is when you create a new storage backend? + // XXX Shouldn't instantiate storage until _after_ successful auth. controlchan::LoopConfig { authenticator: server.authenticator.clone(), storage: (server.storage)(), @@ -59,6 +62,7 @@ where data_listener: server.data_listener.clone(), presence_listener: server.presence_listener.clone(), active_passive_mode: server.active_passive_mode, + binder: server.binder.clone(), } } } diff --git a/src/server/ftpserver/listen.rs b/src/server/ftpserver/listen.rs index 0737fceb..7b7f8bc8 100644 --- a/src/server/ftpserver/listen.rs +++ b/src/server/ftpserver/listen.rs @@ -4,7 +4,9 @@ use super::{chosen::OptionsHolder, ServerError}; use crate::server::failed_logins::FailedLoginsCache; use crate::server::shutdown; use crate::{auth::UserDetail, server::controlchan, storage::StorageBackend}; +use std::ffi::OsString; use std::net::SocketAddr; +use std::os::fd::AsRawFd; use std::sync::Arc; use tokio::net::TcpListener; @@ -20,6 +22,8 @@ where pub options: OptionsHolder, pub shutdown_topic: Arc, pub failed_logins: Option>, + pub connection_helper: Option, + pub connection_helper_args: Vec, } impl Listener @@ -35,6 +39,8 @@ where options, shutdown_topic, failed_logins, + connection_helper, + connection_helper_args, } = self; let listener = TcpListener::bind(bind_address).await?; loop { @@ -42,10 +48,32 @@ where match listener.accept().await { Ok((tcp_stream, socket_addr)) => { slog::info!(logger, "Incoming control connection from {:?}", socket_addr); - let result = - controlchan::spawn_loop::((&options).into(), tcp_stream, None, None, shutdown_listener, failed_logins.clone()).await; - if let Err(err) = result { - slog::error!(logger, "Could not spawn control channel loop for connection from {:?}: {:?}", socket_addr, err) + if let Some(helper) = connection_helper.as_ref() { + slog::info!(logger, "Spawning {:?}", helper); + let fd = tcp_stream.as_raw_fd(); + nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_SETFD(nix::fcntl::FdFlag::empty())).unwrap(); + let result = tokio::process::Command::new(helper) + .args(connection_helper_args.iter()) + .arg(fd.to_string()) + .spawn(); + let logger2 = logger.clone(); + match result { + Ok(mut child) => { + tokio::spawn(async move { + let child_status = child.wait().await; + slog::debug!(logger2, "helper process exited {:?}", child_status); + }); + } + Err(err) => { + slog::error!(logger, "Could not spawn helper process for connection from {:?}: {:?}", socket_addr, err); + } + } + } else { + let result = + controlchan::spawn_loop::((&options).into(), tcp_stream, None, None, shutdown_listener, failed_logins.clone()).await; + if let Err(err) = result { + slog::error!(logger, "Could not spawn control channel loop for connection from {:?}: {:?}", socket_addr, err); + } } } Err(err) => { diff --git a/src/server/ftpserver/options.rs b/src/server/ftpserver/options.rs index a541add3..cb10b049 100644 --- a/src/server/ftpserver/options.rs +++ b/src/server/ftpserver/options.rs @@ -1,13 +1,16 @@ -//! Contains code pertaining to the setup options that can be given to the [`Server`](crate::Server) +//! Contains code pertaining to the setup options that can be given to the [`ServerBuilder`](crate::ServerBuilder) +use async_trait::async_trait; use bitflags::bitflags; use std::time::Duration; use std::{ fmt::Formatter, fmt::{self, Debug, Display}, + io, net::{IpAddr, Ipv4Addr}, ops::Range, }; +use tokio::net::TcpSocket; // Once we're sure about the types of these I think its good to expose it to the API user so that // he/she can see what our server defaults are. @@ -18,7 +21,15 @@ pub(crate) const DEFAULT_PASSIVE_PORTS: Range = 49152..65535; pub(crate) const DEFAULT_FTPS_REQUIRE: FtpsRequired = FtpsRequired::None; pub(crate) const DEFAULT_FTPS_TRUST_STORE: &str = "./trusted.pem"; -/// The option to [Server.passive_host](crate::Server::passive_host). It allows the user to specify how the IP address +/// A helper trait to customize how the server binds to ports +#[async_trait] +pub trait Binder: Debug + Send { + /// Create a [`tokio::net::TcpSocket`] and bind it to the given address, with a port in the + /// given range. + async fn bind(&mut self, local_addr: IpAddr, passive_ports: Range) -> io::Result; +} + +/// The option to [ServerBuilder::passive_host](crate::ServerBuilder::passive_host). It allows the user to specify how the IP address /// communicated in the _PASV_ response is determined. #[derive(Debug, PartialEq, Clone, Default)] pub enum PassiveHost { @@ -57,7 +68,7 @@ impl From<&str> for PassiveHost { } } -/// The option to [Server.ftps_required](crate::Server::ftps_required). It allows the user to specify whether clients are required +/// The option to [ServerBuilder::ftps_required](crate::ServerBuilder::ftps_required). It allows the user to specify whether clients are required /// to upgrade a to secure TLS connection i.e. use FTPS. #[derive(Debug, PartialEq, Clone, Copy)] pub enum FtpsRequired { @@ -119,7 +130,7 @@ impl Default for TlsFlags { } } -/// The option to [Server.ftps_client_auth](crate::Server::ftps_client_auth). Tells if and how mutual TLS (client certificate +/// The option to [ServerBuilder::ftps_client_auth](crate::ServerBuilder::ftps_client_auth). Tells if and how mutual TLS (client certificate /// authentication) should be handled. #[derive(Debug, PartialEq, Clone, Copy, Default)] pub enum FtpsClientAuth { @@ -129,11 +140,11 @@ pub enum FtpsClientAuth { Off, /// Mutual TLS is on and whilst the server will request a certificate it will still proceed /// without one. If a certificate is sent by the client it will be validated against the - /// configured trust anchors (see [Server::ftps_trust_store](crate::Server::ftps_trust_store)). + /// configured trust anchors (see [ServerBuilder::ftps_trust_store](crate::ServerBuilder::ftps_trust_store)). Request, /// Mutual TLS is on, the server will request a certificate and it won't proceed without a /// client certificate that validates against the configured trust anchors (see - /// [Server::ftps_trust_store](crate::Server::ftps_trust_store)). + /// [ServerBuilder::ftps_trust_store](crate::ServerBuilder::ftps_trust_store)). Require, } @@ -148,7 +159,7 @@ impl From for FtpsClientAuth { } } -/// The options for [Server.sitemd5](crate::Server::sitemd5). +/// The options for [ServerBuilder::sitemd5](crate::ServerBuilder::sitemd5). /// Allow MD5 either to be used by all, logged in users only or no one. #[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] pub enum SiteMd5 { @@ -162,7 +173,8 @@ pub enum SiteMd5 { } /// Tells how graceful shutdown should happen. An instance of this struct should be returned from -/// the future passed to [Server.shutdown_indicator](crate::Server::shutdown_indicator). +/// the future passed to +/// [ServerBuilder::shutdown_indicator](crate::ServerBuilder::shutdown_indicator). pub struct Shutdown { pub(crate) grace_period: Duration, //pub(crate) handle_new_connections: bool, @@ -242,8 +254,9 @@ impl Default for FailedLoginsPolicy { } } -/// The options for [Server.active_passive_mode](crate::Server::active_passive_mode). -/// This allows to switch active / passive mode on or off. +/// The options for +/// [ServerBuilder::active_passive_mode](crate::ServerBuilder::active_passive_mode). This allows +/// to switch active / passive mode on or off. #[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] pub enum ActivePassiveMode { /// Only passive mode is enabled diff --git a/src/server/session.rs b/src/server/session.rs index 9017df03..db3c439f 100644 --- a/src/server/session.rs +++ b/src/server/session.rs @@ -113,6 +113,8 @@ where pub cert_chain: Option>, // The failed logins cache can monitor successive failed logins and apply a policy to deter brute force attacks. pub failed_logins: Option>, + // An optional functor that can bind a socket + pub binder: Option>, } impl Session @@ -146,6 +148,7 @@ where data_busy: false, cert_chain: None, failed_logins: None, + binder: None, } } @@ -162,6 +165,11 @@ where self } + pub fn binder(mut self, binder: Box) -> Self { + self.binder = Some(binder); + self + } + pub fn control_msg_tx(mut self, sender: Sender) -> Self { self.control_msg_tx = Some(sender); self diff --git a/src/storage/storage_backend.rs b/src/storage/storage_backend.rs index 1593eb86..284630ce 100644 --- a/src/storage/storage_backend.rs +++ b/src/storage/storage_backend.rs @@ -12,6 +12,7 @@ use libc; use md5::{Digest, Md5}; use std::{ fmt::{self, Debug, Formatter, Write}, + io, path::Path, result, time::SystemTime, @@ -164,6 +165,14 @@ pub trait StorageBackend: Send + Sync + Debug { /// The concrete type of the _metadata_ used by this storage backend. type Metadata: Metadata + Sync + Send; + /// Restrict the backend's capabilities commensurate with the provided + /// [`UserDetail`](crate::auth::UserDetail). + /// + /// Once restricted, it may never be unrestricted. + fn enter(&mut self, _user_detail: &User) -> io::Result<()> { + Ok(()) + } + /// Implement to set the name of the storage back-end. By default it returns the type signature. fn name(&self) -> &str { std::any::type_name::() diff --git a/tests/common.rs b/tests/common.rs index e8022f70..da837d58 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -15,7 +15,10 @@ pub async fn run_with_auth() { let server = libunftp::Server::with_fs(std::env::temp_dir()) .authenticator(Arc::new(TestAuthenticator {})) .greeting("Welcome test") - .failed_logins_policy(FailedLoginsPolicy::new(3, std::time::Duration::new(5, 0), FailedLoginsBlock::User)); + .failed_logins_policy(FailedLoginsPolicy::new(3, std::time::Duration::new(5, 0), FailedLoginsBlock::User)) + .build() + .await + .unwrap(); server.listen(addr).await.unwrap(); }