-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix/batch length #824
Fix/batch length #824
Changes from 5 commits
4c013e8
772c46f
efb96fc
ec5cf68
a23e6eb
b35346b
6ecaa03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,8 @@ use byteorder::{BigEndian, ReadBytesExt}; | |
use bytes::{Buf, BufMut}; | ||
use num_enum::TryFromPrimitive; | ||
use std::collections::HashMap; | ||
use std::convert::TryFrom; | ||
use std::convert::TryInto; | ||
use std::convert::{Infallible, TryFrom}; | ||
use std::net::IpAddr; | ||
use std::net::SocketAddr; | ||
use std::str; | ||
|
@@ -16,7 +16,7 @@ use uuid::Uuid; | |
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, TryFromPrimitive)] | ||
#[cfg_attr(feature = "serde", derive(serde::Deserialize))] | ||
#[cfg_attr(feature = "serde", serde(rename_all = "SCREAMING_SNAKE_CASE"))] | ||
#[repr(i16)] | ||
#[repr(u16)] | ||
pub enum Consistency { | ||
Any = 0x0000, | ||
One = 0x0001, | ||
|
@@ -98,6 +98,12 @@ impl From<std::str::Utf8Error> for ParseError { | |
} | ||
} | ||
|
||
impl From<Infallible> for ParseError { | ||
fn from(_: Infallible) -> Self { | ||
ParseError::BadIncomingData("Unexpected Infallible Error".to_string()) | ||
} | ||
} | ||
|
||
impl From<std::array::TryFromSliceError> for ParseError { | ||
fn from(_err: std::array::TryFromSliceError) -> Self { | ||
ParseError::BadIncomingData("array try from slice failed".to_string()) | ||
|
@@ -169,13 +175,18 @@ fn type_long() { | |
} | ||
} | ||
|
||
pub fn read_short(buf: &mut &[u8]) -> Result<i16, ParseError> { | ||
let v = buf.read_i16::<BigEndian>()?; | ||
pub fn read_short(buf: &mut &[u8]) -> Result<u16, ParseError> { | ||
let v = buf.read_u16::<BigEndian>()?; | ||
Ok(v) | ||
} | ||
|
||
pub fn read_u16(buf: &mut &[u8]) -> Result<u16, ParseError> { | ||
let v = buf.read_u16::<BigEndian>()?; | ||
Ok(v) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the |
||
|
||
pub fn write_short(v: i16, buf: &mut impl BufMut) { | ||
buf.put_i16(v); | ||
pub fn write_short(v: u16, buf: &mut impl BufMut) { | ||
buf.put_u16(v); | ||
} | ||
|
||
pub(crate) fn read_short_length(buf: &mut &[u8]) -> Result<usize, ParseError> { | ||
|
@@ -185,14 +196,14 @@ pub(crate) fn read_short_length(buf: &mut &[u8]) -> Result<usize, ParseError> { | |
} | ||
|
||
fn write_short_length(v: usize, buf: &mut impl BufMut) -> Result<(), ParseError> { | ||
let v: i16 = v.try_into()?; | ||
let v: u16 = v.try_into()?; | ||
write_short(v, buf); | ||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn type_short() { | ||
let vals = [i16::MIN, -1, 0, 1, i16::MAX]; | ||
let vals: [u16; 3] = [0, 1, u16::MAX]; | ||
for val in vals.iter() { | ||
let mut buf = Vec::new(); | ||
write_short(*val, &mut buf); | ||
|
@@ -464,11 +475,11 @@ pub fn read_consistency(buf: &mut &[u8]) -> Result<Consistency, ParseError> { | |
} | ||
|
||
pub fn write_consistency(c: Consistency, buf: &mut impl BufMut) { | ||
write_short(c as i16, buf); | ||
write_short(c as u16, buf); | ||
} | ||
|
||
pub fn write_serial_consistency(c: SerialConsistency, buf: &mut impl BufMut) { | ||
write_short(c as i16, buf); | ||
write_short(c as u16, buf); | ||
} | ||
|
||
#[test] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,7 +63,7 @@ pub struct Time(pub Duration); | |
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] | ||
pub struct SerializedValues { | ||
serialized_values: Vec<u8>, | ||
values_num: i16, | ||
values_num: u16, | ||
contains_names: bool, | ||
} | ||
|
||
|
@@ -134,7 +134,7 @@ impl SerializedValues { | |
if self.contains_names { | ||
return Err(SerializeValuesError::MixingNamedAndNotNamedValues); | ||
} | ||
if self.values_num == i16::MAX { | ||
if self.values_num == u16::MAX { | ||
return Err(SerializeValuesError::TooManyValues); | ||
} | ||
Comment on lines
+137
to
139
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The message for #[error("Too many values to add, max 32 767 values can be sent in a request")]
TooManyValues, |
||
|
||
|
@@ -158,7 +158,7 @@ impl SerializedValues { | |
return Err(SerializeValuesError::MixingNamedAndNotNamedValues); | ||
} | ||
self.contains_names = true; | ||
if self.values_num == i16::MAX { | ||
if self.values_num == u16::MAX { | ||
return Err(SerializeValuesError::TooManyValues); | ||
} | ||
|
||
|
@@ -184,15 +184,15 @@ impl SerializedValues { | |
} | ||
|
||
pub fn write_to_request(&self, buf: &mut impl BufMut) { | ||
buf.put_i16(self.values_num); | ||
buf.put_u16(self.values_num); | ||
buf.put(&self.serialized_values[..]); | ||
} | ||
|
||
pub fn is_empty(&self) -> bool { | ||
self.values_num == 0 | ||
} | ||
|
||
pub fn len(&self) -> i16 { | ||
pub fn len(&self) -> u16 { | ||
self.values_num | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
use assert_matches::assert_matches; | ||
|
||
use scylla_cql::errors::{BadQuery, QueryError}; | ||
|
||
use crate::batch::BatchType; | ||
use crate::query::Query; | ||
use crate::{ | ||
batch::Batch, | ||
test_utils::{create_new_session_builder, unique_keyspace_name}, | ||
QueryResult, Session, | ||
}; | ||
|
||
#[tokio::test] | ||
async fn test_large_batch_statements() { | ||
let mut session = create_new_session_builder().build().await.unwrap(); | ||
|
||
let ks = unique_keyspace_name(); | ||
session = create_test_session(session, &ks).await; | ||
|
||
let max_queries = u16::MAX as usize; | ||
let batch_insert_result = write_batch(&session, max_queries, &ks).await; | ||
|
||
assert!(batch_insert_result.is_ok()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you change it to |
||
|
||
let too_many_queries = u16::MAX as usize + 1; | ||
let batch_insert_result = write_batch(&session, too_many_queries, &ks).await; | ||
assert_matches!( | ||
batch_insert_result.unwrap_err(), | ||
QueryError::BadQuery(BadQuery::TooManyQueriesInBatchStatement(_too_many_queries)) if _too_many_queries == too_many_queries | ||
) | ||
} | ||
|
||
async fn create_test_session(session: Session, ks: &String) -> Session { | ||
session | ||
.query( | ||
format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }}",ks), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use |
||
&[], | ||
) | ||
.await.unwrap(); | ||
session | ||
.query(format!("DROP TABLE IF EXISTS {}.pairs;", ks), &[]) | ||
.await | ||
.unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not needed, the test will create a new keyspace because |
||
session | ||
.query( | ||
format!("CREATE TABLE IF NOT EXISTS {}.pairs (dummy int, k blob, v blob, primary key (dummy, k))", ks), | ||
&[], | ||
) | ||
.await.unwrap(); | ||
session | ||
} | ||
|
||
async fn write_batch(session: &Session, n: usize, ks: &String) -> Result<QueryResult, QueryError> { | ||
let mut batch_query = Batch::new(BatchType::Logged); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An |
||
let mut batch_values = Vec::new(); | ||
for i in 0..n { | ||
let mut key = vec![0]; | ||
key.extend(i.to_be_bytes().as_slice()); | ||
let value = key.clone(); | ||
let query = format!("INSERT INTO {}.pairs (dummy, k, v) VALUES (0, ?, ?)", ks); | ||
let values = vec![key, value]; | ||
batch_values.push(values); | ||
let query = Query::new(query); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using prepared statements instead? Prepare the statement once before creating the batch and then use it in |
||
batch_query.append_statement(query); | ||
} | ||
session.batch(&batch_query, batch_values).await | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what this trait implementation does?
AFAIU
Infallible
is for things that can never happen, so why do we want to convert it to aParseError
?https://doc.rust-lang.org/std/convert/enum.Infallible.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for the conversion of
u16
to ausize
. I wanted to do a simpleas
but didn't want to modify the existing code as much. I think converting from the previousi16
to ausize
would have failed with theTryFromIntError
error, but withu16
->usize
, it really is in fact infallibleThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay, I understand now.
So we have a few pieces of code like this one:
And after changing the
i16
tou16
they no longer compile becausetry_into()
has anInfallible
error type.We can implement a conversion from
Infallible
toParseError
to make it compile, but it's a bit hacky.I think it would be better to replace the
try_into()?
withinto()
, like this:There is a an implentation of
From<u16> for usize
, so we can just useinto()
here.