Skip to content

Commit

Permalink
Merge branch 'next' into devel
Browse files Browse the repository at this point in the history
  • Loading branch information
ohsayan committed Jul 2, 2024
2 parents bed583a + cf86a55 commit 95b6e33
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 38 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All changes in this project will be noted in this file.

## 0.8.10

### Fixes

- Fixed decode of multi-row responses

## 0.8.9

### Additions
Expand Down
95 changes: 57 additions & 38 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,17 @@ impl<T, U: ProtocolObjectState<Value = T>> ProtocolObjectDecodeState<T, U> {
decoder: &mut Decoder,
) -> ProtocolResult<ProtocolObjectDecodeState<T, U>> {
match self {
Self::Completed(c) => Ok(ProtocolObjectDecodeState::Completed(c)),
Self::Completed(c) => Ok(Self::Completed(c)),
Self::Pending(pv) => match pv.complete(decoder)? {
ProtocolObjectDecodeState::Completed(c) => {
Ok(ProtocolObjectDecodeState::Completed(c.into_value()))
}
ProtocolObjectDecodeState::Pending(pv) => {
Ok(ProtocolObjectDecodeState::Pending(pv))
}
ProtocolObjectDecodeState::Completed(c) => Ok(Self::Completed(c.into_value())),
ProtocolObjectDecodeState::Pending(pv) => Ok(Self::Pending(pv)),
},
}
}
}

