Skip to content

Commit

Permalink
Init autopilot infra (#2206)
Browse files Browse the repository at this point in the history
# Description
Autopilot `RunLoop` is slowly becoming a new `Driver` and `Solver` class
with a million arguments. Let's try to classify things into `infra` for
the start. Initially, I recognized `blockchain`, `config` and `driver`
as something we already have.
https://github.com/cowprotocol/services/pull/2200/files introduces s3
which should also go to `infra`.

- [x] Added `blockchain::Ethereum` for everything blockchain related
  • Loading branch information
sunce86 authored Dec 27, 2023
1 parent d8ab692 commit f362b37
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 15 deletions.
16 changes: 16 additions & 0 deletions crates/autopilot/src/boundary/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pub use shared::ethrpc::Web3;
use url::Url;
/// Builds a web3 client that bufferes requests and sends them in a
/// batch call.
pub fn buffered_web3_client(ethrpc: &Url) -> Web3 {
let ethrpc_args = shared::ethrpc::Arguments {
ethrpc_max_batch_size: 20,
ethrpc_max_concurrent_requests: 10,
ethrpc_batch_delay: Default::default(),
};
let http_factory =
shared::http_client::HttpClientFactory::new(&shared::http_client::Arguments {
http_timeout: std::time::Duration::from_secs(10),
});
shared::ethrpc::web3(&ethrpc_args, &http_factory, ethrpc, "base")
}
126 changes: 126 additions & 0 deletions crates/autopilot/src/infra/blockchain/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use {
crate::boundary,
ethcontract::dyns::DynWeb3,
ethrpc::current_block::CurrentBlockStream,
primitive_types::{H256, U256},
std::sync::Arc,
thiserror::Error,
web3::types::TransactionReceipt,
};

/// Chain ID as defined by EIP-155.
///
/// https://eips.ethereum.org/EIPS/eip-155
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct ChainId(pub U256);

impl From<U256> for ChainId {
fn from(value: U256) -> Self {
Self(value)
}
}

#[derive(Debug, Clone)]
pub struct NetworkId(pub String);

impl From<String> for NetworkId {
fn from(value: String) -> Self {
Self(value)
}
}

/// An Ethereum RPC connection.
pub struct Rpc {
web3: DynWeb3,
network: Network,
}

/// Network information for an Ethereum blockchain connection.
#[derive(Clone, Debug)]
pub struct Network {
pub id: NetworkId,
pub chain: ChainId,
}

impl Rpc {
/// Instantiate an RPC client to an Ethereum (or Ethereum-compatible) node
/// at the specifed URL.
pub async fn new(url: &url::Url) -> Result<Self, Error> {
let web3 = boundary::buffered_web3_client(url);
let id = web3.net().version().await?.into();
let chain = web3.eth().chain_id().await?.into();

Ok(Self {
web3,
network: Network { id, chain },
})
}

/// Returns the network information for the RPC connection.
pub fn network(&self) -> &Network {
&self.network
}

/// Returns a reference to the underlying web3 client.
pub fn web3(&self) -> &DynWeb3 {
&self.web3
}
}

/// The Ethereum blockchain.
#[derive(Clone)]
pub struct Ethereum {
web3: DynWeb3,
network: Network,
current_block: CurrentBlockStream,
}

impl Ethereum {
/// Access the Ethereum blockchain through an RPC API.
///
/// # Panics
///
/// Since this type is essential for the program this method will panic on
/// any initialization error.
pub async fn new(rpc: Rpc) -> Self {
let Rpc { web3, network } = rpc;

Self {
current_block: ethrpc::current_block::current_block_stream(
Arc::new(web3.clone()),
std::time::Duration::from_millis(500),
)
.await
.expect("couldn't initialize current block stream"),
web3,
network,
}
}

pub fn network(&self) -> &Network {
&self.network
}

/// Returns a stream that monitors the block chain to inform about the
/// current and new blocks.
pub fn current_block(&self) -> &CurrentBlockStream {
&self.current_block
}

pub async fn transaction_receipt(
&self,
hash: H256,
) -> Result<Option<TransactionReceipt>, Error> {
self.web3
.eth()
.transaction_receipt(hash)
.await
.map_err(Into::into)
}
}

#[derive(Debug, Error)]
pub enum Error {
#[error("web3 error: {0:?}")]
Web3(#[from] web3::error::Error),
}
1 change: 1 addition & 0 deletions crates/autopilot/src/infra/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod blockchain;
2 changes: 2 additions & 0 deletions crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
pub mod arguments;
pub mod boundary;
pub mod database;
pub mod decoded_settlement;
pub mod driver_api;
pub mod driver_model;
pub mod event_updater;
pub mod infra;
pub mod on_settlement_event_updater;
pub mod periodic_db_cleanup;
pub mod protocol;
Expand Down
24 changes: 16 additions & 8 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use {
},
driver_api::Driver,
event_updater::{EventUpdater, GPv2SettlementContract},
infra::blockchain,
protocol,
run_loop::RunLoop,
shadow,
Expand Down Expand Up @@ -60,6 +61,7 @@ use {
},
std::{collections::HashSet, sync::Arc, time::Duration},
tracing::Instrument,
url::Url,
};

struct Liveness {
Expand All @@ -75,6 +77,16 @@ impl LivenessChecking for Liveness {
}
}

async fn ethrpc(url: &Url) -> blockchain::Rpc {
blockchain::Rpc::new(url)
.await
.expect("connect ethereum RPC")
}

async fn ethereum(ethrpc: blockchain::Rpc) -> blockchain::Ethereum {
blockchain::Ethereum::new(ethrpc).await
}

pub async fn start(args: impl Iterator<Item = String>) {
let args = Arguments::parse_from(args);
observe::tracing::initialize(
Expand Down Expand Up @@ -177,11 +189,6 @@ pub async fn run(args: Arguments) {
.await
.expect("Failed to retrieve network version ID");
let network_name = shared::network::network_name(&network, chain_id);
let network_time_between_blocks = args
.shared
.network_block_interval
.or_else(|| shared::network::block_interval(&network, chain_id))
.expect("unknown network block interval");

let signature_validator = signature_validator::validator(
&web3,
Expand Down Expand Up @@ -620,13 +627,14 @@ pub async fn run(args: Arguments) {
// updated in background task
let market_makable_token_list =
AutoUpdatingTokenList::from_configuration(market_makable_token_list_configuration).await;

let ethrpc = ethrpc(&args.shared.node_url).await;
let eth = ethereum(ethrpc).await;
let run = RunLoop {
eth,
solvable_orders_cache,
database: Arc::new(db),
drivers: args.drivers.into_iter().map(Driver::new).collect(),
current_block: current_block_stream,
web3,
network_block_interval: network_time_between_blocks,
market_makable_token_list,
submission_deadline: args.submission_deadline as u64,
additional_deadline_for_rewards: args.additional_deadline_for_rewards as u64,
Expand Down
12 changes: 5 additions & 7 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use {
settle,
solve::{self, Class, TradedAmounts},
},
infra::blockchain::Ethereum,
protocol::fee,
solvable_orders::SolvableOrdersCache,
},
anyhow::Result,
chrono::Utc,
database::order_events::OrderEventLabel,
ethrpc::{current_block::CurrentBlockStream, Web3},
itertools::Itertools,
model::{
auction::{Auction, AuctionId, AuctionWithId},
Expand Down Expand Up @@ -45,12 +45,10 @@ use {
};

pub struct RunLoop {
pub eth: Ethereum,
pub solvable_orders_cache: Arc<SolvableOrdersCache>,
pub database: Arc<Postgres>,
pub drivers: Vec<Driver>,
pub current_block: CurrentBlockStream,
pub web3: Web3,
pub network_block_interval: Duration,
pub market_makable_token_list: AutoUpdatingTokenList,
pub submission_deadline: u64,
pub additional_deadline_for_rewards: u64,
Expand All @@ -67,7 +65,7 @@ impl RunLoop {
let mut last_block = None;
loop {
if let Some(AuctionWithId { id, auction }) = self.next_auction().await {
let current_block = self.current_block.borrow().hash;
let current_block = self.eth.current_block().borrow().hash;
// Only run the solvers if the auction or block has changed.
if last_auction_id.replace(id) != Some(id)
|| last_block.replace(current_block) != Some(current_block)
Expand Down Expand Up @@ -134,7 +132,7 @@ impl RunLoop {
solutions.sort_unstable_by_key(|participant| participant.solution.score);
solutions
};
let competition_simulation_block = self.current_block.borrow().number;
let competition_simulation_block = self.eth.current_block().borrow().number;

// TODO: Keep going with other solutions until some deadline.
if let Some(Participant { driver, solution }) = solutions.last() {
Expand Down Expand Up @@ -470,7 +468,7 @@ impl RunLoop {
/// to fill an order a second time.
async fn remove_in_flight_orders(&self, mut auction: Auction) -> Auction {
let prev_settlement = self.in_flight_orders.lock().unwrap().tx_hash;
let tx_receipt = self.web3.eth().transaction_receipt(prev_settlement).await;
let tx_receipt = self.eth.transaction_receipt(prev_settlement).await;

let prev_settlement_block = match tx_receipt {
Ok(Some(TransactionReceipt {
Expand Down

0 comments on commit f362b37

Please sign in to comment.