Skip to content

Commit

Permalink
Merge branch 'main' into 3045/add-submission-address-autopilot
Browse files Browse the repository at this point in the history
  • Loading branch information
m-lord-renkse authored Oct 25, 2024
2 parents 02c199b + 516a4e0 commit 09f5b75
Show file tree
Hide file tree
Showing 40 changed files with 530 additions and 176 deletions.
11 changes: 11 additions & 0 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ pub struct Arguments {
/// to settle their winning orders at the same time.
pub max_winners_per_auction: usize,

#[clap(long, env, default_value = "3")]
/// The maximum allowed number of solutions to be proposed from a single
/// solver, per auction.
pub max_solutions_per_solver: usize,

/// Archive node URL used to index CoW AMM
#[clap(long, env)]
pub archive_node_url: Option<Url>,
Expand Down Expand Up @@ -286,6 +291,7 @@ impl std::fmt::Display for Arguments {
run_loop_native_price_timeout,
max_winners_per_auction,
archive_node_url,
max_solutions_per_solver,
} = self;

write!(f, "{}", shared)?;
Expand Down Expand Up @@ -365,6 +371,11 @@ impl std::fmt::Display for Arguments {
)?;
writeln!(f, "max_winners_per_auction: {:?}", max_winners_per_auction)?;
writeln!(f, "archive_node_url: {:?}", archive_node_url)?;
writeln!(
f,
"max_solutions_per_solver: {:?}",
max_solutions_per_solver
)?;
Ok(())
}
}
Expand Down
25 changes: 25 additions & 0 deletions crates/autopilot/src/boundary/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,26 @@
use {
anyhow::{Context, Result},
sqlx::PgPool,
};

pub mod settlement;

pub async fn write_last_block_to_db(db: &PgPool, last_block: u64, contract: &str) -> Result<()> {
let mut ex = db.acquire().await?;
database::last_indexed_blocks::update(
&mut ex,
contract,
i64::try_from(last_block).context("new value of counter is not i64")?,
)
.await?;
Ok(())
}

pub async fn read_last_block_from_db(db: &PgPool, contract: &str) -> Result<u64> {
let mut ex = db.acquire().await?;
database::last_indexed_blocks::fetch(&mut ex, contract)
.await?
.unwrap_or_default()
.try_into()
.context("last block is not u64")
}
10 changes: 8 additions & 2 deletions crates/autopilot/src/boundary/events/settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ impl Indexer {
}
}

/// This name is used to store the latest indexed block in the db.
const INDEX_NAME: &str = "settlements";

#[async_trait::async_trait]
impl EventStoring<contracts::gpv2_settlement::Event> for Indexer {
async fn last_event_block(&self) -> Result<u64> {
let mut con = self.db.pool.acquire().await?;
crate::database::events::last_event_block(&mut con).await
super::read_last_block_from_db(&self.db.pool, INDEX_NAME).await
}

async fn persist_last_indexed_block(&mut self, latest_block: u64) -> Result<()> {
super::write_last_block_to_db(&self.db.pool, latest_block, INDEX_NAME).await
}

async fn replace_events(
Expand Down
11 changes: 8 additions & 3 deletions crates/autopilot/src/database/ethflow_events/event_storing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ fn get_refunds(events: Vec<ethcontract::Event<EthFlowEvent>>) -> Result<Vec<Refu

type EthFlowEvent = contracts::cowswap_eth_flow::Event;

/// This name is used to store the latest indexed block in the db.
const INDEX_NAME: &str = "ethflow_refunds";

#[async_trait::async_trait]
impl EventStoring<EthFlowEvent> for Postgres {
async fn last_event_block(&self) -> Result<u64> {
let mut ex = self.pool.acquire().await?;
let block = database::ethflow_orders::last_indexed_block(&mut ex).await?;
Ok(block.unwrap_or_default() as u64)
crate::boundary::events::read_last_block_from_db(&self.pool, INDEX_NAME).await
}

async fn persist_last_indexed_block(&mut self, last_block: u64) -> Result<()> {
crate::boundary::events::write_last_block_to_db(&self.pool, last_block, INDEX_NAME).await
}

async fn append_events(&mut self, events: Vec<ethcontract::Event<EthFlowEvent>>) -> Result<()> {
Expand Down
13 changes: 0 additions & 13 deletions crates/autopilot/src/database/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use {
},
ethcontract::{Event as EthContractEvent, EventMetadata},
number::conversions::u256_to_big_decimal,
sqlx::PgConnection,
std::convert::TryInto,
};

Expand All @@ -43,18 +42,6 @@ pub fn contract_to_db_events(
.collect::<Result<Vec<_>>>()
}

pub async fn last_event_block(connection: &mut PgConnection) -> Result<u64> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["last_event_block"])
.start_timer();

let block_number = database::events::last_block(connection)
.await
.context("block_number_of_most_recent_event failed")?;
block_number.try_into().context("block number is negative")
}

pub async fn append_events(
transaction: &mut PgTransaction<'_>,
events: Vec<EthContractEvent<ContractEvent>>,
Expand Down
33 changes: 20 additions & 13 deletions crates/autopilot/src/database/onchain_order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,30 @@ where
) -> EventRow;
}

