Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update settlement contract balances in background task #2108

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 67 additions & 2 deletions crates/driver/src/infra/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use {
domain::eth,
infra::{blockchain, Ethereum},
},
ethrpc::current_block::{self, CurrentBlockStream},
futures::StreamExt,
std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
},
tracing::Instrument,
};

#[derive(Clone, Debug)]
Expand All @@ -22,10 +25,16 @@ pub struct Fetcher(Arc<Inner>);

impl Fetcher {
pub fn new(eth: Ethereum) -> Self {
Self(Arc::new(Inner {
let block_stream = eth.current_block().clone();
let inner = Arc::new(Inner {
eth,
cache: RwLock::new(HashMap::new()),
}))
});
tokio::task::spawn(
update_task(block_stream, Arc::downgrade(&inner))
.instrument(tracing::info_span!("token_fetcher")),
);
Self(inner)
}

/// Returns the `Metadata` for the given tokens. Note that the result will
Expand All @@ -39,6 +48,62 @@ impl Fetcher {
}
}

/// Runs a single cache update cycle whenever a new block arrives until the
/// fetcher is dropped.
async fn update_task(blocks: CurrentBlockStream, inner: std::sync::Weak<Inner>) {
let mut stream = current_block::into_stream(blocks);
while stream.next().await.is_some() {
let inner = match inner.upgrade() {
Some(inner) => inner,
// Fetcher was dropped, stop update task.
None => break,
};
if let Err(err) = update_balances(inner).await {
tracing::warn!(?err, "error updating token cache");
}
}
}

/// Updates the settlement contract's balance for every cached token.
async fn update_balances(inner: Arc<Inner>) -> Result<(), blockchain::Error> {
let settlement = inner.eth.contracts().settlement().address().into();
let futures = {
let cache = inner.cache.read().unwrap();
let tokens = cache.keys().cloned().collect::<Vec<_>>();
tracing::debug!(
tokens = tokens.len(),
"updating settlement contract balances"
);
tokens.into_iter().map(|token| {
let erc20 = inner.eth.erc20(token);
async move {
Ok::<(eth::TokenAddress, eth::TokenAmount), blockchain::Error>((
token,
erc20.balance(settlement).await?,
))
}
})
};

// Don't hold on to the lock while fetching balances to allow concurrent
// updates. This may lead to new entries arriving in the meantime, however
// their balances should already be up-to-date.
let mut balances = futures::future::try_join_all(futures)
.await?
.into_iter()
.collect::<HashMap<_, _>>();

let mut cache = inner.cache.write().unwrap();
for (key, entry) in cache.iter_mut() {
if let Some(balance) = balances.remove(key) {
entry.balance = balance;
} else {
tracing::info!(?key, "key without balance update");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of the INFO level here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's supposed to help us to debug the rare race condition where we add a new token balance to the cache while the cache update is in flight.
The new item may not be updated (it should be up to date but in theory the block could also have just changed) and therefore we could pass a slightly outdated buffer list to the solvers. Again, I think this is extremely rare, but if it happens we can at least see if from the logs now.

}
}
Ok(())
}

/// Provides metadata of tokens.
struct Inner {
eth: Ethereum,
Expand Down
2 changes: 1 addition & 1 deletion crates/e2e/src/setup/colocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub async fn start_solver(weth: H160) -> Url {
r#"
weth = "{weth:?}"
base-tokens = []
max-hops = 0
max-hops = 1
max-partial-attempts = 5
risk-parameters = [0,0,0,0]
"#,
Expand Down
108 changes: 108 additions & 0 deletions crates/e2e/tests/e2e/colocation_buffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use {
e2e::{setup::*, tx},
ethcontract::prelude::U256,
model::{
order::{OrderCreation, OrderKind},
signature::EcdsaSigningScheme,
},
secp256k1::SecretKey,
shared::ethrpc::Web3,
web3::signing::SecretKeyRef,
};

#[tokio::test]
#[ignore]
async fn local_node_buffers() {
run_test(onchain_settlement_without_liquidity).await;
}

async fn onchain_settlement_without_liquidity(web3: Web3) {
let mut onchain = OnchainComponents::deploy(web3).await;

let [solver] = onchain.make_solvers(to_wei(1)).await;
let [trader] = onchain.make_accounts(to_wei(1)).await;
let [token_a, token_b] = onchain
.deploy_tokens_with_weth_uni_v2_pools(to_wei(1_000), to_wei(1_000))
.await;

// Fund trader, settlement accounts, and pool creation
token_a.mint(trader.address(), to_wei(100)).await;
token_b
.mint(onchain.contracts().gp_settlement.address(), to_wei(5))
.await;
token_a.mint(solver.address(), to_wei(1000)).await;
fleupold marked this conversation as resolved.
Show resolved Hide resolved
token_b.mint(solver.address(), to_wei(1000)).await;

// Approve GPv2 for trading
tx!(
trader.account(),
token_a.approve(onchain.contracts().allowance, to_wei(100))
);

// Start system
let solver_endpoint = colocation::start_solver(onchain.contracts().weth.address()).await;
colocation::start_driver(onchain.contracts(), &solver_endpoint, &solver);

let services = Services::new(onchain.contracts()).await;
services.start_autopilot(vec![
"--enable-colocation=true".to_string(),
format!(
"--trusted-tokens={weth:#x},{token_a:#x},{token_b:#x}",
weth = onchain.contracts().weth.address(),
token_a = token_a.address(),
token_b = token_b.address()
),
"--drivers=test_solver|http://localhost:11088/test_solver".to_string(),
]);
services.start_api(vec![]).await;

// Place Order
let order = OrderCreation {
sell_token: token_a.address(),
sell_amount: to_wei(9),
fee_amount: to_wei(1),
buy_token: token_b.address(),
buy_amount: to_wei(5),
valid_to: model::time::now_in_epoch_seconds() + 300,
kind: OrderKind::Buy,
..Default::default()
}
.sign(
EcdsaSigningScheme::Eip712,
&onchain.contracts().domain_separator,
SecretKeyRef::from(&SecretKey::from_slice(trader.private_key()).unwrap()),
);
services.create_order(&order).await.unwrap();

tracing::info!("waiting for first trade");
let trade_happened =
|| async { token_b.balance_of(trader.address()).call().await.unwrap() == order.buy_amount };
wait_for_condition(TIMEOUT, trade_happened).await.unwrap();

// Check that settlement buffers were traded.
let settlement_contract_balance = token_b
.balance_of(onchain.contracts().gp_settlement.address())
.call()
.await
.unwrap();
// Check that internal buffers were used
assert!(settlement_contract_balance == 0.into());

// Same order can trade again with external liquidity
let order = OrderCreation {
valid_to: model::time::now_in_epoch_seconds() + 301,
..order
}
.sign(
EcdsaSigningScheme::Eip712,
&onchain.contracts().domain_separator,
SecretKeyRef::from(&SecretKey::from_slice(trader.private_key()).unwrap()),
);
services.create_order(&order).await.unwrap();

tracing::info!("waiting for second trade");
let trade_happened = || async {
token_b.balance_of(trader.address()).call().await.unwrap() == order.buy_amount * 2
};
wait_for_condition(TIMEOUT, trade_happened).await.unwrap();
}
1 change: 1 addition & 0 deletions crates/e2e/tests/e2e/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// Each of the following modules contains tests.
mod app_data;
mod app_data_signer;
mod colocation_buffers;
mod colocation_ethflow;
mod colocation_hooks;
mod colocation_partial_fill;
Expand Down
Loading