Skip to content

Commit

Permalink
Update CLI to use new crate and fix discovered issues (#22)
Browse files Browse the repository at this point in the history
* Move lava-torrent to workspace

* Check length before restarting recv operation

* Improve disconnect flow

* Remove assinged pieces more aggressivly on timeout

* Get rid of O(n) search for piece hash and mark incorrect hashes as not inflight

* First integration with DHT crate

* Don't change timeout point unconditionally when receiving a piece + some cleanup

* Make some asserts debug asserts

* Only print progress if hash matches

---------

Co-authored-by: Nehliin <>
  • Loading branch information
Nehliin authored Jan 12, 2025
1 parent 31e6901 commit 780ef2a
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 168 deletions.
17 changes: 2 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ log = "0.4"
parking_lot = "0.12"
rand = "0.8.5"
env_logger = "0.10"
lava_torrent = "0.11"

[profile.release]
overflow-checks = true
Expand Down
2 changes: 1 addition & 1 deletion bittorrent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ arbitrary = { version = "1.1.3", features = ["derive"]}
log = "0.4"
socket2 = "0.5"
rand = "0.8"
lava_torrent = "0.11"
lava_torrent = { workspace = true }
thiserror = "2"
sha1 = "0.10.5"

Expand Down
139 changes: 75 additions & 64 deletions bittorrent/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,26 @@ pub fn push_connected_write(
}
}

