Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect error for batching #820

Closed
MathieuDutSik opened this issue Oct 5, 2023 · 5 comments · Fixed by #824
Closed

Incorrect error for batching #820

MathieuDutSik opened this issue Oct 5, 2023 · 5 comments · Fixed by #824
Labels
good first issue Good for newcomers

Comments

@MathieuDutSik
Copy link

MathieuDutSik commented Oct 5, 2023

I ran some batch with 30000 entries and it works correctly. With 40000 it does not work. I am fine with that however, the errors appears incorrect. The error that occurs is

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: InvalidMessage("Frame error: Could not deserialize frame: Integer conversion out of range")', src/main.rs:58:53

The code that is triggering it is

use bcs::serialize_into;
use scylla::query::Query;
use scylla::{IntoTypedRows, Session, SessionBuilder};

fn get_upper_bound_option(key_prefix: &[u8]) -> Option<Vec<u8>> {
    let len = key_prefix.len();
    for i in (0..len).rev() {
        let val = key_prefix[i];
        if val < u8::MAX {
            let mut upper_bound = key_prefix[0..i + 1].to_vec();
            upper_bound[i] += 1;
            return Some(upper_bound);
        }
    }
    None
}

async fn find_keys_by_prefix(session: &Session, key_prefix: Vec<u8>) -> Vec<Vec<u8>> {
    let len = key_prefix.len();
    let rows = match get_upper_bound_option(&key_prefix) {
        None => {
            let values = (key_prefix,);
            let query = "SELECT k FROM kv.pairs WHERE dummy = 0 AND k >= ? ALLOW FILTERING";
            session.query(query, values).await.unwrap()
        }
        Some(upper_bound) => {
            let values = (key_prefix, upper_bound);
            let query =
                "SELECT k FROM kv.pairs WHERE dummy = 0 AND k >= ? AND k < ? ALLOW FILTERING";
            session.query(query, values).await.unwrap()
        }
    };
    let mut keys = Vec::new();
    if let Some(rows) = rows.rows {
        for row in rows.into_typed::<(Vec<u8>,)>() {
            let key = row.unwrap();
            let short_key = key.0[len..].to_vec();
            keys.push(short_key);
        }
    }
    keys
}

async fn write_batch(session: &Session, n: usize) {
    let mut batch_query =
        scylla::statement::batch::Batch::new(scylla::frame::request::batch::BatchType::Logged);
    let mut batch_values = Vec::new();
    for i in 0..n {
        let mut key = vec![0];
        serialize_into(&mut key, &(i as usize)).unwrap();
        let value = key.clone();
        let query = "INSERT INTO kv.pairs (dummy, k, v) VALUES (0, ?, ?)";
        let values = vec![key, value];
        batch_values.push(values);
        let query = Query::new(query);
        batch_query.append_statement(query);
    }
    session.batch(&batch_query, batch_values).await.unwrap();
}

async fn create_test_session() -> Session {
    let session_builder = SessionBuilder::new().known_node("localhost:9042");
    let session = session_builder.build().await.unwrap();
    session
        .query(
            "CREATE KEYSPACE IF NOT EXISTS kv WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }",
            &[],
        )
        .await.unwrap();
    session
        .query("DROP TABLE IF EXISTS kv.pairs;", &[])
        .await
        .unwrap();
    session
        .query(
            "CREATE TABLE IF NOT EXISTS kv.pairs (dummy int, k blob, v blob, primary key (dummy, k))",
            &[],
        )
        .await.unwrap();
    session
}

#[tokio::main]
async fn main() {
    let session = create_test_session().await;

    let n = 40000;
    write_batch(&session, n).await;

    let key_prefix = vec![0];
    let keys = find_keys_by_prefix(&session, key_prefix.clone()).await;
    println!("key_prefix={:?} |keys|={}", key_prefix.clone(), keys.len());
}
@piodul piodul added the good first issue Good for newcomers label Oct 6, 2023
@samuelorji
Copy link
Contributor

samuelorji commented Oct 6, 2023

@piodul @MathieuDutSik . Did a little digging around and found the root cause, it's here when trying to parse the length of the statements into an i16, specifically the try_into?
https://github.com/scylladb/scylla-rust-driver/blob/main/scylla-cql/src/frame/request/batch.rs#L84

Tried your example with i16:: MAX and that worked, but i16::MAX + 1 failed with the same error

It may be worthwhile to add this limitation to the documentation or enforce the limitation at the method call boundary 🤷

Another option could be to just change it to an i32, but I don't know what the repercussions of that will be when serializing / deserializing, Happy to whip up a PR with some guidance as I'm a first-time contributor 😄

@samuelorji
Copy link
Contributor

From the java docs, it seems a batch statement can take 65536 statements

@cvybhu
Copy link
Contributor

cvybhu commented Oct 6, 2023

Thank you for the report! It looks like the i16 is a bug.

The CQL protocol spec says that the number of statements in a batch should be serialized as a 2-byte unsigned integer ([short]):

https://github.com/apache/cassandra/blob/865d7c30e4755e74c4e4d26205a7aed4cfb55710/doc/native_protocol_v4.spec#L451

https://github.com/apache/cassandra/blob/865d7c30e4755e74c4e4d26205a7aed4cfb55710/doc/native_protocol_v4.spec#L221C3-L221C3

We are serializing the length as an i16 which cuts the available range in half. I think it would be best to change the i16 to a u16. It's also possible that this bug is present in other places that (de)serialize [short] values, it would be good to go and take a look at all of them.

Happy to whip up a PR with some guidance as I'm a first-time contributor 😄

That would be great, thank you!
It's a standard Rust project, there are some additional tips in CONTRIBUTING.md.

@samuelorji
Copy link
Contributor

Ok, will take a stab at it

@samuelorji samuelorji mentioned this issue Oct 6, 2023
8 tasks
@samuelorji
Copy link
Contributor

PR here: #824

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants