Skip to content

handshake: implement (togglable) pop verification #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5107,6 +5107,7 @@ dependencies = [
"arrayref",
"bytes",
"cid",
"fleek-crypto",
"fn-sdk",
"hex",
"lightning-schema",
Expand Down Expand Up @@ -8380,6 +8381,7 @@ dependencies = [
"lightning-types",
"lightning-workspace-hack",
"serde",
"sha2 0.10.8",
]

[[package]]
Expand Down
4 changes: 4 additions & 0 deletions core/handshake/benches/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ async fn perform_handshake(tx: &Sender<Bytes>, rx: &mut Receiver<Bytes>) {
schema::HandshakeRequestFrame::Handshake {
retry: None,
service: 1001,
expiry: u64::MAX,
nonce: 789,
pk: ClientPublicKey([1; 96]),
pop: ClientSignature([2; 48]),
}
Expand Down Expand Up @@ -121,6 +123,8 @@ fn run_clients(n: usize) -> Vec<JoinHandle<()>> {
schema::HandshakeRequestFrame::Handshake {
retry: None,
service: 1001,
expiry: u64::MAX,
nonce: 789,
pk: ClientPublicKey([1; 96]),
pop: ClientSignature([2; 48]),
}
Expand Down
7 changes: 7 additions & 0 deletions core/handshake/examples/tcp-stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fs::File;
use std::io::Write;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::time::SystemTime;

use anyhow::{anyhow, Result};
use clap::Parser;
Expand Down Expand Up @@ -223,6 +224,12 @@ async fn run_request(
service: 1001,
pk: ClientPublicKey([1; 96]),
pop: ClientSignature([2; 48]),
expiry: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
+ 3600,
nonce: 0,
})
.await?;
data!(line, "handshake_sent", timer);
Expand Down
3 changes: 3 additions & 0 deletions core/handshake/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use crate::transports;
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct HandshakeConfig {
/// Enable validating client pop signatures
pub validate: bool,
/// List of transports to enable
#[serde(rename = "transport")]
pub transports: Vec<TransportConfig>,
Expand All @@ -24,6 +26,7 @@ pub struct HandshakeConfig {
impl Default for HandshakeConfig {
fn default() -> Self {
Self {
validate: false, // disabled by default until gateway is ready
transports: vec![
TransportConfig::WebRTC(Default::default()),
TransportConfig::WebTransport(Default::default()),
Expand Down
108 changes: 98 additions & 10 deletions core/handshake/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use async_channel::{bounded, Sender};
use axum::{Extension, Router};
use axum_server::Handle;
use dashmap::DashMap;
use fleek_crypto::NodePublicKey;
use fleek_crypto::{NodePublicKey, NodeSecretKey, PublicKey, SecretKey};
use fn_sdk::header::{write_header, ConnectionHeader};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use lightning_interfaces::prelude::*;
use lightning_interfaces::schema::handshake::{HandshakeRequestFrame, TerminationReason};
use lightning_metrics::increment_counter;
use rand::RngCore;
use schema::handshake::{handshake_digest, HandshakeResponse};
use tracing::warn;
use triomphe::Arc;

Expand All @@ -32,7 +34,10 @@ pub struct Handshake<C: NodeComponents> {
}

struct Run<C: NodeComponents> {
ctx: Context<c![C::ServiceExecutorInterface::Provider]>,
ctx: Context<
c![C::ServiceExecutorInterface::Provider],
c![C::ApplicationInterface::SyncExecutor],
>,
// The axum_server Server API (TLS server) does not have a `with_graceful_shutdown`
// similarly to axum Server. The only way to shut it down gracefully is via its Handle API.
handle: Handle,
Expand All @@ -45,12 +50,22 @@ impl<C: NodeComponents> Handshake<C> {
config: &C::ConfigProviderInterface,
keystore: &C::KeystoreInterface,
service_executor: &C::ServiceExecutorInterface,
query_runner: &c!(C::ApplicationInterface::SyncExecutor),
fdi::Cloned(waiter): fdi::Cloned<ShutdownWaiter>,
) -> Self {
let config = config.get::<Self>();
let provider = service_executor.get_provider();
let pk = keystore.get_ed25519_pk();
let ctx = Context::new(provider, waiter, config.timeout);
let sk = keystore.get_ed25519_sk();
let ctx = Context::new(
pk,
sk,
provider,
query_runner.clone(),
waiter,
config.timeout,
config.validate,
);
let handle = Handle::new();

Self {
Expand Down Expand Up @@ -134,13 +149,17 @@ pub struct TokenState {

/// Shared context given to the transport listener tasks and the connection proxies.
#[derive(Clone)]
pub struct Context<P: ExecutorProviderInterface> {
pub struct Context<P: ExecutorProviderInterface, QR: SyncQueryRunnerInterface> {
pk: NodePublicKey,
sk: NodeSecretKey,
/// Service unix socket provider
provider: P,
query_runner: QR,
pub(crate) shutdown: ShutdownWaiter,
connection_counter: Arc<AtomicU64>,
connections: Arc<DashMap<u64, ConnectionEntry>>,
timeout: Duration,
pub(crate) validate: bool,
}

struct ConnectionEntry {
Expand All @@ -153,21 +172,33 @@ struct ConnectionEntry {
timeout: u128,
}

impl<P: ExecutorProviderInterface> Context<P> {
pub fn new(provider: P, waiter: ShutdownWaiter, timeout: Duration) -> Self {
impl<P: ExecutorProviderInterface, QR: SyncQueryRunnerInterface> Context<P, QR> {
pub fn new(
pk: NodePublicKey,
sk: NodeSecretKey,
provider: P,
query_runner: QR,
waiter: ShutdownWaiter,
timeout: Duration,
validate: bool,
) -> Self {
Self {
pk,
sk,
provider,
query_runner,
shutdown: waiter,
connection_counter: AtomicU64::new(0).into(),
connections: DashMap::new().into(),
timeout,
validate,
}
}

pub async fn handle_new_connection<S: TransportSender, R: TransportReceiver>(
&self,
request: HandshakeRequestFrame,
sender: S,
mut sender: S,
mut receiver: R,
) where
(S, R): Into<TransportPair>,
Expand All @@ -178,15 +209,64 @@ impl<P: ExecutorProviderInterface> Context<P> {
retry: None,
service,
pk,
..
expiry,
nonce,
pop,
} => {
// TODO: Verify proof of possession
// TODO: Send handshake response
if self.validate {
// 1. check the expiry
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
if expiry <= now {
sender.terminate(TerminationReason::InvalidHandshake).await;
return;
}

// 2. TODO: check the nonce against app state or zero if not found
let current_nonce = self.query_runner.get_client_nonce(&pk);
if nonce <= current_nonce {
sender.terminate(TerminationReason::InvalidHandshake).await;
return;
}

// 3. compute the handshake digest
let digest = handshake_digest(None, service, expiry, nonce, pk);

// 4. validate client signature
// TODO: we currently double hash this, should we just build the
// signature payload from the encoding directly ?
if pk.verify(&pop, &digest) != Ok(true) {
sender.terminate(TerminationReason::InvalidHandshake).await;
increment_counter!(
"handshake_invalid_signatures",
Some("Counter for rejected handshake signatures")
);
return;
}

// 5. Finally send the handshake response, signing the handshake digest from the
// client we just validated.
// TODO: again should we avoid double hashing here
sender
.send_handshake_response(HandshakeResponse {
pk: self.pk,
pop: self.sk.sign(&digest),
})
.await;
}

// Attempt to connect to the service, getting the unix socket.
let Some(mut socket) = self.provider.connect(service).await else {
sender.terminate(TerminationReason::InvalidService).await;
warn!("failed to connect to service {service}");
let service_id = service.to_string();
increment_counter!(
"handshake_service_socket_not_found",
Some("Number of times a service failed to connect"),
"service" => service_id.as_str()
);
return;
};

Expand All @@ -197,6 +277,14 @@ impl<P: ExecutorProviderInterface> Context<P> {

if let Err(e) = write_header(&header, &mut socket).await {
sender.terminate(TerminationReason::ServiceTerminated).await;
let service_id = service.to_string();
increment_counter!(
"handshake_connection_header_write_failed",
Some(
"Number of times services failed before connection header was successfully sent"
),
"service" => service_id.as_str()
);
warn!("failed to write connection header to service {service}: {e}");
return;
}
Expand Down
Loading