Skip to content

Commit

Permalink
Integrating #918
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Apr 10, 2024
1 parent 7d9d57c commit 27063b6
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 108 deletions.
12 changes: 12 additions & 0 deletions commons/zenoh-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,18 @@ pub mod reader {
fn rewind(&mut self, mark: Self::Mark) -> bool;
}

pub trait AdvanceableReader: Reader {
fn skip(&mut self, offset: usize) -> Result<(), DidntRead>;
fn backtrack(&mut self, offset: usize) -> Result<(), DidntRead>;
fn advance(&mut self, offset: isize) -> Result<(), DidntRead> {
if offset > 0 {
self.skip(offset as usize)
} else {
self.backtrack((-offset) as usize)
}
}
}

#[derive(Debug, Clone, Copy)]
pub struct DidntSiphon;

Expand Down
213 changes: 105 additions & 108 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use crate::ZSliceKind;
use crate::{
buffer::{Buffer, SplitBuffer},
reader::{BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, SiphonableReader},
reader::{
AdvanceableReader, BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader,
SiphonableReader,
},
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
ZSlice, ZSliceBuffer,
};
Expand Down Expand Up @@ -433,43 +436,74 @@ impl<'a> io::Read for ZBufReader<'a> {
}
}

#[cfg(feature = "std")]
impl<'a> io::Seek for ZBufReader<'a> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
// Compute the index
let len = self.inner.len();
let index = match pos {
io::SeekFrom::Start(pos) => pos.try_into().unwrap_or(i64::MAX),
io::SeekFrom::End(pos) => {
pos + i64::try_from(len)
.map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?
}
io::SeekFrom::Current(pos) => {
pos + i64::try_from(len - self.remaining())
.map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?
impl<'a> AdvanceableReader for ZBufReader<'a> {
fn skip(&mut self, offset: usize) -> Result<(), DidntRead> {
let mut remaining_offset = offset;
while remaining_offset > 0 {
let s = self.inner.slices.get(self.cursor.slice).ok_or(DidntRead)?;
let remains_in_current_slice = s.len() - self.cursor.byte;
let advance = remaining_offset.min(remains_in_current_slice);
remaining_offset -= advance;
self.cursor.byte += advance;
if self.cursor.byte == s.len() {
self.cursor.slice += 1;
self.cursor.byte = 0;
}
};
}
Ok(())
}

let index = usize::try_from(index)
.map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?
.min(len);

// Seek the position
let mut left = index;
let mut pos = ZBufPos { slice: 0, byte: 0 };
while let Some(slice) = self.inner.slices.get(pos.slice) {
let len = slice.len();
if len >= left {
pos.byte = left;
self.cursor = pos;
return Ok(index as u64);
} else {
left -= len;
fn backtrack(&mut self, offset: usize) -> Result<(), DidntRead> {
let mut remaining_offset = offset;
while remaining_offset > 0 {
let backtrack = remaining_offset.min(self.cursor.byte);
remaining_offset -= backtrack;
self.cursor.byte -= backtrack;
if self.cursor.byte == 0 {
if self.cursor.slice == 0 {
break;
}
self.cursor.slice -= 1;
self.cursor.byte = self
.inner
.slices
.get(self.cursor.slice)
.ok_or(DidntRead)?
.len();
}
pos.slice += 1;
}
if remaining_offset == 0 {
Ok(())
} else {
Err(DidntRead)
}
}
}

Err(io::ErrorKind::UnexpectedEof.into())
#[cfg(feature = "std")]
impl<'a> io::Seek for ZBufReader<'a> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
let current_pos = self
.inner
.slices()
.take(self.cursor.slice)
.fold(0, |acc, s| acc + s.len())
+ self.cursor.byte;
let current_pos = i64::try_from(current_pos)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?;

let offset = match pos {
std::io::SeekFrom::Start(s) => i64::try_from(s).unwrap_or(i64::MAX) - current_pos,
std::io::SeekFrom::Current(s) => s,
std::io::SeekFrom::End(s) => self.inner.len() as i64 + s - current_pos,
};
match self.advance(offset as isize) {
Ok(()) => Ok((offset + current_pos) as u64),
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"InvalidInput",
)),
}
}
}

