Skip to content

Commit

Permalink
feat: swaps indexing (#4776)
Browse files Browse the repository at this point in the history
## Describe your changes
implements indexing of swaps to satisfy the `/api/swaps` endpoint

- [x] There's also an event for Swap, so we should name the event in the
code to BatchSwap to avoid confusion.
- [x] We probably want to record some more information in the swap
table, like having the trading pair there, the total amount of inputs
and outputs for each asset, the unfilled amounts, et.c

## Issue ticket number and link

fixes #4744

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  Indexer-only changes

---------

Co-authored-by: Lucas Meier <[email protected]>
  • Loading branch information
vacekj and cronokirby authored Aug 12, 2024
1 parent c4db70a commit be0ef35
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 32 deletions.
33 changes: 33 additions & 0 deletions crates/bin/pindexer/src/dex/dex.sql
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,36 @@ CREATE TABLE IF NOT EXISTS dex_lp_execution (
-- The end asset for this execution.
context_end BYTEA NOT NULL
);

--- Represents instances where swap executions happened.
CREATE TABLE IF NOT EXISTS dex_batch_swap (
height BIGINT PRIMARY KEY,
trace12_start INTEGER REFERENCES dex_trace (id),
trace12_end INTEGER REFERENCES dex_trace (id),
trace21_start INTEGER REFERENCES dex_trace (id),
trace21_end INTEGER REFERENCES dex_trace (id),
asset1 BYTEA NOT NULL,
asset2 BYTEA NOT NULL,
unfilled1 Amount NOT NULL,
unfilled2 Amount NOT NULL,
delta1 Amount NOT NULL,
delta2 Amount NOT NULL,
lambda1 Amount NOT NULL,
lambda2 Amount NOT NULL
);

CREATE INDEX ON dex_batch_swap(height);
CREATE INDEX ON dex_batch_swap(asset1, height);
CREATE INDEX ON dex_batch_swap(asset2, height);

-- Represents instances of invididual swaps into the batch.
CREATE TABLE IF NOT EXISTS dex_swap (
id SERIAL PRIMARY KEY,
height BIGINT NOT NULL,
value1 Value,
value2 Value
);

CREATE INDEX ON dex_swap(height, id);
CREATE INDEX ON dex_swap(((value1).asset));
CREATE INDEX ON dex_swap(((value2).asset));
184 changes: 152 additions & 32 deletions crates/bin/pindexer/src/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,57 @@ use cometindex::async_trait;
use penumbra_asset::asset::Id as AssetId;
use penumbra_dex::lp::position::{Id, Position};
use penumbra_dex::lp::{self, TradingFunction};
use penumbra_dex::{DirectedTradingPair, SwapExecution};
use penumbra_dex::{BatchSwapOutputData, DirectedTradingPair, SwapExecution, TradingPair};
use penumbra_num::Amount;
use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb};
use sqlx::{PgPool, Postgres, Transaction};

use crate::sql::Sql;
use crate::{AppView, ContextualizedEvent, PgTransaction};

