Skip to content

Commit

Permalink
fix: fix UTXO handling
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Dec 24, 2023
1 parent 4856955 commit 30db587
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 37 deletions.
6 changes: 5 additions & 1 deletion crates/ns-indexer/src/db/model_inscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,11 @@ impl Inscription {
};

let _ = db
.batch(statements[start..end].to_vec(), &values[start..end])
.batch(
scylladb::BatchType::Logged,
statements[start..end].to_vec(),
&values[start..end],
)
.await?;
start = end;
}
Expand Down
12 changes: 9 additions & 3 deletions crates/ns-indexer/src/db/model_name_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl NameState {
statements.push(query.as_str());
values.push((state.0, state.1 as i64));
}
let _ = db.batch(statements, values).await?;
let _ = db
.batch(scylladb::BatchType::Unlogged, statements, values)
.await?;
Ok(())
}

Expand All @@ -220,7 +222,9 @@ impl NameState {
statements.push(query);
values.push((state.0, state.1));
}
let _ = db.batch(statements, values).await?;
let _ = db
.batch(scylladb::BatchType::Unlogged, statements, values)
.await?;
Ok(())
}

Expand Down Expand Up @@ -248,7 +252,9 @@ impl NameState {
statements.push(query.as_str());
values.push((state.0, state.1));
}
let _ = db.batch(statements, values).await?;
let _ = db
.batch(scylladb::BatchType::Unlogged, statements, values)
.await?;
Ok(())
}

Expand Down
58 changes: 31 additions & 27 deletions crates/ns-indexer/src/db/model_utxo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,66 +40,70 @@ impl Utxo {
spent: &Vec<utxo::UTXO>,
unspent: &Vec<(Vec<u8>, utxo::UTXO)>,
) -> anyhow::Result<()> {
// delete spent utxos
let mut start = 0;
while start < spent.len() {
let end = if start + 1000 > spent.len() {
spent.len()
while start < unspent.len() {
let end = if start + 1000 > unspent.len() {
unspent.len()
} else {
start + 1000
};
let mut statements: Vec<&str> = Vec::with_capacity(end - start);
let mut values: Vec<Vec<CqlValue>> = Vec::with_capacity(end - start);
let query = "DELETE FROM utxo WHERE txid=? AND vout=?";
let mut statements: Vec<&str> = Vec::with_capacity(unspent.len());
let mut values: Vec<Vec<CqlValue>> = Vec::with_capacity(unspent.len());
let query = "INSERT INTO utxo (txid,vout,amount,address) VALUES (?,?,?,?)";

for tx in &spent[start..end] {
for tx in &unspent[start..end] {
statements.push(query);
values.push(vec![tx.txid.to_cql(), (tx.vout as i32).to_cql()]);
let tx = Self::from_utxo(tx.0.clone(), &tx.1);
values.push(vec![
tx.txid.to_cql(),
tx.vout.to_cql(),
tx.amount.to_cql(),
tx.address.to_cql(),
]);
}

if statements.len() > 500 {
log::info!(target: "ns-indexer",
action = "handle_spent_utxos",
action = "handle_unspent_utxos",
statements = statements.len();
"",
);
}

let _ = db.batch(statements, values).await?;
let _ = db
.batch(scylladb::BatchType::Unlogged, statements, values)
.await?;
start = end;
}

// delete spent utxos after insert unspent utxos
let mut start = 0;
while start < unspent.len() {
let end = if start + 1000 > unspent.len() {
unspent.len()
while start < spent.len() {
let end = if start + 1000 > spent.len() {
spent.len()
} else {
start + 1000
};
let mut statements: Vec<&str> = Vec::with_capacity(unspent.len());
let mut values: Vec<Vec<CqlValue>> = Vec::with_capacity(unspent.len());
let query = "INSERT INTO utxo (txid,vout,amount,address) VALUES (?,?,?,?)";
let mut statements: Vec<&str> = Vec::with_capacity(end - start);
let mut values: Vec<Vec<CqlValue>> = Vec::with_capacity(end - start);
let query = "DELETE FROM utxo WHERE txid=? AND vout=?";

for tx in &unspent[start..end] {
for tx in &spent[start..end] {
statements.push(query);
let tx = Self::from_utxo(tx.0.clone(), &tx.1);
values.push(vec![
tx.txid.to_cql(),
tx.vout.to_cql(),
tx.amount.to_cql(),
tx.address.to_cql(),
]);
values.push(vec![tx.txid.to_cql(), (tx.vout as i32).to_cql()]);
}

if statements.len() > 500 {
log::info!(target: "ns-indexer",
action = "handle_unspent_utxos",
action = "handle_spent_utxos",
statements = statements.len();
"",
);
}

let _ = db.batch(statements, values).await?;
let _ = db
.batch(scylladb::BatchType::Unlogged, statements, values)
.await?;
start = end;
}

Expand Down
13 changes: 7 additions & 6 deletions crates/ns-indexer/src/db/scylladb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use scylla::{
use std::{sync::Arc, time::Duration};

pub use scylla::{
batch::Batch,
batch::{Batch, BatchStatement, BatchType},
frame::response::result::{ColumnType, Row},
query::Query,
Bytes,
Expand Down Expand Up @@ -97,13 +97,14 @@ impl ScyllaDB {
// BATCH with conditions cannot span multiple tables
pub async fn batch(
&self,
statements: Vec<&str>,
batch_type: BatchType,
statements: Vec<impl Into<BatchStatement>>,
values: impl BatchValues,
) -> anyhow::Result<QueryResult> {
let mut batch: Batch = Default::default();
for statement in statements {
batch.append_statement(statement);
}
let batch = Batch::new_with_statements(
batch_type,
statements.into_iter().map(|s| s.into()).collect(),
);
let res = self.session.batch(&batch, values).await?;
Ok(res)
}
Expand Down

0 comments on commit 30db587

Please sign in to comment.