Skip to content

Commit

Permalink
fix #20: lost files between diode-receive and diode-receive-file
Browse files Browse the repository at this point in the history
server socket must never be closed or we can loose client connection
  • Loading branch information
CrabeDeFrance committed Dec 15, 2024
1 parent 6975e0f commit 8cdf1bb
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 36 deletions.
2 changes: 1 addition & 1 deletion features/steps/diode.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def start_diode_send_dir(context):
stdout = subprocess.PIPE
stderr = subprocess.STDOUT

diode_send_dir_command = [f'{context.bin_dir}/diode-send-dir', '--log-config', context.log_config_diode_send_dir, '--maximum-delay', '200', '--to-tcp', '127.0.0.1:5000', context.send_dir.name]
diode_send_dir_command = [f'{context.bin_dir}/diode-send-dir', '--log-config', context.log_config_diode_send_dir, '--maximum-files', '1', '--to-tcp', '127.0.0.1:5000', context.send_dir.name]

context.proc_diode_send_dir = subprocess.Popen(
diode_send_dir_command,
Expand Down
66 changes: 42 additions & 24 deletions src/file/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
fs,
hash::Hash,
io::{Read, Write},
net,
net::{self, TcpStream},
os::unix::fs::PermissionsExt,
path,
};
Expand All @@ -23,33 +23,51 @@ pub fn receive_files(config: &file::Config, output_dir: &path::Path) -> Result<(
}

fn receive_tcp_loop(config: &file::Config, output_dir: &path::Path) -> Result<(), file::Error> {
loop {
// quit loop in case of error to force reconnecting
log::trace!("new tcp receive file");
let server = net::TcpListener::bind(config.diode)?;
receive_tcp_socket(config, output_dir, server)?;
let (tx, rx) = crossbeam_channel::bounded::<TcpStream>(100);

let server = net::TcpListener::bind(config.diode)?;
if let Err(e) = std::thread::Builder::new()
.name("lidi_rx_file_bind".to_string())
.spawn(move || {
loop {
let (client, client_addr) = match server.accept() {
Ok(ret) => ret,
Err(e) => {
log::warn!("Can't accept new client: {e}");
continue;
}
};
log::debug!("new client ({client_addr}) connected");
// quit loop in case of error to force reconnecting
log::trace!("new tcp receive file");
if let Err(e) = tx.send(client) {
log::warn!("Can't send new client: {e}");
}
}
})
{
log::error!("Can't start new thread: {e}");
}
}

fn receive_tcp_socket(
config: &file::Config,
output_dir: &path::Path,
server: net::TcpListener,
) -> Result<(), file::Error> {
let (mut client, client_addr) = server.accept()?;
log::debug!("new Unix client ({client_addr}) connected");
drop(server);
loop {
match receive_file(config, &mut client, output_dir) {
Ok((filename, total, stream_end)) => {
log::info!("{filename} received, {total} bytes");
if stream_end {
return Ok(());
}
}
let mut client = match rx.recv() {
Ok(client) => client,
Err(e) => {
log::error!("failed to receive file: {e}");
return Err(e);
log::warn!("Can't get new client: {e}");
continue;
}
};

// try to read files until diode-receive disconnects
loop {
match receive_file(config, &mut client, output_dir) {
Ok((filename, total, _stream_end)) => {
log::info!("{filename} received, {total} bytes");
}
Err(e) => {
log::error!("failed to receive file: {e}");
break;
}
}
}
}
Expand Down
32 changes: 21 additions & 11 deletions src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,23 @@ impl ReceiverConfig {
}
// connect and reconnect on error
if let Ok(client) = TcpStream::connect(tcp_to) {
log::info!("tcp: connected to diode-receive");
// reset states
log::info!(
"tcp: connected to diode-receive (from: {:?})",
client.local_addr()
);

// initialize tcp session properly
let mut tcp = Tcp::new(client, tcp_buffer_size);

// if we have a first block try to send it
let current_session = match first_block {
Some(first_block) => {
ReceiverConfig::tcp_send_first_block(&mut tcp, first_block)
}
None => None,
};

// main loop. may return the first block of a session in some corner case
first_block = ReceiverConfig::tcp_send_inner_loop(&for_send, tcp, current_session);

send_log_once = true;
Expand Down Expand Up @@ -386,17 +391,22 @@ impl ReceiverConfig {
"probably due to a network interrupt"
};

if block.session_id == 0 {
log::warn!(
"changed session ! {} != expected {}: {}",
block.session_id,
current_session,
extra_info
);
}
log::warn!(
"changed session ! {} != expected {}: {}",
block.session_id,
current_session,
extra_info
);

// we changed session without receiving last message
// need to close this session and restart
return Some(block);
if block.flags.contains(MessageType::Start) {
// this is the first block of the new session
return Some(block);
} else {
// disconnect, we will wait for the start of the next session
return None;
}
}
}
}
Expand Down

0 comments on commit 8cdf1bb

Please sign in to comment.