Skip to content

Commit

Permalink
Merge branch 'main' into propagate-protocol-fees-in-scoring
Browse files Browse the repository at this point in the history
  • Loading branch information
m-lord-renkse authored Mar 27, 2024
2 parents e40c923 + aac88df commit 9bfd4fa
Show file tree
Hide file tree
Showing 17 changed files with 370 additions and 261 deletions.
7 changes: 0 additions & 7 deletions crates/driver/src/boundary/settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,6 @@ impl Settlement {
Ok(eth::U256::from_big_rational(&quality)?.into())
}

pub fn merge(self, other: Self) -> Result<Self> {
self.inner.merge(other.inner).map(|inner| Self {
inner,
solver: self.solver,
})
}

pub fn clearing_prices(&self) -> HashMap<eth::TokenAddress, eth::TokenAmount> {
self.inner
.clearing_prices()
Expand Down
94 changes: 52 additions & 42 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
},
util::Bytes,
},
futures::{stream::FuturesUnordered, Stream, StreamExt},
futures::{stream::FuturesUnordered, StreamExt},
itertools::Itertools,
std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -83,7 +83,7 @@ impl Competition {
// Discard solutions that don't have unique ID.
let mut ids = HashSet::new();
let solutions = solutions.into_iter().filter(|solution| {
if !ids.insert(solution.id()) {
if !ids.insert(solution.id().clone()) {
observe::duplicated_solution_id(self.solver.name(), solution.id());
notify::duplicated_solution_id(&self.solver, auction.id(), solution.id());
false
Expand All @@ -96,37 +96,46 @@ impl Competition {
let solutions = solutions.filter(|solution| {
if solution.is_empty() {
observe::empty_solution(self.solver.name(), solution.id());
notify::empty_solution(&self.solver, auction.id(), solution.id());
notify::empty_solution(&self.solver, auction.id(), solution.id().clone());
false
} else {
true
}
});

let merged = merge(solutions, auction);

// Encode solutions into settlements (streamed).
let encoded = solutions
let encoded = merged
.into_iter()
.map(|solution| async move {
let id = solution.id();
observe::encoding(id);
let id = solution.id().clone();
observe::encoding(&id);
let settlement = solution.encode(auction, &self.eth, &self.simulator).await;
(id, settlement)
})
.collect::<FuturesUnordered<_>>()
.filter_map(|(id, result)| async move {
result
.tap_err(|err| {
observe::encoding_failed(self.solver.name(), id, err);
notify::encoding_failed(&self.solver, auction.id(), id, err);
observe::encoding_failed(self.solver.name(), &id, err);
notify::encoding_failed(&self.solver, auction.id(), &id, err);
})
.ok()
});

// Merge settlements as they arrive until there are no more new settlements or
// timeout is reached.
let mut settlements = Vec::new();
let future = async {
let mut encoded = std::pin::pin!(encoded);
while let Some(settlement) = encoded.next().await {
settlements.push(settlement);
}
};
if tokio::time::timeout(
auction.deadline().driver().remaining().unwrap_or_default(),
merge_settlements(&mut settlements, encoded, &self.eth, &self.simulator),
future,
)
.await
.is_err()
Expand Down Expand Up @@ -157,7 +166,7 @@ impl Competition {
notify::scoring_failed(
&self.solver,
auction.id(),
settlement.notify_id(),
settlement.solution(),
err,
);
})
Expand Down Expand Up @@ -211,15 +220,13 @@ impl Competition {
observe::winner_voided(block, &err);
*score_ref = None;
*self.settlement.lock().unwrap() = None;
if let Some(id) = settlement.notify_id() {
notify::simulation_failed(
&self.solver,
auction.id(),
id,
&infra::simulator::Error::Revert(err),
true,
);
}
notify::simulation_failed(
&self.solver,
auction.id(),
settlement.solution(),
&infra::simulator::Error::Revert(err),
true,
);
return;
}
}
Expand Down Expand Up @@ -268,7 +275,7 @@ impl Competition {
notify::executed(
&self.solver,
settlement.auction_id,
settlement.notify_id(),
settlement.solution(),
&executed,
);

Expand Down Expand Up @@ -322,34 +329,37 @@ impl Competition {
}
}

/// Tries to merge the incoming stream of new settlements into existing ones.
/// Always adds the new settlement by itself.
async fn merge_settlements(
merged: &mut Vec<Settlement>,
new: impl Stream<Item = Settlement>,
eth: &Ethereum,
simulator: &Simulator,
) {
let eth = eth.with_metric_label("mergeSettlements".into());
let mut new = std::pin::pin!(new);
while let Some(settlement) = new.next().await {
// Try to merge [`settlement`] into some settlements.
for other in merged.iter_mut() {
match other.merge(&settlement, &eth, simulator).await {
Ok(m) => {
*other = m;
observe::merged(&settlement, other);
// could possibly break here if we want to avoid merging
// into multiple settlements
/// Creates a vector with all possible combinations of the given solutions.
/// The result is sorted by the number of merges, so the first elements are the
/// original solutions.
fn merge(solutions: impl Iterator<Item = Solution>, auction: &Auction) -> Vec<Solution> {
let mut merged: Vec<Solution> = Vec::new();
for solution in solutions {
let mut extension = vec![];
for already_merged in merged.iter() {
match solution.merge(already_merged) {
Ok(merged) => {
observe::merged(&solution, already_merged, &merged);
extension.push(merged);
}
Err(err) => {
observe::not_merged(&settlement, other, err);
observe::not_merged(&solution, already_merged, err);
}
}
}
// add [`settlement`] by itself
merged.push(settlement);
// At least insert the current solution
extension.push(solution);
merged.extend(extension);
}

// Sort merged solutions descending by score.
merged.sort_by_key(|solution| {
solution
.scoring(&auction.prices())
.map(|score| score.0)
.unwrap_or_default()
});
merged
}

/// Solution information sent to the protocol by the driver before the solution
Expand Down
Loading

0 comments on commit 9bfd4fa

Please sign in to comment.