Skip to content

Commit

Permalink
protocol: improve and fix decode routines
Browse files Browse the repository at this point in the history
  • Loading branch information
ohsayan committed Jun 23, 2024
1 parent 15761f7 commit c171681
Show file tree
Hide file tree
Showing 5 changed files with 796 additions and 751 deletions.
16 changes: 8 additions & 8 deletions src/io/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use {
error::{ClientResult, ConnectionSetupError, Error},
protocol::{
handshake::{ClientHandshake, ServerHandshake},
state_init::{DecodeState, MRespState, PipelineResult, RState},
Decoder,
DecodeState, Decoder, MRespState, PipelineResult, RState,
},
query::Pipeline,
response::{FromResponse, Response},
Expand Down Expand Up @@ -168,11 +167,12 @@ impl<C: AsyncWriteExt + AsyncReadExt + Unpin> TcpConnection<C> {
return Err(Error::IoError(std::io::ErrorKind::ConnectionReset.into()));
}
self.buf.extend_from_slice(&buf[..n]);
let mut decoder = Decoder::new(&self.buf, cursor);
match decoder.validate_pipe(pipeline.query_count(), state) {
let (_state, _position) =
Decoder::new(&self.buf, cursor).validate_pipe(pipeline.query_count(), state);
match _state {
PipelineResult::Completed(r) => return Ok(r),
PipelineResult::Pending(_state) => {
cursor = decoder.position();
cursor = _position;
state = _state;
}
PipelineResult::Error(e) => return Err(e.into()),
Expand All @@ -198,13 +198,13 @@ impl<C: AsyncWriteExt + AsyncReadExt + Unpin> TcpConnection<C> {
continue;
}
self.buf.extend_from_slice(&buf[..n]);
let mut decoder = Decoder::new(&self.buf, cursor);
match decoder.validate_response(state) {
let (_state, _position) = Decoder::new(&self.buf, cursor).validate_response(state);
match _state {
DecodeState::Completed(resp) => return Ok(resp),
DecodeState::ChangeState(_state) => {
expected = 1;
state = _state;
cursor = decoder.position();
cursor = _position;
}
DecodeState::Error(e) => return Err(Error::ProtocolError(e)),
}
Expand Down
25 changes: 12 additions & 13 deletions src/io/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use {
error::{ClientResult, ConnectionSetupError, Error},
protocol::{
handshake::{ClientHandshake, ServerHandshake},
state_init::{DecodeState, MRespState, PipelineResult, RState},
Decoder,
DecodeState, Decoder, MRespState, PipelineResult, RState,
},
query::Pipeline,
response::{FromResponse, Response},
Expand Down Expand Up @@ -163,11 +162,12 @@ impl<C: Write + Read> TcpConnection<C> {
return Err(Error::IoError(std::io::ErrorKind::ConnectionReset.into()));
}
self.buf.extend_from_slice(&buf[..n]);
let mut decoder = Decoder::new(&self.buf, cursor);
match decoder.validate_pipe(pipeline.query_count(), state) {
let (_state, _position) =
Decoder::new(&self.buf, cursor).validate_pipe(pipeline.query_count(), state);
match _state {
PipelineResult::Completed(r) => return Ok(r),
PipelineResult::Pending(_state) => {
cursor = decoder.position();
cursor = _position;
state = _state;
}
PipelineResult::Error(e) => return Err(e.into()),
Expand All @@ -189,15 +189,14 @@ impl<C: Write + Read> TcpConnection<C> {
return Err(Error::IoError(std::io::ErrorKind::ConnectionReset.into()));
}
self.buf.extend_from_slice(&buf[..n]);
let mut decoder = Decoder::new(&self.buf, cursor);
match decoder.validate_response(state) {
DecodeState::ChangeState(new_state) => {
state = new_state;
cursor = decoder.position();
continue;
}
let (_state, _position) = Decoder::new(&self.buf, cursor).validate_response(state);
match _state {
DecodeState::Completed(resp) => return Ok(resp),
DecodeState::Error(e) => return Err(e.into()),
DecodeState::ChangeState(_state) => {
state = _state;
cursor = _position;
}
DecodeState::Error(e) => return Err(Error::ProtocolError(e)),
}
}
}
Expand Down
Loading

0 comments on commit c171681

Please sign in to comment.