Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge solutions not settlements #2471

Merged
merged 11 commits into from
Mar 27, 2024
7 changes: 0 additions & 7 deletions crates/driver/src/boundary/settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,6 @@ impl Settlement {
Ok(eth::U256::from_big_rational(&quality)?.into())
}

pub fn merge(self, other: Self) -> Result<Self> {
self.inner.merge(other.inner).map(|inner| Self {
inner,
solver: self.solver,
})
}

pub fn clearing_prices(&self) -> HashMap<eth::TokenAddress, eth::TokenAmount> {
self.inner
.clearing_prices()
Expand Down
94 changes: 52 additions & 42 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
},
util::Bytes,
},
futures::{stream::FuturesUnordered, Stream, StreamExt},
futures::{stream::FuturesUnordered, StreamExt},
itertools::Itertools,
std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -83,7 +83,7 @@ impl Competition {
// Discard solutions that don't have unique ID.
let mut ids = HashSet::new();
let solutions = solutions.into_iter().filter(|solution| {
if !ids.insert(solution.id()) {
if !ids.insert(solution.id().clone()) {
observe::duplicated_solution_id(self.solver.name(), solution.id());
notify::duplicated_solution_id(&self.solver, auction.id(), solution.id());
false
Expand All @@ -96,37 +96,46 @@ impl Competition {
let solutions = solutions.filter(|solution| {
if solution.is_empty() {
observe::empty_solution(self.solver.name(), solution.id());
notify::empty_solution(&self.solver, auction.id(), solution.id());
notify::empty_solution(&self.solver, auction.id(), solution.id().clone());
false
} else {
true
}
});

let merged = merge(solutions, auction);

// Encode solutions into settlements (streamed).
let encoded = solutions
let encoded = merged
.into_iter()
.map(|solution| async move {
let id = solution.id();
observe::encoding(id);
let id = solution.id().clone();
observe::encoding(&id);
let settlement = solution.encode(auction, &self.eth, &self.simulator).await;
(id, settlement)
})
.collect::<FuturesUnordered<_>>()
.filter_map(|(id, result)| async move {
result
.tap_err(|err| {
observe::encoding_failed(self.solver.name(), id, err);
notify::encoding_failed(&self.solver, auction.id(), id, err);
observe::encoding_failed(self.solver.name(), &id, err);
notify::encoding_failed(&self.solver, auction.id(), &id, err);
})
.ok()
});

// Merge settlements as they arrive until there are no more new settlements or
// timeout is reached.
let mut settlements = Vec::new();
let future = async {
let mut encoded = std::pin::pin!(encoded);
while let Some(settlement) = encoded.next().await {
settlements.push(settlement);
}
};
if tokio::time::timeout(
auction.deadline().driver().remaining().unwrap_or_default(),
merge_settlements(&mut settlements, encoded, &self.eth, &self.simulator),
future,
)
.await
.is_err()
Expand Down Expand Up @@ -157,7 +166,7 @@ impl Competition {
notify::scoring_failed(
&self.solver,
auction.id(),
settlement.notify_id(),
settlement.solution(),
err,
);
})
Expand Down Expand Up @@ -211,15 +220,13 @@ impl Competition {
observe::winner_voided(block, &err);
*score_ref = None;
*self.settlement.lock().unwrap() = None;
if let Some(id) = settlement.notify_id() {
notify::simulation_failed(
&self.solver,
auction.id(),
id,
&infra::simulator::Error::Revert(err),
true,
);
}
notify::simulation_failed(
&self.solver,
auction.id(),
settlement.solution(),
&infra::simulator::Error::Revert(err),
true,
);
return;
}
}
Expand Down Expand Up @@ -268,7 +275,7 @@ impl Competition {
notify::executed(
&self.solver,
settlement.auction_id,
settlement.notify_id(),
settlement.solution(),
&executed,
);

Expand Down Expand Up @@ -322,34 +329,37 @@ impl Competition {
}
}

/// Tries to merge the incoming stream of new settlements into existing ones.
/// Always adds the new settlement by itself.
async fn merge_settlements(
merged: &mut Vec<Settlement>,
new: impl Stream<Item = Settlement>,
eth: &Ethereum,
simulator: &Simulator,
) {
let eth = eth.with_metric_label("mergeSettlements".into());
let mut new = std::pin::pin!(new);
while let Some(settlement) = new.next().await {
// Try to merge [`settlement`] into some settlements.
for other in merged.iter_mut() {
match other.merge(&settlement, &eth, simulator).await {
Ok(m) => {
*other = m;
observe::merged(&settlement, other);
// could possibly break here if we want to avoid merging
// into multiple settlements
/// Creates a vector with all possible combinations of the given solutions.
/// The result is sorted by the number of merges, so the first elements are the
/// original solutions.
fn merge(solutions: impl Iterator<Item = Solution>, auction: &Auction) -> Vec<Solution> {
let mut merged: Vec<Solution> = Vec::new();
for solution in solutions {
let mut extension = vec![];
for already_merged in merged.iter() {
match solution.merge(already_merged) {
Ok(merged) => {
observe::merged(&solution, already_merged, &merged);
extension.push(merged);
}
Err(err) => {
observe::not_merged(&settlement, other, err);
observe::not_merged(&solution, already_merged, err);
}
}
}
// add [`settlement`] by itself
merged.push(settlement);
// At least insert the current solution
extension.push(solution);
merged.extend(extension);
}

// Sort merged solutions descending by score.
merged.sort_by_key(|solution| {
solution
.scoring(&auction.prices())
.map(|score| score.0)
.unwrap_or_default()
});
merged
}

/// Solution information sent to the protocol by the driver before the solution
Expand Down
Loading
Loading