From 676d644c40a97b023616f2969976c33d9a969fbe Mon Sep 17 00:00:00 2001 From: Oleksii Shmalko Date: Wed, 21 Aug 2024 20:30:58 +0300 Subject: [PATCH] refactor(python): split into multiple files The one file was growing too big. Split it into multiple files for easier maintenance. --- eppo_core/src/context_attributes.rs | 7 +- eppo_core/src/pyo3.rs | 1 + python-sdk/src/assignment_logger.rs | 20 + python-sdk/src/client.rs | 863 +++++++++++----------------- python-sdk/src/config.rs | 72 +++ python-sdk/src/init.rs | 98 ++++ python-sdk/src/lib.rs | 16 + 7 files changed, 548 insertions(+), 529 deletions(-) create mode 100644 python-sdk/src/assignment_logger.rs create mode 100644 python-sdk/src/config.rs create mode 100644 python-sdk/src/init.rs diff --git a/eppo_core/src/context_attributes.rs b/eppo_core/src/context_attributes.rs index 6506499e..c2ea856d 100644 --- a/eppo_core/src/context_attributes.rs +++ b/eppo_core/src/context_attributes.rs @@ -42,14 +42,13 @@ where acc.numeric.insert(key.to_owned(), value); } AttributeValue::Boolean(value) => { - // TBD: shall we ignore boolean attributes instead? - // // One argument for including it here is that this basically guarantees that // assignment evaluation inside bandit evaluation works the same way as if // `get_assignment()` was called with generic `Attributes`. // - // We can go a step further and remove `AttributeValue::Boolean` altogether, - // forcing it to be converted to a string before any evaluation. + // We can go a step further and remove `AttributeValue::Boolean` altogether + // (from `eppo_core`), forcing it to be converted to a string before any + // evaluation. acc.categorical.insert(key.to_owned(), value.to_string()); } AttributeValue::Null => { diff --git a/eppo_core/src/pyo3.rs b/eppo_core/src/pyo3.rs index 7fb8bc86..7e07e586 100644 --- a/eppo_core/src/pyo3.rs +++ b/eppo_core/src/pyo3.rs @@ -1,3 +1,4 @@ +//! Helpers for Python SDK implementation. use pyo3::prelude::*; /// Similar to [`pyo3::ToPyObject`] but allows the conversion to fail. diff --git a/python-sdk/src/assignment_logger.rs b/python-sdk/src/assignment_logger.rs new file mode 100644 index 00000000..9c6ffda0 --- /dev/null +++ b/python-sdk/src/assignment_logger.rs @@ -0,0 +1,20 @@ +use pyo3::prelude::*; +use pyo3::types::PyDict; + +#[derive(Debug, Clone)] +#[pyclass(frozen, subclass, module = "eppo_client")] +pub struct AssignmentLogger {} + +#[pymethods] +impl AssignmentLogger { + #[new] + fn new() -> AssignmentLogger { + AssignmentLogger {} + } + + #[allow(unused_variables)] + fn log_assignment(slf: Bound, event: Bound) {} + + #[allow(unused_variables)] + fn log_bandit_action(slf: Bound, event: Bound) {} +} diff --git a/python-sdk/src/client.rs b/python-sdk/src/client.rs index 6a607fcd..0f2b4515 100644 --- a/python-sdk/src/client.rs +++ b/python-sdk/src/client.rs @@ -1,575 +1,388 @@ -use std::{ - sync::{Arc, Mutex, RwLock}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; -use pyo3::prelude::*; use pyo3::{ - exceptions::PyException, - types::{PyDict, PyString}, + exceptions::PyRuntimeError, + intern, + prelude::*, + types::{PyBool, PyFloat, PyInt, PyString}, + PyTraverseError, PyVisit, }; use eppo_core::{ - configuration_fetcher::{ConfigurationFetcher, DEFAULT_BASE_URL}, + configuration_fetcher::ConfigurationFetcher, configuration_store::ConfigurationStore, - eval::get_assignment, + eval::{eval_details::EvaluationResultWithDetails, get_assignment, get_assignment_details}, + events::AssignmentEvent, poller_thread::{PollerThread, PollerThreadConfig}, pyo3::TryToPyObject, ufc::VariationType, Attributes, }; -#[pymodule(module = "eppo_client", name = "_eppo_client")] -mod eppo_client { - static CLIENT_INSTANCE: RwLock>> = RwLock::new(None); - - use eppo_core::{eval::get_assignment_details, events::AssignmentEvent}; - use pyo3::{ - exceptions::{PyRuntimeError, PyValueError}, - intern, - types::{PyBool, PyFloat, PyInt}, - PyTraverseError, PyVisit, - }; - - use super::*; +use crate::{assignment_logger::AssignmentLogger, config::Config}; - #[pyclass(get_all, set_all)] - struct Config { - api_key: String, - base_url: String, - assignment_logger: Option>, - is_graceful_mode: bool, - poll_interval_seconds: u64, - poll_jitter_seconds: u64, +#[pyclass(frozen, get_all, module = "eppo_client")] +pub struct EvaluationResult { + variation: Py, + action: Option>, + /// Optional evaluation details. + evaluation_details: Py, +} +#[pymethods] +impl EvaluationResult { + fn __repr__<'py>(&self, py: Python<'py>) -> PyResult> { + use pyo3::types::PyList; + + let pieces = PyList::new_bound( + py, + [ + intern!(py, "EvaluationResultWithDetails(variation=").clone(), + self.variation.bind(py).repr()?, + intern!(py, ", action=").clone(), + self.action.to_object(py).into_bound(py).repr()?, + intern!(py, ", evaluation_details=").clone(), + self.evaluation_details.bind(py).repr()?, + intern!(py, ")").clone(), + ], + ); + intern!(py, "").call_method1(intern!(py, "join"), (pieces,)) } +} +impl EvaluationResult { + fn from_details( + py: Python, + result: EvaluationResultWithDetails, + default: Py, + ) -> PyResult { + let EvaluationResultWithDetails { + variation, + action, + evaluation_details, + } = result; + + let variation = if let Some(variation) = variation { + variation.try_to_pyobject(py)? + } else { + default + }; - #[pymethods] - impl Config { - #[new] - #[pyo3(signature = ( - api_key, - *, - base_url=DEFAULT_BASE_URL.to_owned(), - assignment_logger, - is_graceful_mode=true, - poll_interval_seconds=PollerThreadConfig::DEFAULT_POLL_INTERVAL.as_secs(), - poll_jitter_seconds=PollerThreadConfig::DEFAULT_POLL_JITTER.as_secs(), - ))] - fn new( - api_key: String, - base_url: String, - assignment_logger: Py, - is_graceful_mode: bool, - poll_interval_seconds: u64, - poll_jitter_seconds: u64, - ) -> PyResult { - if api_key.is_empty() { - return Err(PyValueError::new_err( - "Invalid value for api_key: cannot be blank", - )); - } - - Ok(Config { - api_key, - base_url, - assignment_logger: Some(assignment_logger), - is_graceful_mode, - poll_interval_seconds, - poll_jitter_seconds, - }) - } - - // Overriding the default setter to make `assignment_logger` non-optional. - #[setter] - fn set_assignment_logger(&mut self, assignment_logger: Py) { - self.assignment_logger = Some(assignment_logger); - } - - // Implementing [Garbage Collector integration][1] in case user's `AssignmentLogger` holds a - // reference to `Config`. This will allow the GC to detect this cycle and break it. - // - // [1]: https://pyo3.rs/v0.22.2/class/protocols.html#garbage-collector-integration - fn __traverse__(&self, visit: PyVisit) -> Result<(), PyTraverseError> { - if let Some(assignment_logger) = &self.assignment_logger { - visit.call(assignment_logger)?; - } - Ok(()) - } - fn __clear__(&mut self) { - self.assignment_logger = None; - } + Ok(EvaluationResult { + variation, + action: action.map(|it| PyString::new_bound(py, &it).unbind()), + evaluation_details: evaluation_details.try_to_pyobject(py)?, + }) } +} - #[derive(Debug, Clone)] - #[pyclass(frozen, subclass)] - struct AssignmentLogger {} - #[pymethods] - impl AssignmentLogger { - #[new] - fn new() -> AssignmentLogger { - AssignmentLogger {} - } - - #[allow(unused_variables)] - fn log_assignment(slf: Bound, event: Bound) {} +#[pyclass(frozen, module = "eppo_client")] +pub struct EppoClient { + configuration_store: Arc, + poller_thread: PollerThread, + assignment_logger: Py, +} - #[allow(unused_variables)] - fn log_bandit_action(slf: Bound, event: Bound) {} +#[pymethods] +impl EppoClient { + fn get_string_assignment( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: Py, + ) -> PyResult { + slf.get().get_assignment( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::String), + default.into_any(), + ) } - - #[pyclass(frozen, get_all)] - struct EvaluationResultWithDetails { - variation: Py, - action: Option>, - evaluation_details: Py, + fn get_integer_assignment( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: Py, + ) -> PyResult { + slf.get().get_assignment( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::Integer), + default.into_any(), + ) } - #[pymethods] - impl EvaluationResultWithDetails { - fn __repr__<'py>(&self, py: Python<'py>) -> PyResult> { - use pyo3::types::PyList; - - let pieces = PyList::new_bound( - py, - [ - intern!(py, "EvaluationResultWithDetails(variation=").clone(), - self.variation.bind(py).repr()?, - intern!(py, ", action=").clone(), - self.action.to_object(py).into_bound(py).repr()?, - intern!(py, ", evaluation_details=").clone(), - self.evaluation_details.bind(py).repr()?, - intern!(py, ")").clone(), - ], - ); - intern!(py, "").call_method1(intern!(py, "join"), (pieces,)) - } + fn get_numeric_assignment( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: Py, + ) -> PyResult { + slf.get().get_assignment( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::Numeric), + default.into_any(), + ) } - impl EvaluationResultWithDetails { - fn from_core( - py: Python, - result: eppo_core::eval::eval_details::EvaluationResultWithDetails, - default: Py, - ) -> PyResult { - let eppo_core::eval::eval_details::EvaluationResultWithDetails { - variation, - action, - evaluation_details, - } = result; - - let variation = if let Some(variation) = variation { - variation.try_to_pyobject(py)? - } else { - default - }; - - Ok(EvaluationResultWithDetails { - variation, - action: action.map(|it| PyString::new_bound(py, &it).unbind()), - evaluation_details: evaluation_details.try_to_pyobject(py)?, - }) - } + fn get_boolean_assignment( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: Py, + ) -> PyResult { + slf.get().get_assignment( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::Boolean), + default.into_any(), + ) } - - #[pyclass(frozen)] - struct EppoClient { - configuration_store: Arc, - poller_thread: PollerThread, - assignment_logger: Py, + fn get_json_assignment( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: PyObject, + ) -> PyResult { + slf.get().get_assignment( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::Json), + default.into_any(), + ) } - #[pymethods] - impl EppoClient { - fn get_string_assignment( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: Py, - ) -> PyResult { - slf.get().get_assignment( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::String), - default.into_any(), - ) - } - fn get_integer_assignment( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: Py, - ) -> PyResult { - slf.get().get_assignment( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::Integer), - default.into_any(), - ) - } - fn get_numeric_assignment( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: Py, - ) -> PyResult { - slf.get().get_assignment( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::Numeric), - default.into_any(), - ) - } - fn get_boolean_assignment( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: Py, - ) -> PyResult { - slf.get().get_assignment( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::Boolean), - default.into_any(), - ) - } - fn get_json_assignment( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: PyObject, - ) -> PyResult { - slf.get().get_assignment( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::Json), - default.into_any(), - ) - } - - fn get_string_assignment_details( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: Py, - ) -> PyResult { - slf.get().get_assignment_details( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::String), - default.into_any(), - ) - } - fn get_integer_assignment_details( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: Py, - ) -> PyResult { - slf.get().get_assignment_details( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::Integer), - default.into_any(), - ) - } - fn get_numeric_assignment_details( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: Py, - ) -> PyResult { - slf.get().get_assignment_details( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::Numeric), - default.into_any(), - ) - } - fn get_boolean_assignment_details( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: Py, - ) -> PyResult { - slf.get().get_assignment_details( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::Boolean), - default.into_any(), - ) - } - fn get_json_assignment_details( - slf: &Bound, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - default: Py, - ) -> PyResult { - slf.get().get_assignment_details( - slf.py(), - flag_key, - subject_key, - subject_attributes, - Some(VariationType::Json), - default.into_any(), - ) - } + fn get_string_assignment_details( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: Py, + ) -> PyResult { + slf.get().get_assignment_details( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::String), + default.into_any(), + ) + } + fn get_integer_assignment_details( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: Py, + ) -> PyResult { + slf.get().get_assignment_details( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::Integer), + default.into_any(), + ) + } + fn get_numeric_assignment_details( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: Py, + ) -> PyResult { + slf.get().get_assignment_details( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::Numeric), + default.into_any(), + ) + } + fn get_boolean_assignment_details( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: Py, + ) -> PyResult { + slf.get().get_assignment_details( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::Boolean), + default.into_any(), + ) + } + fn get_json_assignment_details( + slf: &Bound, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + default: Py, + ) -> PyResult { + slf.get().get_assignment_details( + slf.py(), + flag_key, + subject_key, + subject_attributes, + Some(VariationType::Json), + default.into_any(), + ) + } - // Implementing [Garbage Collector integration][1] in case user's `AssignmentLogger` holds a - // reference to `EppoClient`. This will allow the GC to detect this cycle and break it. - // - // [1]: https://pyo3.rs/v0.22.2/class/protocols.html#garbage-collector-integration - fn __traverse__(&self, visit: PyVisit) -> Result<(), PyTraverseError> { - visit.call(&self.assignment_logger) - } - fn __clear__(&self) { - // We're frozen and don't hold mutable Python references, so there's nothing to clear. - } + // Implementing [Garbage Collector integration][1] in case user's `AssignmentLogger` holds a + // reference to `EppoClient`. This will allow the GC to detect this cycle and break it. + // + // [1]: https://pyo3.rs/v0.22.2/class/protocols.html#garbage-collector-integration + fn __traverse__(&self, visit: PyVisit) -> Result<(), PyTraverseError> { + visit.call(&self.assignment_logger) + } + fn __clear__(&self) { + // We're frozen and don't hold mutable Python references, so there's nothing to clear. } +} - // Rust-only methods - impl EppoClient { - fn new(py: Python, config: &Config) -> PyResult { - let configuration_store = Arc::new(ConfigurationStore::new()); - let poller_thread = PollerThread::start_with_config( - ConfigurationFetcher::new( - eppo_core::configuration_fetcher::ConfigurationFetcherConfig { - base_url: config.base_url.clone(), - api_key: config.api_key.clone(), - sdk_name: "python".to_owned(), - sdk_version: env!("CARGO_PKG_VERSION").to_owned(), - }, - ), - configuration_store.clone(), - PollerThreadConfig { - interval: Duration::from_secs(config.poll_interval_seconds), - jitter: Duration::from_secs(config.poll_jitter_seconds), +// Rust-only methods +impl EppoClient { + pub fn new(py: Python, config: &Config) -> PyResult { + let configuration_store = Arc::new(ConfigurationStore::new()); + let poller_thread = PollerThread::start_with_config( + ConfigurationFetcher::new( + eppo_core::configuration_fetcher::ConfigurationFetcherConfig { + base_url: config.base_url.clone(), + api_key: config.api_key.clone(), + sdk_name: "python".to_owned(), + sdk_version: env!("CARGO_PKG_VERSION").to_owned(), }, - ) - .map_err(|err| { - // This should normally never happen. - PyRuntimeError::new_err(format!("unable to start poller thread: {err}")) - })?; - Ok(EppoClient { - configuration_store, - poller_thread, - assignment_logger: config - .assignment_logger - .as_ref() - .ok_or_else(|| { - // This should never happen as assigment_logger setter requires a valid - // logger. - PyRuntimeError::new_err(format!("Config.assignment_logger is None")) - })? - .clone_ref(py), - }) - } - - fn get_assignment( - &self, - py: Python, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - expected_type: Option, - default: Py, - ) -> PyResult { - let config = self.configuration_store.get_configuration(); - - let result = get_assignment( - config.as_ref().map(AsRef::as_ref), - &flag_key, - &subject_key, - &subject_attributes, - expected_type, - ); - - let assignment = match result { - Ok(assignment) => assignment, - Err(err) => { - let graceful_mode = true; - if graceful_mode { - None - } else { - return Err(PyErr::new::(err.to_string())); - } - } - }; + ), + configuration_store.clone(), + PollerThreadConfig { + interval: Duration::from_secs(config.poll_interval_seconds), + jitter: Duration::from_secs(config.poll_jitter_seconds), + }, + ) + .map_err(|err| { + // This should normally never happen. + PyRuntimeError::new_err(format!("unable to start poller thread: {err}")) + })?; + Ok(EppoClient { + configuration_store, + poller_thread, + assignment_logger: config + .assignment_logger + .as_ref() + .ok_or_else(|| { + // This should never happen as assigment_logger setter requires a valid logger. + PyRuntimeError::new_err(format!("Config.assignment_logger is None")) + })? + .clone_ref(py), + }) + } - if let Some(assignment) = assignment { - if let Some(event) = assignment.event { - if let Err(err) = self.log_assignment_event(py, event) { - log::warn!(target: "eppo", "error logging assignment event: {err}") - } + fn get_assignment( + &self, + py: Python, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + expected_type: Option, + default: Py, + ) -> PyResult { + let config = self.configuration_store.get_configuration(); + + let result = get_assignment( + config.as_ref().map(AsRef::as_ref), + &flag_key, + &subject_key, + &subject_attributes, + expected_type, + ); + + let assignment = match result { + Ok(assignment) => assignment, + Err(err) => { + let graceful_mode = true; + if graceful_mode { + None + } else { + return Err(PyErr::new::(err.to_string())); } - - Ok(assignment.value.try_to_pyobject(py)?) - } else { - Ok(default) } - } - - fn get_assignment_details( - &self, - py: Python, - flag_key: &str, - subject_key: &str, - subject_attributes: Attributes, - expected_type: Option, - default: Py, - ) -> PyResult { - let config = self.configuration_store.get_configuration(); - - let (result, event) = get_assignment_details( - config.as_ref().map(AsRef::as_ref), - &flag_key, - &subject_key, - &subject_attributes, - expected_type, - ); + }; - if let Some(event) = event { + if let Some(assignment) = assignment { + if let Some(event) = assignment.event { if let Err(err) = self.log_assignment_event(py, event) { log::warn!(target: "eppo", "error logging assignment event: {err}") } } - EvaluationResultWithDetails::from_core(py, result, default) - } - - /// Try to log assignment event using `self.assignment_logger`. - fn log_assignment_event(&self, py: Python, event: AssignmentEvent) -> PyResult<()> { - let event = event.try_to_pyobject(py)?; - self.assignment_logger - .call_method1(py, intern!(py, "log_assignment"), (event,))?; - Ok(()) - } - - fn shutdown(&self) { - // Using `.stop()` instead of `.shutdown()` here because we don't need to wait for the - // poller thread to exit. - self.poller_thread.stop(); + Ok(assignment.value.try_to_pyobject(py)?) + } else { + Ok(default) } } - impl Drop for EppoClient { - fn drop(&mut self) { - self.shutdown(); + fn get_assignment_details( + &self, + py: Python, + flag_key: &str, + subject_key: &str, + subject_attributes: Attributes, + expected_type: Option, + default: Py, + ) -> PyResult { + let config = self.configuration_store.get_configuration(); + + let (result, event) = get_assignment_details( + config.as_ref().map(AsRef::as_ref), + &flag_key, + &subject_key, + &subject_attributes, + expected_type, + ); + + if let Some(event) = event { + if let Err(err) = self.log_assignment_event(py, event) { + log::warn!(target: "eppo", "error logging assignment event: {err}") + } } - } - /// Initializes a global Eppo client instance. - /// - /// This method should be called once on application startup. - /// If invoked more than once, it will re-initialize the global client instance. - /// Use the :func:`EppoClient.get_instance()` method to access the client instance. - /// - /// :param config: client configuration containing the API Key - /// :type config: Config - #[pyfunction] - fn init(config: Bound) -> PyResult> { - initialize_pyo3_log(); - - let py = config.py(); - - let client = Bound::new(py, EppoClient::new(py, &*config.borrow())?)?.unbind(); - - // minimizing the scope of holding the write lock - let existing = { - let client = Py::clone_ref(&client, py); - - let mut instance = CLIENT_INSTANCE.write().map_err(|err| { - // This should normally never happen as it signifies that another thread - // panicked while holding the lock. - PyException::new_err(format!("failed to acquire writer lock: {err}")) - })?; - std::mem::replace(&mut *instance, Some(client)) - }; - if let Some(existing) = existing { - existing.get().shutdown(); - existing.drop_ref(py); - } + EvaluationResult::from_details(py, result, default) + } - Ok(client) + /// Try to log assignment event using `self.assignment_logger`. + pub fn log_assignment_event(&self, py: Python, event: AssignmentEvent) -> PyResult<()> { + let event = event.try_to_pyobject(py)?; + self.assignment_logger + .call_method1(py, intern!(py, "log_assignment"), (event,))?; + Ok(()) } - /// Used to access an initialized client instance. - /// - /// Use this method to get a client instance for assigning variants. - /// This method may only be called after invocation of :func:`eppo_client.init()`, otherwise it - /// throws an exception. - /// - /// :return: a shared client instance - /// :rtype: EppoClient - #[pyfunction] - fn get_instance(py: Python) -> PyResult> { - let instance = CLIENT_INSTANCE.read().map_err(|err| { - // This should normally never happen as it signifies that another thread - // panicked while holding the lock. - PyException::new_err(format!("failed to acquire reader lock: {err}")) - })?; - if let Some(existing) = &*instance { - Ok(Py::clone_ref(existing, py)) - } else { - Err(PyException::new_err( - "init() must be called before get_instance()", - )) - } + pub fn shutdown(&self) { + // Using `.stop()` instead of `.shutdown()` here because we don't need to wait for the + // poller thread to exit. + self.poller_thread.stop(); } } -/// Initialize `pyo3_log` crate connecting Rust's `log` to Python's `logger`. -/// -/// If called multiple times, resets the pyo3_log cache. -fn initialize_pyo3_log() { - static LOG_RESET_HANDLE: Mutex> = Mutex::new(None); - { - if let Ok(mut reset_handle) = LOG_RESET_HANDLE.lock() { - if let Some(previous_handle) = &mut *reset_handle { - // There's a previous handle. Logging is already initialized, but we reset - // caches. - previous_handle.reset(); - } else { - if let Ok(new_handle) = pyo3_log::try_init() { - *reset_handle = Some(new_handle); - } else { - // This should not happen as initialization error signals that we already - // initialized logging. (In which case, `LOG_RESET_HANDLE` should contain - // `Some()`.) - debug_assert!(false, "tried to initialize pyo3_log second time"); - } - } - } else { - // This should normally never happen as it shows that another thread has panicked - // while holding `LOG_RESET_HANDLE`. - // - // That's probably not sever enough to throw an exception into user's code. - debug_assert!(false, "failed to acquire LOG_RESET_HANDLE lock"); - } +impl Drop for EppoClient { + fn drop(&mut self) { + self.shutdown(); } } diff --git a/python-sdk/src/config.rs b/python-sdk/src/config.rs new file mode 100644 index 00000000..17ec8f83 --- /dev/null +++ b/python-sdk/src/config.rs @@ -0,0 +1,72 @@ +use pyo3::{exceptions::PyValueError, prelude::*, PyTraverseError, PyVisit}; + +use eppo_core::{configuration_fetcher::DEFAULT_BASE_URL, poller_thread::PollerThreadConfig}; + +use crate::assignment_logger::AssignmentLogger; + +#[pyclass(module = "eppo_client", get_all, set_all)] +pub struct Config { + pub(crate) api_key: String, + pub(crate) base_url: String, + pub(crate) assignment_logger: Option>, + pub(crate) is_graceful_mode: bool, + pub(crate) poll_interval_seconds: u64, + pub(crate) poll_jitter_seconds: u64, +} + +#[pymethods] +impl Config { + #[new] + #[pyo3(signature = ( + api_key, + *, + base_url=DEFAULT_BASE_URL.to_owned(), + assignment_logger, + is_graceful_mode=true, + poll_interval_seconds=PollerThreadConfig::DEFAULT_POLL_INTERVAL.as_secs(), + poll_jitter_seconds=PollerThreadConfig::DEFAULT_POLL_JITTER.as_secs(), + ))] + fn new( + api_key: String, + base_url: String, + assignment_logger: Py, + is_graceful_mode: bool, + poll_interval_seconds: u64, + poll_jitter_seconds: u64, + ) -> PyResult { + if api_key.is_empty() { + return Err(PyValueError::new_err( + "Invalid value for api_key: cannot be blank", + )); + } + + Ok(Config { + api_key, + base_url, + assignment_logger: Some(assignment_logger), + is_graceful_mode, + poll_interval_seconds, + poll_jitter_seconds, + }) + } + + // Overriding the default setter to make `assignment_logger` non-optional. + #[setter] + fn set_assignment_logger(&mut self, assignment_logger: Py) { + self.assignment_logger = Some(assignment_logger); + } + + // Implementing [Garbage Collector integration][1] in case user's `AssignmentLogger` holds a + // reference to `Config`. This will allow the GC to detect this cycle and break it. + // + // [1]: https://pyo3.rs/v0.22.2/class/protocols.html#garbage-collector-integration + fn __traverse__(&self, visit: PyVisit) -> Result<(), PyTraverseError> { + if let Some(assignment_logger) = &self.assignment_logger { + visit.call(assignment_logger)?; + } + Ok(()) + } + fn __clear__(&mut self) { + self.assignment_logger = None; + } +} diff --git a/python-sdk/src/init.rs b/python-sdk/src/init.rs new file mode 100644 index 00000000..b74d6535 --- /dev/null +++ b/python-sdk/src/init.rs @@ -0,0 +1,98 @@ +use std::sync::{Mutex, RwLock}; + +use pyo3::{exceptions::PyException, prelude::*}; + +use crate::{client::EppoClient, config::Config}; + +// TODO: use `pyo3::sync::GILProtected` instead? +static CLIENT_INSTANCE: RwLock>> = RwLock::new(None); + +/// Initializes a global Eppo client instance. +/// +/// This method should be called once on application startup. +/// If invoked more than once, it will re-initialize the global client instance. +/// Use the :func:`EppoClient.get_instance()` method to access the client instance. +/// +/// :param config: client configuration containing the API Key +/// :type config: Config +#[pyfunction] +pub fn init(config: Bound) -> PyResult> { + initialize_pyo3_log(); + + let py = config.py(); + + let client = Bound::new(py, EppoClient::new(py, &*config.borrow())?)?.unbind(); + + // minimizing the scope of holding the write lock + let existing = { + let client = Py::clone_ref(&client, py); + + let mut instance = CLIENT_INSTANCE.write().map_err(|err| { + // This should normally never happen as it signifies that another thread + // panicked while holding the lock. + PyException::new_err(format!("failed to acquire writer lock: {err}")) + })?; + std::mem::replace(&mut *instance, Some(client)) + }; + if let Some(existing) = existing { + existing.get().shutdown(); + existing.drop_ref(py); + } + + Ok(client) +} + +/// Used to access an initialized client instance. +/// +/// Use this method to get a client instance for assigning variants. +/// This method may only be called after invocation of :func:`eppo_client.init()`, otherwise it +/// throws an exception. +/// +/// :return: a shared client instance +/// :rtype: EppoClient +#[pyfunction] +pub fn get_instance(py: Python) -> PyResult> { + let instance = CLIENT_INSTANCE.read().map_err(|err| { + // This should normally never happen as it signifies that another thread + // panicked while holding the lock. + PyException::new_err(format!("failed to acquire reader lock: {err}")) + })?; + if let Some(existing) = &*instance { + Ok(Py::clone_ref(existing, py)) + } else { + Err(PyException::new_err( + "init() must be called before get_instance()", + )) + } +} + +/// Initialize `pyo3_log` crate connecting Rust's `log` to Python's `logger`. +/// +/// If called multiple times, resets the pyo3_log cache. +fn initialize_pyo3_log() { + static LOG_RESET_HANDLE: Mutex> = Mutex::new(None); + { + if let Ok(mut reset_handle) = LOG_RESET_HANDLE.lock() { + if let Some(previous_handle) = &mut *reset_handle { + // There's a previous handle. Logging is already initialized, but we reset + // caches. + previous_handle.reset(); + } else { + if let Ok(new_handle) = pyo3_log::try_init() { + *reset_handle = Some(new_handle); + } else { + // This should not happen as initialization error signals that we already + // initialized logging. (In which case, `LOG_RESET_HANDLE` should contain + // `Some()`.) + debug_assert!(false, "tried to initialize pyo3_log second time"); + } + } + } else { + // This should normally never happen as it shows that another thread has panicked + // while holding `LOG_RESET_HANDLE`. + // + // That's probably not sever enough to throw an exception into user's code. + debug_assert!(false, "failed to acquire LOG_RESET_HANDLE lock"); + } + } +} diff --git a/python-sdk/src/lib.rs b/python-sdk/src/lib.rs index b79c47fc..b65fd9df 100644 --- a/python-sdk/src/lib.rs +++ b/python-sdk/src/lib.rs @@ -1 +1,17 @@ +use pyo3::prelude::*; + +mod assignment_logger; mod client; +mod config; +mod init; + +#[pymodule(module = "eppo_client", name = "_eppo_client")] +mod eppo_client { + #[pymodule_export] + use crate::{ + assignment_logger::AssignmentLogger, + client::{EppoClient, EvaluationResult}, + config::Config, + init::{get_instance, init}, + }; +}