Skip to content

Commit

Permalink
bug: wrap poller thread in std::panic::catch_unwind (FF-3468) (#115)
Browse files Browse the repository at this point in the history
* bug: wrap poller thread in `std::panic::catch_unwind` (FF-3468)

* Return from the thread when the sender disconnects.

* grammar in comments

* python 4.1.3
  • Loading branch information
leoromanovsky authored Dec 10, 2024
1 parent 9c74cfa commit a6b1778
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 43 deletions.
92 changes: 51 additions & 41 deletions eppo_core/src/poller_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ impl PollerThread {
) -> std::io::Result<PollerThread> {
// Using `sync_channel` here as it makes `stop_sender` `Sync` (shareable between
// threads). Buffer size of 1 should be enough for our use case as we're sending a stop
// command and we can simply `try_send()` and ignore if the buffer is full (another thread
// has send a stop command already).
// command, and we can simply `try_send()` and ignore if the buffer is full (another thread
// has sent a stop command already).
let (stop_sender, stop_receiver) = std::sync::mpsc::sync_channel::<()>(1);

let result = Arc::new((Mutex::new(None), Condvar::new()));
Expand All @@ -131,51 +131,61 @@ impl PollerThread {
std::thread::Builder::new()
.name("eppo-poller".to_owned())
.spawn(move || {
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(runtime) => runtime,
Err(err) => {
update_result(Err(Error::from(err)));
return;
}
};
loop {
log::debug!(target: "eppo", "fetching new configuration");
let result = runtime.block_on(fetcher.fetch_configuration());
match result {
Ok(configuration) => {
store.set_configuration(Arc::new(configuration));
update_result(Ok(()))
}
Err(err @ (Error::Unauthorized | Error::InvalidBaseUrl(_))) => {
// Unrecoverable errors
update_result(Err(err));
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(runtime) => runtime,
Err(err) => {
update_result(Err(Error::from(err)));
return;
}
_ => {
// Other errors are retriable.
}
};

let timeout = jitter(config.interval, config.jitter);
match stop_receiver.recv_timeout(timeout) {
Err(RecvTimeoutError::Timeout) => {
// Timed out. Loop to fetch new configuration.
}
Ok(()) => {
log::debug!(target: "eppo", "poller thread received stop command");
// The other end asked us to stop the poller thread.
return;
}
Err(RecvTimeoutError::Disconnected) => {
// When the other end of channel disconnects, calls to
// .recv_timeout() return immediately. Use normal thread sleep in
// this case.
std::thread::sleep(timeout);
loop {
log::debug!(target: "eppo", "fetching new configuration");
let result = runtime.block_on(fetcher.fetch_configuration());
match result {
Ok(configuration) => {
store.set_configuration(Arc::new(configuration));
update_result(Ok(()))
}
Err(err @ (Error::Unauthorized | Error::InvalidBaseUrl(_))) => {
// Unrecoverable errors
update_result(Err(err));
return;
}
_ => {
// Other errors are retrievable.
}
};

let timeout = jitter(config.interval, config.jitter);
match stop_receiver.recv_timeout(timeout) {
Err(RecvTimeoutError::Timeout) => {
// Timed out. Loop back to fetch a new configuration.
}
Ok(()) => {
log::debug!(target: "eppo", "poller thread received stop command");
// Stop command received, break out of the loop to end the thread.
return;
}
Err(RecvTimeoutError::Disconnected) => {
// When the other end of channel disconnects, calls to
// .recv_timeout() return immediately.
// Stop the thread.
log::debug!(target: "eppo", "poller thread received disconnected");
return;
}
}
}
}));

// If catch_unwind returns Err, it means a panic occurred.
if let Err(_panic_info) = result {
// Handle the panic gracefully by updating the result with an error.
update_result(Err(Error::PollerThreadPanicked));
}
})?
};
Expand Down
2 changes: 1 addition & 1 deletion python-sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "eppo_py"
version = "4.1.2"
version = "4.1.3"
edition = "2021"
publish = false

Expand Down

0 comments on commit a6b1778

Please sign in to comment.