Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/batch length #824

Merged
merged 7 commits into from
Oct 17, 2023
Merged

Fix/batch length #824

merged 7 commits into from
Oct 17, 2023

Conversation

samuelorji
Copy link
Contributor

Pre-review checklist

Modify the type of the number of queries in a batch statement from i16 to u16. As well as add guards to prevent making server calls when the number of queries is over u16::MAX

This is following this conversation in the issue

Fixes: #820

  • I have split my patch into logically separate commits.
  • All commit messages clearly explain what they change and why.
  • I added relevant tests for new features and bug fixes.
  • All commits compile, pass static checks and pass test.
  • PR description sums up the changes and reasons why they should be introduced.
  • I have provided docstrings for the public items that I want to introduce.
  • I have adjusted the documentation in ./docs/source/.
  • I added appropriate Fixes: annotations to PR description.

… from an i16 to a u16 according to the CQL protocol spec
…e server when the number of batch queries is greater than u16::MAX, as well as adding some tests
Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the contribution!
Left some comments, please take a look.

@@ -60,6 +60,7 @@ criterion = "0.4" # Note: v0.5 needs at least rust 1.70.0
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
assert_matches = "1.5.0"
rand_chacha = "0.3.1"
bcs = "0.1.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not add new dependencies when it isn't necessary.

let mut key = vec![0];
serialize_into(&mut key, &(i as usize)).unwrap();

Can be written as:

let mut key = vec![0];
key.extend(i.to_be_bytes().as_slice());

bcs looks like a big dependency, it'd be better not to pull in a whole serialization library just for this test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought so too, was trying to fully replicate the issue. Fixed now

