diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index c1ad731233..4868fc3e36 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -5,16 +5,22 @@ use cometindex::{ AppView, PgTransaction, }; use penumbra_asset::asset; +use penumbra_dex::lp::position::{Id as PositionId, Position}; use penumbra_dex::{ event::{ - EventCandlestickData, EventPositionExecution, EventPositionOpen, EventPositionWithdraw, + EventCandlestickData, EventPositionClose, EventPositionExecution, EventPositionOpen, + EventPositionWithdraw, EventQueuePositionClose, }, lp::Reserves, DirectedTradingPair, TradingPair, }; +use penumbra_num::Amount; use penumbra_proto::event::EventDomainType; +use penumbra_proto::DomainType; use penumbra_sct::event::EventBlockRoot; -use std::collections::{HashMap, HashSet}; +use sqlx::types::BigDecimal; +use sqlx::Row; +use std::collections::{BTreeMap, HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; @@ -648,16 +654,37 @@ struct PairMetrics { #[derive(Debug)] struct Events { time: Option, + height: i32, candles: HashMap, metrics: HashMap, + // Relevant positions. + positions: BTreeMap, + // Store events + position_opens: Vec, + position_executions: Vec, + position_closes: Vec, + position_withdrawals: Vec, + // Track transaction hashes by position ID + position_open_txs: BTreeMap, + position_close_txs: BTreeMap, + position_withdrawal_txs: BTreeMap, } impl Events { fn new() -> Self { Self { time: None, + height: 0, candles: HashMap::new(), metrics: HashMap::new(), + positions: BTreeMap::new(), + position_opens: Vec::new(), + position_executions: Vec::new(), + position_closes: Vec::new(), + position_withdrawals: Vec::new(), + position_open_txs: BTreeMap::new(), + position_close_txs: BTreeMap::new(), + position_withdrawal_txs: BTreeMap::new(), } } @@ -731,6 +758,8 @@ impl Events { pub fn extract(block: &BlockEvents) -> anyhow::Result { let mut out = Self::new(); + out.height = block.height as i32; + for event in &block.events { if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { let candle = Candle::from_candlestick_data(&e.stick); @@ -751,6 +780,14 @@ impl Events { }, false, ); + if let Some(tx_hash) = event.tx_hash { + out.position_open_txs.insert(e.position_id, tx_hash); + } + // A newly opened position might be executed against in this block, + // but wouldn't already be in the database. Adding it here ensures + // it's available. + out.positions.insert(e.position_id, e.position.clone()); + out.position_opens.push(e); } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) { // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few // positions to close with being withdrawn. @@ -763,6 +800,10 @@ impl Events { }, true, ); + if let Some(tx_hash) = event.tx_hash { + out.position_withdrawal_txs.insert(e.position_id, tx_hash); + } + out.position_withdrawals.push(e); } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) { out.with_reserve_change( &e.trading_pair, @@ -788,10 +829,57 @@ impl Events { end: e.trading_pair.asset_1(), }); } + out.position_executions.push(e); + } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) { + out.position_closes.push(e); + } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) { + // The position close event is emitted by the dex module at EOB, + // so we need to track it with the tx hash of the closure tx. + if let Some(tx_hash) = event.tx_hash { + out.position_close_txs.insert(e.position_id, tx_hash); + } } } Ok(out) } + + async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> { + // Collect position IDs that we need but don't already have + let missing_positions: Vec<_> = self + .position_executions + .iter() + .map(|e| e.position_id) + .filter(|id| !self.positions.contains_key(id)) + .collect(); + + if missing_positions.is_empty() { + return Ok(()); + } + + // Load missing positions from database + let rows = sqlx::query( + "SELECT position_raw + FROM dex_ex_position_state + WHERE position_id = ANY($1)", + ) + .bind( + &missing_positions + .iter() + .map(|id| id.0.as_ref()) + .collect::>(), + ) + .fetch_all(dbtx.as_mut()) + .await?; + + // Decode and store each position + for row in rows { + let position_raw: Vec = row.get("position_raw"); + let position = Position::decode(position_raw.as_slice())?; + self.positions.insert(position.id(), position); + } + + Ok(()) + } } #[derive(Debug)] @@ -807,6 +895,248 @@ impl Component { min_liquidity, } } + + async fn record_position_open( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + tx_hash: Option<[u8; 32]>, + event: &EventPositionOpen, + ) -> anyhow::Result<()> { + // Get effective prices by orienting the trading function in each direction + let effective_price_1_to_2: f64 = event + .position + .phi + .orient_start(event.trading_pair.asset_1()) + .expect("position trading pair matches") + .effective_price() + .into(); + + let effective_price_2_to_1: f64 = event + .position + .phi + .orient_start(event.trading_pair.asset_2()) + .expect("position trading pair matches") + .effective_price() + .into(); + + // First insert initial reserves and get the rowid + let opening_reserves_rowid = sqlx::query_scalar::<_, i32>( + "INSERT INTO dex_ex_position_reserves ( + position_id, + height, + time, + reserves_1, + reserves_2 + ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(BigDecimal::from(event.reserves_1.value())) + .bind(BigDecimal::from(event.reserves_2.value())) + .fetch_one(dbtx.as_mut()) + .await?; + + // Then insert position state with the opening_reserves_rowid + sqlx::query( + "INSERT INTO dex_ex_position_state ( + position_id, + asset_1, + asset_2, + p, + q, + close_on_fill, + fee_bps, + effective_price_1_to_2, + effective_price_2_to_1, + position_raw, + opening_time, + opening_height, + opening_tx, + opening_reserves_rowid + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", + ) + .bind(event.position_id.0) + .bind(event.trading_pair.asset_1().to_bytes()) + .bind(event.trading_pair.asset_2().to_bytes()) + .bind(BigDecimal::from(event.position.phi.component.p.value())) + .bind(BigDecimal::from(event.position.phi.component.q.value())) + .bind(event.position.close_on_fill) + .bind(event.trading_fee as i32) + .bind(effective_price_1_to_2) + .bind(effective_price_2_to_1) + .bind(event.position.encode_to_vec()) + .bind(time) + .bind(height) + .bind(tx_hash.map(|h| h.as_ref().to_vec())) + .bind(opening_reserves_rowid) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + async fn record_position_execution( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + event: &EventPositionExecution, + positions: &BTreeMap, + ) -> anyhow::Result<()> { + // Get the position that was executed against + let position = positions + .get(&event.position_id) + .expect("position must exist for execution"); + + // Determine trade direction and compute deltas + let (delta_1, delta_2, lambda_1, lambda_2) = if event.reserves_1 > event.prev_reserves_1 { + // Asset 1 was input + let delta_1 = event.reserves_1 - event.prev_reserves_1; + let lambda_2 = event.prev_reserves_2 - event.reserves_2; + (delta_1, Amount::zero(), Amount::zero(), lambda_2) + } else { + // Asset 2 was input + let delta_2 = event.reserves_2 - event.prev_reserves_2; + let lambda_1 = event.prev_reserves_1 - event.reserves_1; + (Amount::zero(), delta_2, lambda_1, Amount::zero()) + }; + + // Compute fees directly from input amounts using u128 arithmetic + let fee_bps = position.phi.component.fee as u128; + let fee_1 = (delta_1.value() * fee_bps) / 10_000u128; + let fee_2 = (delta_2.value() * fee_bps) / 10_000u128; + + // First insert the reserves and get the rowid + let reserves_rowid = sqlx::query_scalar::<_, i32>( + "INSERT INTO dex_ex_position_reserves ( + position_id, + height, + time, + reserves_1, + reserves_2 + ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(BigDecimal::from(event.reserves_1.value())) + .bind(BigDecimal::from(event.reserves_2.value())) + .fetch_one(dbtx.as_mut()) + .await?; + + // Then record the execution with the reserves_rowid + sqlx::query( + "INSERT INTO dex_ex_position_executions ( + position_id, + height, + time, + reserves_rowid, + delta_1, + delta_2, + lambda_1, + lambda_2, + fee_1, + fee_2, + context_asset_start, + context_asset_end + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(reserves_rowid) + .bind(BigDecimal::from(delta_1.value())) + .bind(BigDecimal::from(delta_2.value())) + .bind(BigDecimal::from(lambda_1.value())) + .bind(BigDecimal::from(lambda_2.value())) + .bind(BigDecimal::from(fee_1)) + .bind(BigDecimal::from(fee_2)) + .bind(event.context.start.to_bytes()) + .bind(event.context.end.to_bytes()) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + async fn record_position_close( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + tx_hash: Option<[u8; 32]>, + event: &EventPositionClose, + ) -> anyhow::Result<()> { + sqlx::query( + "UPDATE dex_ex_position_state + SET closing_time = $1, + closing_height = $2, + closing_tx = $3 + WHERE position_id = $4", + ) + .bind(time) + .bind(height) + .bind(tx_hash.map(|h| h.as_ref().to_vec())) + .bind(event.position_id.0) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + async fn record_position_withdraw( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + tx_hash: Option<[u8; 32]>, + event: &EventPositionWithdraw, + ) -> anyhow::Result<()> { + // First insert the final reserves state (zeros after withdrawal) + let reserves_rowid = sqlx::query_scalar::<_, i32>( + "INSERT INTO dex_ex_position_reserves ( + position_id, + height, + time, + reserves_1, + reserves_2 + ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", // Using $4 twice for zero values + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(BigDecimal::from(0)) // Both reserves become zero after withdrawal + .fetch_one(dbtx.as_mut()) + .await?; + + sqlx::query( + "INSERT INTO dex_ex_position_withdrawals ( + position_id, + height, + time, + withdrawal_tx, + sequence, + reserves_1, + reserves_2, + reserves_rowid + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(tx_hash.map(|h| h.as_ref().to_vec())) + .bind(event.sequence as i32) + .bind(BigDecimal::from(event.reserves_1.value())) + .bind(BigDecimal::from(event.reserves_2.value())) + .bind(reserves_rowid) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } } #[async_trait] @@ -836,12 +1166,45 @@ impl AppView for Component { let mut snapshots = HashMap::new(); let mut last_time = None; for block in batch.by_height.iter() { - let events = Events::extract(&block)?; + let mut events = Events::extract(&block)?; let time = events .time .expect(&format!("no block root event at height {}", block.height)); last_time = Some(time); + // Load any missing positions before processing events + events.load_positions(dbtx).await?; + + // Record position opens + for event in &events.position_opens { + let tx_hash = events.position_open_txs.get(&event.position_id).copied(); + self.record_position_open(dbtx, time, events.height, tx_hash, event) + .await?; + } + + // Process position executions + for event in &events.position_executions { + self.record_position_execution(dbtx, time, events.height, event, &events.positions) + .await?; + } + + // Record position closes + for event in &events.position_closes { + let tx_hash = events.position_close_txs.get(&event.position_id).copied(); + self.record_position_close(dbtx, time, events.height, tx_hash, event) + .await?; + } + + // Record position withdrawals + for event in &events.position_withdrawals { + let tx_hash = events + .position_withdrawal_txs + .get(&event.position_id) + .copied(); + self.record_position_withdraw(dbtx, time, events.height, tx_hash, event) + .await?; + } + for (pair, candle) in &events.candles { for window in Window::all() { let key = (pair.start, pair.end, window); diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index ddbf108887..de284b5e98 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -82,3 +82,105 @@ CREATE TABLE IF NOT EXISTS dex_ex_metadata ( -- The asset id to use for prices in places such as the aggregate summary. quote_asset_id BYTEA NOT NULL ); + +CREATE TABLE IF NOT EXISTS dex_ex_position_state ( + -- Call this rowid to distinguish it from the position ID. + rowid SERIAL PRIMARY KEY, + -- Immutable position data, defining the trading function. + position_id BYTEA NOT NULL UNIQUE, + asset_1 BYTEA NOT NULL, + asset_2 BYTEA NOT NULL, + p NUMERIC(39) NOT NULL, + q NUMERIC(39) NOT NULL, + close_on_fill BOOLEAN NOT NULL, + fee_bps INTEGER NOT NULL, + effective_price_1_to_2 FLOAT8 NOT NULL, + effective_price_2_to_1 FLOAT8 NOT NULL, + position_raw BYTEA NOT NULL, + -- The time and height at which the position was opened, and its initial reserves. + opening_time TIMESTAMPTZ NOT NULL, + opening_height INTEGER NOT NULL, + opening_tx BYTEA, + opening_reserves_rowid INTEGER NOT NULL, + -- The time and height at which the position was closed, if it was closed. + closing_time TIMESTAMPTZ, + closing_height INTEGER, + closing_tx BYTEA +); + +CREATE INDEX ON dex_ex_position_state (position_id); +CREATE INDEX ON dex_ex_position_state (opening_tx); + +CREATE TABLE IF NOT EXISTS dex_ex_position_reserves ( + rowid SERIAL PRIMARY KEY, + position_id BYTEA NOT NULL, + height INTEGER NOT NULL, + time TIMESTAMPTZ NOT NULL, + reserves_1 NUMERIC(39) NOT NULL, + reserves_2 NUMERIC(39) NOT NULL +); + +CREATE INDEX ON dex_ex_position_reserves (position_id, height, rowid); + +CREATE TABLE IF NOT EXISTS dex_ex_position_executions ( + rowid SERIAL PRIMARY KEY, + position_id BYTEA NOT NULL, + height INTEGER NOT NULL, + time TIMESTAMPTZ NOT NULL, + reserves_rowid INTEGER NOT NULL, + -- The input amount of asset 1. + delta_1 NUMERIC(39) NOT NULL, + -- The input amount of asset 2. + delta_2 NUMERIC(39) NOT NULL, + -- The output amount of asset 1. + lambda_1 NUMERIC(39) NOT NULL, + -- The output amount of asset 2. + lambda_2 NUMERIC(39) NOT NULL, + -- The fee amount paid in asset 1. + fee_1 NUMERIC(39) NOT NULL, + -- The fee amount paid in asset 2. + fee_2 NUMERIC(39) NOT NULL, + -- The context the execution happened in + context_asset_start BYTEA NOT NULL, + context_asset_end BYTEA NOT NULL +); + +CREATE INDEX ON dex_ex_position_executions (height); +CREATE INDEX ON dex_ex_position_executions (position_id, height, rowid); + +CREATE TABLE IF NOT EXISTS dex_ex_position_withdrawals ( + rowid SERIAL PRIMARY KEY, + position_id BYTEA NOT NULL, + height INTEGER NOT NULL, + time TIMESTAMPTZ NOT NULL, + withdrawal_tx BYTEA, + sequence INTEGER NOT NULL, + reserves_rowid INTEGER NOT NULL, + -- The amount of asset 1 withdrawn. + reserves_1 NUMERIC(39) NOT NULL, + -- The amount of asset 2 withdrawn. + reserves_2 NUMERIC(39) NOT NULL +); + +CREATE INDEX ON dex_ex_position_withdrawals (height); +CREATE INDEX ON dex_ex_position_withdrawals (position_id, height); + +ALTER TABLE dex_ex_position_executions + ADD CONSTRAINT fk_position_executions + FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id); + +ALTER TABLE dex_ex_position_withdrawals + ADD CONSTRAINT fk_position_withdrawals + FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id); + +ALTER TABLE dex_ex_position_executions + ADD CONSTRAINT fk_position_executions_reserves + FOREIGN KEY (reserves_rowid) REFERENCES dex_ex_position_reserves(rowid); + +ALTER TABLE dex_ex_position_state + ADD CONSTRAINT fk_position_state_reserves + FOREIGN KEY (opening_reserves_rowid) REFERENCES dex_ex_position_reserves(rowid); + +ALTER TABLE dex_ex_position_withdrawals + ADD CONSTRAINT fk_position_withdrawals_reserves + FOREIGN KEY (reserves_rowid) REFERENCES dex_ex_position_reserves(rowid); diff --git a/crates/core/asset/src/asset/id.rs b/crates/core/asset/src/asset/id.rs index 1a5417c2e2..f02e63b270 100644 --- a/crates/core/asset/src/asset/id.rs +++ b/crates/core/asset/src/asset/id.rs @@ -172,6 +172,8 @@ impl Id { #[cfg(test)] mod tests { use super::*; + use hex; + use serde_json; use std::str::FromStr; #[test] @@ -205,4 +207,39 @@ mod tests { assert_eq!(id4, id); assert_eq!(id5, id); } + + #[test] + fn hex_to_bech32() { + let hex_strings = [ + "cc0d3c9eef0c7ff4e225eca85a3094603691d289aeaf428ab0d87319ad93a302", // USDY + "a7a339f42e671b2db1de226d4483d3e63036661cad1554d75f5f76fe04ec1e00", // SHITMOS + "29ea9c2f3371f6a487e7e95c247041f4a356f983eb064e5d2b3bcf322ca96a10", // UM + "76b3e4b10681358c123b381f90638476b7789040e47802de879f0fb3eedc8d0b", // USDC + "2923a0a87b3a2421f165cc853dbf73a9bdafb5da0d948564b6059cb0217c4407", // OSMO + "07ef660132a4c3235fab272d43d9b9752a8337b2d108597abffaff5f246d0f0f", // ATOM + "5314b33eecfd5ca2e99c0b6d1e0ccafe3d2dd581c952d814fb64fdf51f85c411", // TIA + "516108d0d0bba3f76e1f982d0a7cde118833307b03c0cd4ccb94e882b53c1f0f", // WBTC + "414e723f74bd987c02ccbc997585ed52b196e2ffe75b3793aa68cc2996626910", // allBTC + "bf8b035dda339b6cda8f221e79773b0fd871f27a472920f84c4aa2b4f98a700d", // allUSDT + ]; + + for hex in hex_strings { + let bytes = hex::decode(hex).expect("valid hex string"); + let bytes_array: [u8; 32] = bytes.try_into().expect("hex is 32 bytes"); + + let id = Id::try_from(bytes_array).expect("valid asset ID bytes"); + let bech32_str = id.to_string(); + + println!("Asset ID for {}:", hex); + println!(" Bech32: {}", bech32_str); + + // Print Proto JSON encoding + let proto: pb::AssetId = id.into(); + println!(" Proto JSON: {}\n", serde_json::to_string(&proto).unwrap()); + + // Convert back to verify roundtrip + let id_decoded = Id::from_str(&bech32_str).expect("valid bech32 string"); + assert_eq!(id, id_decoded); + } + } }