From b949ec0065dc0fb55180b572cfb3ca62fcefc172 Mon Sep 17 00:00:00 2001 From: Chris Czub Date: Tue, 30 Apr 2024 16:01:01 -0400 Subject: [PATCH] Extend arbitrage routing to route against recently opened positions and recently swapped assets --- .../dex/src/component/action_handler/swap.rs | 8 ++++++- .../core/component/dex/src/component/dex.rs | 20 ++++++++++++---- .../dex/src/component/position_manager.rs | 23 +++++++++++++++++++ crates/core/component/dex/src/state_key.rs | 4 ++++ 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/crates/core/component/dex/src/component/action_handler/swap.rs b/crates/core/component/dex/src/component/action_handler/swap.rs index 74848317fc..cbffd75b4d 100644 --- a/crates/core/component/dex/src/component/action_handler/swap.rs +++ b/crates/core/component/dex/src/component/action_handler/swap.rs @@ -7,7 +7,9 @@ use penumbra_proto::StateWriteProto; use penumbra_sct::component::source::SourceContext; use crate::{ - component::{metrics, StateReadExt, StateWriteExt, SwapManager}, + component::{ + metrics, position_manager::PositionManager as _, StateReadExt, StateWriteExt, SwapManager, + }, event, swap::{proof::SwapProofPublic, Swap}, }; @@ -65,6 +67,10 @@ impl ActionHandler for Swap { .add_swap_payload(self.body.payload.clone(), source) .await; + // Mark the assets for the swap's trading pair as accessed during this block. + state.add_recently_accessed_asset(swap.body.trading_pair.asset_1()); + state.add_recently_accessed_asset(swap.body.trading_pair.asset_2()); + metrics::histogram!(crate::component::metrics::DEX_SWAP_DURATION) .record(swap_start.elapsed()); state.record_proto(event::swap(self)); diff --git a/crates/core/component/dex/src/component/dex.rs b/crates/core/component/dex/src/component/dex.rs index 0172e34c62..833b5dc9e7 100644 --- a/crates/core/component/dex/src/component/dex.rs +++ b/crates/core/component/dex/src/component/dex.rs @@ -17,7 +17,7 @@ use crate::{ use super::{ router::{HandleBatchSwaps, RoutingParams}, - Arbitrage, PositionManager, ValueCircuitBreaker, + Arbitrage, PositionManager, PositionRead as _, ValueCircuitBreaker, }; pub struct Dex {} @@ -83,12 +83,22 @@ impl Component for Dex { // For arbitrage, we extend the path search by 2 hops to allow a path out of the // staking token and back. - // TODO: Build an extended candidate set with: - // - both ends of all trading pairs for which there were swaps in the block - // - both ends of all trading pairs for which positions were opened + // Extend the fixed candidate set to include recently accessed assets, to have + // more arbitrage execution against newly opened positions. + let fixed_candidates = Arc::new( + routing_params + .fixed_candidates + .iter() + .cloned() + // Limit the inclusion of recently accessed assets to 10 to avoid + // potentially blowing up routing time. + .chain(state.recently_accessed_assets().iter().take(10).cloned()) + .collect::>(), + ); + let arb_routing_params = RoutingParams { max_hops: routing_params.max_hops + 2, - fixed_candidates: routing_params.fixed_candidates.clone(), + fixed_candidates, price_limit: Some(1u64.into()), }; diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index cce1925989..35cc234daa 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -142,6 +142,12 @@ pub trait PositionRead: StateRead { }) .boxed() } + + /// Fetch the list of assets interacted with during this block. + fn recently_accessed_assets(&self) -> im::Vector { + self.object_get(state_key::recently_accessed_assets()) + .unwrap_or_default() + } } impl PositionRead for T {} @@ -244,6 +250,11 @@ pub trait PositionManager: StateWrite + PositionRead { self.vcb_credit(position.reserves_1()).await?; self.vcb_credit(position.reserves_2()).await?; + // Add the asset IDs from the new position's trading pair + // to the candidate set for this block. + self.add_recently_accessed_asset(position.phi.pair.asset_1()); + self.add_recently_accessed_asset(position.phi.pair.asset_2()); + // Finally, record the new position state. self.record_proto(event::position_open(&position)); self.update_position(None, position).await?; @@ -251,6 +262,18 @@ pub trait PositionManager: StateWrite + PositionRead { Ok(()) } + /// Adds an asset ID to the list of recently accessed assets, + /// making it a candidate for the current block's arbitrage routing. + /// + /// This ensures that assets associated with recently active positions + /// will be eligible for arbitrage if mispriced positions are opened. + #[tracing::instrument(level = "debug", skip_all)] + fn add_recently_accessed_asset(&mut self, asset_id: asset::Id) { + let mut assets = self.recently_accessed_assets(); + assets.push_back(asset_id); + self.object_put(state_key::recently_accessed_assets(), assets); + } + /// Record execution against an opened position. /// /// The `context` parameter records the global context of the path in which diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 20a616401b..a9d34ffbfe 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -67,6 +67,10 @@ pub fn pending_position_closures() -> &'static str { "dex/pending_position_closures" } +pub fn recently_accessed_assets() -> &'static str { + "dex/recently_accessed_assets" +} + pub fn pending_payloads() -> &'static str { "dex/pending_payloads" }