Comment on lines 101 to 105
impl From<Infallible> for ParseError {
fn from(_: Infallible) -> Self {
ParseError::BadIncomingData("Unexpected Infallible Error".to_string())
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what this trait implementation does?
AFAIU Infallible is for things that can never happen, so why do we want to convert it to a ParseError?

https://doc.rust-lang.org/std/convert/enum.Infallible.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for the conversion of u16 to a usize. I wanted to do a simple as but didn't want to modify the existing code as much. I think converting from the previous i16 to a usize would have failed with the TryFromIntError error, but with u16 -> usize, it really is in fact infallible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, I understand now.
So we have a few pieces of code like this one:

let a: i16 = 123;
let b: usize = a.try_into()?;

And after changing the i16 to u16 they no longer compile because try_into() has an Infallible error type.
We can implement a conversion from Infallible to ParseError to make it compile, but it's a bit hacky.

I think it would be better to replace the try_into()?with into(), like this:

let a: u16 = 123;
let b: usize = a.into();

There is a an implentation of From<u16> for usize, so we can just use into() here.

Comment on lines 188 to 194
pub fn write_short(v: i16, buf: &mut impl BufMut) {
buf.put_i16(v);
}

pub fn write_u16(v: u16, buf: &mut impl BufMut) {
buf.put_u16(v);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should change write_short to accept u16.
AFAIU short refers to the CQL short type, which is a u16.
But we can do that in a separate PR as it will be a large change. Let's keep this one focused on the problem with Batches.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about changing write_short to accept u16. There is only a small number of uses of this function in the code (I counted only six) so it should be easy to fix it right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The switch wasn't so simple, i had to change the type in some other structs that were linked in a way I don't understand yet. example. But as long as the tests pass, it should be fine.

Comment on lines 352 to 353
#[error("Number of Queries in Batch Statement has exceeded the max value of 65,536")]
TooManyQueriesInBatchStatement,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum value of u16 is 65,535 - one less than 65,536.
https://doc.rust-lang.org/std/primitive.u16.html#associatedconstant.MAX

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd also be nice to display the number of queries that the user has passed. This can be done by adding (usize) to the enum variant and displaying it with {0}.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice Catch. Fixed

async fn create_test_session(session: Session, ks: &String) -> Session {
session
.query(
format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }}",ks),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change SimpleStrategy to NetworkTopologyStrategy.
We are currently phasing out SimpleStrategy, in the future only NetworkTopologyStrategy will be allowed.
The query syntax is the same, only the name has to replaced.


let err = write_batch(&session, too_many_queries).await;

assert!(err.is_err());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure that the error is actually TooManyQueriesInBatchStatement.
I think this can be done using assert_matches!.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, knew there was a way to do this without having to implement PartialEq on everything 😢

}
keys
}

Copy link
Contributor

@cvybhu cvybhu Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't really follow the logic of key prefixes.
Would it be possible to just insert numbers 1..n and then read them all from the table?
I'm a fan of keeping things as simple as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified the tests to be simpler, but still show the intent of batch queries

Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the CI has failed.
Please make sure that cargo check --examples --tests and cargo clippy --examples --tests don't produce any errors or warnings.

@samuelorji
Copy link
Contributor Author

@cvybhu. I've made the change

@samuelorji samuelorji force-pushed the fix/batch-length branch 3 times, most recently from 5bc57b5 to 64c719d Compare October 10, 2023 14:32
@samuelorji
Copy link
Contributor Author

@cvybhu , any help trying to get the CI to work, it seems to be failing for a myriad of reasons when it works perfectly on my machine.

now, I've added an #![allow(unused_imports)] to prevent that failure amongst other things

@cvybhu
Copy link
Contributor

cvybhu commented Oct 10, 2023

@cvybhu , any help trying to get the CI to work, it seems to be failing for a myriad of reasons when it works perfectly on my machine.

I saw that the CI is failing on cargo clippy. It's possible that you have an older version of rust installed and because of that it doesn't produce some of the warnings that the latest version does. CI always uses the latest version of stable rust (1.73.0 at the moment)

Please make sure that cargo --version and cargo clippy --version print 1.73.0:

$ cargo --version
cargo 1.73.0 (9c4383fb5 2023-08-26)
$ cargo clippy --version
clippy 0.1.73 (cc66ad4 2023-10-03)

If it doesn't, you have to update rust to get the latest warnings. If you installed it using rustup you can just run rustup update.

Another thing is that you have to pass the flags --examples --tests to cargo clippy. Without them it ignores all the code in tests and examples.

Make sure that you are running the same commands as the workflow does: https://github.com/scylladb/scylla-rust-driver/blob/e6d6d3e5bd97ec746e12640a3b861e3cbdffd404/.github/workflows/rust.yml#L29C1-L29C1

Although you can probably skip the --verbose flag to reduce the output size.

@cvybhu
Copy link
Contributor

cvybhu commented Oct 10, 2023

now, I've added an #![allow(unused_imports)] to prevent that failure amongst other things

It'd be better to remove the unused imports that clippy is complaining about.

@samuelorji
Copy link
Contributor Author

@cvybhu , Thanks for the feedback, I upgraded my cargo and cargo clippy versions, and I think I was able to fix the clippy errors. Now, it seems like the test I wrote is a little flakey, although I don't know why. In the previous run, it ran fine, but now, it's failing on this quite simple assertion that runs fine locally for me

@cvybhu
Copy link
Contributor

cvybhu commented Oct 10, 2023

Now, it seems like the test I wrote is a little flakey, although I don't know why. In the previous run, it ran fine, but now, it's failing on this quite simple assertion that runs fine locally for me

It looks like the test passes on Scylla, but fails on Cassandra. scylla-rust-driver supports both databases, so we have to make sure that it passes on both of them.

You can try running this test against Cassandra instead of Scylla.

You can start a single Cassandra node using docker:

docker run --rm -it -p 9042:9042 -p 19042:19042 cassandra

and then run your test on this node and see what goes wrong.

@samuelorji
Copy link
Contributor Author

Now, it seems like the test I wrote is a little flakey, although I don't know why. In the previous run, it ran fine, but now, it's failing on this quite simple assertion that runs fine locally for me

It looks like the test passes on Scylla, but fails on Cassandra. scylla-rust-driver supports both databases, so we have to make sure that it passes on both of them.

You can try running this test against Cassandra instead of Scylla.

You can start a single Cassandra node using docker:

docker run --rm -it -p 9042:9042 -p 19042:19042 cassandra

and then run your test on this node and see what goes wrong.

Thanks, will do now

@samuelorji
Copy link
Contributor Author

@cvybhu , I think I found the issue, the batch write result sometimes result in a write timeout error:

DbError(WriteTimeout { consistency: LocalQuorum, received: 0, required: 1, write_type: Simple }, "Operation timed out - received only 0 responses.")

The only way I could replicate this was by running a cassandra cluster instead of a single instance, and even then, it happened infrequently, but the times when the batch write didn't succeed, it was due to the write timeout, which could be caused by the large size of the data or busy nodes.

WDYT

@cvybhu
Copy link
Contributor

cvybhu commented Oct 10, 2023

@cvybhu , I think I found the issue, the batch write result sometimes result in a write timeout error:

Ah that makes sense, Scylla is the faster (better) database after all ;) 60k requests can take a moment to complete.