/// Insert a swap execution into the database.
///
/// This returns the start and end indices of its trace.
async fn insert_swap_execution<'d>(
dbtx: &mut Transaction<'d, Postgres>,
execution: Option<&SwapExecution>,
) -> anyhow::Result<(Option<i32>, Option<i32>)> {
let execution = match execution {
None => return Ok((None, None)),
Some(e) => e,
};
let mut trace_start = None;
let mut trace_end = None;
for trace in &execution.traces {
let mut step_start = None;
let mut step_end = None;
for step in trace {
let (id,): (i32,) = sqlx::query_as(
r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#,
)
.bind(step.amount.to_string())
.bind(Sql::from(step.asset_id))
.fetch_one(dbtx.as_mut())
.await?;
if let None = step_start {
step_start = Some(id);
}
step_end = Some(id);
}
let (id,): (i32,) =
sqlx::query_as(r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#)
.bind(step_start)
.bind(step_end)
.fetch_one(dbtx.as_mut())
.await?;
if let None = trace_start {
trace_start = Some(id);
}
trace_end = Some(id);
}
Ok((trace_start, trace_end))
}

/// One of the possible events that we care about.
#[derive(Clone, Debug)]
enum Event {
Expand All @@ -33,6 +76,13 @@ enum Event {
height: u64,
execution: SwapExecution,
},
/// A parsed version of [pb::EventBatchSwap]
BatchSwap {
height: u64,
execution12: Option<SwapExecution>,
execution21: Option<SwapExecution>,
output_data: BatchSwapOutputData,
},
/// A parsed version of [pb::EventPositionOpen]
PositionOpen { height: u64, position: Position },
/// A parsed version of [pb::EventPositionWithdraw]
Expand All @@ -55,17 +105,26 @@ enum Event {
prev_reserves_2: Amount,
context: DirectedTradingPair,
},
/// A parsed version of [pb::EventSwap]
Swap {
height: u64,
trading_pair: TradingPair,
delta_1_i: Amount,
delta_2_i: Amount,
},
}

impl Event {
const NAMES: [&'static str; 7] = [
const NAMES: [&'static str; 9] = [
"penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit",
"penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit",
"penumbra.core.component.dex.v1.EventArbExecution",
"penumbra.core.component.dex.v1.EventPositionWithdraw",
"penumbra.core.component.dex.v1.EventPositionOpen",
"penumbra.core.component.dex.v1.EventPositionClose",
"penumbra.core.component.dex.v1.EventPositionExecution",
"penumbra.core.component.dex.v1.EventBatchSwap",
"penumbra.core.component.dex.v1.EventSwap",
];

/// Index this event, using the handle to the postgres transaction.
Expand Down Expand Up @@ -116,36 +175,7 @@ impl Event {
Ok(())
}
Event::ArbExecution { height, execution } => {
let mut trace_start = None;
let mut trace_end = None;
for trace in &execution.traces {
let mut step_start = None;
let mut step_end = None;
for step in trace {
let (id,): (i32,) = sqlx::query_as(
r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#,
)
.bind(step.amount.to_string())
.bind(Sql::from(step.asset_id))
.fetch_one(dbtx.as_mut())
.await?;
if let None = step_start {
step_start = Some(id);
}
step_end = Some(id);
}
let (id,): (i32,) = sqlx::query_as(
r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#,
)
.bind(step_start)
.bind(step_end)
.fetch_one(dbtx.as_mut())
.await?;
if let None = trace_start {
trace_start = Some(id);
}
trace_end = Some(id);
}
let (trace_start, trace_end) = insert_swap_execution(dbtx, Some(execution)).await?;
sqlx::query(r#"INSERT INTO dex_arb VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5), $6, $7);"#)
.bind(i64::try_from(*height)?)
.bind(execution.input.amount.to_string())
Expand Down Expand Up @@ -328,6 +358,50 @@ impl Event {
.await?;
Ok(())
}
Event::BatchSwap {
height,
execution12,
execution21,
output_data,
} => {
let (trace12_start, trace12_end) =
insert_swap_execution(dbtx, execution12.as_ref()).await?;
let (trace21_start, trace21_end) =
insert_swap_execution(dbtx, execution21.as_ref()).await?;
sqlx::query(r#"INSERT INTO dex_batch_swap VALUES ($1, $2, $3, $4, $5, $6, $7, CAST($8 AS Amount), CAST($9 AS Amount), CAST($10 AS Amount), CAST($11 AS Amount), CAST($12 AS Amount), CAST($13 AS Amount));"#)
.bind(i64::try_from(*height)?)
.bind(trace12_start)
.bind(trace12_end)
.bind(trace21_start)
.bind(trace21_end)
.bind(Sql::from(output_data.trading_pair.asset_1()))
.bind(Sql::from(output_data.trading_pair.asset_2()))
.bind(output_data.unfilled_1.to_string())
.bind(output_data.unfilled_2.to_string())
.bind(output_data.delta_1.to_string())
.bind(output_data.delta_2.to_string())
.bind(output_data.lambda_1.to_string())
.bind(output_data.lambda_2.to_string())
.execute(dbtx.as_mut())
.await?;
Ok(())
}
Event::Swap {
height,
trading_pair,
delta_1_i,
delta_2_i,
} => {
sqlx::query(r#"INSERT INTO dex_swap VALUES (DEFAULT, $1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5));"#)
.bind(i64::try_from(*height)?)
.bind(delta_1_i.to_string())
.bind(Sql::from(trading_pair.asset_1()))
.bind(delta_2_i.to_string())
.bind(Sql::from(trading_pair.asset_2()))
.execute(dbtx.as_mut())
.await?;
Ok(())
}
}
}
}
Expand Down Expand Up @@ -467,6 +541,52 @@ impl<'a> TryFrom<&'a ContextualizedEvent> for Event {
context,
})
}
// Batch Swap
x if x == Event::NAMES[7] => {
let pe = pb::EventBatchSwap::from_event(event.as_ref())?;
let height = event.block_height;
let output_data = pe
.batch_swap_output_data
.ok_or(anyhow!("missing swap execution"))?
.try_into()?;
let execution12 = pe
.swap_execution_1_for_2
.map(|x| x.try_into())
.transpose()?;
let execution21 = pe
.swap_execution_2_for_1
.map(|x| x.try_into())
.transpose()?;
Ok(Self::BatchSwap {
height,
execution12,
execution21,
output_data,
})
}
// Swap
x if x == Event::NAMES[8] => {
let pe = pb::EventSwap::from_event(event.as_ref())?;
let height = event.block_height;
let trading_pair = pe
.trading_pair
.expect("trading_pair should be present")
.try_into()?;
let delta_1_i = pe
.delta_1_i
.expect("delta_1_i should be present")
.try_into()?;
let delta_2_i = pe
.delta_2_i
.expect("delta_2_i should be present")
.try_into()?;
Ok(Self::Swap {
height,
trading_pair,
delta_1_i,
delta_2_i,
})
}
x => Err(anyhow!(format!("unrecognized event kind: {x}"))),
}
}
Expand Down

0 comments on commit be0ef35

Please sign in to comment.