Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update CLI to use new crate and fix discovered issues #22

Merged
merged 9 commits into from
Jan 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ log = "0.4"
parking_lot = "0.12"
rand = "0.8.5"
env_logger = "0.10"
lava_torrent = "0.11"

[profile.release]
overflow-checks = true
Expand Down
2 changes: 1 addition & 1 deletion bittorrent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ arbitrary = { version = "1.1.3", features = ["derive"]}
log = "0.4"
socket2 = "0.5"
rand = "0.8"
lava_torrent = "0.11"
lava_torrent = { workspace = true }
thiserror = "2"
sha1 = "0.10.5"

Expand Down
139 changes: 75 additions & 64 deletions bittorrent/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,26 @@ pub fn push_connected_write(
}
}

fn restart_multishot_recv(
sq: &mut SubmissionQueue<'_>,
user_data: UserData,
fd: RawFd,
backlog: &mut VecDeque<io_uring::squeue::Entry>,
bgid: Bgid,
) {
log::warn!("Starting new recv multishot");
let read_op = opcode::RecvMulti::new(types::Fd(fd), bgid)
.build()
.user_data(user_data.as_u64())
.flags(io_uring::squeue::Flags::BUFFER_SELECT);
unsafe {
if sq.push(&read_op).is_err() {
log::warn!("SQ buffer full, pushing to backlog");
backlog.push_back(read_op);
}
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
pub struct UserData {
buffer_idx: Option<u32>,
Expand Down Expand Up @@ -163,8 +183,7 @@ fn event_error_handler(
Ok(())
}
_ => {
let event = events.remove(user_data.event_idx as _);
log::error!("Unhandled error event: {event:?}");
events.remove(user_data.event_idx as _);
let err = std::io::Error::from_raw_os_error(error_code as i32);
Err(err)
}
Expand Down Expand Up @@ -455,6 +474,15 @@ impl EventLoop {
Event::Recv { socket } => {
let fd = socket.as_raw_fd();
let len = ret as usize;
if len == 0 {
log::debug!("No more data when expecting handshake");
self.events.remove(user_data.event_idx as _);
return Ok(());
}
let is_more = io_uring::cqueue::more(cqe.flags());
if !is_more {
restart_multishot_recv(sq, user_data, fd, backlog, self.read_ring.bgid());
}
// We always have a buffer associated
let buffer = read_bid.map(|bid| self.read_ring.get(bid)).unwrap();
// Expect this to be the handshake response
Expand Down Expand Up @@ -506,78 +534,61 @@ impl EventLoop {
// The event is reused and not replaced
std::mem::swap(&mut event, &mut self.events[user_data.event_idx as usize]);
let connection = &mut self.connections[connection_idx];
let len = ret as usize;
if len == 0 {
log::debug!("[PeerId: {}] No more data, mark as pending disconnect", connection.peer_id);
self.events.remove(user_data.event_idx as _);
connection.pending_disconnect = true;
return Ok(());
}
let is_more = io_uring::cqueue::more(cqe.flags());
if !is_more {
log::warn!("No more, starting new recv");
let read_op = opcode::RecvMulti::new(
types::Fd(connection.socket.as_raw_fd()),
self.read_ring.bgid(),
)
.build()
.user_data(user_data.as_u64())
.flags(io_uring::squeue::Flags::BUFFER_SELECT);
unsafe {
if sq.push(&read_op).is_err() {
log::warn!("SQ buffer full, pushing to backlog");
backlog.push_back(read_op);
}
}
// This event doesn't contain any data
return Ok(());
let fd = connection.socket.as_raw_fd();
restart_multishot_recv(sq, user_data, fd, backlog, self.read_ring.bgid());
}

let len = ret as usize;

// We always have a buffer associated
let buffer = read_bid.map(|bid| self.read_ring.get(bid)).unwrap();
let buffer = &buffer[..len];
if buffer.is_empty() {
log::warn!("READ 0");
self.events.remove(user_data.event_idx as _);
self.connections.remove(connection_idx as _);
log::warn!("shutting down connection");
// TODO graceful shutdown
} else {
connection.stateful_decoder.append_data(buffer);
let conn_fd = connection.socket.as_raw_fd();
while let Some(parse_result) = connection.stateful_decoder.next() {
match parse_result {
Ok(peer_message) => {
match connection.handle_message(
connection_idx,
peer_message,
torrent_state,
) {
Ok(outgoing_messages) => {
for outgoing in outgoing_messages {
// Buffers are returned in the event loop
let buffer = self.write_pool.get_buffer();
outgoing.message.encode(buffer.inner);
let size = outgoing.message.encoded_size();
push_connected_write(
connection_idx,
conn_fd,
&mut self.events,
sq,
buffer.index,
&buffer.inner[..size],
outgoing.ordered,
backlog,
)
}
}
Err(err @ Error::Disconnect(_)) => {
log::warn!("[Peer {}] {err}", connection.peer_id);
// TODO proper shutdown
}
Err(err) => {
log::error!("Failed handling message: {err}");
connection.stateful_decoder.append_data(buffer);
let conn_fd = connection.socket.as_raw_fd();
while let Some(parse_result) = connection.stateful_decoder.next() {
match parse_result {
Ok(peer_message) => {
match connection.handle_message(
connection_idx,
peer_message,
torrent_state,
) {
Ok(outgoing_messages) => {
for outgoing in outgoing_messages {
// Buffers are returned in the event loop
let buffer = self.write_pool.get_buffer();
outgoing.message.encode(buffer.inner);
let size = outgoing.message.encoded_size();
push_connected_write(
connection_idx,
conn_fd,
&mut self.events,
sq,
buffer.index,
&buffer.inner[..size],
outgoing.ordered,
backlog,
)
}
}
Err(err @ Error::Disconnect(_)) => {
log::warn!("[Peer {}] {err}", connection.peer_id);
connection.pending_disconnect = true;
}
Err(err) => {
log::error!("Failed handling message: {err}");
}
}
Err(err) => {
log::error!("Failed decoding message: {err}");
}
}
Err(err) => {
log::error!("Failed decoding message: {err}");
}
}
}
Expand Down
70 changes: 34 additions & 36 deletions bittorrent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ mod peer_connection;
mod peer_protocol;
mod piece_selector;

pub use peer_protocol::PeerId;
pub use peer_protocol::generate_peer_id;
pub use peer_protocol::PeerId;

#[derive(Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -117,7 +117,8 @@ impl TorrentState {
self.torrent_info.pieces.len()
}

pub(crate) fn on_piece_completed(&mut self, index: i32, data: Vec<u8>) {
// Returns if the piece is valid
pub(crate) fn on_piece_completed(&mut self, index: i32, data: Vec<u8>) -> bool {
let hash_time = Instant::now();
let mut hasher = sha1::Sha1::new();
hasher.update(&data);
Expand All @@ -126,34 +127,26 @@ impl TorrentState {
log::info!("Piece hashed in: {} microsec", hash_time.as_micros());
// The hash can be provided to the data storage or the peer connection
// when the piece is requested so it can be used for validation later on
let position = self
.torrent_info
.pieces
.iter()
.position(|piece_hash| data_hash.as_slice() == piece_hash);
match position {
Some(piece_index) if piece_index == index as usize => {
log::info!("Piece hash matched downloaded data");
self.piece_selector.mark_complete(piece_index);
self.file_store.write_piece(index, &data).unwrap();
let expected_hash = &self.torrent_info.pieces[index as usize];
if expected_hash == data_hash.as_slice() {
log::info!("Piece hash matched downloaded data");
self.piece_selector.mark_complete(index as usize);
self.file_store.write_piece(index, &data).unwrap();

// Purge disconnected peers TODO move to tick instead
//self.peer_list.connections.retain(|_, peer| {
// peer.have(index).is_ok()
//});

if self.piece_selector.completed_all() {
let file_store = std::mem::replace(&mut self.file_store, FileStore::dummy());
file_store.close().unwrap();
self.is_complete = true;
}
}
Some(piece_index) => log::error!(
"Piece hash didn't match expected index! expected index: {index}, piece_index: {piece_index}"
),
None => {
log::error!("Piece sha1 hash not found!");
// Purge disconnected peers TODO move to tick instead
//self.peer_list.connections.retain(|_, peer| {
// peer.have(index).is_ok()
//});
if self.piece_selector.completed_all() {
let file_store = std::mem::replace(&mut self.file_store, FileStore::dummy());
file_store.close().unwrap();
self.is_complete = true;
}
true
} else {
log::error!("Piece hash didn't match expected hash!");
self.piece_selector.mark_not_inflight(index as usize);
false
}
}

Expand All @@ -172,28 +165,28 @@ fn tick(
// 2. Go through them in order
// 3. select pieces
// TODO use retain instead of iter mut in the loop below and get rid of this
//let mut disconnects = Vec::new();
let mut disconnects = Vec::new();
for (id, connection) in connections.iter_mut() {
// If this connection have no inflight 2 iterations in a row
// disconnect it and clear all pieces it was currently downloading
// this is not very granular and will need tweaking

if let Some(time) = connection.timeout_point {
if time.elapsed() > connection.request_timeout() {
connection.on_request_timeout();
log::warn!("TIMEOUT: {id}");
connection.on_request_timeout(torrent_state);
}
}

if connection.pending_disconnect && !connection.peer_choking && connection.queued.is_empty()
{
log::error!("Disconnect");
//disconnects.push(peer_key);
if connection.pending_disconnect {
log::debug!("Disconnect: {id}");
disconnects.push(id);
continue;
}

if !connection.peer_choking {
// slow start win size increase is handled in update_stats
if !connection.slow_start && !connection.pending_disconnect {
if !connection.slow_start {
// From the libtorrent impl, request queue time = 3
let new_queue_capacity =
3 * connection.throughput / piece_selector::SUBPIECE_SIZE as u64;
Expand All @@ -217,7 +210,7 @@ fn tick(
&& connection.throughput > 0
&& connection.throughput < connection.prev_throughput + 5000
{
log::error!("Exiting slow start");
log::debug!("[Peer {}] Exiting slow start", connection.peer_id);
connection.slow_start = false;
}
connection.prev_throughput = connection.throughput;
Expand Down Expand Up @@ -269,4 +262,9 @@ fn tick(
}
peer.fill_request_queue();
}

for id in disconnects {
let mut conn = connections.remove(id);
conn.release_pieces(torrent_state);
}
}
Loading
Loading