In that case let's increase the timeout so that Cassandra can complete the work. There used to be set_request_timeout function, but AFAIR this was replaced by ExecutionProfiles. So you'll have to create an ExecutionProfile with larger timeout and make the Batch statement use this ExecutionProfile.

@samuelorji
Copy link
Contributor Author

@cvybhu , I think I found the issue, the batch write result sometimes result in a write timeout error:

Ah that makes sense, Scylla is the faster (better) database after all ;) 60k requests can take a moment to complete.

In that case let's increase the timeout so that Cassandra can complete the work. There used to be set_request_timeout function, but AFAIR this was replaced by ExecutionProfiles. So you'll have to create an ExecutionProfile with larger timeout and make the Batch statement use this ExecutionProfile.

Okay, lemme give that a try

@samuelorji
Copy link
Contributor Author

@cvybhu I've made the change. Added the change as a different commit that should be eventually squashed

@samuelorji
Copy link
Contributor Author

so weird how my cargo and cargo clippy versions are both:

$ cargo --version
cargo 1.73.0 (9c4383fb5 2023-08-26)
$ cargo clippy --version
clippy 0.1.73 (cc66ad4 2023-10-03)

and I'm running Clippy check an Format Check. and I'm not getting any error, but it's weirdly failing on CI 🤔

@cvybhu
Copy link
Contributor

cvybhu commented Oct 10, 2023

and I'm running Clippy check an Format Check. and I'm not getting any error, but it's weirdly failing on CI 🤔

Looks like everything is OK in the last version. Clippy check passes in the CI now.

It failed on Cassandra tests again. Maybe you have to make the timeout even larger.

Btw you could replace assert(batch_result.is_ok()) with batch_result.unwrap(). unwrap() prints the error message on failure, so we would be able to see the timeout message in CI logs.

@samuelorji
Copy link
Contributor Author

Still a write timeout error even with 240 seconds

thread 'transport::large_batch_statements_test::test_large_batch_statements' panicked at 'called `Result::unwrap()` on an `Err` value: DbError(WriteTimeout { consistency: LocalQuorum, received: 0, required: 1, write_type: Simple }, "Operation timed out - received only 0 responses.")', scylla/src/transport/large_batch_statements_test.rs:33:26

@piodul
Copy link
Collaborator

piodul commented Oct 10, 2023

Still a write timeout error even with 240 seconds

thread 'transport::large_batch_statements_test::test_large_batch_statements' panicked at 'called `Result::unwrap()` on an `Err` value: DbError(WriteTimeout { consistency: LocalQuorum, received: 0, required: 1, write_type: Simple }, "Operation timed out - received only 0 responses.")', scylla/src/transport/large_batch_statements_test.rs:33:26

Let's not introduce unit tests that are very long to execute. I suppose the goal is to just verify that batch serialization succeeds, right? Can we just call the serialization logic directly and test that it returns the expected error or succeeds?

Alternatively, you can see if using prepared statements in the batch helps - but a batch with 60k statements is abnormally large nonetheless, so I doubt the test will end in a reasonable time.

@piodul
Copy link
Collaborator

piodul commented Oct 10, 2023

One more thing - the timeout that you observed occurs on the DB side, i.e. Cassandra itself returns a timeout error. Increasing the timeout on the driver side will not help - it only affects how long the driver waits for response. You would have to modify the configuration used to run Cassandra in CI.