#[cfg(test)]
impl<T: ProtocolObjectState> ProtocolObjectDecodeState<T> {
impl<T: ProtocolObjectState + core::fmt::Debug> ProtocolObjectDecodeState<T> {
fn into_completed(self) -> Option<T> {
match self {
Self::Completed(c) => Some(c),
Expand Down Expand Up @@ -280,7 +276,7 @@ impl<T: LfsObject> ProtocolObjectState for LfsValue<T> {
) -> ProtocolResult<ProtocolObjectDecodeState<Self>> {
let mut stop = decoder.cursor_eq(b'\n');
let mut error = false;
while !decoder.eof() && !error && !stop {
while (!decoder.eof()) && (!error) && (!stop) {
let byte = decoder.next();
error = !self.v.update(&mut self.state, byte);
stop = decoder.cursor_eq(b'\n');
Expand Down Expand Up @@ -588,32 +584,17 @@ impl AsValueStream for Row {

#[derive(Debug, PartialEq)]
pub(crate) struct ValueStream {
size: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
element_count: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
items: Vec<Value>,
pending: Option<Box<PendingValue>>,
}

impl ProtocolObjectState for ValueStream {
type Value = Vec<Value>;
fn initialize(decoder: &Decoder) -> Self {
Self {
size: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
items: vec![],
pending: None,
}
}
fn complete(
impl ValueStream {
fn _complete(
mut self,
decoder: &mut Decoder,
size: usize,
) -> ProtocolResult<ProtocolObjectDecodeState<Self>> {
let size = match self.size.try_complete(decoder)? {
ProtocolObjectDecodeState::Completed(c) => c,
ProtocolObjectDecodeState::Pending(pv) => {
self.size = ProtocolObjectDecodeState::Pending(pv);
return Ok(ProtocolObjectDecodeState::Pending(self));
}
};
self.size = ProtocolObjectDecodeState::Completed(size);
while self.items.len() != size {
if decoder.eof() {
return Ok(ProtocolObjectDecodeState::Pending(self));
Expand All @@ -634,6 +615,31 @@ impl ProtocolObjectState for ValueStream {
}
Ok(ProtocolObjectDecodeState::Completed(self))
}
}

impl ProtocolObjectState for ValueStream {
type Value = Vec<Value>;
fn initialize(decoder: &Decoder) -> Self {
Self {
element_count: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
items: vec![],
pending: None,
}
}
fn complete(
mut self,
decoder: &mut Decoder,
) -> ProtocolResult<ProtocolObjectDecodeState<Self>> {
let size = match self.element_count.try_complete(decoder)? {
ProtocolObjectDecodeState::Completed(c) => c,
ProtocolObjectDecodeState::Pending(pv) => {
self.element_count = ProtocolObjectDecodeState::Pending(pv);
return Ok(ProtocolObjectDecodeState::Pending(self));
}
};
self.element_count = ProtocolObjectDecodeState::Completed(size);
self._complete(decoder, size)
}
fn into_value(self) -> Self::Value {
self.items
}
Expand All @@ -645,7 +651,8 @@ impl ProtocolObjectState for ValueStream {

#[derive(Debug, PartialEq)]
pub(crate) struct MultiValueStream {
count: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
stream_count: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
stream_size: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
items: Vec<Vec<Value>>,
pending: Option<ValueStream>,
}
Expand All @@ -654,7 +661,8 @@ impl ProtocolObjectState for MultiValueStream {
type Value = Vec<Vec<Value>>;
fn initialize(decoder: &Decoder) -> Self {
Self {
count: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
stream_count: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
stream_size: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
items: vec![],
pending: None,
}
Expand All @@ -663,18 +671,29 @@ impl ProtocolObjectState for MultiValueStream {
mut self,
decoder: &mut Decoder,
) -> ProtocolResult<ProtocolObjectDecodeState<Self>> {
let size = match self.count.try_complete(decoder)? {
// get number of streams
let stream_count = match self.stream_count.try_complete(decoder)? {
ProtocolObjectDecodeState::Completed(sz) => sz,
ProtocolObjectDecodeState::Pending(pv) => {
self.count = ProtocolObjectDecodeState::Pending(pv);
self.stream_count = ProtocolObjectDecodeState::Pending(pv);
return Ok(ProtocolObjectDecodeState::Pending(self));
}
};
self.count = ProtocolObjectDecodeState::Completed(size);
while self.items.len() != size {
self.stream_count = ProtocolObjectDecodeState::Completed(stream_count);
// get per stream size
let stream_size = match self.stream_size.try_complete(decoder)? {
ProtocolObjectDecodeState::Completed(sz) => sz,
ProtocolObjectDecodeState::Pending(pv) => {
self.stream_size = ProtocolObjectDecodeState::Pending(pv);
return Ok(ProtocolObjectDecodeState::Pending(self));
}
};
self.stream_size = ProtocolObjectDecodeState::Completed(stream_size);
// load items
while self.items.len() != stream_count {
match match self.pending.take() {
Some(pending_vs) => pending_vs.complete(decoder),
None => ValueStream::initialize(decoder).complete(decoder),
Some(pending_vs) => pending_vs._complete(decoder, stream_size),
None => ValueStream::initialize(decoder)._complete(decoder, stream_size),
}? {
ProtocolObjectDecodeState::Completed(vs) => {
self.items.push(vs.items);
Expand Down Expand Up @@ -851,8 +870,8 @@ fn decode_value_stream() {
#[test]
fn decode_multi_value_stream() {
let packet = [
b"5\n".to_vec(),
"8\n\x00\x01\x01\x0518446744073709551615\n\x09-9223372036854775808\n\x0A-3.141592654\n\x0C5\nabcde\x0D5\nfghij\x0E2\n\x0C5\nabcde\x0D5\nfghij".repeat(5).into_bytes()
b"5\n8\n".to_vec(),
"\x00\x01\x01\x0518446744073709551615\n\x09-9223372036854775808\n\x0A-3.141592654\n\x0C5\nabcde\x0D5\nfghij\x0E2\n\x0C5\nabcde\x0D5\nfghij".repeat(5).into_bytes()
].concat();
for i in 1..packet.len() {
let mut decoder = Decoder::new(&packet[..i], 0);
Expand Down

0 comments on commit 95b6e33

Please sign in to comment.