Skip to content

Commit

Permalink
small cleanup and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Jan 9, 2025
1 parent d0bd8a9 commit f91b832
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 9 deletions.
68 changes: 62 additions & 6 deletions crates/bytes-util/src/bytes_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,91 @@ pub trait BytesCursor {

impl BytesCursor for io::Cursor<Bytes> {
fn remaining(&self) -> usize {
self.get_ref().len() - self.position() as usize
// We have to use a saturating sub here because the position can be
// greater than the length of the bytes.
self.get_ref().len().saturating_sub(self.position() as usize)
}

fn extract_remaining(&mut self) -> Bytes {
self.extract_bytes(self.remaining())
.expect("somehow we read past the end of the file")
// We don't really care if we fail here since the desired behavior is
// to return all bytes remaining in the cursor. If we fail its because
// there are not enough bytes left in the cursor to read.
self.extract_bytes(self.remaining()).unwrap_or_default()
}

fn extract_bytes(&mut self, size: usize) -> io::Result<Bytes> {
let position = self.position() as usize;
if position + size > self.get_ref().len() {
// If the size is zero we can just return an empty bytes slice.
if size == 0 {
return Ok(Bytes::new());
}

// If the size is greater than the remaining bytes we can just return an
// error.
if size > self.remaining() {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "not enough bytes"));
}

let position = self.position() as usize;

// We slice bytes here which is a O(1) operation as it only modifies a few
// reference counters and does not copy the memory.
let slice = self.get_ref().slice(position..position + size);

// We advance the cursor because we have now "read" the bytes.
self.set_position((position + size) as u64);

Ok(slice)
}
}

#[cfg(test)]
#[cfg_attr(all(test, coverage_nightly), coverage(off))]
mod tests {
use super::*;

#[test]
fn test_bytes_cursor() {
fn test_bytes_cursor_extract_remaining() {
let mut cursor = io::Cursor::new(Bytes::from_static(&[1, 2, 3, 4, 5]));
let remaining = cursor.extract_remaining();
assert_eq!(remaining, Bytes::from_static(&[1, 2, 3, 4, 5]));
}

#[test]
fn test_bytes_cursor_extract_bytes() {
let mut cursor = io::Cursor::new(Bytes::from_static(&[1, 2, 3, 4, 5]));
let bytes = cursor.extract_bytes(3).unwrap();
assert_eq!(bytes, Bytes::from_static(&[1, 2, 3]));
assert_eq!(cursor.remaining(), 2);

let bytes = cursor.extract_bytes(2).unwrap();
assert_eq!(bytes, Bytes::from_static(&[4, 5]));
assert_eq!(cursor.remaining(), 0);

let bytes = cursor.extract_bytes(1).unwrap_err();
assert_eq!(bytes.kind(), io::ErrorKind::UnexpectedEof);

let bytes = cursor.extract_bytes(0).unwrap();
assert_eq!(bytes, Bytes::from_static(&[]));
assert_eq!(cursor.remaining(), 0);

let bytes = cursor.extract_remaining();
assert_eq!(bytes, Bytes::from_static(&[]));
assert_eq!(cursor.remaining(), 0);
}

#[test]
fn seek_out_of_bounds() {
let mut cursor = io::Cursor::new(Bytes::from_static(&[1, 2, 3, 4, 5]));
cursor.set_position(10);
assert_eq!(cursor.remaining(), 0);

let bytes = cursor.extract_remaining();
assert_eq!(bytes, Bytes::from_static(&[]));

let bytes = cursor.extract_bytes(1);
assert_eq!(bytes.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);

let bytes = cursor.extract_bytes(0);
assert_eq!(bytes.unwrap(), Bytes::from_static(&[]));
}
}
3 changes: 0 additions & 3 deletions crates/rtmp/src/session/server_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin> Session<S> {
self.flush().await?;
}

println!("write_buf: cap: {}; len: {}", self.write_buf.capacity(), self.write_buf.len());
println!("read_buf: cap: {}; len: {}", self.read_buf.capacity(), self.read_buf.len());

// We should technically check the stream_map here
// However most clients just disconnect without cleanly stopping the subscrition
// streams (play streams) So we just check that all publishers have disconnected
Expand Down

0 comments on commit f91b832

Please sign in to comment.