Skip to content

Commit

Permalink
Added electrum-proxy-depth command arg
Browse files Browse the repository at this point in the history
  • Loading branch information
junderw committed Oct 7, 2023
1 parent 54d054b commit be60629
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 21 deletions.
16 changes: 16 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct Config {
pub daemon_rpc_addr: SocketAddr,
pub cookie: Option<String>,
pub electrum_rpc_addr: SocketAddr,
pub electrum_proxy_depth: usize,
pub http_addr: SocketAddr,
pub http_socket_file: Option<PathBuf>,
pub rpc_socket_file: Option<PathBuf>,
Expand Down Expand Up @@ -139,6 +140,15 @@ impl Config {
.help("Electrum server JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet and '127.0.0.1:60401' for regtest)")
.takes_value(true),
)
.arg(
Arg::with_name("electrum_proxy_depth")
.long("electrum-proxy-depth")
.help("Electrum server's PROXY protocol header depth. \
ie. a value of 2 means the 2nd closest hop's PROXY header \
will be used to find the source IP. A value of 0 means all \
IPs are ignored. (default: 0)")
.takes_value(true),
)
.arg(
Arg::with_name("http_addr")
.long("http-addr")
Expand Down Expand Up @@ -393,6 +403,11 @@ impl Config {
.unwrap_or(&format!("127.0.0.1:{}", default_electrum_port)),
"Electrum RPC",
);
let electrum_proxy_depth = m
.value_of("electrum_proxy_depth")
.unwrap_or("0")
.parse::<usize>()
.expect("invalid electrum_proxy_depth");
let http_addr: SocketAddr = str_to_socketaddr(
m.value_of("http_addr")
.unwrap_or(&format!("127.0.0.1:{}", default_http_port)),
Expand Down Expand Up @@ -465,6 +480,7 @@ impl Config {
cookie,
utxos_limit: value_t_or_exit!(m, "utxos_limit", usize),
electrum_rpc_addr,
electrum_proxy_depth,
electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize),
electrum_banner,
http_addr,
Expand Down
55 changes: 34 additions & 21 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ fn get_status_hash(txs: Vec<(Txid, Option<BlockId>)>, query: &Query) -> Option<F

struct Connection {
proxy_proto_addr: Cell<Option<Option<SocketAddr>>>,
electrum_proxy_depth: usize,
// Chain info related
query: Arc<Query>,
last_header_entry: Option<HeaderEntry>,
Expand Down Expand Up @@ -133,9 +134,8 @@ impl Connection {
query: Arc<Query>,
stream: ConnectionStream,
stats: Arc<Stats>,
txs_limit: usize,
shutdown_send: Sender<()>,
shutdown_recv: Receiver<()>,
(txs_limit, electrum_proxy_depth): (usize, usize),
(shutdown_send, shutdown_recv): (Sender<()>, Receiver<()>),
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
) -> Connection {
let (reply_killer, shutdown_replies) = crossbeam_channel::unbounded();
Expand All @@ -153,6 +153,7 @@ impl Connection {
});
let ret = Connection {
proxy_proto_addr: Cell::new(None),
electrum_proxy_depth,
query,
last_header_entry: None, // disable header subscription for now
status_hashes: HashMap::new(),
Expand Down Expand Up @@ -576,7 +577,9 @@ impl Connection {
.borrow_mut()
.as_mut()
.and_then(|r| r.fill_buf().ok())
.and_then(|mut available| parse_proxy_headers(&mut available))
.and_then(|mut available| {
parse_proxy_headers(&mut available, self.electrum_proxy_depth)
})
.map(|addr| {
trace!("RPC Received PROXY Protocol address: {}", addr);
addr
Expand Down Expand Up @@ -632,6 +635,7 @@ impl Connection {
mut reader: BufReader<ConnectionStream>,
tx: &crossbeam_channel::Sender<Message>,
addr: Option<SocketAddr>,
electrum_proxy_depth: usize,
) -> Result<()> {
let addr_str = addr
.map(|a| a.to_string())
Expand Down Expand Up @@ -671,7 +675,7 @@ impl Connection {

// Step 1: Parse PROXY header if present and move the slice
// cursor over to the next byte.
let new_addr = parse_proxy_headers(&mut slice);
let new_addr = parse_proxy_headers(&mut slice, electrum_proxy_depth);
if new_addr != addr {
trace!("PROXY addresses don't match between requests: Expected {addr:?} Got {new_addr:?}");
}
Expand Down Expand Up @@ -705,15 +709,14 @@ impl Connection {
let rpc_addr = self.get_source_addr();
let addr_str = self.get_source_addr_str();
let shutdown_send = self.shutdown_send.clone();
let electrum_proxy_depth = self.electrum_proxy_depth;
let child = spawn_thread("reader", move || {
let result =
std::panic::catch_unwind(|| Connection::handle_requests(reader, &tx, rpc_addr))
.unwrap_or_else(|e| {
Err(
format!("RPC Panic in request handler: {}", parse_panic_error(&e))
.into(),
)
});
let result = std::panic::catch_unwind(|| {
Connection::handle_requests(reader, &tx, rpc_addr, electrum_proxy_depth)
})
.unwrap_or_else(|e| {
Err(format!("RPC Panic in request handler: {}", parse_panic_error(&e)).into())
});
// This shuts down the other graceful shutdown thread,
// which also shuts down the handle_replies loop
// regardless of panic, Err, or Ok
Expand Down Expand Up @@ -878,6 +881,7 @@ impl RPC {
});

let txs_limit = config.electrum_txs_limit;
let electrum_proxy_depth = config.electrum_proxy_depth;

RPC {
notification: notification.sender(),
Expand Down Expand Up @@ -906,8 +910,8 @@ impl RPC {
let garbage_sender = garbage_sender.clone();

// Kill the peers properly
let (killer, peace_receiver) = std::sync::mpsc::channel();
let killer_clone = killer.clone();
let (shutdown_sender, shutdown_receiver) = std::sync::mpsc::channel();
let shutdown_sender_clone = shutdown_sender.clone();

#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();
Expand All @@ -918,9 +922,8 @@ impl RPC {
query,
stream,
stats,
txs_limit,
killer_clone.clone(),
peace_receiver,
(txs_limit, electrum_proxy_depth),
(shutdown_sender_clone.clone(), shutdown_receiver),
#[cfg(feature = "electrum-discovery")]
discovery,
);
Expand All @@ -929,12 +932,12 @@ impl RPC {
senders.lock().unwrap().push(conn.message_chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
let _ = killer_clone.send(());
let _ = shutdown_sender_clone.send(());
let _ = garbage_sender.send(std::thread::current().id());
});

trace!("spawned {:?}", spawned.thread().id());
threads.insert(spawned.thread().id(), (spawned, killer));
threads.insert(spawned.thread().id(), (spawned, shutdown_sender));
while let Ok(id) = garbage_receiver.try_recv() {
if let Some((thread, killer)) = threads.remove(&id) {
trace!("joining {:?}", id);
Expand Down Expand Up @@ -1256,9 +1259,10 @@ fn proxy_header_to_source_socket_addr(p_header: ProxyHeader) -> Option<SocketAdd
}
}

fn parse_proxy_headers(buf: &mut &[u8]) -> Option<SocketAddr> {
fn parse_proxy_headers(buf: &mut &[u8], electrum_proxy_depth: usize) -> Option<SocketAddr> {
trace!("Starting parse PROXY headers: {:?}", &buf[..12]);
let mut addr = None;
let mut current_header_index = 0;
// The last header is the outer-most proxy
loop {
let p_header = match proxy_protocol::parse(buf) {
Expand All @@ -1273,6 +1277,15 @@ fn parse_proxy_headers(buf: &mut &[u8]) -> Option<SocketAddr> {
Err(_) => return None,
};
trace!("Parsed PROXY protocol header: {:?}", p_header);
// Increment from 0 to 1 before the first check
current_header_index += 1;
// 0 should always continue
// 1 should only get the 1st header's IP address etc.
if current_header_index != electrum_proxy_depth {
continue;
}
// The address is only attempted to be
// parsed when the 1 based index is equal
addr = proxy_header_to_source_socket_addr(p_header);
}
addr
Expand Down
13 changes: 13 additions & 0 deletions start
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ DB_FOLDER=/electrs
NODENAME=$(hostname|cut -d . -f1)
LOCATION=$(hostname|cut -d . -f2)

# since we know that our nginx will always be the first
# (closest) proxy from electrs, we set it to 1.
# If some servers prefer to disable PROXY protocol,
# set to 0. If the client themselves is proxying the
# electrum RPCs from other clients, this should be set to
# the hop that you want to trust to source IP addresses.
# If that hop doesn't exist, TcpStream's local_addr or
# "Unix socket" is used, and the PROXY headers are discarded.
# Image:
# [electrs] <> [proxy "1"] <> [proxy "2"] <> [electrum client]
PROXY_PROTOCOL_DEPTH=1

# load rust if necessary
if [ -e "${HOME}/.cargo/env" ];then
source "${HOME}/.cargo/env"
Expand Down Expand Up @@ -136,6 +148,7 @@ do
--address-search \
--utxos-limit "${UTXOS_LIMIT}" \
--electrum-txs-limit "${ELECTRUM_TXS_LIMIT}" \
--electrum-proxy-depth "${PROXY_PROTOCOL_DEPTH}" \
-vvv
sleep 1
done

0 comments on commit be60629

Please sign in to comment.