Skip to content

Commit

Permalink
Fix pipeline impl
Browse files Browse the repository at this point in the history
  • Loading branch information
ohsayan committed Mar 7, 2024
1 parent 5df52bc commit 821652f
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 25 deletions.
10 changes: 5 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
*/

//! # Configuration
//!
//!
//! This module provides items to help with database connection setup and configuration.
//!
//!
//! ## Example
//!
//!
//! ```no_run
//! use skytable::Config;
//!
//!
//! // establish a sync connection to 127.0.0.1:2003
//! let mut db = Config::new_default("username", "password").connect().unwrap();
//!
//!
//! // establish a connection to a specific host `subnetx2_db1` and port `2008`
//! let mut db = Config::new("subnetx2_db1", 2008, "username", "password").connect().unwrap();
//! ```
Expand Down
6 changes: 3 additions & 3 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

#[macro_export]
/// This macro can be used to create a [`Query`](struct@crate::Query), almost like a variadic function
///
///
/// ## Examples
/// ```
/// use skytable::query;
///
///
/// fn get_username() -> String { "sayan".to_owned() }
/// fn get_counter() -> u64 { 100 }
///
///
/// let query1 = query!("select * from myspace.mymodel WHERE username = ?", get_username());
/// assert_eq!(query1.param_cnt(), 1);
/// let query2 = query!("update myspace.mymodel set counter += ? WHERE username = ?", get_counter(), get_username());
Expand Down
63 changes: 47 additions & 16 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ impl<'a> Decoder<'a> {
fn _creq(&self, b: u8) -> bool {
(self.b[core::cmp::min(self.i, self.b.len() - 1)] == b) & !self._cursor_eof()
}
fn _current(&self) -> &[u8] {
&self.b[self.i..]
}
}

trait DecodeDelimited {
Expand Down Expand Up @@ -510,24 +513,24 @@ impl<'a> Decoder<'a> {
}
}

#[test]
fn t_row() {
let mut decoder = Decoder::new(b"\x115\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n", 0);
assert_eq!(
decoder.validate_response(RState::default()),
DecodeState::Completed(Response::Row(Row::new(vec![
Value::Null,
Value::Bool(true),
Value::String("sayan".into()),
Value::UInt8(20),
Value::List(vec![])
])))
);
}

#[test]
fn t_mrow() {
let mut decoder = Decoder::new(b"\x133\n5\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n\x00\x01\x01\x0D5\nelana\x0221\n\x0E0\n\x00\x01\x01\x0D5\nemily\x0222\n\x0E0\n", 0);
const MROW_QUERY: &[u8] = b"\x133\n5\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n\x00\x01\x01\x0D5\nelana\x0221\n\x0E0\n\x00\x01\x01\x0D5\nemily\x0222\n\x0E0\n";
for i in 1..MROW_QUERY.len() {
let mut decoder = Decoder::new(&MROW_QUERY[..i], 0);
if i == 1 {
assert!(matches!(
decoder.validate_response(RState::default()),
DecodeState::ChangeState(RState(_))
));
} else {
assert!(matches!(
decoder.validate_response(RState::default()),
DecodeState::ChangeState(RState(ResponseState::PMultiRow(_)))
));
}
}
let mut decoder = Decoder::new(MROW_QUERY, 0);
assert_eq!(
decoder.validate_response(RState::default()),
DecodeState::Completed(Response::Rows(vec![
Expand Down Expand Up @@ -555,3 +558,31 @@ fn t_mrow() {
]))
);
}
#[test]
fn t_num() {
const NUM: &[u8] = b"1234\n";
fn decoder(i: usize) -> Decoder<'static> {
Decoder::new(&NUM[..i], 0)
}
for (i, expected) in [1, 12, 123, 1234u64]
.iter()
.enumerate()
.map(|(a, b)| (a + 1, *b))
{
assert_eq!(
decoder(i)
.__resume_decode(0u64, ValueStateMeta::zero())
.unwrap(),
ValueDecodeStateAny::Pending(ValueState::new(
Value::UInt64(expected),
ValueStateMeta::zero()
))
);
}
assert_eq!(
decoder(NUM.len())
.__resume_decode(0u64, ValueStateMeta::zero())
.unwrap(),
ValueDecodeStateAny::Decoded(Value::UInt64(1234))
);
}
26 changes: 25 additions & 1 deletion src/protocol/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ impl MRespState {
if self.processed.len() as u64 == self.expected.val() {
return PipelineResult::Completed(self.processed);
}
if decoder._cursor_eof() {
return PipelineResult::Pending(self);
}
match decoder.validate_response(RState(
self.pending.take().unwrap_or(ResponseState::Initial),
)) {
Expand All @@ -71,10 +74,13 @@ impl<'a> Decoder<'a> {
}
}

#[cfg(test)]
const QUERY: &[u8] = b"P5\n\x12\x10\xFF\xFF\x115\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n\x115\n\x00\x01\x01\x0D5\nelana\x0221\n\x0E0\n\x115\n\x00\x01\x01\x0D5\nemily\x0222\n\x0E0\n";

#[test]
fn t_pipe() {
use crate::response::{Response, Row, Value};
let mut decoder = Decoder::new(b"P5\n\x12\x10\xFF\xFF\x115\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n\x115\n\x00\x01\x01\x0D5\nelana\x0221\n\x0E0\n\x115\n\x00\x01\x01\x0D5\nemily\x0222\n\x0E0\n", 0);
let mut decoder = Decoder::new(QUERY, 0);
assert_eq!(
decoder.validate_pipe(true, MRespState::default()),
PipelineResult::Completed(vec![
Expand Down Expand Up @@ -104,3 +110,21 @@ fn t_pipe() {
])
);
}

#[test]
fn t_pipe_staged() {
for i in Decoder::MIN_READBACK..QUERY.len() {
let mut dec = Decoder::new(&QUERY[..i], 0);
if i < 3 {
assert!(matches!(
dec.validate_pipe(true, MRespState::default()),
PipelineResult::Pending(_)
));
} else {
assert!(matches!(
dec.validate_pipe(true, MRespState::default()),
PipelineResult::Pending(p) if p.expected.val() == 5
));
}
}
}

0 comments on commit 821652f

Please sign in to comment.