From a34e3531b22c7758bb130dfd183d787a3526ce86 Mon Sep 17 00:00:00 2001 From: junderw Date: Sun, 8 Oct 2023 00:34:14 -0700 Subject: [PATCH] Feat: Log client IP during REST and Electrum requests --- Cargo.lock | 39 ++++ Cargo.toml | 2 + src/bin/electrs.rs | 9 + src/config.rs | 16 ++ src/electrum/server.rs | 400 +++++++++++++++++++++++++++++++++-------- src/new_index/fetch.rs | 8 +- src/rest.rs | 43 ++++- src/util/mod.rs | 31 ++-- start | 13 ++ 9 files changed, 460 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23b2bb57..5409cd00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "either" version = "1.6.1" @@ -420,9 +426,11 @@ dependencies = [ "lazy_static", "libc", "log", + "memchr", "num_cpus", "page_size", "prometheus", + "proxy-protocol", "rayon", "rocksdb", "serde", @@ -1098,6 +1106,16 @@ version = "2.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf7e6d18738ecd0902d30d1ad232c9125985a3422929b16c65517b38adc14f96" +[[package]] +name = "proxy-protocol" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e50c72c21c738f5c5f350cc33640aee30bf7cd20f9d9da20ed41bce2671d532" +dependencies = [ + "bytes", + "snafu", +] + [[package]] name = "quote" version = "1.0.31" @@ -1512,6 +1530,27 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "snafu" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.91", +] + [[package]] name = "socket2" version = "0.4.4" diff --git a/Cargo.toml b/Cargo.toml index 5872d6ba..7b6bad12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,8 +35,10 @@ libc = "0.2.81" log = "0.4.11" socket2 = { version = "0.4", features = ["all"] } num_cpus = "1.12.0" +memchr = "2.4.1" page_size = "0.4.2" prometheus = "0.13" +proxy-protocol = { version = "0.5", features = ["always_exhaustive"] } rayon = "1.5.0" rocksdb = "0.21.0" serde = "1.0.118" diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 68fa47ff..aa71dacb 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -106,6 +106,15 @@ fn run_server(config: Arc) -> Result<()> { ); } + if std::env::var("ELECTRS_PERIODIC_THREAD_LOGGER").is_ok() { + electrs::util::spawn_thread("periodic_thread_logger", || loop { + electrs::util::with_spawned_threads(|threads| { + debug!("THREADS: {:?}", threads); + }); + std::thread::sleep(std::time::Duration::from_millis(5000)); + }); + } + loop { if let Err(err) = signal.wait(Duration::from_millis(500), true) { info!("stopping server: {}", err); diff --git a/src/config.rs b/src/config.rs index ff81d4ad..ae36d80e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,6 +39,7 @@ pub struct Config { pub daemon_rpc_addr: SocketAddr, pub cookie: Option, pub electrum_rpc_addr: SocketAddr, + pub electrum_proxy_depth: usize, pub http_addr: SocketAddr, pub http_socket_file: Option, pub rpc_socket_file: Option, @@ -139,6 +140,15 @@ impl Config { .help("Electrum server JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet and '127.0.0.1:60401' for regtest)") .takes_value(true), ) + .arg( + Arg::with_name("electrum_proxy_depth") + .long("electrum-proxy-depth") + .help("Electrum server's PROXY protocol header depth. \ + ie. a value of 2 means the 2nd closest hop's PROXY header \ + will be used to find the source IP. A value of 0 means all \ + IPs are ignored. (default: 0)") + .takes_value(true), + ) .arg( Arg::with_name("http_addr") .long("http-addr") @@ -393,6 +403,11 @@ impl Config { .unwrap_or(&format!("127.0.0.1:{}", default_electrum_port)), "Electrum RPC", ); + let electrum_proxy_depth = m + .value_of("electrum_proxy_depth") + .unwrap_or("0") + .parse::() + .expect("invalid electrum_proxy_depth"); let http_addr: SocketAddr = str_to_socketaddr( m.value_of("http_addr") .unwrap_or(&format!("127.0.0.1:{}", default_http_port)), @@ -465,6 +480,7 @@ impl Config { cookie, utxos_limit: value_t_or_exit!(m, "utxos_limit", usize), electrum_rpc_addr, + electrum_proxy_depth, electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize), electrum_banner, http_addr, diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 1b44ee99..3bb2f9ba 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -1,3 +1,4 @@ +use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::convert::TryInto; use std::fs; @@ -9,14 +10,15 @@ use std::os::unix::fs::FileTypeExt; use std::os::unix::net::{UnixListener, UnixStream}; use std::path::Path; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; use std::thread; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use error_chain::ChainedError; use hex; -use serde_json::{from_str, Value}; +use proxy_protocol::{version1, version2, ProxyHeader}; +use serde_json::Value; use sha2::{Digest, Sha256}; #[cfg(not(feature = "liquid"))] @@ -100,39 +102,78 @@ fn get_status_hash(txs: Vec<(Txid, Option)>, query: &Query) -> Option>>, + electrum_proxy_depth: usize, + // Chain info related query: Arc, last_header_entry: Option, status_hashes: HashMap, // ScriptHash -> StatusHash - stream: ConnectionStream, - chan: SyncChannel, stats: Arc, txs_limit: usize, - die_please: Option>, + // Stream related + stream: ConnectionStream, + _arc_stream: Arc, // Needs to be kept alive until drop + reader: RefCell>>, + // Channel related + message_chan: SyncChannel, + shutdown_replies: crossbeam_channel::Receiver<()>, // For reply select branch + shutdown_send: crossbeam_channel::Sender<()>, // For Drop. Kills properly-die thread + // Discovery related #[cfg(feature = "electrum-discovery")] discovery: Option>, } +impl Drop for Connection { + fn drop(&mut self) { + let _ = self.shutdown_send.send(()); + } +} + impl Connection { pub fn new( query: Arc, stream: ConnectionStream, stats: Arc, - txs_limit: usize, - die_please: Receiver<()>, + (txs_limit, electrum_proxy_depth): (usize, usize), + shutdown: SyncChannel<()>, #[cfg(feature = "electrum-discovery")] discovery: Option>, ) -> Connection { - Connection { + // Channels + let (reply_killer, shutdown_replies) = crossbeam_channel::unbounded(); + let shutdown_send = shutdown.sender(); + + // Using this Arc to prevent any thread leaks from keeping the stream alive + let _arc_stream = Arc::new(stream.try_clone().expect("failed to clone TcpStream")); + let maybe_stream = Arc::downgrade(&_arc_stream); + + spawn_thread("properly-die", move || { + let _ = shutdown.receiver().map(|c| c.recv()); + let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both)); + let _ = reply_killer.send(()); + }); + + let ret = Connection { + proxy_proto_addr: Cell::new(None), + electrum_proxy_depth, query, last_header_entry: None, // disable header subscription for now status_hashes: HashMap::new(), + reader: RefCell::new(Some(BufReader::new( + stream.try_clone().expect("failed to clone TcpStream"), + ))), stream, - chan: SyncChannel::new(10), + _arc_stream, + message_chan: SyncChannel::new(10), stats, txs_limit, - die_please: Some(die_please), + shutdown_replies, + shutdown_send, #[cfg(feature = "electrum-discovery")] discovery, - } + }; + // Wait for first request to find + ret.get_source_addr(); + ret } fn blockchain_headers_subscribe(&mut self) -> Result { @@ -354,8 +395,12 @@ impl Connection { let tx = params.get(0).chain_err(|| "missing tx")?; let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string(); let txid = self.query.broadcast_raw(&tx)?; - if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) { - warn!("failed to issue PeriodicUpdate after broadcast: {}", e); + if let Err(e) = self.message_chan.sender().try_send(Message::PeriodicUpdate) { + warn!( + "[{}] failed to issue PeriodicUpdate after broadcast: {}", + self.get_source_addr_str(), + e + ); } Ok(json!(txid)) } @@ -455,7 +500,8 @@ impl Connection { Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}), Err(e) => { warn!( - "rpc #{} {} {:?} failed: {}", + "[{}] rpc #{} {} {:?} failed: {}", + self.get_source_addr_str(), id, method, params, @@ -512,16 +558,48 @@ impl Connection { Ok(()) } - fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> { + fn get_source_addr_str(&self) -> String { + self.get_source_addr() + .map(|s| s.to_string()) + .unwrap_or_else(|| self.stream.addr_string()) + } + + /// This will only check the PROXY protocol once + /// and store the result in the first Option. + /// Some(None) means "we checked, but there was no address" + /// The inner option is returned as a Copy. + fn get_source_addr(&self) -> Option { + // Option is Copy + if let Some(v) = self.proxy_proto_addr.get() { + v + } else { + let v = self + .reader + .borrow_mut() + .as_mut() + .and_then(|r| r.fill_buf().ok()) + .and_then(|mut available| { + parse_proxy_headers(&mut available, self.electrum_proxy_depth).0 + }) + .map(|addr| { + trace!("RPC Received PROXY Protocol address: {}", addr); + addr + }); + self.proxy_proto_addr.set(Some(v)); + v + } + } + + fn handle_replies(&mut self) -> Result<()> { let empty_params = json!([]); + let addr_str = self.get_source_addr_str(); loop { crossbeam_channel::select! { - recv(self.chan.receiver()) -> msg => { - let msg = msg.chain_err(|| "channel closed")?; - trace!("RPC {:?}", msg); + recv(self.message_chan.receiver().chain_err(|| format!("[{addr_str}] channel closed"))?) -> msg => { + let msg = msg.chain_err(|| format!("[{addr_str}] channel closed"))?; + trace!("RPC [{addr_str}] {:?}", msg); match msg { - Message::Request(line) => { - let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?; + Message::Request(cmd) => { let reply = match ( cmd.get("method"), cmd.get("params").unwrap_or(&empty_params), @@ -530,24 +608,24 @@ impl Connection { (Some(Value::String(method)), Value::Array(params), Some(id)) => { self.handle_command(method, params, id)? } - _ => bail!("invalid command: {}", cmd), + _ => bail!("[{addr_str}] invalid command: {}", cmd), }; self.send_values(&[reply])? } Message::PeriodicUpdate => { let values = self .update_subscriptions() - .chain_err(|| "failed to update subscriptions")?; + .chain_err(|| format!("[{addr_str}] failed to update subscriptions"))?; self.send_values(&values)? } Message::Done => { - self.chan.close(); + self.message_chan.close(); return Ok(()); } } } - recv(shutdown) -> _ => { - self.chan.close(); + recv(self.shutdown_replies) -> _ => { + self.message_chan.close(); return Ok(()); } } @@ -556,29 +634,32 @@ impl Connection { fn handle_requests( mut reader: BufReader, - tx: crossbeam_channel::Sender, + tx: &crossbeam_channel::Sender, + addr_str: &str, ) -> Result<()> { loop { - let mut line = Vec::::new(); - reader - .read_until(b'\n', &mut line) - .chain_err(|| "failed to read a request")?; - if line.is_empty() { - tx.send(Message::Done).chain_err(|| "channel closed")?; + let mut recv_data = Vec::::new(); + match read_until(&mut reader, b'\n', &mut recv_data) { + Ok(bytes) => trace!("[{addr_str}] Read {bytes} bytes from connection"), + Err(e) => bail!("[{addr_str}] Failed to read: {}", e), + } + if recv_data.is_empty() { return Ok(()); } else { - if line.starts_with(&[22, 3, 1]) { + if recv_data.starts_with(&[22, 3, 1]) { // (very) naive SSL handshake detection - let _ = tx.send(Message::Done); - bail!("invalid request - maybe SSL-encrypted data?: {:?}", line) + bail!( + "[{addr_str}] invalid request - maybe SSL-encrypted data?: {:?}", + recv_data + ) } - match String::from_utf8(line) { + match serde_json::from_slice(&recv_data) { Ok(req) => tx .send(Message::Request(req)) - .chain_err(|| "channel closed")?, + .chain_err(|| format!("[{}] channel closed", addr_str))?, Err(err) => { let _ = tx.send(Message::Done); - bail!("invalid UTF8: {}", err) + bail!("[{}] invalid UTF8: {}", addr_str, err) } } } @@ -587,29 +668,36 @@ impl Connection { pub fn run(mut self) { self.stats.clients.inc(); - let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); - let tx = self.chan.sender(); - - let die_please = self.die_please.take().unwrap(); - let (reply_killer, reply_receiver) = crossbeam_channel::unbounded(); - - // We create a clone of the stream and put it in an Arc - // This will drop at the end of the function. - let arc_stream = Arc::new(self.stream.try_clone().expect("failed to clone TcpStream")); - // We don't want to keep the stream alive until SIGINT - // It should drop (close) no matter what. - let maybe_stream = Arc::downgrade(&arc_stream); - spawn_thread("properly-die", move || { - let _ = die_please.recv(); - let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both)); - let _ = reply_killer.send(()); + let reader = self.reader.take().unwrap(); + let tx = self.message_chan.sender(); + + let rpc_addr = self.get_source_addr(); + let addr_str = self.get_source_addr_str(); + let shutdown_send = self.shutdown_send.clone(); + let child = spawn_thread("reader", move || { + let addr_str = rpc_addr + .map(|a| a.to_string()) + .unwrap_or_else(|| reader.get_ref().addr_string()); + let result = + std::panic::catch_unwind(|| Connection::handle_requests(reader, &tx, &addr_str)) + .unwrap_or_else(|e| { + Err(format!( + "[{}] RPC Panic in request handler: {}", + addr_str, + parse_panic_error(&e) + ) + .into()) + }); + // This shuts down the other graceful shutdown thread, + // which also shuts down the handle_replies loop + // regardless of panic, Err, or Ok + let _ = shutdown_send.send(()); + result }); - - let child = spawn_thread("reader", || Connection::handle_requests(reader, tx)); - if let Err(e) = self.handle_replies(reply_receiver) { + if let Err(e) = self.handle_replies() { error!( "[{}] connection handling failed: {}", - self.stream.addr_string(), + addr_str, e.display_chain().to_string() ); } @@ -618,13 +706,11 @@ impl Connection { .subscriptions .sub(self.status_hashes.len() as i64); - let addr = self.stream.addr_string(); - debug!("[{}] shutting down connection", addr); - // Drop the Arc so that the stream properly closes. - drop(arc_stream); + debug!("[{}] shutting down connection", addr_str); let _ = self.stream.shutdown(Shutdown::Both); - if let Err(err) = child.join().expect("receiver panicked") { - error!("[{}] receiver failed: {}", addr, err); + self.message_chan.close(); + if let Err(err) = child.join().expect("receiver can't panic") { + error!("[{}] receiver failed: {}", addr_str, err); } } } @@ -651,7 +737,7 @@ struct GetHistoryResult { #[derive(Debug)] pub enum Message { - Request(String), + Request(Value), PeriodicUpdate, Done, } @@ -767,6 +853,7 @@ impl RPC { }); let txs_limit = config.electrum_txs_limit; + let electrum_proxy_depth = config.electrum_proxy_depth; RPC { notification: notification.sender(), @@ -788,7 +875,6 @@ impl RPC { let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded(); while let Some(stream) = acceptor.receiver().recv().unwrap() { - let addr = stream.addr_string(); // explicitely scope the shadowed variables for the new thread let query = Arc::clone(&query); let senders = Arc::clone(&senders); @@ -796,36 +882,38 @@ impl RPC { let garbage_sender = garbage_sender.clone(); // Kill the peers properly - let (killer, peace_receiver) = std::sync::mpsc::channel(); - let killer_clone = killer.clone(); + let shutdown_channel = SyncChannel::new(1); + let shutdown_sender = shutdown_channel.sender(); #[cfg(feature = "electrum-discovery")] let discovery = discovery.clone(); let spawned = spawn_thread("peer", move || { - let addr = stream.addr_string(); - info!("[{}] connected peer", addr); + let shutdown_sender = shutdown_channel.sender(); + info!("connected peer. waiting for first request..."); let conn = Connection::new( query, stream, stats, - txs_limit, - peace_receiver, + (txs_limit, electrum_proxy_depth), + shutdown_channel, #[cfg(feature = "electrum-discovery")] discovery, ); - senders.lock().unwrap().push(conn.chan.sender()); + let addr = conn.get_source_addr_str(); + info!("[{}] connected peer", addr); + senders.lock().unwrap().push(conn.message_chan.sender()); conn.run(); info!("[{}] disconnected peer", addr); - let _ = killer_clone.send(()); + let _ = shutdown_sender.send(()); let _ = garbage_sender.send(std::thread::current().id()); }); - trace!("[{}] spawned {:?}", addr, spawned.thread().id()); - threads.insert(spawned.thread().id(), (spawned, killer)); + trace!("spawned {:?}", spawned.thread().id()); + threads.insert(spawned.thread().id(), (spawned, shutdown_sender)); while let Ok(id) = garbage_receiver.try_recv() { if let Some((thread, killer)) = threads.remove(&id) { - trace!("[{}] joining {:?}", addr, id); + trace!("joining {:?}", id); let _ = killer.send(()); if let Err(error) = thread.join() { error!("failed to join {:?}: {:?}", id, error); @@ -989,7 +1077,7 @@ impl ConnectionStream { fn addr_string(&self) -> String { match self { ConnectionStream::Tcp(_, a) => format!("{a}"), - ConnectionStream::Unix(_, a, _) => format!("{a:?}"), + ConnectionStream::Unix(_, _, _) => "(Unix socket)".to_string(), } } @@ -1047,3 +1135,159 @@ impl Read for ConnectionStream { } } } + +/// This is a slightly modified version of read_until from standard library BufRead trait. +/// After every read we check if there's a PROXY protocol header at the beginning of the read. +fn read_until( + r: &mut BufReader, + delim: u8, + buf: &mut Vec, +) -> std::io::Result { + let mut read = 0; + let mut carry_over_arr = [0_u8; 256]; + let mut carrying_over = 0; + loop { + let (done, used) = { + let mut available = match r.fill_buf() { + Ok(n) => n, + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + }; + + let skipped_count = if carrying_over > 0 { + process_carry_over(&mut carrying_over, &mut carry_over_arr, &mut available) + } else { + // Added to read_until impl: Try parsing PROXY headers after every read. + let (_, skipped_count) = parse_proxy_headers(&mut available, 0); + skipped_count + }; + + match memchr::memchr(delim, available) { + Some(i) => { + buf.extend_from_slice(&available[..=i]); + (true, i + 1 + skipped_count) + } + None => { + // Added: carry over + carrying_over = available.len(); + carry_over_arr[..carrying_over].copy_from_slice(available); + + buf.extend_from_slice(available); + (false, available.len() + skipped_count) + } + } + }; + r.consume(used); + read += used; + if done || used == 0 { + return Ok(read); + } + } +} + +fn proxy_header_to_source_socket_addr(p_header: ProxyHeader) -> Option { + match p_header { + ProxyHeader::Version1 { + addresses: version1::ProxyAddresses::Ipv4 { source, .. }, + } => Some(SocketAddr::V4(source)), + ProxyHeader::Version1 { + addresses: version1::ProxyAddresses::Ipv6 { source, .. }, + } => Some(SocketAddr::V6(source)), + ProxyHeader::Version2 { + addresses: version2::ProxyAddresses::Ipv4 { source, .. }, + .. + } => Some(SocketAddr::V4(source)), + ProxyHeader::Version2 { + addresses: version2::ProxyAddresses::Ipv6 { source, .. }, + .. + } => Some(SocketAddr::V6(source)), + _ => None, + } +} + +fn parse_proxy_headers( + buf: &mut &[u8], + electrum_proxy_depth: usize, +) -> (Option, usize) { + trace!("Starting parse PROXY headers: {:?}", &buf[..12]); + let mut addr = None; + let mut current_header_index = 0; + let before_len = buf.len(); + // The last header is the outer-most proxy + // Warning do not early return. ONLY break the loop. + loop { + let p_header = match proxy_protocol::parse(buf) { + Ok(h) => h, + Err(proxy_protocol::ParseError::NotProxyHeader) => break, + // This won't move the buf cursor forward + // and will most likely end in an error higher in the call stack + // This means "PROXY protocol was used, but it was in an unknown format" + // (Maybe someday if nginx/etc. uses a new version of the protocol + // and we don't update the dependency to a version that handles the new + // version, it might break.) + Err(_) => break, + }; + trace!("Parsed PROXY protocol header: {:?}", p_header); + // Increment from 0 to 1 before the first check + current_header_index += 1; + // 0 should always continue + // 1 should only get the 1st header's IP address etc. + if current_header_index != electrum_proxy_depth { + continue; + } + // The address is only attempted to be + // parsed when the 1 based index is equal + addr = proxy_header_to_source_socket_addr(p_header); + } + (addr, before_len - buf.len()) +} + +fn parse_panic_error(e: &(dyn std::any::Any + Send)) -> &str { + if let Some(s) = e.downcast_ref::<&str>() { + s + } else if let Some(s) = e.downcast_ref::() { + s + } else { + "Unknown panic" + } +} + +/// The goal of this function is to take the carried over bytes from the last loop +/// and connect them with the first bytes of the next read, then check if it's a header. +/// This is to prevent headers from straddling the BufReader's buffer end. +/// A simple static array should be quick and easy. +fn process_carry_over( + carrying_over: &mut usize, + carry_over: &mut [u8], + available: &mut &[u8], +) -> usize { + // How much space do we have left in the array? + let empty_space = carry_over.len() - *carrying_over; + // How many bytes should we copy over? + let copy_bytes = available.len().min(empty_space); + // Copy over the bytes to join with the carried over bytes + carry_over[*carrying_over..*carrying_over + copy_bytes] + .copy_from_slice(&available[..copy_bytes]); + + // Figure out if it was a proxy header or not. + #[allow(clippy::redundant_slicing)] + let mut cursor = &carry_over[..]; + let before_len = cursor.len(); + let was_proxy = proxy_protocol::parse(&mut cursor).is_ok(); + let skipped_count = before_len - cursor.len(); + + let skip_count = if was_proxy { + // We only want to skip the amount in the new buffer + skipped_count.saturating_sub(*carrying_over) + } else { + 0 + }; + + // Move the available cursor + *available = &available[skip_count..]; + // Reset carrying over (writing 0s to the array is unnecessary) + *carrying_over = 0; + + // Return the skip count so we can call consume later + skip_count +} diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index ec4c2cb6..6d06ce2e 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -74,7 +74,7 @@ fn bitcoind_fetcher( let chan = SyncChannel::new(1); let sender = chan.sender(); Ok(Fetcher::from( - chan.into_receiver(), + chan.into_receiver().expect("not closed"), spawn_thread("bitcoind_fetcher", move || { for entries in new_headers.chunks(100) { let blockhashes: Vec = entries.iter().map(|e| *e.hash()).collect(); @@ -115,7 +115,7 @@ fn blkfiles_fetcher( let parser = blkfiles_parser(blkfiles_reader(blk_files), magic); Ok(Fetcher::from( - chan.into_receiver(), + chan.into_receiver().expect("not closed"), spawn_thread("blkfiles_fetcher", move || { parser.map(|sizedblocks| { let block_entries: Vec = sizedblocks @@ -151,7 +151,7 @@ fn blkfiles_reader(blk_files: Vec) -> Fetcher> { let sender = chan.sender(); Fetcher::from( - chan.into_receiver(), + chan.into_receiver().expect("not closed"), spawn_thread("blkfiles_reader", move || { for path in blk_files { trace!("reading {:?}", path); @@ -170,7 +170,7 @@ fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher) -> Option { + headers + .get_all("X-Forwarded-For") + .iter() + .filter_map(|v| v.to_str().ok()) + .join(",") + .split(',') + .filter_map(|ip| ip.trim().parse::().ok()) + .next() +} + +fn get_client_ip_str() -> String { + get_rest_addr() + .map(|a| a.to_string()) + .unwrap_or_else(|| String::from("Unknown IP")) +} + #[tokio::main] async fn run_server(config: Arc, query: Arc, rx: oneshot::Receiver<()>) { let addr = &config.http_addr; @@ -565,14 +588,18 @@ async fn run_server(config: Arc, query: Arc, rx: oneshot::Receive let query = Arc::clone(&query); let config = Arc::clone(&config); - async move { + REST_CLIENT_ADDR.scope(Cell::new(None), async move { let method = req.method().clone(); let uri = req.uri().clone(); + + // Set the task local IP addr from X-Forwarded-For + set_rest_addr(get_client_ip(req.headers())); + let body = hyper::body::to_bytes(req.into_body()).await?; let mut resp = handle_request(method, uri, body, &query, &config) .unwrap_or_else(|err| { - warn!("{:?}", err); + warn!("[{}] {:?}", get_client_ip_str(), err); Response::builder() .status(err.0) .header("Content-Type", "text/plain") @@ -585,7 +612,7 @@ async fn run_server(config: Arc, query: Arc, rx: oneshot::Receive .insert("Access-Control-Allow-Origin", origins.parse().unwrap()); } Ok::<_, hyper::Error>(resp) - } + }) })) } }; @@ -669,7 +696,7 @@ fn handle_request( None => HashMap::new(), }; - info!("handle {:?} {:?}", method, uri); + info!("[{}] handle {:?} {:?}", get_client_ip_str(), method, uri); match ( &method, path.first(), diff --git a/src/util/mod.rs b/src/util/mod.rs index 22775911..bb55706c 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -14,6 +14,7 @@ pub use self::transaction::{ sigops::transaction_sigop_count, TransactionStatus, TxInput, }; +use std::cell::Cell; use std::collections::HashMap; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -38,29 +39,26 @@ pub fn full_hash(hash: &[u8]) -> FullHash { } pub struct SyncChannel { - tx: Option>, + tx: crossbeam_channel::Sender, rx: Option>, } impl SyncChannel { pub fn new(size: usize) -> SyncChannel { let (tx, rx) = crossbeam_channel::bounded(size); - SyncChannel { - tx: Some(tx), - rx: Some(rx), - } + SyncChannel { tx, rx: Some(rx) } } pub fn sender(&self) -> crossbeam_channel::Sender { - self.tx.as_ref().expect("No Sender").clone() + self.tx.clone() } - pub fn receiver(&self) -> &crossbeam_channel::Receiver { - self.rx.as_ref().expect("No Receiver") + pub fn receiver(&self) -> Option<&crossbeam_channel::Receiver> { + self.rx.as_ref() } - pub fn into_receiver(self) -> crossbeam_channel::Receiver { - self.rx.expect("No Receiver") + pub fn into_receiver(self) -> Option> { + self.rx } /// This drops the sender and receiver, causing all other methods to panic. @@ -68,7 +66,6 @@ impl SyncChannel { /// Use only when you know that the channel will no longer be used. /// ie. shutdown. pub fn close(&mut self) -> Option> { - self.tx.take(); self.rx.take() } } @@ -225,3 +222,15 @@ pub mod serde_hex { } } } + +pub(crate) fn get_rest_addr() -> Option { + REST_CLIENT_ADDR.with(|addr| addr.get()) +} +pub(crate) fn set_rest_addr(input: Option) { + REST_CLIENT_ADDR.with(|addr| { + addr.set(input); + }); +} +tokio::task_local! { + pub(crate) static REST_CLIENT_ADDR: Cell>; +} diff --git a/start b/start index 1305a865..f3ce55f7 100755 --- a/start +++ b/start @@ -8,6 +8,18 @@ DB_FOLDER=/electrs NODENAME=$(hostname|cut -d . -f1) LOCATION=$(hostname|cut -d . -f2) +# since we know that our nginx will always be the first +# (closest) proxy from electrs, we set it to 1. +# If some servers prefer to disable PROXY protocol, +# set to 0. If the client themselves is proxying the +# electrum RPCs from other clients, this should be set to +# the hop that you want to trust to source IP addresses. +# If that hop doesn't exist, TcpStream's local_addr or +# "Unix socket" is used, and the PROXY headers are discarded. +# Image: +# [electrs] <> [proxy "1"] <> [proxy "2"] <> [electrum client] +PROXY_PROTOCOL_DEPTH=1 + # load rust if necessary if [ -e "${HOME}/.cargo/env" ];then source "${HOME}/.cargo/env" @@ -136,6 +148,7 @@ do --address-search \ --utxos-limit "${UTXOS_LIMIT}" \ --electrum-txs-limit "${ELECTRUM_TXS_LIMIT}" \ + --electrum-proxy-depth "${PROXY_PROTOCOL_DEPTH}" \ -vvv sleep 1 done