/// This name is used to store the latest indexed block in the db.
const INDEX_NAME: &str = "onchain_orders";

#[async_trait::async_trait]
impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>
for OnchainOrderParser<T, W>
{
async fn last_event_block(&self) -> Result<u64> {
let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["read_last_block_onchain_orders"])
.start_timer();
crate::boundary::events::read_last_block_from_db(&self.db.pool, INDEX_NAME).await
}

async fn persist_last_indexed_block(&mut self, latest_block: u64) -> Result<()> {
let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["update_last_block_onchain_orders"])
.start_timer();
crate::boundary::events::write_last_block_to_db(&self.db.pool, latest_block, INDEX_NAME)
.await
}

async fn replace_events(
&mut self,
events: Vec<EthContractEvent<ContractEvent>>,
Expand Down Expand Up @@ -183,19 +203,6 @@ impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>

Ok(())
}

async fn last_event_block(&self) -> Result<u64> {
let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["last_event_block"])
.start_timer();

let mut con = self.db.pool.acquire().await?;
let block_number = database::onchain_broadcasted_orders::last_block(&mut con)
.await
.context("block_number_of_most_recent_event failed")?;
block_number.try_into().context("block number is negative")
}
}

impl<T: Send + Sync + Clone, W: Send + Sync> OnchainOrderParser<T, W> {
Expand Down
30 changes: 30 additions & 0 deletions crates/autopilot/src/domain/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ use {
derive_more::{Display, From, Into},
};

/// ERC20 token address for ETH. In reality, ETH is not an ERC20 token because
/// it does not implement the ERC20 interface, but this address is used by
/// convention across the Ethereum ecosystem whenever ETH is treated like an
/// ERC20 token.
/// Same address is also used for XDAI on Gnosis Chain.
pub const NATIVE_TOKEN: TokenAddress = TokenAddress(H160([0xee; 20]));

/// An address. Can be an EOA or a smart contract address.
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, From, Into, Display,
Expand Down Expand Up @@ -33,6 +40,29 @@ pub struct TxId(pub H256);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, From, Into)]
pub struct TokenAddress(pub H160);

impl TokenAddress {
/// If the token is ETH/XDAI, return WETH/WXDAI, thereby converting it to
/// erc20.
pub fn as_erc20(self, wrapped: WrappedNativeToken) -> Self {
if self == NATIVE_TOKEN {
wrapped.into()
} else {
self
}
}
}

/// ERC20 representation of the chain's native token (e.g. WETH on mainnet,
/// WXDAI on Gnosis Chain).
#[derive(Debug, Clone, Copy, From, Into)]
pub struct WrappedNativeToken(TokenAddress);

impl From<H160> for WrappedNativeToken {
fn from(value: H160) -> Self {
WrappedNativeToken(value.into())
}
}

/// An ERC20 token amount.
///
/// https://eips.ethereum.org/EIPS/eip-20
Expand Down
6 changes: 6 additions & 0 deletions crates/autopilot/src/infra/blockchain/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ impl Contracts {
&self.weth
}

/// Wrapped version of the native token (e.g. WETH for Ethereum, WXDAI for
/// Gnosis Chain)
pub fn wrapped_native_token(&self) -> domain::eth::WrappedNativeToken {
self.weth.address().into()
}

pub fn authenticator(&self) -> &contracts::GPv2AllowListAuthentication {
&self.authenticator
}
Expand Down
43 changes: 33 additions & 10 deletions crates/autopilot/src/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use {
event_updater::EventUpdater,
},
anyhow::Result,
ethrpc::block_stream::BlockInfo,
ethrpc::block_stream::{into_stream, BlockInfo, CurrentBlockWatcher},
futures::StreamExt,
prometheus::{
core::{AtomicU64, GenericGauge},
HistogramVec,
Expand Down Expand Up @@ -89,15 +90,6 @@ impl Maintenance {
),
Self::timed_future("db_cleanup", self.db_cleanup.run_maintenance()),
Self::timed_future("ethflow_indexer", self.index_ethflow_orders()),
Self::timed_future(
"cow_amm_indexer",
futures::future::try_join_all(
self.cow_amm_indexer
.iter()
.cloned()
.map(|indexer| async move { indexer.run_maintenance().await }),
)
),
)?;

