From 88cf2117d249a8d1efeb98a18e8e0c64881b0b8c Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Tue, 5 Nov 2024 09:58:02 -0800 Subject: [PATCH] convert from reqwest blocking to non-blocking to conform to wasm target --- .vscode/settings.json | 3 +++ eppo_core/Cargo.toml | 3 ++- eppo_core/src/configuration_fetcher.rs | 26 ++++++++++++++------------ eppo_core/src/poller_thread.rs | 6 +++++- sdk-test-data | 2 +- 5 files changed, 25 insertions(+), 15 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..7b016a89 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.compile.nullAnalysis.mode": "automatic" +} \ No newline at end of file diff --git a/eppo_core/Cargo.toml b/eppo_core/Cargo.toml index 50db47ef..9275d6d4 100644 --- a/eppo_core/Cargo.toml +++ b/eppo_core/Cargo.toml @@ -26,11 +26,12 @@ log = { version = "0.4.21", features = ["kv", "kv_serde"] } md5 = "0.7.0" rand = "0.8.5" regex = "1.10.4" -reqwest = { version = "0.12.4", features = ["blocking", "json"] } +reqwest = { version = "0.12.4", features = ["json"] } semver = { version = "1.0.22", features = ["serde"] } serde = { version = "1.0.198", features = ["derive", "rc"] } serde_json = "1.0.116" thiserror = "1.0.60" +tokio = { version = "1.34.0", features = ["rt", "time"] } url = "2.5.0" # pyo3 dependencies diff --git a/eppo_core/src/configuration_fetcher.rs b/eppo_core/src/configuration_fetcher.rs index 4e446c09..9f91bf23 100644 --- a/eppo_core/src/configuration_fetcher.rs +++ b/eppo_core/src/configuration_fetcher.rs @@ -20,7 +20,7 @@ const BANDIT_ENDPOINT: &'static str = "/flag-config/v1/bandits"; /// A client that fetches Eppo configuration from the server. pub struct ConfigurationFetcher { // Client holds a connection pool internally, so we're reusing the client between requests. - client: reqwest::blocking::Client, + client: reqwest::Client, config: ConfigurationFetcherConfig, /// If we receive a 401 Unauthorized error during a request, it means the API key is not /// valid. We cache this error so we don't issue additional requests to the server. @@ -29,7 +29,7 @@ pub struct ConfigurationFetcher { impl ConfigurationFetcher { pub fn new(config: ConfigurationFetcherConfig) -> ConfigurationFetcher { - let client = reqwest::blocking::Client::new(); + let client = reqwest::Client::new(); ConfigurationFetcher { client, @@ -38,24 +38,24 @@ impl ConfigurationFetcher { } } - pub fn fetch_configuration(&mut self) -> Result { + pub async fn fetch_configuration(&mut self) -> Result { if self.unauthorized { return Err(Error::Unauthorized); } - let ufc = self.fetch_ufc_configuration()?; + let ufc = self.fetch_ufc_configuration().await?; let bandits = if ufc.compiled.flag_to_bandit_associations.is_empty() { // We don't need bandits configuration if there are no bandits. None } else { - Some(self.fetch_bandits_configuration()?) + Some(self.fetch_bandits_configuration().await?) }; Ok(Configuration::from_server_response(ufc, bandits)) } - fn fetch_ufc_configuration(&mut self) -> Result { + async fn fetch_ufc_configuration(&mut self) -> Result { let url = Url::parse_with_params( &format!("{}{}", self.config.base_url, UFC_ENDPOINT), &[ @@ -68,7 +68,7 @@ impl ConfigurationFetcher { .map_err(|err| Error::InvalidBaseUrl(err))?; log::debug!(target: "eppo", "fetching UFC flags configuration"); - let response = self.client.get(url).send()?; + let response = self.client.get(url).send().await?; let response = response.error_for_status().map_err(|err| { if err.status() == Some(StatusCode::UNAUTHORIZED) { @@ -82,15 +82,17 @@ impl ConfigurationFetcher { } })?; - let configuration = - UniversalFlagConfig::from_json(self.config.sdk_metadata, response.bytes()?.into())?; + let configuration = UniversalFlagConfig::from_json( + self.config.sdk_metadata, + response.bytes().await?.into(), + )?; log::debug!(target: "eppo", "successfully fetched UFC flags configuration"); Ok(configuration) } - fn fetch_bandits_configuration(&mut self) -> Result { + async fn fetch_bandits_configuration(&mut self) -> Result { let url = Url::parse_with_params( &format!("{}{}", self.config.base_url, BANDIT_ENDPOINT), &[ @@ -103,7 +105,7 @@ impl ConfigurationFetcher { .map_err(|err| Error::InvalidBaseUrl(err))?; log::debug!(target: "eppo", "fetching UFC bandits configuration"); - let response = self.client.get(url).send()?; + let response = self.client.get(url).send().await?; let response = response.error_for_status().map_err(|err| { if err.status() == Some(StatusCode::UNAUTHORIZED) { @@ -117,7 +119,7 @@ impl ConfigurationFetcher { } })?; - let configuration = response.json()?; + let configuration = response.json().await?; log::debug!(target: "eppo", "successfully fetched UFC bandits configuration"); diff --git a/eppo_core/src/poller_thread.rs b/eppo_core/src/poller_thread.rs index 3eef4ed9..2db0e24c 100644 --- a/eppo_core/src/poller_thread.rs +++ b/eppo_core/src/poller_thread.rs @@ -131,9 +131,13 @@ impl PollerThread { std::thread::Builder::new() .name("eppo-poller".to_owned()) .spawn(move || { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); loop { log::debug!(target: "eppo", "fetching new configuration"); - let result = fetcher.fetch_configuration(); + let result = runtime.block_on(fetcher.fetch_configuration()); match result { Ok(configuration) => { store.set_configuration(Arc::new(configuration)); diff --git a/sdk-test-data b/sdk-test-data index 11dace62..7db46318 160000 --- a/sdk-test-data +++ b/sdk-test-data @@ -1 +1 @@ -Subproject commit 11dace62a7ce97792bd54e48aaf354f62a034d63 +Subproject commit 7db46318cf74a3286a06afc448358cd379ae0cb9