Skip to content

Commit

Permalink
Add support for batch queries
Browse files Browse the repository at this point in the history
Example of it's usage:

  pub async fn insert(db, i) {
    ...
    let stmt_keys = [];
    let stmt_values = [];
    ...
    stmt_keys.push(some_prepared_statement_name);
    stmt_values.push([
      user_id, user_name, permissions,
    ]);
    ...
    db.batch_prepared(stmt_keys, stmt_values).await?
    ...
  }

To reuse the same gathered values in non-batch call do following:

  pub async fn insert(db, i) {
    ...
    db.execute_prepared(stmt_keys[0], stmt_values[0]).await?
    ...
  }
  • Loading branch information
vponomaryov committed Nov 5, 2024
1 parent 0ad2000 commit 231541b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
63 changes: 63 additions & 0 deletions src/scripting/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use rand::prelude::ThreadRng;
use rand::random;
use rune::runtime::{Object, Shared};
use rune::{Any, Value};
use scylla::batch::{Batch, BatchType};
use scylla::frame::response::result::CqlValue;
use scylla::prepared_statement::PreparedStatement;
use scylla::query::Query;
use std::collections::HashMap;
Expand Down Expand Up @@ -655,6 +657,67 @@ impl Context {
}
}

pub async fn batch_prepared(
&self,
keys: Vec<&str>,
params: Vec<Value>,
) -> Result<(), CassError> {
let keys_len = keys.len();
let params_len = params.len();
if keys_len != params_len {
return Err(CassError(CassErrorKind::Error(format!(
"Number of prepared statements ({keys_len}) and values ({params_len}) must be equal"
))));
} else if keys_len == 0 {
return Err(CassError(CassErrorKind::Error("Empty batch".to_string())));
}
let mut batch: Batch = Batch::new(BatchType::Logged);
let mut batch_values: Vec<Vec<CqlValue>> = vec![];
for (i, key) in enumerate(keys) {
let statement = self.statements.get(key).ok_or_else(|| {
CassError(CassErrorKind::PreparedStatementNotFound(key.to_string()))
})?;
let statement_col_specs = statement.get_variable_col_specs();
batch.append_statement((**statement).clone());
batch_values.push(bind::to_scylla_query_params(
params.get(i).expect("REASON"),
statement_col_specs,
)?);
}
match &self.session {
Some(session) => {
let mut current_attempt_num = 0;
while current_attempt_num <= self.retry_number {
let start_time = self.stats.try_lock().unwrap().start_request();
let rs = session.batch(&batch, batch_values.clone()).await;
let duration = Instant::now() - start_time;
match rs {
Ok(_) => {
self.stats.try_lock().unwrap().complete_request(
duration,
Some(batch_values.len() as u64),
&rs,
);
return Ok(());
}
Err(e) => {
let current_error = CassError(CassErrorKind::Error(format!(
"batch execution failed: {e}"
)));
handle_retry_error(self, current_attempt_num, current_error).await;
current_attempt_num += 1;
continue;
}
}
}
Err(CassError::query_retries_exceeded(self.retry_number))
}
None => Err(CassError(CassErrorKind::Error(
"'session' is not defined".to_string(),
))),
}
}

pub fn elapsed_secs(&self) -> f64 {
self.start_time.try_lock().unwrap().elapsed().as_secs_f64()
}
Expand Down
10 changes: 10 additions & 0 deletions src/scripting/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ pub async fn execute_prepared(
ctx.execute_prepared(&key, params).await
}

#[rune::function(instance)]
pub async fn batch_prepared(
ctx: Ref<Context>,
keys: Vec<Ref<str>>,
params: Vec<Value>,
) -> Result<(), CassError> {
ctx.batch_prepared(keys.iter().map(|k| k.deref()).collect(), params)
.await
}

#[rune::function(instance)]
pub async fn init_partition_row_distribution_preset(
mut ctx: Mut<Context>,
Expand Down
1 change: 1 addition & 0 deletions src/scripting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fn try_install(
context_module.function_meta(functions::execute)?;
context_module.function_meta(functions::prepare)?;
context_module.function_meta(functions::execute_prepared)?;
context_module.function_meta(functions::batch_prepared)?;
context_module.function_meta(functions::init_partition_row_distribution_preset)?;
context_module.function_meta(functions::get_partition_idx)?;
context_module.function_meta(functions::get_datacenters)?;
Expand Down

0 comments on commit 231541b

Please sign in to comment.