Ok(())
Expand Down Expand Up @@ -128,6 +120,37 @@ impl Maintenance {
.start_timer();
fut.await
}

/// Spawns a background task that runs on every new block but also
/// at least after every `update_interval`.
pub fn spawn_cow_amm_indexing_task(self_: Arc<Self>, current_block: CurrentBlockWatcher) {
tokio::task::spawn(async move {
let mut stream = into_stream(current_block);
loop {
let _ = match stream.next().await {
Some(block) => {
metrics().last_seen_block.set(block.number);
block
}
None => panic!("block stream terminated unexpectedly"),
};

// TODO: move this back into `Self::update_inner()` once we
// store cow amms in the DB to avoid incredibly slow restarts.
let _ = Self::timed_future(
"cow_amm_indexer",
futures::future::try_join_all(
self_
.cow_amm_indexer
.iter()
.cloned()
.map(|indexer| async move { indexer.run_maintenance().await }),
),
)
.await;
}
});
}
}

type EthflowIndexer =
Expand Down
1 change: 1 addition & 0 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ pub async fn run(args: Arguments) {
solve_deadline: args.solve_deadline,
max_run_loop_delay: args.max_run_loop_delay,
max_winners_per_auction: args.max_winners_per_auction,
max_solutions_per_solver: args.max_solutions_per_solver,
};
let drivers_futures = args
.drivers
Expand Down
40 changes: 26 additions & 14 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct Config {
/// by waiting for the next block to appear.
pub max_run_loop_delay: Duration,
pub max_winners_per_auction: usize,
pub max_solutions_per_solver: usize,
}

pub struct RunLoop {
Expand Down Expand Up @@ -82,13 +83,6 @@ impl RunLoop {
liveness: Arc<Liveness>,
maintenance: Arc<Maintenance>,
) -> Self {
// Added to make sure no more than one winner is activated by accident
// Should be removed once we decide to activate "multiple winners per auction"
// feature.
assert_eq!(
config.max_winners_per_auction, 1,
"only one winner is supported"
);
Self {
config,
eth,
Expand All @@ -104,6 +98,10 @@ impl RunLoop {
}

pub async fn run_forever(self) -> ! {
Maintenance::spawn_cow_amm_indexing_task(
self.maintenance.clone(),
self.eth.current_block().clone(),
);
let mut last_auction = None;
let mut last_block = None;
let self_arc = Arc::new(self);
Expand Down Expand Up @@ -571,6 +569,16 @@ impl RunLoop {
})
.collect::<Vec<_>>();

// Limit the number of accepted solutions per solver. Do not alter the ordering
// of solutions
let mut counter = HashMap::new();
solutions.retain(|participant| {
let driver = participant.driver().name.clone();
let count = counter.entry(driver).or_insert(0);
*count += 1;
*count <= self.config.max_solutions_per_solver
});

// Filter out solutions that are not fair
let solutions = solutions
.iter()
Expand All @@ -589,8 +597,9 @@ impl RunLoop {

// Winners are selected one by one, starting from the best solution,
// until `max_winners_per_auction` are selected. The solution is a winner
// if it swaps tokens that are not yet swapped by any other already
// selected winner.
// if it swaps tokens that are not yet swapped by any previously processed
// solution.
let wrapped_native_token = self.eth.contracts().wrapped_native_token();
let mut already_swapped_tokens = HashSet::new();
let mut winners = 0;
let solutions = solutions
Expand All @@ -600,16 +609,19 @@ impl RunLoop {
.solution()
.orders()
.iter()
.flat_map(|(_, order)| [order.sell.token, order.buy.token])
.flat_map(|(_, order)| {
[
order.sell.token.as_erc20(wrapped_native_token),
order.buy.token.as_erc20(wrapped_native_token),
]
})
.collect::<HashSet<_>>();

let is_winner = swapped_tokens.is_disjoint(&already_swapped_tokens)
&& winners < self.config.max_winners_per_auction;

if is_winner {
already_swapped_tokens.extend(swapped_tokens);
winners += 1;
}
already_swapped_tokens.extend(swapped_tokens);
winners += usize::from(is_winner);

participant.rank(is_winner)
})
Expand Down
Loading

0 comments on commit 09f5b75

Please sign in to comment.