@samuelorji
Copy link
Contributor Author

One more thing - the timeout that you observed occurs on the DB side, i.e. Cassandra itself returns a timeout error. Increasing the timeout on the driver side will not help - it only affects how long the driver waits for response. You would have to modify the configuration used to run Cassandra in CI.

Okay, makes sense, because the check for the size of the statements occurs at the session level, will a single test that shows the expected error suffice ?

@piodul
Copy link
Collaborator

piodul commented Oct 10, 2023

Okay, makes sense, because the check for the size of the statements occurs at the session level, will a single test that shows the expected error suffice ?

I think yes. It would be better to also have a test which shows that serializing a large batch that is just under the limit works, but to me it's even better not to have long tests or flaky tests (due to timeouts). I don't suspect that regression here is likely, so if it's hard to write such a test then I won't insist.

@cvybhu
Copy link
Contributor

cvybhu commented Oct 11, 2023

Let's not introduce unit tests that are very long to execute. I suppose the goal is to just verify that batch serialization succeeds, right? Can we just call the serialization logic directly and test that it returns the expected error or succeeds?

@piodul I think it would be good to have a test that actually runs a batch with ~40k queries. Otherwise how do we know that it actually works? It will take a moment, but IMO it's better to have a few heavy tests rather than no tests at all.
In Pytest there are markers like @heavy_test and it's possible to run all tests except for the heavy ones for quick feedback. Maybe Rust has something similar?

Plus inserting 40k rows shouldn't take THAT long. AFAIR Scylla was capable of 100k inserts/s on a single shard, so it shouldn't take more than a few seconds to run this test. Cassandra might be worse, but we could just disable this test for Cassandra.
One more idea would be to insert the same row 40k times - that would all be a single mutation, so AFAIU there wouldn't be any memtable flushes, should be really fast.

@cvybhu
Copy link
Contributor

cvybhu commented Oct 11, 2023

Rust docs mention that there are two main types of tests - unit tests, which are next to the source code and integration tests, which are in the tests folder[1]. It's possible to run only unit tests using cargo test --lib [2].

I think we could add the test that runs a huge batch as an integration test. It matches the definition of an integration test, because it uses the library in the same way that an outside user would. Then we'd have a split - unit tests are quick, can be ran alone, while integration tests can be heavier, but can be skipped when we need only the quick tests.

[1] https://doc.rust-lang.org/book/ch11-03-test-organization.html#the-tests-directory
[2] rust-lang/cargo#8396

@piodul
Copy link
Collaborator

piodul commented Oct 12, 2023

Let's not introduce unit tests that are very long to execute. I suppose the goal is to just verify that batch serialization succeeds, right? Can we just call the serialization logic directly and test that it returns the expected error or succeeds?

@piodul I think it would be good to have a test that actually runs a batch with ~40k queries. Otherwise how do we know that it actually works? It will take a moment, but IMO it's better to have a few heavy tests rather than no tests at all.

I tried running the test locally and it actually doesn't take that long (<1s to insert a batch, both with Scylla and Cassandra). I was concerned about not making the tests too long for the developers to execute them locally, but I don't think this one sticks out too much (it spends most of the time on schema changes, actually - this seems to be a problem with most of our existing tests).

I'm actually more worried about introducing flakiness to the CI.

In Pytest there are markers like @heavy_test and it's possible to run all tests except for the heavy ones for quick feedback. Maybe Rust has something similar?

Rust's default testing framework, sadly, doesn't have too many features, so I doubt it.

Plus inserting 40k rows shouldn't take THAT long. AFAIR Scylla was capable of 100k inserts/s on a single shard, so it shouldn't take more than a few seconds to run this test. Cassandra might be worse, but we could just disable this test for Cassandra.

Disabling the test for Cassandra is also an option.

One more idea would be to insert the same row 40k times - that would all be a single mutation, so AFAIU there wouldn't be any memtable flushes, should be really fast.

The batch should already result in a single mutation, it only modifies a single partition. I don't know about Cassandra, but Scylla also seems to struggle with batches this large - I've seen some large stalls in the logs related to applying mutation to the memtable.

