Skip to content

Commit

Permalink
several fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yinqiwen committed Mar 1, 2020
1 parent 8acc48c commit a859e1c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 42 deletions.
41 changes: 6 additions & 35 deletions src/rmux/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,37 +797,6 @@ where
break;
},
}
// let recv_event = read_encrypt_event(&mut rctx, ri, recv_buf).await;
// match recv_event {
// Ok(Some(mut ev)) => {
// recv_session_state.io_active_unix_secs.store(
// SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .unwrap()
// .as_secs() as u32,
// Ordering::SeqCst,
// );
// ev.remote = true;
// if FLAG_DATA != ev.header.flags() {
// info!(
// "[{}][{}][{}]remote recv event type:{}, len:{}",
// channel,
// tunnel_id,
// ev.header.stream_id,
// get_event_type_str(ev.header.flags()),
// ev.header.len(),
// );
// }
// let _ = handle_recv_event_tx.send(ev).await;
// }
// Ok(None) => {
// break;
// }
// Err(err) => {
// error!("Close remote recv since of error:{}", err);
// break;
// }
// }
}
handle_recv_session_state
.process_recv_state
Expand All @@ -836,8 +805,8 @@ where
handle_recv_session_state
.closed
.store(true, Ordering::SeqCst);
let shutdown_ev = new_shutdown_event(0, false);
let _ = handle_recv_event_tx.send(shutdown_ev).await;
//let shutdown_ev = new_shutdown_event(0, false);
//let _ = handle_recv_event_tx.send(shutdown_ev).await;
let _ = handle_recv_send_tx.send(Vec::new()).await;
handle_recv_session_state
.process_recv_state
Expand Down Expand Up @@ -937,8 +906,10 @@ where
.closed
.store(true, Ordering::SeqCst);
send_rx.close();
let _ = close_tx.send(());
//let _ = wi.shutdown().await;
let close_rc = close_tx.send(());
if close_rc.is_err() {
error!("[{}][{}]Close error:{:?}", channel, tunnel_id, close_rc);
}
let shutdown_ev = new_shutdown_event(0, false);
let _ = event_tx.send(shutdown_ev).await;
handle_send_session_state
Expand Down
25 changes: 19 additions & 6 deletions src/rmux/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ struct SharedIOState {
data_rx: Option<mpsc::Receiver<Vec<u8>>>,
}

impl SharedIOState {
fn try_close(&mut self) {
if let Some(tx) = &mut self.data_tx {
let empty = Vec::new();
let _ = tx.clone().try_send(empty);
}
}
}

impl MuxStreamState {
fn close(&self) {
self.closed.store(true, Ordering::SeqCst);
Expand Down Expand Up @@ -141,6 +150,7 @@ impl AsyncWrite for MuxStreamWriter {
io_state,
} = &mut *self;
if state.closed.load(Ordering::SeqCst) {
io_state.lock().unwrap().try_close();
return Poll::Ready(Err(make_io_error("closed")));
}
if state.send_buf_window.load(Ordering::SeqCst) < 0 {
Expand All @@ -166,11 +176,17 @@ impl AsyncWrite for MuxStreamWriter {
// }
match tx.poll_ready(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Err(make_io_error(e.description()))),
Poll::Ready(Err(e)) => {
io_state.lock().unwrap().try_close();
return Poll::Ready(Err(make_io_error(e.description())));
}
Poll::Ready(Ok(())) => {}
}
match tx.try_send(ev) {
Err(e) => Poll::Ready(Err(make_io_error(e.description()))),
Err(e) => {
io_state.lock().unwrap().try_close();
return Poll::Ready(Err(make_io_error(e.description())));
}
Ok(()) => {
state
.send_buf_window
Expand All @@ -190,10 +206,7 @@ impl AsyncWrite for MuxStreamWriter {
_cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.state.closed.store(true, Ordering::SeqCst);
if let Some(tx) = &self.io_state.lock().unwrap().data_tx {
let empty = Vec::new();
let _ = tx.clone().try_send(empty);
}
self.io_state.lock().unwrap().try_close();
Poll::Ready(Ok(()))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/tunnel/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub async fn relay_connection(
relay_buf: Vec<u8>,
) -> Result<(), Box<dyn Error>> {
let (mut ri, mut wi) = inbound.split();
relay_stream(tunnel_id, &mut ri, &mut wi, target, cfg, relay_buf).await?;
let _ = relay_stream(tunnel_id, &mut ri, &mut wi, target, cfg, relay_buf).await;
let _ = inbound.shutdown(Shutdown::Both);
Ok(())
}
Expand Down

0 comments on commit a859e1c

Please sign in to comment.