Skip to content

Commit

Permalink
Merge pull request #568 from dora-rs/ignore-quicker-pending-drop-token
Browse files Browse the repository at this point in the history
Ignore-quicker-pending-drop-token
  • Loading branch information
haixuanTao authored Jul 3, 2024
2 parents 320ef26 + 1f06034 commit be1618a
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 4 deletions.
4 changes: 2 additions & 2 deletions apis/rust/node/src/event_stream/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,14 @@ fn report_remaining_drop_tokens(

let mut still_pending = Vec::new();
for (token, rx, since, _) in pending_drop_tokens.drain(..) {
match rx.recv_timeout(Duration::from_millis(100)) {
match rx.recv_timeout(Duration::from_millis(50)) {
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::RecvTimeoutError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
}
Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_secs(30);
let duration = Duration::from_millis(200);
if since.elapsed() > duration {
tracing::warn!(
"timeout: node finished, but token {token:?} was still not \
Expand Down
2 changes: 1 addition & 1 deletion apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl Drop for DoraNode {
);
}

match self.drop_stream.recv_timeout(Duration::from_secs(10)) {
match self.drop_stream.recv_timeout(Duration::from_millis(500)) {
Ok(token) => {
self.sent_out_shared_memory.remove(&token);
}
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,7 +1567,7 @@ impl RunningDataflow {
let running_nodes = self.running_nodes.clone();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(500));
let duration = grace_duration.unwrap_or(Duration::from_millis(2000));
tokio::time::sleep(duration).await;
let mut system = sysinfo::System::new();
system.refresh_processes();
Expand Down

0 comments on commit be1618a

Please sign in to comment.