From f3124c2df8a5d5c5dd9756aff9ff105334426138 Mon Sep 17 00:00:00 2001 From: Jared Wolff Date: Sat, 15 May 2021 15:50:21 -0400 Subject: [PATCH] rumqttd: adding optional native-tls support. (#258) By using the `use-native-tls` feature, this crate can now use tokio-native-tls vs tokio-rustls. Changed: * Made certain rustls includes to be conditional in rumqttd * How errors are handled in main loop. Otherwise process loop exits silently. * Configuration .conf files to account for cert usage * Support for all 3 cases, Rustls, Native-TLS or none! * Changed CI to support different use cases of this library. Added: * Notes to Readme about adding native-tls * Added separate tls() function in rumqttd for native-tls * Added use of tokio-native-tls --- .github/workflows/features.yml | 12 ++ rumqttd/Cargo.toml | 8 +- rumqttd/README.md | 41 ++++++ rumqttd/config/rumqttd.conf | 10 +- rumqttd/config/rumqttd0.conf | 11 +- rumqttd/config/rumqttd1.conf | 4 +- rumqttd/config/rumqttd2.conf | 4 +- rumqttd/src/lib.rs | 257 +++++++++++++++++++++++++-------- 8 files changed, 279 insertions(+), 68 deletions(-) diff --git a/.github/workflows/features.yml b/.github/workflows/features.yml index d5d0bfe37..d95ff72a2 100644 --- a/.github/workflows/features.yml +++ b/.github/workflows/features.yml @@ -18,6 +18,18 @@ jobs: - uses: actions-rs/toolchain@v1 with: toolchain: stable + - uses: actions-rs/cargo@v1 + with: + command: test + args: --release --no-default-features + - uses: actions-rs/cargo@v1 + with: + command: test + args: --release --features use-rustls + - uses: actions-rs/cargo@v1 + with: + command: test + args: --release --no-default-features --features use-native-tls - uses: actions-rs/cargo@v1 with: command: test diff --git a/rumqttd/Cargo.toml b/rumqttd/Cargo.toml index 08fe1306b..e116bf618 100644 --- a/rumqttd/Cargo.toml +++ b/rumqttd/Cargo.toml @@ -18,13 +18,15 @@ name = "rumqttd" path = "src/bin.rs" [features] +default = ["use-rustls"] prof = ["pprof"] +use-rustls = ["tokio-rustls"] +use-native-tls = ["tokio-native-tls"] [dependencies] rumqttlog = { path = "../rumqttlog", version = "0.6"} mqttbytes = { path = "../mqttbytes", version = "0.3" } tokio = { version = "1.0", features = ["full"] } -tokio-rustls = "0.22" serde = { version = "1", features = ["derive"] } log = "0.4" thiserror = "1" @@ -36,5 +38,9 @@ warp = "0.3" futures-util = "0.3.8" pprof = { version = "0.4", features = ["flamegraph", "protobuf"], optional = true } +# Optional +tokio-rustls = { version = "0.22", optional = true } +tokio-native-tls = { version = "0.3", optional = true } + [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.3" diff --git a/rumqttd/README.md b/rumqttd/README.md index fa9c840b3..bda3fc3b2 100644 --- a/rumqttd/README.md +++ b/rumqttd/README.md @@ -2,3 +2,44 @@ [![crates.io page](https://img.shields.io/crates/v/rumqttd.svg)](https://crates.io/crates/rumqttd) [![docs.rs page](https://docs.rs/rumqttd/badge.svg)](https://docs.rs/rumqttd) + +## `native-tls` support + +This crate, by default uses the `tokio-rustls` crate. There's also support for the `tokio-native-tls` crate. +Add it to your Cargo.toml like so: + +``` +rumqttd = { version = "0.5", default-features = false, features = ["use-native-tls"] } +``` + +Then in your config file make sure that you use the `pkcs12` entries under `certs` for your cert instead of `cert_path`, `key_path`, etc. + +```toml +[rumqtt.servers.1] +port = 8883 + +[servers.1.cert] +pkcs12_path = "/root/identity.pfx" +pkcs12_pass = "" +``` + +Here's what a Rustls config looks like: + +```toml +[servers.1] +port = 8883 + +[servers.1.cert] +cert_path = "tlsfiles/server.cert.pem" +key_path = "tlsfiles/server.key.pem" +ca_path = "tlsfiles/ca.cert.pem" +``` + + +You can generate the `.p12`/`.pfx` file using `openssl`: + +``` +openssl pkcs12 -export -out identity.pfx -inkey ~/pki/private/test.key -in ~/pki/issued/test.crt -certfile ~/pki/ca.crt +``` + +Make sure if you use a password it matches the entry in `pkcs12_pass`. If no password, use an empty string `""`. \ No newline at end of file diff --git a/rumqttd/config/rumqttd.conf b/rumqttd/config/rumqttd.conf index 3ea14d9e4..ce704a658 100644 --- a/rumqttd/config/rumqttd.conf +++ b/rumqttd/config/rumqttd.conf @@ -26,11 +26,13 @@ next_connection_delay_ms = 1 # Configuration of server and connections that it accepts [servers.2] listen = "0.0.0.0:8883" -cert_path = "tlsfiles/server.cert.pem" -key_path = "tlsfiles/server.key.pem" -ca_path = "tlsfiles/ca.cert.pem" next_connection_delay_ms = 10 - # Tls connections. ca_path enables client authentication + # Cert config + [servers.2.cert] + cert_path = "tlsfiles/server.cert.pem" + key_path = "tlsfiles/server.key.pem" + ca_path = "tlsfiles/ca.cert.pem" + # Connection parameters [servers.2.connections] connection_timeout_ms = 5000 max_client_id_len = 256 diff --git a/rumqttd/config/rumqttd0.conf b/rumqttd/config/rumqttd0.conf index b29cdc6f9..633f7191b 100644 --- a/rumqttd/config/rumqttd0.conf +++ b/rumqttd/config/rumqttd0.conf @@ -27,7 +27,12 @@ next_connection_delay_ms = 1 [servers.2] listen = "0.0.0.0:1883" next_connection_delay_ms = 10 - # Tls connections. ca_path enables client authentication + # Handling of Certs + [servers.2.cert] + cert_path = "tlsfiles/server.cert.pem" + key_path = "tlsfiles/server.key.pem" + ca_path = "tlsfiles/ca-chain.cert.pem" + # Connection parameters [servers.2.connections] connection_timeout_ms = 100 max_client_id_len = 256 @@ -35,9 +40,7 @@ next_connection_delay_ms = 10 max_payload_size = 2048 max_inflight_count = 100 max_inflight_size = 1024 - cert_path = "tlsfiles/server.cert.pem" - key_path = "tlsfiles/server.key.pem" - ca_path = "tlsfiles/ca-chain.cert.pem" + # Cluster configuration. Remote host and port to connect to. # Mesh is created based on ids. diff --git a/rumqttd/config/rumqttd1.conf b/rumqttd/config/rumqttd1.conf index 897765e15..9fd9c1fe0 100644 --- a/rumqttd/config/rumqttd1.conf +++ b/rumqttd/config/rumqttd1.conf @@ -27,7 +27,7 @@ next_connection_delay_ms = 1 [servers.2] listen = "0.0.0.0:1884" next_connection_delay_ms = 10 - # Tls connections. ca_path enables client authentication + # Connection parameters [servers.2.connections] connection_timeout_ms = 100 max_client_id_len = 256 @@ -35,6 +35,8 @@ next_connection_delay_ms = 10 max_payload_size = 2048 max_inflight_count = 100 max_inflight_size = 1024 + # Certs for connection + [servers.2.certs] cert_path = "tlsfiles/server.cert.pem" key_path = "tlsfiles/server.key.pem" ca_path = "tlsfiles/ca-chain.cert.pem" diff --git a/rumqttd/config/rumqttd2.conf b/rumqttd/config/rumqttd2.conf index 7dec3772b..465354ad5 100644 --- a/rumqttd/config/rumqttd2.conf +++ b/rumqttd/config/rumqttd2.conf @@ -27,7 +27,7 @@ next_connection_delay_ms = 1 [servers.2] listen = "0.0.0.0:1885" next_connection_delay_ms = 10 - # Tls connections. ca_path enables client authentication + # Connection parameters [servers.2.connections] connection_timeout_ms = 100 max_client_id_len = 256 @@ -35,6 +35,8 @@ next_connection_delay_ms = 10 max_payload_size = 2048 max_inflight_count = 100 max_inflight_size = 1024 + # Certs for connection + [servers.2.certs] cert_path = "tlsfiles/server.cert.pem" key_path = "tlsfiles/server.key.pem" ca_path = "tlsfiles/ca-chain.cert.pem" diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index 927041918..2b10cfe8f 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -15,12 +15,22 @@ use crate::remotelink::RemoteLink; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio::{task, time}; + +// All requirements for `rustls` +#[cfg(feature = "use-rustls")] use tokio_rustls::rustls::internal::pemfile::{certs, rsa_private_keys}; +#[cfg(feature = "use-rustls")] use tokio_rustls::rustls::{ - AllowAnyAuthenticatedClient, NoClientAuth, RootCertStore, ServerConfig, TLSError, + AllowAnyAuthenticatedClient, RootCertStore, ServerConfig, TLSError as RustlsError, }; -use tokio_rustls::TlsAcceptor; +// All requirements for `native-tls` +#[cfg(feature = "use-native-tls")] +use std::io::Read; +#[cfg(feature = "use-native-tls")] +use tokio_native_tls::native_tls; +#[cfg(feature = "use-native-tls")] +use tokio_native_tls::native_tls::Error as NativeTlsError; pub mod async_locallink; mod consolelink; mod locallink; @@ -31,9 +41,13 @@ mod state; use crate::consolelink::ConsoleLink; pub use crate::locallink::{LinkError, LinkRx, LinkTx}; use crate::network::Network; +#[cfg(feature = "use-rustls")] use crate::Error::ServerKeyNotFound; use std::collections::HashMap; + +#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] use std::fs::File; +#[cfg(feature = "use-rustls")] use std::io::BufReader; #[derive(Debug, thiserror::Error)] @@ -49,8 +63,12 @@ pub enum Error { Recv(#[from] RecvError), #[error("Channel send error")] Send(#[from] SendError<(Id, Event)>), - #[error("TLS error {0}")] - Tls(#[from] TLSError), + #[cfg(feature = "use-native-tls")] + #[error("Native TLS error {0}")] + NativeTls(#[from] NativeTlsError), + #[cfg(feature = "use-rustls")] + #[error("Rustls error {0}")] + Rustls(#[from] RustlsError), #[error("Server cert not provided")] ServerCertRequired, #[error("Server private key not provided")] @@ -67,6 +85,8 @@ pub enum Error { InvalidServerCert(String), #[error("Invalid server key file {0}")] InvalidServerKey(String), + RustlsNotEnabled, + NativeTlsNotEnabled, Disconnected, NetworkClosed, WrongPacket(Packet), @@ -84,12 +104,34 @@ pub struct Config { pub console: ConsoleSettings, } +#[allow(dead_code)] +enum ServerTLSAcceptor { + #[cfg(feature = "use-rustls")] + RustlsAcceptor { acceptor: tokio_rustls::TlsAcceptor }, + #[cfg(feature = "use-native-tls")] + NativeTLSAcceptor { + acceptor: tokio_native_tls::TlsAcceptor, + }, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(untagged)] +pub enum ServerCert { + RustlsCert { + ca_path: String, + cert_path: String, + key_path: String, + }, + NativeTlsCert { + pkcs12_path: String, + pkcs12_pass: String, + }, +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ServerSettings { pub listen: SocketAddr, - pub ca_path: Option, - pub cert_path: Option, - pub key_path: Option, + pub cert: Option, pub next_connection_delay_ms: u64, pub connections: ConnectionSettings, } @@ -215,51 +257,99 @@ impl Server { } } - fn tls(&self) -> Result, Error> { - let (certs, key) = match self.config.cert_path.clone() { - Some(cert) => { - // Get certificates - let cert_file = File::open(&cert); - let cert_file = cert_file.map_err(|_| Error::ServerCertNotFound(cert.clone()))?; - let certs = certs(&mut BufReader::new(cert_file)); - let certs = certs.map_err(|_| Error::InvalidServerCert(cert))?; - - // Get private key - let key = self.config.key_path.as_ref(); - let key = key.ok_or(Error::ServerKeyRequired)?.clone(); - let key_file = File::open(&key); - let key_file = key_file.map_err(|_| ServerKeyNotFound(key.clone()))?; - let keys = rsa_private_keys(&mut BufReader::new(key_file)); - let keys = keys.map_err(|_| Error::InvalidServerKey(key.clone()))?; - - // Get the first key - let key = match keys.first() { - Some(k) => k.clone(), - None => return Err(Error::InvalidServerKey(key.clone())), - }; - - (certs, key) - } - None => return Ok(None), + #[cfg(feature = "use-native-tls")] + fn tls_native_tls( + &self, + pkcs12_path: &String, + pkcs12_pass: &String, + ) -> Result, Error> { + // Get certificates + let cert_file = File::open(&pkcs12_path); + let mut cert_file = + cert_file.map_err(|_| Error::ServerCertNotFound(pkcs12_path.clone()))?; + + // Read cert into memory + let mut buf = Vec::new(); + cert_file + .read_to_end(&mut buf) + .map_err(|_| Error::InvalidServerCert(pkcs12_path.clone()))?; + + // Get the identity + let identity = native_tls::Identity::from_pkcs12(&buf, &pkcs12_pass) + .map_err(|_| Error::InvalidServerCert(pkcs12_path.clone()))?; + + // Builder + let builder = native_tls::TlsAcceptor::builder(identity).build()?; + + // Create acceptor + let acceptor = tokio_native_tls::TlsAcceptor::from(builder); + Ok(Some(ServerTLSAcceptor::NativeTLSAcceptor { acceptor })) + } + + #[allow(dead_code)] + #[cfg(not(feature = "use-native-tls"))] + fn tls_native_tls( + &self, + _pkcs12_path: &String, + _pkcs12_pass: &String, + ) -> Result, Error> { + Err(Error::NativeTlsNotEnabled) + } + + #[cfg(feature = "use-rustls")] + fn tls_rustls( + &self, + cert_path: &String, + key_path: &String, + ca_path: &String, + ) -> Result, Error> { + let (certs, key) = { + // Get certificates + let cert_file = File::open(&cert_path); + let cert_file = cert_file.map_err(|_| Error::ServerCertNotFound(cert_path.clone()))?; + let certs = certs(&mut BufReader::new(cert_file)); + let certs = certs.map_err(|_| Error::InvalidServerCert(cert_path.to_string()))?; + + // Get private key + let key_file = File::open(&key_path); + let key_file = key_file.map_err(|_| ServerKeyNotFound(key_path.clone()))?; + let keys = rsa_private_keys(&mut BufReader::new(key_file)); + let keys = keys.map_err(|_| Error::InvalidServerKey(key_path.clone()))?; + + // Get the first key + let key = match keys.first() { + Some(k) => k.clone(), + None => return Err(Error::InvalidServerKey(key_path.clone())), + }; + + (certs, key) }; // client authentication with a CA. CA isn't required otherwise - let mut server_config = match self.config.ca_path.clone() { - Some(ca) => { - let ca_file = File::open(&ca); - let ca_file = ca_file.map_err(|_| Error::CaFileNotFound(ca.clone()))?; - let ca_file = &mut BufReader::new(ca_file); - let mut store = RootCertStore::empty(); - let o = store.add_pem_file(ca_file); - o.map_err(|_| Error::InvalidCACert(ca))?; - ServerConfig::new(AllowAnyAuthenticatedClient::new(store)) - } - None => ServerConfig::new(NoClientAuth::new()), + let mut server_config = { + let ca_file = File::open(ca_path); + let ca_file = ca_file.map_err(|_| Error::CaFileNotFound(ca_path.clone()))?; + let ca_file = &mut BufReader::new(ca_file); + let mut store = RootCertStore::empty(); + let o = store.add_pem_file(ca_file); + o.map_err(|_| Error::InvalidCACert(ca_path.to_string()))?; + ServerConfig::new(AllowAnyAuthenticatedClient::new(store)) }; server_config.set_single_cert(certs, key)?; - let acceptor = TlsAcceptor::from(Arc::new(server_config)); - Ok(Some(acceptor)) + let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config)); + Ok(Some(ServerTLSAcceptor::RustlsAcceptor { acceptor })) + } + + #[allow(dead_code)] + #[cfg(not(feature = "use-rustls"))] + fn tls_rustls( + &self, + _cert_path: &String, + _key_path: &String, + _ca_path: &String, + ) -> Result, Error> { + Err(Error::RustlsNotEnabled) } async fn start(&self) -> Result<(), Error> { @@ -268,7 +358,24 @@ impl Server { let mut count = 0; let config = Arc::new(self.config.connections.clone()); - let acceptor = self.tls()?; + + // Get the ServerTLSAcceptor which allow us to use either Rustls or Native TLS + #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] + let acceptor = match &self.config.cert { + Some(c) => match c { + ServerCert::RustlsCert { + ca_path, + cert_path, + key_path, + } => self.tls_rustls(cert_path, key_path, ca_path)?, + ServerCert::NativeTlsCert { + pkcs12_path, + pkcs12_pass, + } => self.tls_native_tls(pkcs12_path, pkcs12_pass)?, + }, + None => None, + }; + let max_incoming_size = config.max_payload_size; info!( @@ -276,30 +383,66 @@ impl Server { self.config.listen, self.id ); loop { - let (stream, addr) = listener.accept().await?; + // Await new network connection. + let (stream, addr) = match listener.accept().await { + Ok((s, r)) => (s, r), + Err(_e) => { + error!("Unable to accept socket."); + continue; + } + }; + + // Depending on TLS or not create a new Network + #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] let network = match &acceptor { - Some(acceptor) => { + Some(a) => { info!("{}. Accepting TLS connection from: {}", count, addr); - let stream = match acceptor.accept(stream).await { - Ok(v) => v, - Err(e) => { - error!("Failed to accept TLS connection. Error = {:?}", e); - continue; - } - }; - Network::new(stream, max_incoming_size) + // Depending on which acceptor we're using address accordingly.. + match a { + #[cfg(feature = "use-rustls")] + ServerTLSAcceptor::RustlsAcceptor { acceptor } => { + let stream = match acceptor.accept(stream).await { + Ok(v) => v, + Err(e) => { + error!("Failed to accept TLS connection using Rustls. Error = {:?}", e); + continue; + } + }; + + Network::new(stream, max_incoming_size) + } + #[cfg(feature = "use-native-tls")] + ServerTLSAcceptor::NativeTLSAcceptor { acceptor } => { + let stream = match acceptor.accept(stream).await { + Ok(v) => v, + Err(e) => { + error!("Failed to accept TLS connection using Native TLS. Error = {:?}", e); + continue; + } + }; + + Network::new(stream, max_incoming_size) + } + } } None => { info!("{}. Accepting TCP connection from: {}", count, addr); Network::new(stream, max_incoming_size) } }; + #[cfg(not(any(feature = "use-rustls", feature = "use-native-tls")))] + let network = { + info!("{}. Accepting TCP connection from: {}", count, addr); + Network::new(stream, max_incoming_size) + }; count += 1; let config = config.clone(); let router_tx = self.router_tx.clone(); + + // Spawn a new thread to handle this connection. task::spawn(async { let connector = Connector::new(config, router_tx); if let Err(e) = connector.new_connection(network).await {