@samuelorji
Copy link
Contributor Author

@cvybhu @piodul to move forward, should reinstate the previous tests that does an actual write but then ignore the tests for cassandra as done here

@piodul
Copy link
Collaborator

piodul commented Oct 13, 2023

@cvybhu @piodul to move forward, should reinstate the previous tests that does an actual write but then ignore the tests for cassandra as done here

I'm fine with this approach. @cvybhu ?

@cvybhu
Copy link
Contributor

cvybhu commented Oct 13, 2023

@cvybhu @piodul to move forward, should reinstate the previous tests that does an actual write but then ignore the tests for cassandra as done here

Sounds good to me 👍

@samuelorji samuelorji requested a review from cvybhu October 13, 2023 12:49
Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it looks nice.

I left a few more comments, please take a look, and then we can merge it :)

Comment on lines 178 to 186
pub fn read_short(buf: &mut &[u8]) -> Result<u16, ParseError> {
let v = buf.read_u16::<BigEndian>()?;
Ok(v)
}

pub fn read_u16(buf: &mut &[u8]) -> Result<u16, ParseError> {
let v = buf.read_u16::<BigEndian>()?;
Ok(v)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the read_u16 isn't needed anymore, we have read_short that does the same thing. Let's remove read_u16.

Comment on lines +137 to 139
if self.values_num == u16::MAX {
return Err(SerializeValuesError::TooManyValues);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message for TooManyValues has to be adjusted as well, currently it mentions i16::MAX requests.

#[error("Too many values to add, max 32 767 values can be sent in a request")]
TooManyValues,

}

async fn write_batch(session: &Session, n: usize, ks: &String) -> Result<QueryResult, QueryError> {
let mut batch_query = Batch::new(BatchType::Logged);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An Unlogged batch might be faster to execute, it could speed up the test a bit.

let max_queries = u16::MAX as usize;
let batch_insert_result = write_batch(&session, max_queries, &ks).await;

assert!(batch_insert_result.is_ok());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change it to batch_insert_result.unwrap()? unwrap() will print the error message when the test fails. assert! will only give us assertion failed, which isn't very helpful.

Comment on lines 101 to 105
impl From<Infallible> for ParseError {
fn from(_: Infallible) -> Self {
ParseError::BadIncomingData("Unexpected Infallible Error".to_string())
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, I understand now.
So we have a few pieces of code like this one:

let a: i16 = 123;
let b: usize = a.try_into()?;

And after changing the i16 to u16 they no longer compile because try_into() has an Infallible error type.
We can implement a conversion from Infallible to ParseError to make it compile, but it's a bit hacky.

I think it would be better to replace the try_into()?with into(), like this:

let a: u16 = 123;
let b: usize = a.into();

There is a an implentation of From<u16> for usize, so we can just use into() here.

@samuelorji samuelorji requested a review from cvybhu October 13, 2023 17:17
Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@piodul do you want to take a final look? I'm ready to merge.

Copy link
Collaborator

@piodul piodul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some last nitpicks, but after fixing them - LGTM

async fn create_test_session(session: Session, ks: &String) -> Session {
session
.query(
format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }}",ks),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use CREATE KEYSPACE without IF EXISTS, the ks name is guaranteed to be unique.

Comment on lines 40 to 43
session
.query(format!("DROP TABLE IF EXISTS {}.pairs;", ks), &[])
.await
.unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed, the test will create a new keyspace because ks is guaranteed to be unique.

let query = format!("INSERT INTO {}.pairs (dummy, k, v) VALUES (0, ?, ?)", ks);
let values = vec![key, value];
batch_values.push(values);
let query = Query::new(query);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using prepared statements instead? Prepare the statement once before creating the batch and then use it in append_statement: batch_query.append_statement(prepared.clone()). This should reduce the work needed by DB to process the batch.

@samuelorji
Copy link
Contributor Author

@piodul thanks for the review. I've made the change, can you please have another look?

Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cvybhu cvybhu merged commit 82c1c99 into scylladb:main Oct 17, 2023
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incorrect error for batching
3 participants