Skip to content

Commit

Permalink
fix: batcher timeout to prevent simple DoS (#1485)
Browse files Browse the repository at this point in the history
Co-authored-by: Mauro Toscano <[email protected]>
  • Loading branch information
avilagaston9 and MauroToscano authored Nov 26, 2024
1 parent 8644746 commit d57c028
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 11 deletions.
72 changes: 61 additions & 11 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ use retry::batcher_retryables::{
get_user_nonce_from_ethereum_retryable, user_balance_is_unlocked_retryable,
};
use retry::{retry_function, RetryError};
use tokio::time::timeout;
use types::batch_state::BatchState;
use types::user_state::UserState;

use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use aligned_sdk::core::constants::{
ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF, AGGREGATOR_GAS_COST, BUMP_BACKOFF_FACTOR,
BUMP_MAX_RETRIES, BUMP_MAX_RETRY_DELAY, BUMP_MIN_RETRY_DELAY, CONSTANT_GAS_COST,
DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER, DEFAULT_MAX_FEE_PER_PROOF,
BUMP_MAX_RETRIES, BUMP_MAX_RETRY_DELAY, BUMP_MIN_RETRY_DELAY, CONNECTION_TIMEOUT,
CONSTANT_GAS_COST, DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER, DEFAULT_MAX_FEE_PER_PROOF,
ETHEREUM_CALL_BACKOFF_FACTOR, ETHEREUM_CALL_MAX_RETRIES, ETHEREUM_CALL_MAX_RETRY_DELAY,
ETHEREUM_CALL_MIN_RETRY_DELAY, GAS_PRICE_PERCENTAGE_MULTIPLIER, PERCENTAGE_DIVIDER,
RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER,
Expand Down Expand Up @@ -265,13 +267,19 @@ impl Batcher {
.map_err(|e| BatcherError::TcpListenerError(e.to_string()))?;
info!("Listening on: {}", address);

// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
self.metrics.open_connections.inc();
let batcher = self.clone();
tokio::spawn(batcher.handle_connection(stream, addr));
loop {
match listener.accept().await {
Ok((stream, addr)) => {
let batcher = self.clone();
// Let's spawn the handling of each connection in a separate task.
tokio::spawn(batcher.handle_connection(stream, addr));
}
Err(e) => {
self.metrics.user_error(&["connection_accept_error", ""]);
error!("Couldn't accept new connection: {}", e);
}
}
}
Ok(())
}

/// Listen for Ethereum new blocks.
Expand Down Expand Up @@ -360,7 +368,24 @@ impl Batcher {
addr: SocketAddr,
) -> Result<(), BatcherError> {
info!("Incoming TCP connection from: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
self.metrics.open_connections.inc();

let ws_stream_future = tokio_tungstenite::accept_async(raw_stream);
let ws_stream =
match timeout(Duration::from_secs(CONNECTION_TIMEOUT), ws_stream_future).await {
Ok(Ok(stream)) => stream,
Ok(Err(e)) => {
warn!("Error while establishing websocket connection: {}", e);
self.metrics.open_connections.dec();
return Ok(());
}
Err(e) => {
warn!("Error while establishing websocket connection: {}", e);
self.metrics.open_connections.dec();
self.metrics.user_error(&["user_timeout", ""]);
return Ok(());
}
};

debug!("WebSocket connection established: {}", addr);
let (outgoing, incoming) = ws_stream.split();
Expand All @@ -379,8 +404,33 @@ impl Batcher {
.send(Message::binary(serialized_protocol_version_msg))
.await?;

match incoming
.try_filter(|msg| future::ready(msg.is_binary()))
let mut incoming_filter = incoming.try_filter(|msg| future::ready(msg.is_binary()));
let future_msg = incoming_filter.try_next();

// timeout to prevent a DOS attack
match timeout(Duration::from_secs(CONNECTION_TIMEOUT), future_msg).await {
Ok(Ok(Some(msg))) => {
self.clone().handle_message(msg, outgoing.clone()).await?;
}
Err(elapsed) => {
warn!("[{}] {}", &addr, elapsed);
self.metrics.user_error(&["user_timeout", ""]);
self.metrics.open_connections.dec();
return Ok(());
}
Ok(Ok(None)) => {
info!("[{}] Connection closed by the other side", &addr);
self.metrics.open_connections.dec();
return Ok(());
}
Ok(Err(e)) => {
error!("Unexpected error: {}", e);
self.metrics.open_connections.dec();
return Ok(());
}
};

match incoming_filter
.try_for_each(|msg| self.clone().handle_message(msg, outgoing.clone()))
.await
{
Expand Down
1 change: 1 addition & 0 deletions batcher/aligned-sdk/src/core/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub const CONSTANT_GAS_COST: u128 =
+ BATCHER_SUBMISSION_BASE_GAS_COST;
pub const DEFAULT_MAX_FEE_PER_PROOF: u128 =
ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * 100_000_000_000; // gas_price = 100 Gwei = 0.0000001 ether (high gas price)
pub const CONNECTION_TIMEOUT: u64 = 5; // 5 secs

// % modifiers: (100% is x1, 10% is x0.1, 1000% is x10)
pub const RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER: u128 = 250; // fee_for_aggregator -> respondToTaskFeeLimit modifier
Expand Down

0 comments on commit d57c028

Please sign in to comment.