Skip to content

Commit

Permalink
Merge pull request #792 from econia-labs/ECO-1997
Browse files Browse the repository at this point in the history
[ECO-1997] Add transaction version in MQTT price levels topic payload
  • Loading branch information
CRBl69 authored Jul 25, 2024
2 parents 96258fb + dc26c0f commit fe2ea34
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 3 deletions.
7 changes: 6 additions & 1 deletion doc/doc-site/docs/off-chain/dss/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ A payload for this event looks like this:
```json
{
"price": 12345,
"size": 12345
"size": 12345,
"txn_version": 12345
}
```

*Remember to set `MQTT_PRICE_LEVELS=yes` in the `.env` file for docker compose
(if running with docker compose, otherwise the `mqtt-publisher` executable must
have that variable exported) if you wish to get price levels over MQTT.*

### PlaceLimitOrderEvents

`place_limit_order/MARKET_ID/USER_ADDRESS/CUSTODIAN_ID/INTEGRATOR`
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SELECT
txn_version
FROM
aggregator.user_history_last_indexed_txn;
16 changes: 14 additions & 2 deletions src/rust/mqtt-publisher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bigdecimal::ToPrimitive;
use chrono::{DateTime, Utc};
use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS, Transport};
use serde::{Deserialize, Serialize};
use sqlx::{Executor, PgConnection};
use sqlx_postgres::{PgListener, PgPool};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -136,16 +137,26 @@ async fn eventpoll_loop(mut eventloop: EventLoop) -> Result<()> {
struct PriceLevel {
price: u128,
size: u128,
txn_version: u128,
}

async fn price_level_loop(db_url: &str, mqtt_client: Arc<RwLock<AsyncClient>>) -> Result<()> {
let pool = PgPool::connect(&db_url).await?;

loop {
tokio::time::sleep(Duration::from_millis(50)).await;
let mut tx = pool.begin().await?;
tx.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;").await?;
let data = sqlx::query_file!("sqlx_queries/get_price_levels.sql")
.fetch_all(&pool)
.await?;
.fetch_all(&mut tx as &mut PgConnection)
.await?;
let txn_version = sqlx::query_file!("sqlx_queries/get_user_history_last_indexed_txn.sql")
.fetch_one(&mut tx as &mut PgConnection)
.await?
.txn_version
.to_u128()
.ok_or(anyhow!("txn_version is too big"))?;
tx.rollback().await?;
let mqtt_client = mqtt_client.read().await;
for row in data {
let topic = format!(
Expand All @@ -165,6 +176,7 @@ async fn price_level_loop(db_url: &str, mqtt_client: Arc<RwLock<AsyncClient>>) -
.ok_or(anyhow!("total_size is None"))?
.to_u128()
.ok_or(anyhow!("total_size is too big"))?,
txn_version,
};
mqtt_client
.publish(
Expand Down

0 comments on commit fe2ea34

Please sign in to comment.