Expand Down Expand Up @@ -745,80 +779,43 @@ mod tests {
#[cfg(feature = "std")]
#[test]
fn zbuf_seek() {
use crate::reader::HasReader;
use std::io::{Seek, SeekFrom};

use super::{ZBuf, ZSlice};

let slice: ZSlice = [0u8, 1, 2, 3, 4, 5, 6, 7].to_vec().into();

let mut zbuf = ZBuf::empty();
zbuf.push_zslice(slice.subslice(0, 1).unwrap());
zbuf.push_zslice(slice.subslice(1, 4).unwrap());
zbuf.push_zslice(slice.subslice(4, 8).unwrap());

let mut reader = zbuf.reader();

let index = reader.seek(SeekFrom::Start(0)).unwrap();
assert_eq!(index, 0);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Start(4)).unwrap();
assert_eq!(index, 4);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Start(8)).unwrap();
assert_eq!(index, 8);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Start(u64::MAX)).unwrap();
assert_eq!(index, 8);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::End(0)).unwrap();
assert_eq!(index, 8);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::End(-4)).unwrap();
assert_eq!(index, 4);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::End(-8)).unwrap();
assert_eq!(index, 0);
assert_eq!(index, reader.stream_position().unwrap());

reader.seek(SeekFrom::End(i64::MIN)).unwrap_err();
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Start(0)).unwrap();
assert_eq!(index, 0);
assert_eq!(index, reader.stream_position().unwrap());

reader.seek(SeekFrom::Current(-1)).unwrap_err();
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Current(2)).unwrap();
assert_eq!(index, 2);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Current(2)).unwrap();
assert_eq!(index, 4);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Current(-2)).unwrap();
assert_eq!(index, 2);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Current(-2)).unwrap();
assert_eq!(index, 0);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Current(i64::MAX)).unwrap();
assert_eq!(index, 8);
assert_eq!(index, reader.stream_position().unwrap());

let index = reader.seek(SeekFrom::Current(-1)).unwrap();
assert_eq!(index, 7);
assert_eq!(index, reader.stream_position().unwrap());
use super::{HasReader, ZBuf};
use crate::reader::Reader;
use std::io::Seek;

let mut buf = ZBuf::empty();
buf.push_zslice([0u8, 1u8, 2u8, 3u8].into());
buf.push_zslice([4u8, 5u8, 6u8, 7u8, 8u8].into());
buf.push_zslice([9u8, 10u8, 11u8, 12u8, 13u8, 14u8].into());
let mut reader = buf.reader();

assert_eq!(reader.stream_position().unwrap(), 0);
assert_eq!(reader.read_u8().unwrap(), 0);
assert_eq!(reader.seek(std::io::SeekFrom::Current(6)).unwrap(), 7);
assert_eq!(reader.read_u8().unwrap(), 7);
assert_eq!(reader.seek(std::io::SeekFrom::Current(-5)).unwrap(), 3);
assert_eq!(reader.read_u8().unwrap(), 3);
assert_eq!(reader.seek(std::io::SeekFrom::Current(10)).unwrap(), 14);
assert_eq!(reader.read_u8().unwrap(), 14);
reader.seek(std::io::SeekFrom::Current(100)).unwrap_err();

assert_eq!(reader.seek(std::io::SeekFrom::Start(0)).unwrap(), 0);
assert_eq!(reader.read_u8().unwrap(), 0);
assert_eq!(reader.seek(std::io::SeekFrom::Start(12)).unwrap(), 12);
assert_eq!(reader.read_u8().unwrap(), 12);
assert_eq!(reader.seek(std::io::SeekFrom::Start(15)).unwrap(), 15);
reader.read_u8().unwrap_err();
reader.seek(std::io::SeekFrom::Start(100)).unwrap_err();

assert_eq!(reader.seek(std::io::SeekFrom::End(0)).unwrap(), 15);
reader.read_u8().unwrap_err();
assert_eq!(reader.seek(std::io::SeekFrom::End(-5)).unwrap(), 10);
assert_eq!(reader.read_u8().unwrap(), 10);
assert_eq!(reader.seek(std::io::SeekFrom::End(-15)).unwrap(), 0);
assert_eq!(reader.read_u8().unwrap(), 0);
reader.seek(std::io::SeekFrom::End(-20)).unwrap_err();

assert_eq!(reader.seek(std::io::SeekFrom::Start(10)).unwrap(), 10);
reader.seek(std::io::SeekFrom::Current(-100)).unwrap_err();
}
}

0 comments on commit 27063b6

Please sign in to comment.