Skip to content

Commit

Permalink
Merge solutions not settlements (#2471)
Browse files Browse the repository at this point in the history
# Description
Currently merging multiple solutions into a larger one happens in the
`Settlement` type and reuses the legacy settlement encoder. As part of
our initiative to fully transition away from the legacy code base this
PR moves solution merging logic into our solution domain model.

This means we will now merge solutions before simulating and encoding
them. This can potentially lead to a lot more solution candidates being
generated and requiring simulation. I don't expect this to be an issue
in practice as encoding is already part of a stream which will yield the
best solution candidate found so far as soon as we approach the solving
deadline (thought may have to go into in which order solution candidate
should be attempted to be simulated).

# Changes
<!-- List of detailed changes (how the change is accomplished) -->

- [x] Implement `merge` method on `Solution`
- [x] Change the `Id` type on `Solution` to be able to represent when a
solution was generated from multiple sub-solutions.
- [x] Replace `settlement.merge` with `solution.merge` (this means
merging now happens before first simulation)
- [x] Change the way merge candidates are generated 
- [x] Sort stream of to encode settlements by "simples" ie least merged
solution (might want to use `score` instead once #2448 is merged)

Depending on how risky we think this change is, we can also keep the
existing merging logic and have a feature flag guard the two codepaths.

## How to test
Existing unit test

## Related Issues

Fixes #1478
  • Loading branch information
fleupold authored Mar 27, 2024
1 parent 200c7e7 commit 9e1eb58
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 203 deletions.
7 changes: 0 additions & 7 deletions crates/driver/src/boundary/settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,6 @@ impl Settlement {
Ok(eth::U256::from_big_rational(&quality)?.into())
}

pub fn merge(self, other: Self) -> Result<Self> {
self.inner.merge(other.inner).map(|inner| Self {
inner,
solver: self.solver,
})
}

pub fn clearing_prices(&self) -> HashMap<eth::TokenAddress, eth::TokenAmount> {
self.inner
.clearing_prices()
Expand Down
94 changes: 52 additions & 42 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
},
util::Bytes,
},
futures::{stream::FuturesUnordered, Stream, StreamExt},
futures::{stream::FuturesUnordered, StreamExt},
itertools::Itertools,
std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -83,7 +83,7 @@ impl Competition {
// Discard solutions that don't have unique ID.
let mut ids = HashSet::new();
let solutions = solutions.into_iter().filter(|solution| {
if !ids.insert(solution.id()) {
if !ids.insert(solution.id().clone()) {
observe::duplicated_solution_id(self.solver.name(), solution.id());
notify::duplicated_solution_id(&self.solver, auction.id(), solution.id());
false
Expand All @@ -96,37 +96,46 @@ impl Competition {
let solutions = solutions.filter(|solution| {
if solution.is_empty() {
observe::empty_solution(self.solver.name(), solution.id());
notify::empty_solution(&self.solver, auction.id(), solution.id());
notify::empty_solution(&self.solver, auction.id(), solution.id().clone());
false
} else {
true
}
});

let merged = merge(solutions, auction);

// Encode solutions into settlements (streamed).
let encoded = solutions
let encoded = merged
.into_iter()
.map(|solution| async move {
let id = solution.id();
observe::encoding(id);
let id = solution.id().clone();
observe::encoding(&id);
let settlement = solution.encode(auction, &self.eth, &self.simulator).await;
(id, settlement)
})
.collect::<FuturesUnordered<_>>()
.filter_map(|(id, result)| async move {
result
.tap_err(|err| {
observe::encoding_failed(self.solver.name(), id, err);
notify::encoding_failed(&self.solver, auction.id(), id, err);
observe::encoding_failed(self.solver.name(), &id, err);
notify::encoding_failed(&self.solver, auction.id(), &id, err);
})
.ok()
});

// Merge settlements as they arrive until there are no more new settlements or
// timeout is reached.
let mut settlements = Vec::new();
let future = async {
let mut encoded = std::pin::pin!(encoded);
while let Some(settlement) = encoded.next().await {
settlements.push(settlement);
}
};
if tokio::time::timeout(
auction.deadline().driver().remaining().unwrap_or_default(),
merge_settlements(&mut settlements, encoded, &self.eth, &self.simulator),
future,
)
.await
.is_err()
Expand Down Expand Up @@ -157,7 +166,7 @@ impl Competition {
notify::scoring_failed(
&self.solver,
auction.id(),
settlement.notify_id(),
settlement.solution(),
err,
);
})
Expand Down Expand Up @@ -211,15 +220,13 @@ impl Competition {
observe::winner_voided(block, &err);
*score_ref = None;
*self.settlement.lock().unwrap() = None;
if let Some(id) = settlement.notify_id() {
notify::simulation_failed(
&self.solver,
auction.id(),
id,
&infra::simulator::Error::Revert(err),
true,
);
}
notify::simulation_failed(
&self.solver,
auction.id(),
settlement.solution(),
&infra::simulator::Error::Revert(err),
true,
);
return;
}
}
Expand Down Expand Up @@ -268,7 +275,7 @@ impl Competition {
notify::executed(
&self.solver,
settlement.auction_id,
settlement.notify_id(),
settlement.solution(),
&executed,
);

Expand Down Expand Up @@ -322,34 +329,37 @@ impl Competition {
}
}

/// Tries to merge the incoming stream of new settlements into existing ones.
/// Always adds the new settlement by itself.
async fn merge_settlements(
merged: &mut Vec<Settlement>,
new: impl Stream<Item = Settlement>,
eth: &Ethereum,
simulator: &Simulator,
) {
let eth = eth.with_metric_label("mergeSettlements".into());
let mut new = std::pin::pin!(new);
while let Some(settlement) = new.next().await {
// Try to merge [`settlement`] into some settlements.
for other in merged.iter_mut() {
match other.merge(&settlement, &eth, simulator).await {
Ok(m) => {
*other = m;
observe::merged(&settlement, other);
// could possibly break here if we want to avoid merging
// into multiple settlements
/// Creates a vector with all possible combinations of the given solutions.
/// The result is sorted by the number of merges, so the first elements are the
/// original solutions.
fn merge(solutions: impl Iterator<Item = Solution>, auction: &Auction) -> Vec<Solution> {
let mut merged: Vec<Solution> = Vec::new();
for solution in solutions {
let mut extension = vec![];
for already_merged in merged.iter() {
match solution.merge(already_merged) {
Ok(merged) => {
observe::merged(&solution, already_merged, &merged);
extension.push(merged);
}
Err(err) => {
observe::not_merged(&settlement, other, err);
observe::not_merged(&solution, already_merged, err);
}
}
}
// add [`settlement`] by itself
merged.push(settlement);
// At least insert the current solution
extension.push(solution);
merged.extend(extension);
}

// Sort merged solutions descending by score.
merged.sort_by_key(|solution| {
solution
.scoring(&auction.prices())
.map(|score| score.0)
.unwrap_or_default()
});
merged
}

/// Solution information sent to the protocol by the driver before the solution
Expand Down
Loading

0 comments on commit 9e1eb58

Please sign in to comment.