fn restart_multishot_recv(
sq: &mut SubmissionQueue<'_>,
user_data: UserData,
fd: RawFd,
backlog: &mut VecDeque<io_uring::squeue::Entry>,
bgid: Bgid,
) {
log::warn!("Starting new recv multishot");
let read_op = opcode::RecvMulti::new(types::Fd(fd), bgid)
.build()
.user_data(user_data.as_u64())
.flags(io_uring::squeue::Flags::BUFFER_SELECT);
unsafe {
if sq.push(&read_op).is_err() {
log::warn!("SQ buffer full, pushing to backlog");
backlog.push_back(read_op);
}
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
pub struct UserData {
buffer_idx: Option<u32>,
Expand Down Expand Up @@ -163,8 +183,7 @@ fn event_error_handler(
Ok(())
}
_ => {
let event = events.remove(user_data.event_idx as _);
log::error!("Unhandled error event: {event:?}");
events.remove(user_data.event_idx as _);
let err = std::io::Error::from_raw_os_error(error_code as i32);
Err(err)
}
Expand Down Expand Up @@ -455,6 +474,15 @@ impl EventLoop {
Event::Recv { socket } => {
let fd = socket.as_raw_fd();
let len = ret as usize;
if len == 0 {
log::debug!("No more data when expecting handshake");
self.events.remove(user_data.event_idx as _);
return Ok(());
}
let is_more = io_uring::cqueue::more(cqe.flags());
if !is_more {
restart_multishot_recv(sq, user_data, fd, backlog, self.read_ring.bgid());
}
// We always have a buffer associated
let buffer = read_bid.map(|bid| self.read_ring.get(bid)).unwrap();
// Expect this to be the handshake response
Expand Down Expand Up @@ -506,78 +534,61 @@ impl EventLoop {
// The event is reused and not replaced
std::mem::swap(&mut event, &mut self.events[user_data.event_idx as usize]);
let connection = &mut self.connections[connection_idx];
let len = ret as usize;
if len == 0 {
log::debug!("[PeerId: {}] No more data, mark as pending disconnect", connection.peer_id);
self.events.remove(user_data.event_idx as _);
connection.pending_disconnect = true;
return Ok(());
}
let is_more = io_uring::cqueue::more(cqe.flags());
if !is_more {
log::warn!("No more, starting new recv");
let read_op = opcode::RecvMulti::new(
types::Fd(connection.socket.as_raw_fd()),
self.read_ring.bgid(),
)
.build()
.user_data(user_data.as_u64())
.flags(io_uring::squeue::Flags::BUFFER_SELECT);
unsafe {
if sq.push(&read_op).is_err() {
log::warn!("SQ buffer full, pushing to backlog");
backlog.push_back(read_op);
}
}
// This event doesn't contain any data
return Ok(());
let fd = connection.socket.as_raw_fd();
restart_multishot_recv(sq, user_data, fd, backlog, self.read_ring.bgid());
}

let len = ret as usize;

// We always have a buffer associated
let buffer = read_bid.map(|bid| self.read_ring.get(bid)).unwrap();
let buffer = &buffer[..len];
if buffer.is_empty() {
log::warn!("READ 0");
self.events.remove(user_data.event_idx as _);
self.connections.remove(connection_idx as _);
log::warn!("shutting down connection");
// TODO graceful shutdown
} else {
connection.stateful_decoder.append_data(buffer);
let conn_fd = connection.socket.as_raw_fd();
while let Some(parse_result) = connection.stateful_decoder.next() {
match parse_result {
Ok(peer_message) => {
match connection.handle_message(
connection_idx,
peer_message,
torrent_state,
) {
Ok(outgoing_messages) => {
for outgoing in outgoing_messages {
// Buffers are returned in the event loop
let buffer = self.write_pool.get_buffer();
outgoing.message.encode(buffer.inner);
let size = outgoing.message.encoded_size();
push_connected_write(
connection_idx,
conn_fd,
&mut self.events,
sq,
buffer.index,
&buffer.inner[..size],
outgoing.ordered,
backlog,
)
}
}
Err(err @ Error::Disconnect(_)) => {
log::warn!("[Peer {}] {err}", connection.peer_id);
// TODO proper shutdown
}
Err(err) => {
log::error!("Failed handling message: {err}");
connection.stateful_decoder.append_data(buffer);
let conn_fd = connection.socket.as_raw_fd();
while let Some(parse_result) = connection.stateful_decoder.next() {
match parse_result {
Ok(peer_message) => {
match connection.handle_message(
connection_idx,
peer_message,
torrent_state,
) {
Ok(outgoing_messages) => {
for outgoing in outgoing_messages {
// Buffers are returned in the event loop
let buffer = self.write_pool.get_buffer();
outgoing.message.encode(buffer.inner);
let size = outgoing.message.encoded_size();
push_connected_write(
connection_idx,
conn_fd,
&mut self.events,
sq,
buffer.index,
&buffer.inner[..size],
outgoing.ordered,
backlog,
)
}
}
Err(err @ Error::Disconnect(_)) => {
log::warn!("[Peer {}] {err}", connection.peer_id);
connection.pending_disconnect = true;
}
Err(err) => {
log::error!("Failed handling message: {err}");
}
}
Err(err) => {
log::error!("Failed decoding message: {err}");
}
}
Err(err) => {
log::error!("Failed decoding message: {err}");
}
}
}
Expand Down
70 changes: 34 additions & 36 deletions bittorrent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ mod peer_connection;
mod peer_protocol;
mod piece_selector;

pub use peer_protocol::PeerId;
pub use peer_protocol::generate_peer_id;
pub use peer_protocol::PeerId;

#[derive(Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -117,7 +117,8 @@ impl TorrentState {
self.torrent_info.pieces.len()
}

pub(crate) fn on_piece_completed(&mut self, index: i32, data: Vec<u8>) {
// Returns if the piece is valid
pub(crate) fn on_piece_completed(&mut self, index: i32, data: Vec<u8>) -> bool {
let hash_time = Instant::now();
let mut hasher = sha1::Sha1::new();
hasher.update(&data);
Expand All @@ -126,34 +127,26 @@ impl TorrentState {
log::info!("Piece hashed in: {} microsec", hash_time.as_micros());
// The hash can be provided to the data storage or the peer connection
// when the piece is requested so it can be used for validation later on
let position = self
.torrent_info
.pieces
.iter()
.position(|piece_hash| data_hash.as_slice() == piece_hash);
match position {
Some(piece_index) if piece_index == index as usize => {
log::info!("Piece hash matched downloaded data");
self.piece_selector.mark_complete(piece_index);
self.file_store.write_piece(index, &data).unwrap();
let expected_hash = &self.torrent_info.pieces[index as usize];
if expected_hash == data_hash.as_slice() {
log::info!("Piece hash matched downloaded data");
self.piece_selector.mark_complete(index as usize);
self.file_store.write_piece(index, &data).unwrap();

// Purge disconnected peers TODO move to tick instead
//self.peer_list.connections.retain(|_, peer| {
// peer.have(index).is_ok()
//});

if self.piece_selector.completed_all() {
let file_store = std::mem::replace(&mut self.file_store, FileStore::dummy());
file_store.close().unwrap();
self.is_complete = true;
}
}
Some(piece_index) => log::error!(
"Piece hash didn't match expected index! expected index: {index}, piece_index: {piece_index}"
),
None => {
log::error!("Piece sha1 hash not found!");
// Purge disconnected peers TODO move to tick instead
//self.peer_list.connections.retain(|_, peer| {
// peer.have(index).is_ok()
//});
if self.piece_selector.completed_all() {
let file_store = std::mem::replace(&mut self.file_store, FileStore::dummy());
file_store.close().unwrap();
self.is_complete = true;
}
true
} else {
log::error!("Piece hash didn't match expected hash!");
self.piece_selector.mark_not_inflight(index as usize);
false
}
}

Expand All @@ -172,28 +165,28 @@ fn tick(
// 2. Go through them in order
// 3. select pieces
// TODO use retain instead of iter mut in the loop below and get rid of this
//let mut disconnects = Vec::new();
let mut disconnects = Vec::new();
for (id, connection) in connections.iter_mut() {
// If this connection have no inflight 2 iterations in a row
// disconnect it and clear all pieces it was currently downloading
// this is not very granular and will need tweaking

if let Some(time) = connection.timeout_point {
if time.elapsed() > connection.request_timeout() {
connection.on_request_timeout();
log::warn!("TIMEOUT: {id}");
connection.on_request_timeout(torrent_state);
}
}

if connection.pending_disconnect && !connection.peer_choking && connection.queued.is_empty()
{
log::error!("Disconnect");
//disconnects.push(peer_key);
if connection.pending_disconnect {
log::debug!("Disconnect: {id}");
disconnects.push(id);
continue;
}

if !connection.peer_choking {
// slow start win size increase is handled in update_stats
if !connection.slow_start && !connection.pending_disconnect {
if !connection.slow_start {
// From the libtorrent impl, request queue time = 3
let new_queue_capacity =
3 * connection.throughput / piece_selector::SUBPIECE_SIZE as u64;
Expand All @@ -217,7 +210,7 @@ fn tick(
&& connection.throughput > 0
&& connection.throughput < connection.prev_throughput + 5000
{
log::error!("Exiting slow start");
log::debug!("[Peer {}] Exiting slow start", connection.peer_id);
connection.slow_start = false;
}
connection.prev_throughput = connection.throughput;
Expand Down Expand Up @@ -269,4 +262,9 @@ fn tick(
}
peer.fill_request_queue();
}

for id in disconnects {
let mut conn = connections.remove(id);
conn.release_pieces(torrent_state);
}
}
Loading

0 comments on commit 780ef2a

Please sign in to comment.