From 4680a622584bddb80d74d2544dc7949aff36d330 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= <88321181+rafal-ch@users.noreply.github.com> Date: Tue, 22 Oct 2024 09:44:36 +0200 Subject: [PATCH 1/8] Fix collection of tx pool insertion time metric (#2369) ## Description This PR fixes the collection of `transaction_insertion_time_in_thread_pool_milliseconds` metric. ### Before requesting review - [X] I have reviewed the code myself --- CHANGELOG.md | 1 + crates/metrics/src/txpool_metrics.rs | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4e728c2881..fede191b13b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed - [2366](https://github.com/FuelLabs/fuel-core/pull/2366): The `importer_gas_price_for_block` metric is properly collected. +- [2369](https://github.com/FuelLabs/fuel-core/pull/2369): The `transaction_insertion_time_in_thread_pool_milliseconds` metric is properly collected. ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). diff --git a/crates/metrics/src/txpool_metrics.rs b/crates/metrics/src/txpool_metrics.rs index c5eb612022c..2978825560f 100644 --- a/crates/metrics/src/txpool_metrics.rs +++ b/crates/metrics/src/txpool_metrics.rs @@ -91,7 +91,9 @@ impl Default for TxPoolMetrics { registry.register( "txpool_insert_transaction_time_milliseconds", "The time it took to insert a transaction in the txpool in milliseconds", - metrics.select_transaction_time_nanoseconds.clone(), + metrics + .transaction_insertion_time_in_thread_pool_milliseconds + .clone(), ); metrics From bb6a42ccbbc0e045508c24bda462c3ef5e98c414 Mon Sep 17 00:00:00 2001 From: Andrea Cerone <22031682+acerone85@users.noreply.github.com> Date: Tue, 22 Oct 2024 13:46:42 +0100 Subject: [PATCH 2/8] Add versioning to request response protocols (#2362) ## Linked Issues/PRs Related issue: #1311 ## Context The P2P service uses the [request_response](https://docs.rs/libp2p-request-response/latest/libp2p_request_response/) protocol to let peer request information (e.g. block headers, or transactions in the tx pool) from other peers. Currently, the P2P service provides a protocol `/fuel/req_res/0.0.1`, which defines responses to have an `Option` value. When a request from a peer is unsuccessful, we send a `None` value as a response, wrapped around the relevant response type. In practice, we want to be able to send more meaningful information in responses when requests are unsuccessful, in the form of error codes. This requires implementing a new version of the protocol `/fuel/req_res/0.0.2` where Response messages are changed to contain error codes. Because we want to be backward compatible with peers that will only use the version `0.0.1` of the protocol, we must account for requests sent by, and responses sent to, those peers. ## Description - Rename the `ResponseMessage` type in the P2P service to `LegacyResponseMessage` - Add a `ResponseMessageErrorCode` enum, which for now only contains a `LegacyNodeEmptyResponse` enum. - Implement a new `ResponseMessage` type that returns a `Result<_, ResponseMessageErrorCode>` wrapped around enum variants - Change P2P service to use the neq `ResponseMessage` type. Responses previously returned as `None` are now returned as `Err`. - Change the `PostcardCodec` to serialize and deserialize according to two different versions of the `/fuel/req_res` protocol: `/fuel/req_res/0.0.1` and `/fuel/req_res/0.0.2`. - Serialization and deserialization of responses using `/fuel/req_res/0.0.1` are performed by first converting a `ResponseMessage` into a `LegacyResponseMessage` and vicecersa, thus preserving backward-compatibility - Change the req_res behaviour used by the `P2P` service to use both version of the req_res protocol. ### TODO (here or in different PRs) - [ ] Investigate how protocols are exchanged during peer handshakes. We don't want two nodes that both understand `/fuel/req_res/0.0.2` to communicate using `/fuel/req_res/0.0.1`. - [ ] Add more meaningful error codes. ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --- CHANGELOG.md | 1 + Cargo.lock | 2 + crates/fuel-core/src/p2p_test_helpers.rs | 4 +- crates/services/p2p/Cargo.toml | 2 + crates/services/p2p/src/behavior.rs | 15 +- crates/services/p2p/src/codecs.rs | 10 +- crates/services/p2p/src/codecs/postcard.rs | 233 ++++++++++++++++-- crates/services/p2p/src/p2p_service.rs | 38 +-- .../p2p/src/request_response/messages.rs | 67 ++++- crates/services/p2p/src/service.rs | 82 +++--- 10 files changed, 377 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fede191b13b..645237e21c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). +- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. ## [Version 0.40.0] diff --git a/Cargo.lock b/Cargo.lock index 7574ea76ac7..d4d274e7f71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3558,6 +3558,8 @@ dependencies = [ "serde", "serde_with", "sha2 0.10.8", + "strum 0.25.0", + "strum_macros 0.25.3", "thiserror", "tokio", "tracing", diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index fbc9f32afb4..3a5f6e78009 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -28,7 +28,7 @@ use fuel_core_p2p::{ p2p_service::FuelP2PEvent, request_response::messages::{ RequestMessage, - ResponseMessage, + V2ResponseMessage, }, service::to_message_acceptance, }; @@ -178,7 +178,7 @@ impl Bootstrap { if request_message == RequestMessage::TxPoolAllTransactionsIds { let _ = bootstrap.send_response_msg( request_id, - ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])), + V2ResponseMessage::TxPoolAllTransactionsIds(Ok(vec![])), ); } } diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 5455e83c5f3..09662d26311 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -48,6 +48,8 @@ rayon = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_with = { workspace = true } sha2 = "0.10" +strum = { workspace = true } +strum_macros = { workspace = true } thiserror = "1.0.47" tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 2b689eb3949..8fd6808e348 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -13,7 +13,7 @@ use crate::{ peer_report, request_response::messages::{ RequestMessage, - ResponseMessage, + V2ResponseMessage, }, }; use fuel_core_types::fuel_types::BlockHeight; @@ -112,15 +112,16 @@ impl FuelBehaviour { BlockHeight::default(), ); - let req_res_protocol = - core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full)); + let req_res_protocol = codec + .get_req_res_protocols() + .map(|protocol| (protocol, ProtocolSupport::Full)); let req_res_config = request_response::Config::default() .with_request_timeout(p2p_config.set_request_timeout) .with_max_concurrent_streams(p2p_config.max_concurrent_streams); let request_response = request_response::Behaviour::with_codec( - codec, + codec.clone(), req_res_protocol, req_res_config, ); @@ -165,9 +166,9 @@ impl FuelBehaviour { pub fn send_response_msg( &mut self, - channel: ResponseChannel, - message: ResponseMessage, - ) -> Result<(), ResponseMessage> { + channel: ResponseChannel, + message: V2ResponseMessage, + ) -> Result<(), V2ResponseMessage> { self.request_response.send_response(channel, message) } diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index c22aacd5671..505cf40c9bf 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -8,7 +8,7 @@ use crate::{ }, request_response::messages::{ RequestMessage, - ResponseMessage, + V2ResponseMessage, }, }; use libp2p::request_response; @@ -28,18 +28,22 @@ pub trait GossipsubCodec { ) -> Result; } +// TODO: https://github.com/FuelLabs/fuel-core/issues/2368 +// Remove this trait /// Main Codec trait /// Needs to be implemented and provided to FuelBehaviour pub trait NetworkCodec: GossipsubCodec< RequestMessage = GossipsubBroadcastRequest, ResponseMessage = GossipsubMessage, - > + request_response::Codec + > + request_response::Codec + Clone + Send + 'static { /// Returns RequestResponse's Protocol /// Needed for initialization of RequestResponse Behaviour - fn get_req_res_protocol(&self) -> ::Protocol; + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol>; } diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 94f23cd6fd2..db3fe814e4f 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -10,8 +10,10 @@ use crate::{ }, request_response::messages::{ RequestMessage, - ResponseMessage, - REQUEST_RESPONSE_PROTOCOL_ID, + V1ResponseMessage, + V2ResponseMessage, + V1_REQUEST_RESPONSE_PROTOCOL_ID, + V2_REQUEST_RESPONSE_PROTOCOL_ID, }, }; use async_trait::async_trait; @@ -26,6 +28,8 @@ use serde::{ Serialize, }; use std::io; +use strum::IntoEnumIterator; +use strum_macros::EnumIter; /// Helper method for decoding data /// Reusable across `RequestResponseCodec` and `GossipsubCodec` @@ -69,13 +73,13 @@ impl PostcardCodec { /// run into a timeout waiting for the response. #[async_trait] impl request_response::Codec for PostcardCodec { - type Protocol = MessageExchangePostcardProtocol; + type Protocol = PostcardProtocol; type Request = RequestMessage; - type Response = ResponseMessage; + type Response = V2ResponseMessage; async fn read_request( &mut self, - _: &Self::Protocol, + _protocol: &Self::Protocol, socket: &mut T, ) -> io::Result where @@ -91,7 +95,7 @@ impl request_response::Codec for PostcardCodec { async fn read_response( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, socket: &mut T, ) -> io::Result where @@ -103,7 +107,13 @@ impl request_response::Codec for PostcardCodec { .read_to_end(&mut response) .await?; - deserialize(&response) + match protocol { + PostcardProtocol::V1 => { + let v1_response = deserialize::(&response)?; + Ok(v1_response.into()) + } + PostcardProtocol::V2 => deserialize::(&response), + } } async fn write_request( @@ -122,14 +132,20 @@ impl request_response::Codec for PostcardCodec { async fn write_response( &mut self, - _protocol: &Self::Protocol, + protocol: &Self::Protocol, socket: &mut T, res: Self::Response, ) -> io::Result<()> where T: futures::AsyncWrite + Unpin + Send, { - let encoded_data = serialize(&res)?; + let encoded_data = match protocol { + PostcardProtocol::V1 => { + let v1_response: V1ResponseMessage = res.into(); + serialize(&v1_response)? + } + PostcardProtocol::V2 => serialize(&res)?, + }; socket.write_all(&encoded_data).await?; Ok(()) } @@ -161,24 +177,44 @@ impl GossipsubCodec for PostcardCodec { } impl NetworkCodec for PostcardCodec { - fn get_req_res_protocol(&self) -> ::Protocol { - MessageExchangePostcardProtocol {} + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol> { + // TODO: Iterating over versions in reverse order should prefer + // peers to use V2 over V1 for exchanging messages. However, this is + // not guaranteed by the specs for the `request_response` protocol. + PostcardProtocol::iter().rev() } } -#[derive(Default, Debug, Clone)] -pub struct MessageExchangePostcardProtocol; +#[derive(Debug, Default, Clone, EnumIter)] +pub enum PostcardProtocol { + #[default] + V1, + V2, +} -impl AsRef for MessageExchangePostcardProtocol { +impl AsRef for PostcardProtocol { fn as_ref(&self) -> &str { - REQUEST_RESPONSE_PROTOCOL_ID + match self { + PostcardProtocol::V1 => V1_REQUEST_RESPONSE_PROTOCOL_ID, + PostcardProtocol::V2 => V2_REQUEST_RESPONSE_PROTOCOL_ID, + } } } #[cfg(test)] +#[allow(non_snake_case)] mod tests { + + use fuel_core_types::blockchain::SealedBlockHeader; + use request_response::Codec as _; + use super::*; - use crate::request_response::messages::MAX_REQUEST_SIZE; + use crate::request_response::messages::{ + ResponseMessageErrorCode, + MAX_REQUEST_SIZE, + }; #[test] fn test_request_size_fits() { @@ -186,4 +222,169 @@ mod tests { let m = RequestMessage::Transactions(arbitrary_range); assert!(postcard::to_stdvec(&m).unwrap().len() <= MAX_REQUEST_SIZE); } + + #[tokio::test] + async fn codec__serialization_roundtrip_using_v2_on_successful_response_returns_original_value( + ) { + // Given + let sealed_block_headers = vec![SealedBlockHeader::default()]; + let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V2, &mut buf, response) + .await + .expect("Valid Vec should be serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .await + .expect("Valid Vec should be deserialized using v1"); + + // Then + assert!(matches!( + deserialized, + V2ResponseMessage::SealedHeaders(Ok(sealed_headers)) if sealed_headers == sealed_block_headers + )); + } + + #[tokio::test] + async fn codec__serialization_roundtrip_using_v1_on_successful_response_returns_original_value( + ) { + // Given + let sealed_block_headers = vec![SealedBlockHeader::default()]; + let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V1, &mut buf, response) + .await + .expect("Valid Vec should be serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .await + .expect("Valid Vec should be deserialized using v1"); + + // Then + assert!( + matches!(deserialized, V2ResponseMessage::SealedHeaders(Ok(sealed_headers)) if sealed_headers == sealed_block_headers) + ); + } + + #[tokio::test] + async fn codec__serialization_roundtrip_using_v2_on_error_response_returns_original_value( + ) { + // Given + let response = V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V2, &mut buf, response.clone()) + .await + .expect("Valid Vec is serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .await + .expect("Valid Vec is deserialized using v1"); + + // Then + assert!(matches!( + deserialized, + V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse + )) + )); + } + + #[tokio::test] + async fn codec__serialzation_roundtrip_using_v1_on_error_response_returns_predefined_error_code( + ) { + // Given + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Change this to a different ResponseMessageErrorCode once these have been implemented. + let response = V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .await + .expect("Valid Vec is serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .await + .expect("Valid Vec is deserialized using v1"); + + // Then + assert!(matches!( + deserialized, + V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse + )) + )); + } + + #[tokio::test] + async fn codec__write_response_is_backwards_compatible_with_v1() { + // Given + let response = V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + + // When + codec + .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .await + .expect("Valid Vec is serialized using v1"); + + let deserialized_as_v1 = + // We cannot access the codec trait from an old node here, + // so we deserialize directly using the `V1ResponseMessage` type. + deserialize::(&buf).expect("Deserialization as V1ResponseMessage should succeed"); + + // Then + assert!(matches!( + deserialized_as_v1, + V1ResponseMessage::SealedHeaders(None) + )); + } + + #[tokio::test] + async fn codec__read_response_is_backwards_compatible_with_v1() { + // Given + let response = V1ResponseMessage::SealedHeaders(None); + let mut codec = PostcardCodec::new(1024); + + // When + let buf = serialize(&response) + .expect("Serialization as V1ResponseMessage should succeed"); + let deserialized = codec + .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .await + .expect("Valid Vec is deserialized using v1"); + + // Then + assert!(matches!( + deserialized, + V2ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse + )) + )); + } } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 024cc006785..76f6bdbf615 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -30,9 +30,9 @@ use crate::{ RequestError, RequestMessage, ResponseError, - ResponseMessage, ResponseSendError, ResponseSender, + V2ResponseMessage, }, TryPeerId, }; @@ -119,7 +119,7 @@ pub struct FuelP2PService { /// Whenever we're done processing the request, it's removed from this table, /// and the channel is used to send the result to libp2p, which will forward it /// to the peer that requested it. - inbound_requests_table: HashMap>, + inbound_requests_table: HashMap>, /// NetworkCodec used as `` for encoding and decoding of Gossipsub messages network_codec: PostcardCodec, @@ -428,7 +428,7 @@ impl FuelP2PService { pub fn send_response_msg( &mut self, request_id: InboundRequestId, - message: ResponseMessage, + message: V2ResponseMessage, ) -> Result<(), ResponseSendError> { let Some(channel) = self.inbound_requests_table.remove(&request_id) else { debug!("ResponseChannel for {:?} does not exist!", request_id); @@ -646,7 +646,7 @@ impl FuelP2PService { fn handle_request_response_event( &mut self, - event: request_response::Event, + event: request_response::Event, ) -> Option { match event { request_response::Event::Message { peer, message } => match message { @@ -674,8 +674,10 @@ impl FuelP2PService { let send_ok = match channel { ResponseSender::SealedHeaders(c) => match response { - ResponseMessage::SealedHeaders(v) => { - c.send((peer, Ok(v))).is_ok() + V2ResponseMessage::SealedHeaders(v) => { + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Change type of ResponseSender and remove the .ok() here + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -686,8 +688,8 @@ impl FuelP2PService { } }, ResponseSender::Transactions(c) => match response { - ResponseMessage::Transactions(v) => { - c.send((peer, Ok(v))).is_ok() + V2ResponseMessage::Transactions(v) => { + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -698,8 +700,8 @@ impl FuelP2PService { } }, ResponseSender::TxPoolAllTransactionsIds(c) => match response { - ResponseMessage::TxPoolAllTransactionsIds(v) => { - c.send((peer, Ok(v))).is_ok() + V2ResponseMessage::TxPoolAllTransactionsIds(v) => { + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -710,8 +712,8 @@ impl FuelP2PService { } }, ResponseSender::TxPoolFullTransactions(c) => match response { - ResponseMessage::TxPoolFullTransactions(v) => { - c.send((peer, Ok(v))).is_ok() + V2ResponseMessage::TxPoolFullTransactions(v) => { + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -847,8 +849,8 @@ mod tests { request_response::messages::{ RequestMessage, ResponseError, - ResponseMessage, ResponseSender, + V2ResponseMessage, }, service::to_message_acceptance, }; @@ -1778,16 +1780,16 @@ mod tests { RequestMessage::SealedHeaders(range) => { let sealed_headers: Vec<_> = arbitrary_headers_for_range(range.clone()); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::SealedHeaders(Ok(sealed_headers))); } RequestMessage::Transactions(_) => { let txs = (0..5).map(|_| Transaction::default_test_tx()).collect(); let transactions = vec![Transactions(txs)]; - let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(transactions))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::Transactions(Ok(transactions))); } RequestMessage::TxPoolAllTransactionsIds => { let tx_ids = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect(); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolAllTransactionsIds(Some(tx_ids))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::TxPoolAllTransactionsIds(Ok(tx_ids))); } RequestMessage::TxPoolFullTransactions(tx_ids) => { let txs = tx_ids.iter().enumerate().map(|(i, _)| { @@ -1797,7 +1799,7 @@ mod tests { Some(NetworkableTransactionPool::Transaction(Transaction::default_test_tx())) } }).collect(); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolFullTransactions(Some(txs))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::TxPoolFullTransactions(Ok(txs))); } } } @@ -1905,7 +1907,7 @@ mod tests { // 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator if let Some(FuelP2PEvent::InboundRequestMessage{ request_id, request_message: _ }) = &node_b_event { let sealed_headers: Vec<_> = arbitrary_headers_for_range(1..3); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, V2ResponseMessage::SealedHeaders(Ok(sealed_headers))); } tracing::info!("Node B Event: {:?}", node_b_event); diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 83f3f7a3a50..2a0e03ba2cd 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -18,7 +18,8 @@ use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; -pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; +pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; +pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2"; /// Max Size in Bytes of the Request Message #[cfg(test)] @@ -32,14 +33,76 @@ pub enum RequestMessage { TxPoolFullTransactions(Vec), } +#[derive(Error, Debug, Clone, Serialize, Deserialize)] +pub enum ResponseMessageErrorCode { + /// The peer sent an empty response using protocol `/fuel/req_res/0.0.1` + #[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")] + ProtocolV1EmptyResponse = 0, +} + #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ResponseMessage { +pub enum V1ResponseMessage { SealedHeaders(Option>), Transactions(Option>), TxPoolAllTransactionsIds(Option>), TxPoolFullTransactions(Option>>), } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum V2ResponseMessage { + SealedHeaders(Result, ResponseMessageErrorCode>), + Transactions(Result, ResponseMessageErrorCode>), + TxPoolAllTransactionsIds(Result, ResponseMessageErrorCode>), + TxPoolFullTransactions( + Result>, ResponseMessageErrorCode>, + ), +} + +impl From for V2ResponseMessage { + fn from(v1_response: V1ResponseMessage) -> Self { + match v1_response { + V1ResponseMessage::SealedHeaders(sealed_headers) => { + V2ResponseMessage::SealedHeaders( + sealed_headers + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + V1ResponseMessage::Transactions(vec) => V2ResponseMessage::Transactions( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ), + V1ResponseMessage::TxPoolAllTransactionsIds(vec) => { + V2ResponseMessage::TxPoolAllTransactionsIds( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + V1ResponseMessage::TxPoolFullTransactions(vec) => { + V2ResponseMessage::TxPoolFullTransactions( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + } + } +} + +impl From for V1ResponseMessage { + fn from(response: V2ResponseMessage) -> Self { + match response { + V2ResponseMessage::SealedHeaders(sealed_headers) => { + V1ResponseMessage::SealedHeaders(sealed_headers.ok()) + } + V2ResponseMessage::Transactions(transactions) => { + V1ResponseMessage::Transactions(transactions.ok()) + } + V2ResponseMessage::TxPoolAllTransactionsIds(tx_ids) => { + V1ResponseMessage::TxPoolAllTransactionsIds(tx_ids.ok()) + } + V2ResponseMessage::TxPoolFullTransactions(tx_pool) => { + V1ResponseMessage::TxPoolFullTransactions(tx_pool.ok()) + } + } + } +} + pub type OnResponse = oneshot::Sender<(PeerId, Result)>; #[derive(Debug)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3ff09da0beb..c85e1e3a6c8 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -22,8 +22,9 @@ use crate::{ request_response::messages::{ OnResponse, RequestMessage, - ResponseMessage, + ResponseMessageErrorCode, ResponseSender, + V2ResponseMessage, }, }; use anyhow::anyhow; @@ -136,19 +137,20 @@ pub enum TaskRequest { reporting_service: &'static str, }, DatabaseTransactionsLookUp { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, DatabaseHeaderLookUp { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, TxPoolAllTransactionsIds { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, TxPoolFullTransactions { - response: Option>>, + response: + Result>, ResponseMessageErrorCode>, request_id: InboundRequestId, }, } @@ -223,7 +225,7 @@ pub trait TaskP2PService: Send { fn send_response_msg( &mut self, request_id: InboundRequestId, - message: ResponseMessage, + message: V2ResponseMessage, ) -> anyhow::Result<()>; fn report_message( @@ -297,7 +299,7 @@ impl TaskP2PService for FuelP2PService { fn send_response_msg( &mut self, request_id: InboundRequestId, - message: ResponseMessage, + message: V2ResponseMessage, ) -> anyhow::Result<()> { self.send_response_msg(request_id, message)?; Ok(()) @@ -532,8 +534,11 @@ where where DbLookUpFn: Fn(&V::LatestView, Range) -> anyhow::Result> + Send + 'static, - ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, - TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, + ResponseSenderFn: + Fn(Result) -> V2ResponseMessage + Send + 'static, + TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest + + Send + + 'static, R: Send + 'static, { let instant = Instant::now(); @@ -549,8 +554,9 @@ where max_len, "Requested range is too big" ); - // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 - let response = None; + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Return helpful error message to requester. + let response = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service .send_response_msg(request_id, response_sender(response)); @@ -564,17 +570,25 @@ where return; } - let response = db_lookup(&view, range.clone()).ok().flatten(); + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Add new error code + let response = db_lookup(&view, range.clone()) + .ok() + .flatten() + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = response_channel .try_send(task_request(response, request_id)) .trace_err("Failed to send response to the request channel"); }); + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Handle error cases and return meaningful status codes if result.is_err() { + let err = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service - .send_response_msg(request_id, response_sender(None)); + .send_response_msg(request_id, response_sender(err)); } Ok(()) @@ -588,7 +602,7 @@ where self.handle_db_request( range, request_id, - ResponseMessage::Transactions, + V2ResponseMessage::Transactions, |view, range| view.get_transactions(range).map_err(anyhow::Error::from), |response, request_id| TaskRequest::DatabaseTransactionsLookUp { response, @@ -606,7 +620,7 @@ where self.handle_db_request( range, request_id, - ResponseMessage::SealedHeaders, + V2ResponseMessage::SealedHeaders, |view, range| view.get_sealed_headers(range).map_err(anyhow::Error::from), |response, request_id| TaskRequest::DatabaseHeaderLookUp { response, @@ -624,8 +638,11 @@ where task_request: TaskRequestFn, ) -> anyhow::Result<()> where - ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, - TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, + ResponseSenderFn: + Fn(Result) -> V2ResponseMessage + Send + 'static, + TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest + + Send + + 'static, F: Future> + Send + 'static, { let instant = Instant::now(); @@ -642,16 +659,20 @@ where return; }; - // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Return helpful error message to requester. let _ = response_channel - .try_send(task_request(Some(response), request_id)) + .try_send(task_request(Ok(response), request_id)) .trace_err("Failed to send response to the request channel"); }); if result.is_err() { + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Return better error code + let res = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service - .send_response_msg(request_id, response_sender(None)); + .send_response_msg(request_id, response_sender(res)); } Ok(()) @@ -666,7 +687,7 @@ where self.handle_txpool_request( request_id, async move { tx_pool.get_tx_ids(max_txs).await }, - ResponseMessage::TxPoolAllTransactionsIds, + V2ResponseMessage::TxPoolAllTransactionsIds, |response, request_id| TaskRequest::TxPoolAllTransactionsIds { response, request_id, @@ -679,11 +700,14 @@ where tx_ids: Vec, request_id: InboundRequestId, ) -> anyhow::Result<()> { - // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Return helpful error message to requester. if tx_ids.len() > self.max_txs_per_request { self.p2p_service.send_response_msg( request_id, - ResponseMessage::TxPoolFullTransactions(None), + V2ResponseMessage::TxPoolFullTransactions(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )), )?; return Ok(()); } @@ -691,7 +715,7 @@ where self.handle_txpool_request( request_id, async move { tx_pool.get_full_txs(tx_ids).await }, - ResponseMessage::TxPoolFullTransactions, + V2ResponseMessage::TxPoolFullTransactions, |response, request_id| TaskRequest::TxPoolFullTransactions { response, request_id, @@ -886,16 +910,16 @@ where let _ = channel.send(peers); } Some(TaskRequest::DatabaseTransactionsLookUp { response, request_id }) => { - let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Transactions(response)); + let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::Transactions(response)); } Some(TaskRequest::DatabaseHeaderLookUp { response, request_id }) => { - let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::SealedHeaders(response)); + let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::SealedHeaders(response)); } Some(TaskRequest::TxPoolAllTransactionsIds { response, request_id }) => { - let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::TxPoolAllTransactionsIds(response)); + let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolAllTransactionsIds(response)); } Some(TaskRequest::TxPoolFullTransactions { response, request_id }) => { - let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::TxPoolFullTransactions(response)); + let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolFullTransactions(response)); } None => { tracing::error!("The P2P `Task` should be holder of the `Sender`"); @@ -1414,7 +1438,7 @@ pub mod tests { fn send_response_msg( &mut self, _request_id: InboundRequestId, - _message: ResponseMessage, + _message: V2ResponseMessage, ) -> anyhow::Result<()> { todo!() } From 293667c58834388d0c7c53d324e4805e65708dc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= <88321181+rafal-ch@users.noreply.github.com> Date: Tue, 22 Oct 2024 19:04:00 +0200 Subject: [PATCH 3/8] Return reason of why proof cant be generated (#2258) Closes https://github.com/FuelLabs/fuel-core/issues/1394 ## Description `message_proof()` API is modified to return either the proof or an error with the description why the proof couldn't be created. Previously the reason was discarded by returning `Ok(None)`. ### Before requesting review - [X] I have reviewed the code myself --------- Co-authored-by: Green Baneling --- crates/fuel-core/src/query/message.rs | 148 ++++++++++++--------- crates/fuel-core/src/query/message/test.rs | 12 +- crates/fuel-core/src/schema/message.rs | 7 +- 3 files changed, 96 insertions(+), 71 deletions(-) diff --git a/crates/fuel-core/src/query/message.rs b/crates/fuel-core/src/query/message.rs index c98b2358b2c..07d2cd9ab56 100644 --- a/crates/fuel-core/src/query/message.rs +++ b/crates/fuel-core/src/query/message.rs @@ -1,7 +1,4 @@ -use crate::fuel_core_graphql_api::{ - database::ReadView, - IntoApiResult, -}; +use crate::fuel_core_graphql_api::database::ReadView; use fuel_core_storage::{ iter::{ BoxedIter, @@ -162,15 +159,14 @@ impl MessageProofData for ReadView { } /// Generate an output proof. -// TODO: Do we want to return `Option` here? pub fn message_proof( database: &T, transaction_id: Bytes32, desired_nonce: Nonce, commit_block_height: BlockHeight, -) -> StorageResult> { - // Check if the receipts for this transaction actually contain this message id or exit. - let receipt = database +) -> StorageResult { + // Check if the receipts for this transaction actually contain this nonce or exit. + let (sender, recipient, nonce, amount, data) = database .receipts(&transaction_id)? .into_iter() .find_map(|r| match r { @@ -185,63 +181,83 @@ pub fn message_proof( Some((sender, recipient, nonce, amount, data)) } _ => None, - }); + }) + .ok_or::( + anyhow::anyhow!("Desired `nonce` missing in transaction receipts").into(), + )?; - let (sender, recipient, nonce, amount, data) = match receipt { - Some(r) => r, - None => return Ok(None), + let Some(data) = data else { + return Err(anyhow::anyhow!("Output message doesn't contain any `data`").into()) }; - let data = - data.ok_or(anyhow::anyhow!("Output message doesn't contain any `data`"))?; // Get the block id from the transaction status if it's ready. - let message_block_height = match database - .transaction_status(&transaction_id) - .into_api_result::( - )? { - Some(TransactionStatus::Success { block_height, .. }) => block_height, - _ => return Ok(None), + let message_block_height = match database.transaction_status(&transaction_id) { + Ok(TransactionStatus::Success { block_height, .. }) => block_height, + Ok(TransactionStatus::Submitted { .. }) => { + return Err(anyhow::anyhow!( + "Unable to obtain the message block height. The transaction has not been processed yet" + ) + .into()) + } + Ok(TransactionStatus::SqueezedOut { reason }) => { + return Err(anyhow::anyhow!( + "Unable to obtain the message block height. The transaction was squeezed out: {reason}" + ) + .into()) + } + Ok(TransactionStatus::Failed { .. }) => { + return Err(anyhow::anyhow!( + "Unable to obtain the message block height. The transaction failed" + ) + .into()) + } + Err(err) => { + return Err(anyhow::anyhow!( + "Unable to obtain the message block height: {err}" + ) + .into()) + } }; // Get the message fuel block header. - let (message_block_header, message_block_txs) = match database - .block(&message_block_height) - .into_api_result::()? - { - Some(t) => t.into_inner(), - None => return Ok(None), - }; + let (message_block_header, message_block_txs) = + match database.block(&message_block_height) { + Ok(message_block) => message_block.into_inner(), + Err(err) => { + return Err(anyhow::anyhow!( + "Unable to get the message block from the database: {err}" + ) + .into()) + } + }; let message_id = compute_message_id(&sender, &recipient, &nonce, amount, &data); - let message_proof = - match message_receipts_proof(database, message_id, &message_block_txs)? { - Some(proof) => proof, - None => return Ok(None), - }; + let message_proof = message_receipts_proof(database, message_id, &message_block_txs)?; // Get the commit fuel block header. - let commit_block_header = match database - .block(&commit_block_height) - .into_api_result::()? - { - Some(t) => t.into_inner().0, - None => return Ok(None), + let (commit_block_header, _) = match database.block(&commit_block_height) { + Ok(commit_block_header) => commit_block_header.into_inner(), + Err(err) => { + return Err(anyhow::anyhow!( + "Unable to get commit block header from database: {err}" + ) + .into()) + } }; - let block_height = *commit_block_header.height(); - if block_height == 0u32.into() { - // Cannot look beyond the genesis block - return Ok(None) - } - let verifiable_commit_block_height = - block_height.pred().expect("We checked the height above"); + let Some(verifiable_commit_block_height) = commit_block_header.height().pred() else { + return Err(anyhow::anyhow!( + "Impossible to generate proof beyond the genesis block" + ) + .into()) + }; let block_proof = database.block_history_proof( message_block_header.height(), &verifiable_commit_block_height, )?; - Ok(Some(MessageProof { + Ok(MessageProof { message_proof, block_proof, message_block_header, @@ -251,19 +267,18 @@ pub fn message_proof( nonce, amount, data, - })) + }) } fn message_receipts_proof( database: &T, message_id: MessageId, message_block_txs: &[Bytes32], -) -> StorageResult> { +) -> StorageResult { // Get the message receipts from the block. let leaves: Vec> = message_block_txs .iter() .map(|id| database.receipts(id)) - .filter_map(|result| result.into_api_result::<_, StorageError>().transpose()) .try_collect()?; let leaves = leaves.into_iter() // Flatten the receipts after filtering on output messages @@ -287,20 +302,27 @@ fn message_receipts_proof( tree.push(id.as_ref()); } - // If we found the leaf proof index then return the proof. - match proof_index { - Some(proof_index) => { - // Generate the actual merkle proof. - match tree.prove(proof_index) { - Some((_, proof_set)) => Ok(Some(MerkleProof { - proof_set, - proof_index, - })), - None => Ok(None), - } - } - None => Ok(None), - } + // Check if we found a leaf. + let Some(proof_index) = proof_index else { + return Err(anyhow::anyhow!( + "Unable to find the message receipt in the transaction to generate the proof" + ) + .into()) + }; + + // Get the proof set. + let Some((_, proof_set)) = tree.prove(proof_index) else { + return Err(anyhow::anyhow!( + "Unable to generate the Merkle proof for the message from its receipts" + ) + .into()); + }; + + // Return the proof. + Ok(MerkleProof { + proof_set, + proof_index, + }) } pub fn message_status( diff --git a/crates/fuel-core/src/query/message/test.rs b/crates/fuel-core/src/query/message/test.rs index 43d6ecbef1a..447e1c01a78 100644 --- a/crates/fuel-core/src/query/message/test.rs +++ b/crates/fuel-core/src/query/message/test.rs @@ -1,10 +1,13 @@ use std::ops::Deref; use fuel_core_types::{ - blockchain::header::{ - ApplicationHeader, - ConsensusHeader, - PartialBlockHeader, + blockchain::{ + block::CompressedBlock, + header::{ + ApplicationHeader, + ConsensusHeader, + PartialBlockHeader, + }, }, entities::relayer::message::MerkleProof, fuel_tx::{ @@ -191,7 +194,6 @@ async fn can_build_message_proof() { nonce.to_owned(), *commit_block.header().height(), ) - .unwrap() .unwrap(); assert_eq!( proof.message_block_header.message_outbox_root, diff --git a/crates/fuel-core/src/schema/message.rs b/crates/fuel-core/src/schema/message.rs index ff5da25ab8f..4d9e5048bd2 100644 --- a/crates/fuel-core/src/schema/message.rs +++ b/crates/fuel-core/src/schema/message.rs @@ -150,13 +150,14 @@ impl MessageQuery { ))?, }; - Ok(crate::query::message_proof( + let proof = crate::query::message_proof( query.as_ref(), transaction_id.into(), nonce.into(), height, - )? - .map(MessageProof)) + )?; + + Ok(Some(MessageProof(proof))) } #[graphql(complexity = "query_costs().storage_read + child_complexity")] From 48344cafca1d817d5726d7fd700590f8a53cb65d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Yaz=C4=B1c=C4=B1?= <75089142+yaziciahmet@users.noreply.github.com> Date: Tue, 22 Oct 2024 21:40:46 +0200 Subject: [PATCH 4/8] p2p: use precalculated topic hash (#2378) ## Linked Issues/PRs None ## Description One of the main target of `GossipsubTopics`struct is to precalculate the topic hashes to avoid calculating them on each outgoing message. With the current implementation and usage of `get_gossipsub_topic()` method, `Topic` (more accurately `Sha256Topic`) is retrieved and then passed down to publish method of the gossipsub, where the conversion to `TopicHash` happens. Meaning that precalculated topic hashes are actually unused. This PR rewrites `get_gossipsub_topic()` method to `get_gossipsub_topic_hash()`, which allows benefiting from the precalculated topic hashes. ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [ ] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --------- Co-authored-by: Green Baneling --- CHANGELOG.md | 4 +++ crates/services/p2p/src/behavior.rs | 10 +++---- crates/services/p2p/src/gossipsub/config.rs | 8 ++---- crates/services/p2p/src/gossipsub/topics.rs | 32 ++++++++++----------- crates/services/p2p/src/p2p_service.rs | 18 ++++++------ 5 files changed, 35 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 645237e21c9..a67e169d9a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2366](https://github.com/FuelLabs/fuel-core/pull/2366): The `importer_gas_price_for_block` metric is properly collected. - [2369](https://github.com/FuelLabs/fuel-core/pull/2369): The `transaction_insertion_time_in_thread_pool_milliseconds` metric is properly collected. +### Changed + +- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message. + ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). - [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 8fd6808e348..9fe58c5dec5 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -5,10 +5,7 @@ use crate::{ }, config::Config, discovery, - gossipsub::{ - config::build_gossipsub_behaviour, - topics::GossipTopic, - }, + gossipsub::config::build_gossipsub_behaviour, heartbeat, peer_report, request_response::messages::{ @@ -24,6 +21,7 @@ use libp2p::{ MessageAcceptance, MessageId, PublishError, + TopicHash, }, identify, request_response::{ @@ -150,10 +148,10 @@ impl FuelBehaviour { pub fn publish_message( &mut self, - topic: GossipTopic, + topic_hash: TopicHash, encoded_data: Vec, ) -> Result { - self.gossipsub.publish(topic, encoded_data) + self.gossipsub.publish(topic_hash, encoded_data) } pub fn send_request_msg( diff --git a/crates/services/p2p/src/gossipsub/config.rs b/crates/services/p2p/src/gossipsub/config.rs index d045c4b95d9..2f6973e8ee9 100644 --- a/crates/services/p2p/src/gossipsub/config.rs +++ b/crates/services/p2p/src/gossipsub/config.rs @@ -1,7 +1,4 @@ -use super::topics::{ - GossipTopic, - NEW_TX_GOSSIP_TOPIC, -}; +use super::topics::NEW_TX_GOSSIP_TOPIC; use crate::{ config::{ Config, @@ -18,6 +15,7 @@ use libp2p::gossipsub::{ MetricsConfig, PeerScoreParams, PeerScoreThresholds, + Sha256Topic, Topic, TopicScoreParams, }; @@ -228,7 +226,7 @@ fn initialize_gossipsub(gossipsub: &mut gossipsub::Behaviour, p2p_config: &Confi // subscribe to gossipsub topics with the network name suffix for (topic, weight) in topics { - let t: GossipTopic = Topic::new(format!("{}/{}", topic, p2p_config.network_name)); + let t: Sha256Topic = Topic::new(format!("{}/{}", topic, p2p_config.network_name)); gossipsub .set_topic_params(t.clone(), initialize_topic_score_params(weight)) diff --git a/crates/services/p2p/src/gossipsub/topics.rs b/crates/services/p2p/src/gossipsub/topics.rs index f56b2c38f0a..ac8e9efb105 100644 --- a/crates/services/p2p/src/gossipsub/topics.rs +++ b/crates/services/p2p/src/gossipsub/topics.rs @@ -9,23 +9,23 @@ use super::messages::{ GossipsubBroadcastRequest, }; -pub type GossipTopic = Sha256Topic; pub const NEW_TX_GOSSIP_TOPIC: &str = "new_tx"; /// Holds used Gossipsub Topics -/// Each field contains TopicHash and GossipTopic itself -/// in order to avoid converting GossipTopic to TopicHash on each received message +/// Each field contains TopicHash of existing topics +/// in order to avoid converting topics to TopicHash on each received message #[derive(Debug)] pub struct GossipsubTopics { - new_tx_topic: (TopicHash, GossipTopic), + new_tx_topic: TopicHash, } impl GossipsubTopics { pub fn new(network_name: &str) -> Self { - let new_tx_topic = Topic::new(format!("{NEW_TX_GOSSIP_TOPIC}/{network_name}")); + let new_tx_topic: Sha256Topic = + Topic::new(format!("{NEW_TX_GOSSIP_TOPIC}/{network_name}")); Self { - new_tx_topic: (new_tx_topic.hash(), new_tx_topic), + new_tx_topic: new_tx_topic.hash(), } } @@ -34,22 +34,20 @@ impl GossipsubTopics { &self, incoming_topic: &TopicHash, ) -> Option { - let GossipsubTopics { new_tx_topic } = &self; - match incoming_topic { - hash if hash == &new_tx_topic.0 => Some(GossipTopicTag::NewTx), + hash if hash == &self.new_tx_topic => Some(GossipTopicTag::NewTx), _ => None, } } - /// Given a `GossipsubBroadcastRequest` returns a `GossipTopic` + /// Given a `GossipsubBroadcastRequest` returns a `TopicHash` /// which is broadcast over the network with the serialized inner value of `GossipsubBroadcastRequest` - pub fn get_gossipsub_topic( + pub fn get_gossipsub_topic_hash( &self, outgoing_request: &GossipsubBroadcastRequest, - ) -> GossipTopic { + ) -> TopicHash { match outgoing_request { - GossipsubBroadcastRequest::NewTx(_) => self.new_tx_topic.1.clone(), + GossipsubBroadcastRequest::NewTx(_) => self.new_tx_topic.clone(), } } } @@ -64,13 +62,13 @@ mod tests { #[test] fn test_gossipsub_topics() { let network_name = "fuel_test_network"; - let new_tx_topic: GossipTopic = + let new_tx_topic: Sha256Topic = Topic::new(format!("{NEW_TX_GOSSIP_TOPIC}/{network_name}")); let gossipsub_topics = GossipsubTopics::new(network_name); // Test matching Topic Hashes - assert_eq!(gossipsub_topics.new_tx_topic.0, new_tx_topic.hash()); + assert_eq!(gossipsub_topics.new_tx_topic, new_tx_topic.hash()); // Test given a TopicHash that `get_gossipsub_tag()` returns matching `GossipTopicTag` assert_eq!( @@ -78,11 +76,11 @@ mod tests { Some(GossipTopicTag::NewTx) ); - // Test given a `GossipsubBroadcastRequest` that `get_gossipsub_topic()` returns matching `Topic` + // Test given a `GossipsubBroadcastRequest` that `get_gossipsub_topic_hash()` returns matching `TopicHash` let broadcast_req = GossipsubBroadcastRequest::NewTx(Arc::new(Transaction::default_test_tx())); assert_eq!( - gossipsub_topics.get_gossipsub_topic(&broadcast_req).hash(), + gossipsub_topics.get_gossipsub_topic_hash(&broadcast_req), new_tx_topic.hash() ); } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 76f6bdbf615..05dd2ec1b38 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -375,17 +375,17 @@ impl FuelP2PService { &mut self, message: GossipsubBroadcastRequest, ) -> Result { - let topic = self + let topic_hash = self .network_metadata .gossipsub_data .topics - .get_gossipsub_topic(&message); + .get_gossipsub_topic_hash(&message); match self.network_codec.encode(message) { Ok(encoded_data) => self .swarm .behaviour_mut() - .publish_message(topic, encoded_data), + .publish_message(topic_hash, encoded_data), Err(e) => Err(PublishError::TransformFailed(e)), } } @@ -839,10 +839,7 @@ mod tests { GossipsubBroadcastRequest, GossipsubMessage, }, - topics::{ - GossipTopic, - NEW_TX_GOSSIP_TOPIC, - }, + topics::NEW_TX_GOSSIP_TOPIC, }, p2p_service::FuelP2PEvent, peer_manager::PeerInfo, @@ -881,7 +878,10 @@ mod tests { StreamExt, }; use libp2p::{ - gossipsub::Topic, + gossipsub::{ + Sha256Topic, + Topic, + }, identity::Keypair, swarm::{ ListenError, @@ -1528,7 +1528,7 @@ mod tests { ) { let mut p2p_config = Config::default_initialized("gossipsub_exchanges_messages"); - let selected_topic: GossipTopic = { + let selected_topic: Sha256Topic = { let topic = match broadcast_request { GossipsubBroadcastRequest::NewTx(_) => NEW_TX_GOSSIP_TOPIC, }; From 481d4bb12187ddc769eecfbb064e496fa1dcd97b Mon Sep 17 00:00:00 2001 From: AurelienFT <32803821+AurelienFT@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:30:29 +0200 Subject: [PATCH 5/8] Remove ignore RUSTSEC-2024-0336 (#2384) ## Linked Issues/PRs Resolves https://github.com/FuelLabs/fuel-core/issues/1843 ## Description This has been fixed in https://github.com/FuelLabs/fuel-core/pull/1954. Verified it by running cargo audit on my own did had this warning. Co-authored-by: Green Baneling --- .cargo/audit.toml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index db18bdc5a3c..46ec91ce057 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -1,4 +1,2 @@ [advisories] -ignore = [ - "RUSTSEC-2024-0336" # https://github.com/FuelLabs/fuel-core/issues/1843 -] \ No newline at end of file +ignore = [] \ No newline at end of file From a14922eb17a9ce844132edc240b5157eb860199f Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Thu, 24 Oct 2024 14:43:59 +0300 Subject: [PATCH 6/8] Deal with negative feed back loop in DA gas price (#2364) --- CHANGELOG.md | 1 + .../gas-price-analysis/src/simulation.rs | 7 +- crates/fuel-gas-price-algorithm/src/v1.rs | 174 +++++++- .../fuel-gas-price-algorithm/src/v1/tests.rs | 13 +- .../v1/tests/update_l2_block_data_tests.rs | 371 ++++++++++++++++-- 5 files changed, 508 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a67e169d9a7..82018148012 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). +- [2347](https://github.com/FuelLabs/fuel-core/pull/2364): Add activity concept in order to protect against infinitely increasing DA gas price scenarios - [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. ## [Version 0.40.0] diff --git a/crates/fuel-gas-price-algorithm/gas-price-analysis/src/simulation.rs b/crates/fuel-gas-price-algorithm/gas-price-analysis/src/simulation.rs index 94d7c06faa3..590952207ba 100644 --- a/crates/fuel-gas-price-algorithm/gas-price-analysis/src/simulation.rs +++ b/crates/fuel-gas-price-algorithm/gas-price-analysis/src/simulation.rs @@ -1,5 +1,8 @@ use super::*; -use fuel_gas_price_algorithm::v1::AlgorithmUpdaterV1; +use fuel_gas_price_algorithm::v1::{ + AlgorithmUpdaterV1, + L2ActivityTracker, +}; use std::{ collections::BTreeMap, num::NonZeroU64, @@ -83,6 +86,7 @@ impl Simulator { ) -> AlgorithmUpdaterV1 { // Scales the gas price internally, value is arbitrary let gas_price_factor = 100; + let always_normal_activity = L2ActivityTracker::new_always_normal(); let updater = AlgorithmUpdaterV1 { min_exec_gas_price: 10, min_da_gas_price: 10, @@ -109,6 +113,7 @@ impl Simulator { da_d_component, last_profit: 0, second_to_last_profit: 0, + l2_activity: always_normal_activity, }; updater } diff --git a/crates/fuel-gas-price-algorithm/src/v1.rs b/crates/fuel-gas-price-algorithm/src/v1.rs index a1a5f7dd536..9a9da066fee 100644 --- a/crates/fuel-gas-price-algorithm/src/v1.rs +++ b/crates/fuel-gas-price-algorithm/src/v1.rs @@ -26,6 +26,8 @@ pub enum Error { L2BlockExpectedNotFound(u32), } +// TODO: separate exec gas price and DA gas price into newtypes for clarity +// https://github.com/FuelLabs/fuel-core/issues/2382 #[derive(Debug, Clone, PartialEq)] pub struct AlgorithmV1 { /// The gas price for to cover the execution of the next block @@ -118,13 +120,8 @@ pub struct AlgorithmUpdaterV1 { /// This is a percentage of the total capacity of the L2 block pub l2_block_fullness_threshold_percent: ClampedPercentage, // DA - /// The gas price for the DA portion of the last block. This can be used to calculate - /// the DA portion of the next block - // pub last_da_gas_price: u64, - /// The gas price (scaled by the `gas_price_factor`) to cover the DA commitment of the next block pub new_scaled_da_gas_price: u64, - /// Scale factor for the gas price. pub gas_price_factor: NonZeroU64, /// The lowest the algorithm allows the da gas price to go @@ -151,13 +148,136 @@ pub struct AlgorithmUpdaterV1 { pub second_to_last_profit: i128, /// The latest known cost per byte for recording blocks on the DA chain pub latest_da_cost_per_byte: u128, - + /// Activity of L2 + pub l2_activity: L2ActivityTracker, /// The unrecorded blocks that are used to calculate the projected cost of recording blocks pub unrecorded_blocks: BTreeMap, } -/// A value that represents a value between 0 and 100. Higher values are clamped to 100 +/// The `L2ActivityTracker` tracks the chain activity to determine a safety mode for setting the DA price. +/// +/// Because the DA gas price can increase even when no-one is using the network, there is a potential +/// for a negative feedback loop to occur where the gas price increases, further decreasing activity +/// and increasing the gas price. The `L2ActivityTracker` is used to moderate changes to the DA +/// gas price based on the activity of the L2 chain. +/// +/// The chain activity is a cumulative measure, updated whenever a new block is processed. +/// For each L2 block, the block usage is a percentage of the block capacity used. If the +/// block usage is below a certain threshold, the chain activity is decreased, if above the threshold, +/// the activity is increased The chain activity exists on a scale +/// between 0 and the sum of the normal, capped, and decrease buffers. +/// +/// e.g. if the decrease activity threshold is 20, the capped activity threshold is 80, and the max activity is 120, +/// we'd have the following ranges: +/// +/// 0 <-- decrease range -->20<-- capped range -->80<-- normal range -->120 +/// +/// The current chain activity determines the behavior of the DA gas price. +/// +/// For healthy behavior, the activity should be in the `normal` range. #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] +pub struct L2ActivityTracker { + /// The maximum value the chain activity can hit + max_activity: u16, + /// The threshold if the block activity is below, the DA gas price will be held when it would otherwise be increased + capped_activity_threshold: u16, + /// If the chain activity falls below this value, the DA gas price will be decreased when it would otherwise be increased + decrease_activity_threshold: u16, + /// The current activity of the L2 chain + chain_activity: u16, + /// The threshold of block activity below which the chain activity will be decreased, + /// above or equal it will always increase + block_activity_threshold: ClampedPercentage, +} + +/// Designates the intended behavior of the DA gas price based on the activity of the L2 chain +pub enum DAGasPriceSafetyMode { + /// Should increase DA gas price freely + Normal, + /// Should not increase the DA gas price + Capped, + /// Should decrease the DA gas price always + AlwaysDecrease, +} + +impl L2ActivityTracker { + pub fn new_full( + normal_range_size: u16, + capped_range_size: u16, + decrease_range_size: u16, + block_activity_threshold: ClampedPercentage, + ) -> Self { + let decrease_activity_threshold = decrease_range_size; + let capped_activity_threshold = + decrease_range_size.saturating_add(capped_range_size); + let max_activity = capped_activity_threshold.saturating_add(normal_range_size); + let chain_activity = max_activity; + Self { + max_activity, + capped_activity_threshold, + decrease_activity_threshold, + chain_activity, + block_activity_threshold, + } + } + + pub fn new( + normal_range_size: u16, + capped_range_size: u16, + decrease_range_size: u16, + activity: u16, + block_activity_threshold: ClampedPercentage, + ) -> Self { + let mut tracker = Self::new_full( + normal_range_size, + capped_range_size, + decrease_range_size, + block_activity_threshold, + ); + tracker.chain_activity = activity.min(tracker.max_activity); + tracker + } + + pub fn new_always_normal() -> Self { + let normal_range_size = 100; + let capped_range_size = 0; + let decrease_range_size = 0; + let percentage = ClampedPercentage::new(0); + Self::new( + normal_range_size, + capped_range_size, + decrease_range_size, + 100, + percentage, + ) + } + + pub fn safety_mode(&self) -> DAGasPriceSafetyMode { + if self.chain_activity > self.capped_activity_threshold { + DAGasPriceSafetyMode::Normal + } else if self.chain_activity > self.decrease_activity_threshold { + DAGasPriceSafetyMode::Capped + } else { + DAGasPriceSafetyMode::AlwaysDecrease + } + } + + pub fn update(&mut self, block_usage: ClampedPercentage) { + if block_usage < self.block_activity_threshold { + self.chain_activity = self.chain_activity.saturating_sub(1); + } else { + self.chain_activity = + self.chain_activity.saturating_add(1).min(self.max_activity); + } + } + + pub fn current_activity(&self) -> u16 { + self.chain_activity + } +} + +/// A value that represents a value between 0 and 100. Higher values are clamped to 100 +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, PartialOrd)] pub struct ClampedPercentage { value: u8, } @@ -226,6 +346,9 @@ impl AlgorithmUpdaterV1 { let last_profit = rewards.saturating_sub(projected_total_da_cost); self.update_last_profit(last_profit); + // activity + self.update_activity(used, capacity); + // gas prices self.update_exec_gas_price(used, capacity); self.update_da_gas_price(); @@ -236,6 +359,12 @@ impl AlgorithmUpdaterV1 { } } + fn update_activity(&mut self, used: u64, capacity: NonZeroU64) { + let block_activity = used.saturating_mul(100).div(capacity); + let usage = ClampedPercentage::new(block_activity.try_into().unwrap_or(100)); + self.l2_activity.update(usage); + } + fn update_da_rewards(&mut self, fee_wei: u128) { let block_da_reward = self.da_portion_of_fee(fee_wei); self.total_da_rewards_excess = @@ -309,7 +438,8 @@ impl AlgorithmUpdaterV1 { fn update_da_gas_price(&mut self) { let p = self.p(); let d = self.d(); - let da_change = self.da_change(p, d); + let maybe_da_change = self.da_change(p, d); + let da_change = self.da_change_accounting_for_activity(maybe_da_change); let maybe_new_scaled_da_gas_price = i128::from(self.new_scaled_da_gas_price) .checked_add(da_change) .and_then(|x| u64::try_from(x).ok()) @@ -326,6 +456,20 @@ impl AlgorithmUpdaterV1 { ); } + fn da_change_accounting_for_activity(&self, maybe_da_change: i128) -> i128 { + if maybe_da_change > 0 { + match self.l2_activity.safety_mode() { + DAGasPriceSafetyMode::Normal => maybe_da_change, + DAGasPriceSafetyMode::Capped => 0, + DAGasPriceSafetyMode::AlwaysDecrease => { + self.max_change().saturating_mul(-1) + } + } + } else { + maybe_da_change + } + } + fn min_scaled_da_gas_price(&self) -> u64 { self.min_da_gas_price .saturating_mul(self.gas_price_factor.into()) @@ -348,14 +492,18 @@ impl AlgorithmUpdaterV1 { fn da_change(&self, p: i128, d: i128) -> i128 { let pd_change = p.saturating_add(d); + let max_change = self.max_change(); + let clamped_change = pd_change.saturating_abs().min(max_change); + pd_change.signum().saturating_mul(clamped_change) + } + + // Should always be positive + fn max_change(&self) -> i128 { let upcast_percent = self.max_da_gas_price_change_percent.into(); - let max_change = self - .new_scaled_da_gas_price + self.new_scaled_da_gas_price .saturating_mul(upcast_percent) .saturating_div(100) - .into(); - let clamped_change = pd_change.saturating_abs().min(max_change); - pd_change.signum().saturating_mul(clamped_change) + .into() } fn exec_change(&self, principle: u64) -> u64 { diff --git a/crates/fuel-gas-price-algorithm/src/v1/tests.rs b/crates/fuel-gas-price-algorithm/src/v1/tests.rs index 0aca738a3f0..e7150378428 100644 --- a/crates/fuel-gas-price-algorithm/src/v1/tests.rs +++ b/crates/fuel-gas-price-algorithm/src/v1/tests.rs @@ -2,7 +2,10 @@ #![allow(clippy::arithmetic_side_effects)] #![allow(clippy::cast_possible_truncation)] -use crate::v1::AlgorithmUpdaterV1; +use crate::v1::{ + AlgorithmUpdaterV1, + L2ActivityTracker, +}; #[cfg(test)] mod algorithm_v1_tests; @@ -40,6 +43,7 @@ pub struct UpdaterBuilder { last_profit: i128, second_to_last_profit: i128, da_gas_price_factor: u64, + l2_activity: L2ActivityTracker, } impl UpdaterBuilder { @@ -67,6 +71,7 @@ impl UpdaterBuilder { last_profit: 0, second_to_last_profit: 0, da_gas_price_factor: 1, + l2_activity: L2ActivityTracker::new_always_normal(), } } @@ -159,6 +164,11 @@ impl UpdaterBuilder { self } + fn with_activity(mut self, l2_activity: L2ActivityTracker) -> Self { + self.l2_activity = l2_activity; + self + } + fn build(self) -> AlgorithmUpdaterV1 { AlgorithmUpdaterV1 { min_exec_gas_price: self.min_exec_gas_price, @@ -190,6 +200,7 @@ impl UpdaterBuilder { .da_gas_price_factor .try_into() .expect("Should never be non-zero"), + l2_activity: self.l2_activity, } } } diff --git a/crates/fuel-gas-price-algorithm/src/v1/tests/update_l2_block_data_tests.rs b/crates/fuel-gas-price-algorithm/src/v1/tests/update_l2_block_data_tests.rs index 6d5e4cf2ce7..5cb56bbde2d 100644 --- a/crates/fuel-gas-price-algorithm/src/v1/tests/update_l2_block_data_tests.rs +++ b/crates/fuel-gas-price-algorithm/src/v1/tests/update_l2_block_data_tests.rs @@ -4,8 +4,64 @@ use crate::v1::{ UpdaterBuilder, }, Error, + L2ActivityTracker, }; +fn decrease_l2_activity() -> L2ActivityTracker { + let normal = 1; + let capped = 1; + let decrease = 100; + let activity = 50; + let threshold = 50.into(); + L2ActivityTracker::new(normal, capped, decrease, activity, threshold) +} + +fn negative_profit_updater_builder() -> UpdaterBuilder { + let starting_exec_gas_price = 100; + let starting_da_gas_price = 100; + let starting_cost = u128::MAX; + let latest_gas_per_byte = i32::MAX; // DA is very expensive + let da_p_component = 100; + let da_d_component = 10; + let last_profit = i128::MIN; + let last_last_profit = 0; + let smaller_starting_reward = 0; + UpdaterBuilder::new() + .with_starting_exec_gas_price(starting_exec_gas_price) + .with_starting_da_gas_price(starting_da_gas_price) + .with_da_p_component(da_p_component) + .with_da_d_component(da_d_component) + .with_total_rewards(smaller_starting_reward) + .with_known_total_cost(starting_cost) + .with_projected_total_cost(starting_cost) + .with_da_cost_per_byte(latest_gas_per_byte as u128) + .with_last_profit(last_profit, last_last_profit) +} + +fn positive_profit_updater_builder() -> UpdaterBuilder { + let starting_exec_gas_price = 100; + let last_da_gas_price = 100; + let starting_cost = 500; + let latest_gas_per_byte = 0; // DA is free + let da_p_component = 100; + let da_d_component = 10; + let last_profit = i128::MAX; + let last_last_profit = 0; + let large_reward = i128::MAX; + UpdaterBuilder::new() + .with_starting_exec_gas_price(starting_exec_gas_price) + .with_da_p_component(da_p_component) + .with_da_d_component(da_d_component) + .with_starting_da_gas_price(last_da_gas_price) + .with_total_rewards(large_reward as u128) + .with_known_total_cost(starting_cost as u128) + .with_projected_total_cost(starting_cost as u128) + .with_da_cost_per_byte(latest_gas_per_byte as u128) + .with_last_profit(last_profit, last_last_profit) + .with_da_max_change_percent(u16::MAX) + .with_exec_gas_price_change_percent(0) +} + #[test] fn update_l2_block_data__updates_l2_block() { // given @@ -268,32 +324,11 @@ fn update_l2_block_data__updates_last_and_last_last_profit() { #[test] fn update_l2_block_data__positive_profit_decrease_gas_price() { // given - let starting_exec_gas_price = 100; - let last_da_gas_price = 100; - let starting_cost = 500; - let latest_gas_per_byte = 0; // DA is free - let da_p_component = 100; - let da_d_component = 10; - let block_bytes = 500u64; - let last_profit = i128::MAX; - let last_last_profit = 0; - let large_reward = i128::MAX; - let mut updater = UpdaterBuilder::new() - .with_starting_exec_gas_price(starting_exec_gas_price) - .with_da_p_component(da_p_component) - .with_da_d_component(da_d_component) - .with_starting_da_gas_price(last_da_gas_price) - .with_total_rewards(large_reward as u128) - .with_known_total_cost(starting_cost as u128) - .with_projected_total_cost(starting_cost as u128) - .with_da_cost_per_byte(latest_gas_per_byte as u128) - .with_last_profit(last_profit, last_last_profit) - .with_da_max_change_percent(u16::MAX) - .with_exec_gas_price_change_percent(0) - .build(); + let mut updater = positive_profit_updater_builder().build(); let old_gas_price = updater.algorithm().calculate(); // when + let block_bytes = 500u64; updater .update_l2_block_data( updater.l2_block_height + 1, @@ -496,26 +531,7 @@ fn update_l2_block_data__even_profit_maintains_price() { #[test] fn update_l2_block_data__negative_profit_increase_gas_price() { // given - let starting_exec_gas_price = 100; - let starting_da_gas_price = 100; - let starting_cost = u128::MAX; - let latest_gas_per_byte = i32::MAX; // DA is very expensive - let da_p_component = 100; - let da_d_component = 10; - let last_profit = i128::MIN; - let last_last_profit = 0; - let smaller_starting_reward = 0; - let mut updater = UpdaterBuilder::new() - .with_starting_exec_gas_price(starting_exec_gas_price) - .with_starting_da_gas_price(starting_da_gas_price) - .with_da_p_component(da_p_component) - .with_da_d_component(da_d_component) - .with_total_rewards(smaller_starting_reward) - .with_known_total_cost(starting_cost) - .with_projected_total_cost(starting_cost) - .with_da_cost_per_byte(latest_gas_per_byte as u128) - .with_last_profit(last_profit, last_last_profit) - .build(); + let mut updater = negative_profit_updater_builder().build(); let algo = updater.algorithm(); let old_gas_price = algo.calculate(); @@ -610,3 +626,272 @@ fn update_l2_block_data__retains_existing_blocks_and_adds_l2_block_to_unrecorded .contains_key(&preexisting_block.height); assert!(contains_preexisting_block_bytes); } + +fn capped_l2_activity_tracker() -> L2ActivityTracker { + let normal = 1; + let capped = 100; + let decrease = 1; + let activity = 50; + let threshold = 50.into(); + L2ActivityTracker::new(normal, capped, decrease, activity, threshold) +} + +#[test] +fn update_l2_block_data__da_gas_price_wants_to_increase_will_hold_if_activity_in_hold_range( +) { + // given + let capped_activity = capped_l2_activity_tracker(); + let mut updater = negative_profit_updater_builder() + .with_activity(capped_activity) + .build(); + let algo = updater.algorithm(); + let old_gas_price = algo.calculate(); + + // when + let height = updater.l2_block_height + 1; + let used = 50; + let capacity = 100u64.try_into().unwrap(); + let block_bytes = 500u64; + let fee = 0; + updater + .update_l2_block_data(height, used, capacity, block_bytes, fee) + .unwrap(); + + // then + let algo = updater.algorithm(); + let new_gas_price = algo.calculate(); + assert_eq!(new_gas_price, old_gas_price,); +} + +#[test] +fn update_l2_block_data__da_gas_price_wants_to_decrease_will_decrease_if_activity_in_hold_range( +) { + // given + let capped_activity = capped_l2_activity_tracker(); + let mut updater = positive_profit_updater_builder() + .with_activity(capped_activity) + .build(); + let old_gas_price = updater.algorithm().calculate(); + + // when + let block_bytes = 500u64; + updater + .update_l2_block_data( + updater.l2_block_height + 1, + 50, + 100.try_into().unwrap(), + block_bytes, + 200, + ) + .unwrap(); + + // then + let new_gas_price = updater.algorithm().calculate(); + assert!( + new_gas_price < old_gas_price, + "{} !< {}", + old_gas_price, + new_gas_price + ); +} + +#[test] +fn update_l2_block_data__da_gas_price_wants_to_increase_will_decrease_if_activity_in_decrease_range( +) { + // given + let decrease_activity = decrease_l2_activity(); + let mut updater = negative_profit_updater_builder() + .with_activity(decrease_activity) + .build(); + let algo = updater.algorithm(); + let old_gas_price = algo.calculate(); + + // when + let height = updater.l2_block_height + 1; + let used = 50; + let capacity = 100u64.try_into().unwrap(); + let block_bytes = 500u64; + let fee = 0; + updater + .update_l2_block_data(height, used, capacity, block_bytes, fee) + .unwrap(); + + // then + let algo = updater.algorithm(); + let new_gas_price = algo.calculate(); + assert!( + new_gas_price < old_gas_price, + "{} !> {}", + new_gas_price, + old_gas_price + ); +} + +#[test] +fn update_l2_block_data__da_gas_price_wants_to_decrease_will_decrease_if_activity_in_decrease_range( +) { + // given + let decrease_activity = decrease_l2_activity(); + let mut updater = positive_profit_updater_builder() + .with_activity(decrease_activity) + .build(); + let old_gas_price = updater.algorithm().calculate(); + + // when + let block_bytes = 500u64; + updater + .update_l2_block_data( + updater.l2_block_height + 1, + 50, + 100.try_into().unwrap(), + block_bytes, + 200, + ) + .unwrap(); + + // then + let new_gas_price = updater.algorithm().calculate(); + assert!( + new_gas_price < old_gas_price, + "{} !< {}", + old_gas_price, + new_gas_price + ); +} + +#[test] +fn update_l2_block_data__above_threshold_increase_activity() { + // given + let starting_exec_gas_price = 100; + let exec_gas_price_increase_percent = 10; + let threshold = 50; + let starting_activity = 2; + let activity = L2ActivityTracker::new(1, 1, 1, starting_activity, 50.into()); + let mut updater = UpdaterBuilder::new() + .with_starting_exec_gas_price(starting_exec_gas_price) + .with_exec_gas_price_change_percent(exec_gas_price_increase_percent) + .with_l2_block_capacity_threshold(threshold) + .with_activity(activity) + .build(); + + let height = 1; + let used = 60; + let capacity = 100.try_into().unwrap(); + let block_bytes = 1000; + let fee = 200; + + // when + updater + .update_l2_block_data(height, used, capacity, block_bytes, fee) + .unwrap(); + + // then + let expected = starting_activity + 1; + let actual = updater.l2_activity.current_activity(); + assert_eq!(actual, expected); +} + +#[test] +fn update_l2_block_data__below_threshold_decrease_activity() { + // given + let starting_exec_gas_price = 100; + let exec_gas_price_increase_percent = 10; + let threshold = 50; + let starting_activity = 2; + let activity = L2ActivityTracker::new(1, 1, 1, starting_activity, 50.into()); + let mut updater = UpdaterBuilder::new() + .with_starting_exec_gas_price(starting_exec_gas_price) + .with_exec_gas_price_change_percent(exec_gas_price_increase_percent) + .with_l2_block_capacity_threshold(threshold) + .with_activity(activity) + .build(); + + let height = 1; + let used = 40; + let capacity = 100.try_into().unwrap(); + let block_bytes = 1000; + let fee = 200; + + // when + updater + .update_l2_block_data(height, used, capacity, block_bytes, fee) + .unwrap(); + + // then + let expected = starting_activity - 1; + let actual = updater.l2_activity.current_activity(); + assert_eq!(actual, expected); +} + +#[test] +fn update_l2_block_data__if_activity_at_max_will_stop_increasing() { + // given + let starting_exec_gas_price = 100; + let exec_gas_price_increase_percent = 10; + let threshold = 50; + let normal_range = 1; + let capped_range = 1; + let decrease_range = 1; + let starting_activity = normal_range + capped_range + decrease_range; + let activity = L2ActivityTracker::new( + normal_range, + capped_range, + decrease_range, + starting_activity, + 50.into(), + ); + let mut updater = UpdaterBuilder::new() + .with_starting_exec_gas_price(starting_exec_gas_price) + .with_exec_gas_price_change_percent(exec_gas_price_increase_percent) + .with_l2_block_capacity_threshold(threshold) + .with_activity(activity) + .build(); + + let height = 1; + let used = 60; + let capacity = 100.try_into().unwrap(); + let block_bytes = 1000; + let fee = 200; + + // when + updater + .update_l2_block_data(height, used, capacity, block_bytes, fee) + .unwrap(); + + // then + let expected = starting_activity; + let actual = updater.l2_activity.current_activity(); + assert_eq!(actual, expected); +} + +#[test] +fn update_l2_block_data__if_activity_is_zero_will_stop_decreasing() { + // given + let starting_exec_gas_price = 100; + let exec_gas_price_increase_percent = 10; + let threshold = 50; + let starting_activity = 0; + let activity = L2ActivityTracker::new(1, 1, 1, starting_activity, 50.into()); + let mut updater = UpdaterBuilder::new() + .with_starting_exec_gas_price(starting_exec_gas_price) + .with_exec_gas_price_change_percent(exec_gas_price_increase_percent) + .with_l2_block_capacity_threshold(threshold) + .with_activity(activity) + .build(); + + let height = 1; + let used = 40; + let capacity = 100.try_into().unwrap(); + let block_bytes = 1000; + let fee = 200; + + // when + updater + .update_l2_block_data(height, used, capacity, block_bytes, fee) + .unwrap(); + + // then + let expected = starting_activity; + let actual = updater.l2_activity.current_activity(); + assert_eq!(actual, expected); +} From f333ae80f18d6f4a8ba624bcb116056d90a96fe1 Mon Sep 17 00:00:00 2001 From: AurelienFT <32803821+AurelienFT@users.noreply.github.com> Date: Sat, 26 Oct 2024 23:36:56 +0200 Subject: [PATCH 7/8] Add new flag for maximum file descriptors in rocksdb. (#2386) ## Linked Issues/PRs Resolves: https://github.com/FuelLabs/fuel-core/issues/2093 ## Description Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit. ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here --------- Co-authored-by: Mitchell Turner --- CHANGELOG.md | 1 + Cargo.lock | 10 +++++ benches/benches/vm_set/blockchain.rs | 1 + benches/src/db_lookup_times_utils/mod.rs | 2 +- benches/src/db_lookup_times_utils/utils.rs | 2 +- bin/fuel-core/Cargo.toml | 1 + bin/fuel-core/src/cli/rollback.rs | 22 ++++++++++ bin/fuel-core/src/cli/run.rs | 27 ++++++++++++ bin/fuel-core/src/cli/snapshot.rs | 41 +++++++++++++++++-- crates/fuel-core/src/combined_database.rs | 20 +++++++-- crates/fuel-core/src/database.rs | 3 ++ crates/fuel-core/src/service/config.rs | 2 + .../fuel-core/src/state/historical_rocksdb.rs | 4 +- crates/fuel-core/src/state/rocks_db.rs | 27 ++++++++---- tests/tests/health.rs | 12 ++++-- tests/tests/poa.rs | 4 +- tests/tests/relayer.rs | 6 ++- 17 files changed, 157 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82018148012..570a3130672 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). - [2347](https://github.com/FuelLabs/fuel-core/pull/2364): Add activity concept in order to protect against infinitely increasing DA gas price scenarios - [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. +- [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit. ## [Version 0.40.0] diff --git a/Cargo.lock b/Cargo.lock index d4d274e7f71..8e305a0c04a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3295,6 +3295,7 @@ dependencies = [ "pyroscope", "pyroscope_pprofrs", "rand", + "rlimit", "serde", "serde_json", "strum 0.25.0", @@ -7657,6 +7658,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" +[[package]] +name = "rlimit" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7043b63bd0cd1aaa628e476b80e6d4023a3b50eb32789f2728908107bd0c793a" +dependencies = [ + "libc", +] + [[package]] name = "rlp" version = "0.5.2" diff --git a/benches/benches/vm_set/blockchain.rs b/benches/benches/vm_set/blockchain.rs index 2d85a4f2028..cd51bf6dbc9 100644 --- a/benches/benches/vm_set/blockchain.rs +++ b/benches/benches/vm_set/blockchain.rs @@ -75,6 +75,7 @@ impl BenchDb { tmp_dir.path(), None, Default::default(), + -1, ) .unwrap(); let db = Arc::new(db); diff --git a/benches/src/db_lookup_times_utils/mod.rs b/benches/src/db_lookup_times_utils/mod.rs index 9bb12fb7169..0d001a3bc3e 100644 --- a/benches/src/db_lookup_times_utils/mod.rs +++ b/benches/src/db_lookup_times_utils/mod.rs @@ -27,7 +27,7 @@ mod tests { fn setup_test_db() -> RocksDb { let temp_dir = ShallowTempDir::new(); - RocksDb::default_open(temp_dir.path(), None).unwrap() + RocksDb::default_open(temp_dir.path(), None, -1).unwrap() } #[test] diff --git a/benches/src/db_lookup_times_utils/utils.rs b/benches/src/db_lookup_times_utils/utils.rs index 5845079e2b5..4db547a5094 100644 --- a/benches/src/db_lookup_times_utils/utils.rs +++ b/benches/src/db_lookup_times_utils/utils.rs @@ -39,7 +39,7 @@ pub fn get_random_block_height( pub fn open_rocks_db( path: &Path, ) -> Result> { - let db = RocksDb::default_open(path, None)?; + let db = RocksDb::default_open(path, None, -1)?; Ok(db) } diff --git a/bin/fuel-core/Cargo.toml b/bin/fuel-core/Cargo.toml index 76bbb6d0367..41d05c2b69a 100644 --- a/bin/fuel-core/Cargo.toml +++ b/bin/fuel-core/Cargo.toml @@ -35,6 +35,7 @@ hex = { workspace = true } humantime = "2.1" pyroscope = "0.5" pyroscope_pprofrs = "0.2" +rlimit = "0.10.2" serde_json = { workspace = true } tikv-jemallocator = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/bin/fuel-core/src/cli/rollback.rs b/bin/fuel-core/src/cli/rollback.rs index 2b85edd0e4f..12e18a0547b 100644 --- a/bin/fuel-core/src/cli/rollback.rs +++ b/bin/fuel-core/src/cli/rollback.rs @@ -5,6 +5,10 @@ use fuel_core::{ combined_database::CombinedDatabase, state::historical_rocksdb::StateRewindPolicy, }; +use rlimit::{ + getrlimit, + Resource, +}; use std::path::PathBuf; /// Rollbacks the state of the blockchain to a specific block height. @@ -19,11 +23,28 @@ pub struct Command { )] pub database_path: PathBuf, + /// Defines a specific number of file descriptors that RocksDB can use. + /// + /// If defined as -1 no limit will be applied and will use the OS limits. + /// If not defined the system default divided by two is used. + #[clap( + long = "rocksdb-max-fds", + env, + default_value = get_default_max_fds().to_string() + )] + pub rocksdb_max_fds: i32, + /// The path to the database. #[clap(long = "target-block-height")] pub target_block_height: u32, } +fn get_default_max_fds() -> i32 { + getrlimit(Resource::NOFILE) + .map(|(_, hard)| i32::try_from(hard.saturating_div(2)).unwrap_or(i32::MAX)) + .expect("Our supported platforms should return max FD.") +} + pub async fn exec(command: Command) -> anyhow::Result<()> { use crate::cli::ShutdownListener; @@ -32,6 +53,7 @@ pub async fn exec(command: Command) -> anyhow::Result<()> { path, 64 * 1024 * 1024, StateRewindPolicy::RewindFullRange, + command.rocksdb_max_fds, ) .map_err(Into::::into) .context(format!("failed to open combined database at path {path:?}"))?; diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index 256ecc1fe3b..06d00219b50 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -65,6 +65,10 @@ use pyroscope_pprofrs::{ pprof_backend, PprofConfig, }; +use rlimit::{ + getrlimit, + Resource, +}; use std::{ env, net, @@ -126,6 +130,18 @@ pub struct Command { )] pub database_type: DbType, + #[cfg(feature = "rocksdb")] + /// Defines a specific number of file descriptors that RocksDB can use. + /// + /// If defined as -1 no limit will be applied and will use the OS limits. + /// If not defined the system default divided by two is used. + #[clap( + long = "rocksdb-max-fds", + env, + default_value = get_default_max_fds().to_string() + )] + pub rocksdb_max_fds: i32, + #[cfg(feature = "rocksdb")] /// Defines the state rewind policy for the database when RocksDB is enabled. /// @@ -273,6 +289,8 @@ impl Command { database_path, database_type, #[cfg(feature = "rocksdb")] + rocksdb_max_fds, + #[cfg(feature = "rocksdb")] state_rewind_duration, db_prune, snapshot, @@ -441,6 +459,8 @@ impl Command { max_database_cache_size, #[cfg(feature = "rocksdb")] state_rewind_policy, + #[cfg(feature = "rocksdb")] + max_fds: rocksdb_max_fds, }; let block_importer = fuel_core::service::config::fuel_core_importer::Config::new( @@ -591,6 +611,13 @@ impl Command { } } +#[cfg(feature = "rocksdb")] +fn get_default_max_fds() -> i32 { + getrlimit(Resource::NOFILE) + .map(|(_, hard)| i32::try_from(hard.saturating_div(2)).unwrap_or(i32::MAX)) + .expect("Our supported platforms should return max FD.") +} + pub async fn get_service_with_shutdown_listeners( command: Command, ) -> anyhow::Result<(FuelService, ShutdownListener)> { diff --git a/bin/fuel-core/src/cli/snapshot.rs b/bin/fuel-core/src/cli/snapshot.rs index 0c03f73edfd..3f562e38733 100644 --- a/bin/fuel-core/src/cli/snapshot.rs +++ b/bin/fuel-core/src/cli/snapshot.rs @@ -10,6 +10,10 @@ use fuel_core::{ types::fuel_types::ContractId, }; use fuel_core_chain_config::ChainConfig; +use rlimit::{ + getrlimit, + Resource, +}; use std::path::{ Path, PathBuf, @@ -29,6 +33,17 @@ pub struct Command { )] pub database_path: PathBuf, + /// Defines a specific number of file descriptors that RocksDB can use. + /// + /// If defined as -1 no limit will be applied and will use the OS limits. + /// If not defined the system default divided by two is used. + #[clap( + long = "rocksdb-max-fds", + env, + default_value = get_default_max_fds().to_string() + )] + pub rocksdb_max_fds: i32, + /// Where to save the snapshot #[arg(name = "OUTPUT_DIR", long = "output-directory")] pub output_dir: PathBuf, @@ -112,6 +127,12 @@ pub enum SubCommands { }, } +fn get_default_max_fds() -> i32 { + getrlimit(Resource::NOFILE) + .map(|(_, hard)| i32::try_from(hard.saturating_div(2)).unwrap_or(i32::MAX)) + .expect("Our supported platforms should return max FD.") +} + #[cfg(feature = "rocksdb")] pub async fn exec(command: Command) -> anyhow::Result<()> { use fuel_core::service::genesis::Exporter; @@ -125,6 +146,7 @@ pub async fn exec(command: Command) -> anyhow::Result<()> { let db = open_db( &command.database_path, Some(command.max_database_cache_size), + command.rocksdb_max_fds, )?; let output_dir = command.output_dir; let shutdown_listener = ShutdownListener::spawn(); @@ -180,11 +202,16 @@ fn load_chain_config_or_use_testnet(path: Option<&Path>) -> anyhow::Result) -> anyhow::Result { +fn open_db( + path: &Path, + capacity: Option, + max_fds: i32, +) -> anyhow::Result { CombinedDatabase::open( path, capacity.unwrap_or(1024 * 1024 * 1024), StateRewindPolicy::NoRewind, + max_fds, ) .map_err(Into::::into) .context(format!("failed to open combined database at path {path:?}",)) @@ -668,7 +695,8 @@ mod tests { let db_path = temp_dir.path().join("db"); std::fs::create_dir(&db_path)?; - let mut db = DbPopulator::new(open_db(&db_path, None)?, StdRng::seed_from_u64(2)); + let mut db = + DbPopulator::new(open_db(&db_path, None, 512)?, StdRng::seed_from_u64(2)); let state = db.given_persisted_data(); db.flush(); @@ -681,6 +709,7 @@ mod tests { chain_config: None, encoding_command: Some(EncodingCommand::Encoding { encoding }), }, + rocksdb_max_fds: 512, }); // Because the test_case macro doesn't work with async tests @@ -720,7 +749,8 @@ mod tests { let snapshot_dir = temp_dir.path().join("snapshot"); let db_path = temp_dir.path().join("db"); - let mut db = DbPopulator::new(open_db(&db_path, None)?, StdRng::seed_from_u64(2)); + let mut db = + DbPopulator::new(open_db(&db_path, None, 512)?, StdRng::seed_from_u64(2)); let state = db.given_persisted_data(); db.flush(); @@ -739,6 +769,7 @@ mod tests { }, }), }, + rocksdb_max_fds: 512, }); tokio::runtime::Runtime::new() @@ -763,7 +794,8 @@ mod tests { let snapshot_dir = temp_dir.path().join("snapshot"); let db_path = temp_dir.path().join("db"); - let mut db = DbPopulator::new(open_db(&db_path, None)?, StdRng::seed_from_u64(2)); + let mut db = + DbPopulator::new(open_db(&db_path, None, 512)?, StdRng::seed_from_u64(2)); let original_state = db.given_persisted_data().sorted().into_state_config(); @@ -789,6 +821,7 @@ mod tests { output_dir: snapshot_dir.clone(), max_database_cache_size: DEFAULT_DATABASE_CACHE_SIZE, subcommand: SubCommands::Contract { contract_id }, + rocksdb_max_fds: 512, }) .await?; diff --git a/crates/fuel-core/src/combined_database.rs b/crates/fuel-core/src/combined_database.rs index f69649d2605..8696a8b3969 100644 --- a/crates/fuel-core/src/combined_database.rs +++ b/crates/fuel-core/src/combined_database.rs @@ -39,6 +39,8 @@ pub struct CombinedDatabaseConfig { pub max_database_cache_size: usize, #[cfg(feature = "rocksdb")] pub state_rewind_policy: StateRewindPolicy, + #[cfg(feature = "rocksdb")] + pub max_fds: i32, } /// A database that combines the on-chain, off-chain and relayer databases into one entity. @@ -79,13 +81,22 @@ impl CombinedDatabase { path: &std::path::Path, capacity: usize, state_rewind_policy: StateRewindPolicy, + max_fds: i32, ) -> crate::database::Result { + // Split the fds in equitable manner between the databases + let max_fds = match max_fds { + -1 => -1, + _ => max_fds.saturating_div(4), + }; // TODO: Use different cache sizes for different databases - let on_chain = Database::open_rocksdb(path, capacity, state_rewind_policy)?; - let off_chain = Database::open_rocksdb(path, capacity, state_rewind_policy)?; + let on_chain = + Database::open_rocksdb(path, capacity, state_rewind_policy, max_fds)?; + let off_chain = + Database::open_rocksdb(path, capacity, state_rewind_policy, max_fds)?; let relayer = - Database::open_rocksdb(path, capacity, StateRewindPolicy::NoRewind)?; - let gas_price = Database::open_rocksdb(path, capacity, state_rewind_policy)?; + Database::open_rocksdb(path, capacity, StateRewindPolicy::NoRewind, max_fds)?; + let gas_price = + Database::open_rocksdb(path, capacity, state_rewind_policy, max_fds)?; Ok(Self { on_chain, off_chain, @@ -115,6 +126,7 @@ impl CombinedDatabase { &config.database_path, config.max_database_cache_size, config.state_rewind_policy, + config.max_fds, )? } } diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index d1d85dfe17e..5061692ed92 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -199,12 +199,14 @@ where path: &Path, capacity: impl Into>, state_rewind_policy: StateRewindPolicy, + max_fds: i32, ) -> Result { use anyhow::Context; let db = HistoricalRocksDB::::default_open( path, capacity.into(), state_rewind_policy, + max_fds, ) .map_err(Into::::into) .with_context(|| { @@ -1076,6 +1078,7 @@ mod tests { temp_dir.path(), 1024 * 1024 * 1024, Default::default(), + 512, ) .unwrap(); // rocks db fails diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 2dd6602a979..0e093c2b8b6 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -126,6 +126,8 @@ impl Config { #[cfg(feature = "rocksdb")] state_rewind_policy: crate::state::historical_rocksdb::StateRewindPolicy::RewindFullRange, + #[cfg(feature = "rocksdb")] + max_fds: 512, }; let starting_gas_price = 0; let gas_price_change_percent = 0; diff --git a/crates/fuel-core/src/state/historical_rocksdb.rs b/crates/fuel-core/src/state/historical_rocksdb.rs index a95368ec12f..52887fdaf2e 100644 --- a/crates/fuel-core/src/state/historical_rocksdb.rs +++ b/crates/fuel-core/src/state/historical_rocksdb.rs @@ -101,8 +101,10 @@ where path: P, capacity: Option, state_rewind_policy: StateRewindPolicy, + max_fds: i32, ) -> DatabaseResult { - let db = RocksDb::>::default_open(path, capacity)?; + let db = + RocksDb::>::default_open(path, capacity, max_fds)?; Ok(Self { state_rewind_policy, db, diff --git a/crates/fuel-core/src/state/rocks_db.rs b/crates/fuel-core/src/state/rocks_db.rs index c54ee3765c1..c751ebe7c55 100644 --- a/crates/fuel-core/src/state/rocks_db.rs +++ b/crates/fuel-core/src/state/rocks_db.rs @@ -127,6 +127,7 @@ where path, enum_iterator::all::().collect::>(), capacity, + 512, ); let mut db = result?; @@ -146,11 +147,13 @@ where pub fn default_open>( path: P, capacity: Option, + max_fds: i32, ) -> DatabaseResult { Self::open( path, enum_iterator::all::().collect::>(), capacity, + max_fds, ) } @@ -165,8 +168,9 @@ where path: P, columns: Vec, capacity: Option, + max_fds: i32, ) -> DatabaseResult { - Self::open_with(DB::open_cf_descriptors, path, columns, capacity) + Self::open_with(DB::open_cf_descriptors, path, columns, capacity, max_fds) } pub fn open_read_only>( @@ -174,6 +178,7 @@ where columns: Vec, capacity: Option, error_if_log_file_exist: bool, + max_fds: i32, ) -> DatabaseResult { Self::open_with( |options, primary_path, cfs| { @@ -187,6 +192,7 @@ where path, columns, capacity, + max_fds, ) } @@ -195,6 +201,7 @@ where secondary_path: SecondaryPath, columns: Vec, capacity: Option, + max_fds: i32, ) -> DatabaseResult where PrimaryPath: AsRef, @@ -212,6 +219,7 @@ where path, columns, capacity, + max_fds, ) } @@ -220,6 +228,7 @@ where path: P, columns: Vec, capacity: Option, + max_fds: i32, ) -> DatabaseResult where F: Fn( @@ -279,9 +288,7 @@ where } opts.set_max_background_jobs(6); opts.set_bytes_per_sync(1048576); - - #[cfg(feature = "test-helpers")] - opts.set_max_open_files(512); + opts.set_max_open_files(max_fds); let existing_column_families = DB::list_cf(&opts, &path).unwrap_or_default(); @@ -880,7 +887,7 @@ mod tests { fn create_db() -> (RocksDb, TempDir) { let tmp_dir = TempDir::new().unwrap(); ( - RocksDb::default_open(tmp_dir.path(), None).unwrap(), + RocksDb::default_open(tmp_dir.path(), None, 512).unwrap(), tmp_dir, ) } @@ -893,7 +900,7 @@ mod tests { let old_columns = vec![Column::Coins, Column::Messages, Column::UploadedBytecodes]; let database_with_old_columns = - RocksDb::::open(tmp_dir.path(), old_columns.clone(), None) + RocksDb::::open(tmp_dir.path(), old_columns.clone(), None, 512) .expect("Failed to open database with old columns"); drop(database_with_old_columns); @@ -902,7 +909,7 @@ mod tests { new_columns.push(Column::ContractsAssets); new_columns.push(Column::Metadata); let database_with_new_columns = - RocksDb::::open(tmp_dir.path(), new_columns, None).map(|_| ()); + RocksDb::::open(tmp_dir.path(), new_columns, None, 512).map(|_| ()); // Then assert_eq!(Ok(()), database_with_new_columns); @@ -1071,7 +1078,7 @@ mod tests { // When let columns = enum_iterator::all::<::Column>() .collect::>(); - let result = RocksDb::::open(tmp_dir.path(), columns, None); + let result = RocksDb::::open(tmp_dir.path(), columns, None, 512); // Then assert!(result.is_err()); @@ -1090,6 +1097,7 @@ mod tests { old_columns.clone(), None, false, + 512, ) .map(|_| ()); @@ -1111,6 +1119,7 @@ mod tests { secondary_temp.path(), old_columns.clone(), None, + 512, ) .map(|_| ()); @@ -1203,7 +1212,7 @@ mod tests { .skip(1) .collect::>(); let open_with_part_of_columns = - RocksDb::::open(tmp_dir.path(), part_of_columns, None); + RocksDb::::open(tmp_dir.path(), part_of_columns, None, 512); // Then let _ = open_with_part_of_columns diff --git a/tests/tests/health.rs b/tests/tests/health.rs index 5cdcf5b8a39..d0b241fcb5a 100644 --- a/tests/tests/health.rs +++ b/tests/tests/health.rs @@ -29,7 +29,8 @@ async fn can_restart_node() { // start node once { let database = - Database::open_rocksdb(tmp_dir.path(), None, Default::default()).unwrap(); + Database::open_rocksdb(tmp_dir.path(), None, Default::default(), 512) + .unwrap(); let first_startup = FuelService::from_database(database, Config::local_node()) .await .unwrap(); @@ -41,7 +42,8 @@ async fn can_restart_node() { { let database = - Database::open_rocksdb(tmp_dir.path(), None, Default::default()).unwrap(); + Database::open_rocksdb(tmp_dir.path(), None, Default::default(), 512) + .unwrap(); let _second_startup = FuelService::from_database(database, Config::local_node()) .await .unwrap(); @@ -56,7 +58,8 @@ async fn can_restart_node_with_transactions() { { // Given let database = - CombinedDatabase::open(tmp_dir.path(), capacity, Default::default()).unwrap(); + CombinedDatabase::open(tmp_dir.path(), capacity, Default::default(), 512) + .unwrap(); let service = FuelService::from_combined_database(database, Config::local_node()) .await .unwrap(); @@ -74,7 +77,8 @@ async fn can_restart_node_with_transactions() { { // When let database = - CombinedDatabase::open(tmp_dir.path(), capacity, Default::default()).unwrap(); + CombinedDatabase::open(tmp_dir.path(), capacity, Default::default(), 512) + .unwrap(); let service = FuelService::from_combined_database(database, Config::local_node()) .await .unwrap(); diff --git a/tests/tests/poa.rs b/tests/tests/poa.rs index 792064fbade..0f4f5a4fb97 100644 --- a/tests/tests/poa.rs +++ b/tests/tests/poa.rs @@ -137,8 +137,8 @@ async fn can_get_sealed_block_from_poa_produced_block_when_signing_with_kms() { // stop the node and just grab the database let db_path = driver.kill().await; - let db = - CombinedDatabase::open(db_path.path(), 1024 * 1024, Default::default()).unwrap(); + let db = CombinedDatabase::open(db_path.path(), 1024 * 1024, Default::default(), 512) + .unwrap(); let view = db.on_chain().latest_view().unwrap(); diff --git a/tests/tests/relayer.rs b/tests/tests/relayer.rs index 28afbf03d89..9ea80b1e242 100644 --- a/tests/tests/relayer.rs +++ b/tests/tests/relayer.rs @@ -331,7 +331,8 @@ async fn can_restart_node_with_relayer_data() { { // Given let database = - CombinedDatabase::open(tmp_dir.path(), capacity, Default::default()).unwrap(); + CombinedDatabase::open(tmp_dir.path(), capacity, Default::default(), 512) + .unwrap(); let service = FuelService::from_combined_database(database, config.clone()) .await @@ -350,7 +351,8 @@ async fn can_restart_node_with_relayer_data() { { // When let database = - CombinedDatabase::open(tmp_dir.path(), capacity, Default::default()).unwrap(); + CombinedDatabase::open(tmp_dir.path(), capacity, Default::default(), 512) + .unwrap(); let service = FuelService::from_combined_database(database, config) .await .unwrap(); From 57b1d170dabd06b897e9e6d833465ef65318e116 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= <88321181+rafal-ch@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:24:27 +0100 Subject: [PATCH 8/8] Add codeowners for gas price algorithm crate (#2404) ## Description This PRs adds a code owners for the gas price algorithm crate (@MitchTurner @rafal-ch). --- .github/CODEOWNERS | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 87fda006074..93428eaf29d 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,2 +1,5 @@ # Default code owners for fuel-core * @xgreenx @Dentosal @MitchTurner + +# Code owners for the gas price algorithm +crates/fuel-gas-price-algorithm @MitchTurner @rafal-ch \ No newline at end of file