diff --git a/src/config.rs b/src/config.rs index ff81d4ad..ae36d80e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,6 +39,7 @@ pub struct Config { pub daemon_rpc_addr: SocketAddr, pub cookie: Option, pub electrum_rpc_addr: SocketAddr, + pub electrum_proxy_depth: usize, pub http_addr: SocketAddr, pub http_socket_file: Option, pub rpc_socket_file: Option, @@ -139,6 +140,15 @@ impl Config { .help("Electrum server JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet and '127.0.0.1:60401' for regtest)") .takes_value(true), ) + .arg( + Arg::with_name("electrum_proxy_depth") + .long("electrum-proxy-depth") + .help("Electrum server's PROXY protocol header depth. \ + ie. a value of 2 means the 2nd closest hop's PROXY header \ + will be used to find the source IP. A value of 0 means all \ + IPs are ignored. (default: 0)") + .takes_value(true), + ) .arg( Arg::with_name("http_addr") .long("http-addr") @@ -393,6 +403,11 @@ impl Config { .unwrap_or(&format!("127.0.0.1:{}", default_electrum_port)), "Electrum RPC", ); + let electrum_proxy_depth = m + .value_of("electrum_proxy_depth") + .unwrap_or("0") + .parse::() + .expect("invalid electrum_proxy_depth"); let http_addr: SocketAddr = str_to_socketaddr( m.value_of("http_addr") .unwrap_or(&format!("127.0.0.1:{}", default_http_port)), @@ -465,6 +480,7 @@ impl Config { cookie, utxos_limit: value_t_or_exit!(m, "utxos_limit", usize), electrum_rpc_addr, + electrum_proxy_depth, electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize), electrum_banner, http_addr, diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 3c0e1a69..0ef6423b 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -103,6 +103,7 @@ fn get_status_hash(txs: Vec<(Txid, Option)>, query: &Query) -> Option>>, + electrum_proxy_depth: usize, // Chain info related query: Arc, last_header_entry: Option, @@ -133,9 +134,8 @@ impl Connection { query: Arc, stream: ConnectionStream, stats: Arc, - txs_limit: usize, - shutdown_send: Sender<()>, - shutdown_recv: Receiver<()>, + (txs_limit, electrum_proxy_depth): (usize, usize), + (shutdown_send, shutdown_recv): (Sender<()>, Receiver<()>), #[cfg(feature = "electrum-discovery")] discovery: Option>, ) -> Connection { let (reply_killer, shutdown_replies) = crossbeam_channel::unbounded(); @@ -153,6 +153,7 @@ impl Connection { }); let ret = Connection { proxy_proto_addr: Cell::new(None), + electrum_proxy_depth, query, last_header_entry: None, // disable header subscription for now status_hashes: HashMap::new(), @@ -576,7 +577,9 @@ impl Connection { .borrow_mut() .as_mut() .and_then(|r| r.fill_buf().ok()) - .and_then(|mut available| parse_proxy_headers(&mut available)) + .and_then(|mut available| { + parse_proxy_headers(&mut available, self.electrum_proxy_depth) + }) .map(|addr| { trace!("RPC Received PROXY Protocol address: {}", addr); addr @@ -632,6 +635,7 @@ impl Connection { mut reader: BufReader, tx: &crossbeam_channel::Sender, addr: Option, + electrum_proxy_depth: usize, ) -> Result<()> { let addr_str = addr .map(|a| a.to_string()) @@ -671,7 +675,7 @@ impl Connection { // Step 1: Parse PROXY header if present and move the slice // cursor over to the next byte. - let new_addr = parse_proxy_headers(&mut slice); + let new_addr = parse_proxy_headers(&mut slice, electrum_proxy_depth); if new_addr != addr { trace!("PROXY addresses don't match between requests: Expected {addr:?} Got {new_addr:?}"); } @@ -705,15 +709,14 @@ impl Connection { let rpc_addr = self.get_source_addr(); let addr_str = self.get_source_addr_str(); let shutdown_send = self.shutdown_send.clone(); + let electrum_proxy_depth = self.electrum_proxy_depth; let child = spawn_thread("reader", move || { - let result = - std::panic::catch_unwind(|| Connection::handle_requests(reader, &tx, rpc_addr)) - .unwrap_or_else(|e| { - Err( - format!("RPC Panic in request handler: {}", parse_panic_error(&e)) - .into(), - ) - }); + let result = std::panic::catch_unwind(|| { + Connection::handle_requests(reader, &tx, rpc_addr, electrum_proxy_depth) + }) + .unwrap_or_else(|e| { + Err(format!("RPC Panic in request handler: {}", parse_panic_error(&e)).into()) + }); // This shuts down the other graceful shutdown thread, // which also shuts down the handle_replies loop // regardless of panic, Err, or Ok @@ -878,6 +881,7 @@ impl RPC { }); let txs_limit = config.electrum_txs_limit; + let electrum_proxy_depth = config.electrum_proxy_depth; RPC { notification: notification.sender(), @@ -906,8 +910,8 @@ impl RPC { let garbage_sender = garbage_sender.clone(); // Kill the peers properly - let (killer, peace_receiver) = std::sync::mpsc::channel(); - let killer_clone = killer.clone(); + let (shutdown_sender, shutdown_receiver) = std::sync::mpsc::channel(); + let shutdown_sender_clone = shutdown_sender.clone(); #[cfg(feature = "electrum-discovery")] let discovery = discovery.clone(); @@ -918,9 +922,8 @@ impl RPC { query, stream, stats, - txs_limit, - killer_clone.clone(), - peace_receiver, + (txs_limit, electrum_proxy_depth), + (shutdown_sender_clone.clone(), shutdown_receiver), #[cfg(feature = "electrum-discovery")] discovery, ); @@ -929,12 +932,12 @@ impl RPC { senders.lock().unwrap().push(conn.message_chan.sender()); conn.run(); info!("[{}] disconnected peer", addr); - let _ = killer_clone.send(()); + let _ = shutdown_sender_clone.send(()); let _ = garbage_sender.send(std::thread::current().id()); }); trace!("spawned {:?}", spawned.thread().id()); - threads.insert(spawned.thread().id(), (spawned, killer)); + threads.insert(spawned.thread().id(), (spawned, shutdown_sender)); while let Ok(id) = garbage_receiver.try_recv() { if let Some((thread, killer)) = threads.remove(&id) { trace!("joining {:?}", id); @@ -1256,9 +1259,10 @@ fn proxy_header_to_source_socket_addr(p_header: ProxyHeader) -> Option Option { +fn parse_proxy_headers(buf: &mut &[u8], electrum_proxy_depth: usize) -> Option { trace!("Starting parse PROXY headers: {:?}", &buf[..12]); let mut addr = None; + let mut current_header_index = 0; // The last header is the outer-most proxy loop { let p_header = match proxy_protocol::parse(buf) { @@ -1273,6 +1277,15 @@ fn parse_proxy_headers(buf: &mut &[u8]) -> Option { Err(_) => return None, }; trace!("Parsed PROXY protocol header: {:?}", p_header); + // Increment from 0 to 1 before the first check + current_header_index += 1; + // 0 should always continue + // 1 should only get the 1st header's IP address etc. + if current_header_index != electrum_proxy_depth { + continue; + } + // The address is only attempted to be + // parsed when the 1 based index is equal addr = proxy_header_to_source_socket_addr(p_header); } addr diff --git a/start b/start index 1305a865..f3ce55f7 100755 --- a/start +++ b/start @@ -8,6 +8,18 @@ DB_FOLDER=/electrs NODENAME=$(hostname|cut -d . -f1) LOCATION=$(hostname|cut -d . -f2) +# since we know that our nginx will always be the first +# (closest) proxy from electrs, we set it to 1. +# If some servers prefer to disable PROXY protocol, +# set to 0. If the client themselves is proxying the +# electrum RPCs from other clients, this should be set to +# the hop that you want to trust to source IP addresses. +# If that hop doesn't exist, TcpStream's local_addr or +# "Unix socket" is used, and the PROXY headers are discarded. +# Image: +# [electrs] <> [proxy "1"] <> [proxy "2"] <> [electrum client] +PROXY_PROTOCOL_DEPTH=1 + # load rust if necessary if [ -e "${HOME}/.cargo/env" ];then source "${HOME}/.cargo/env" @@ -136,6 +148,7 @@ do --address-search \ --utxos-limit "${UTXOS_LIMIT}" \ --electrum-txs-limit "${ELECTRUM_TXS_LIMIT}" \ + --electrum-proxy-depth "${PROXY_PROTOCOL_DEPTH}" \ -vvv sleep 1 done