From 4169b2904de1c969d36c73c35f812ee114d5fc02 Mon Sep 17 00:00:00 2001 From: Oleksii Shmalko Date: Wed, 3 Jul 2024 19:49:02 +0300 Subject: [PATCH] feat: bandits --- eppo_core/src/bandits/eval.rs | 421 +++++++++++++++++++++++++ eppo_core/src/bandits/event.rs | 22 ++ eppo_core/src/bandits/mod.rs | 7 + eppo_core/src/bandits/models.rs | 60 ++++ eppo_core/src/configuration.rs | 65 +++- eppo_core/src/configuration_fetcher.rs | 51 ++- eppo_core/src/configuration_store.rs | 19 +- eppo_core/src/context_attributes.rs | 74 +++++ eppo_core/src/lib.rs | 3 + eppo_core/src/ufc/assignment.rs | 10 + eppo_core/src/ufc/eval.rs | 62 +++- eppo_core/src/ufc/mod.rs | 2 +- eppo_core/src/ufc/models.rs | 17 + ruby-sdk/ext/eppo_rb/Cargo.toml | 2 +- ruby-sdk/ext/eppo_rb/src/client.rs | 106 +++---- ruby-sdk/ext/eppo_rb/src/lib.rs | 1 + ruby-sdk/lib/eppo_client/client.rb | 63 ++-- rust-sdk/src/client.rs | 49 +-- 18 files changed, 884 insertions(+), 150 deletions(-) create mode 100644 eppo_core/src/bandits/eval.rs create mode 100644 eppo_core/src/bandits/event.rs create mode 100644 eppo_core/src/bandits/mod.rs create mode 100644 eppo_core/src/bandits/models.rs create mode 100644 eppo_core/src/context_attributes.rs diff --git a/eppo_core/src/bandits/eval.rs b/eppo_core/src/bandits/eval.rs new file mode 100644 index 00000000..03c87906 --- /dev/null +++ b/eppo_core/src/bandits/eval.rs @@ -0,0 +1,421 @@ +use std::collections::HashMap; + +use chrono::Utc; +use serde::Deserialize; +use serde::Serialize; + +use crate::sharder::Md5Sharder; +use crate::sharder::Sharder; +use crate::ufc::Assignment; +use crate::ufc::AssignmentEvent; +use crate::ufc::AssignmentValue; +use crate::ufc::VariationType; +use crate::Configuration; +use crate::ContextAttributes; + +use super::event::BanditEvent; +use super::BanditCategoricalAttributeCoefficient; +use super::BanditModelData; +use super::BanditNumericAttributeCoefficient; + +#[derive(Debug)] +struct BanditEvaluationDetails { + pub flag_key: String, + pub subject_key: String, + pub subject_attributes: ContextAttributes, + /// Selected action. + pub action_key: String, + /// Attributes of the selected action. + pub action_attributes: ContextAttributes, + /// Score of the selected action. + pub action_score: f64, + pub action_weight: f64, + pub gamma: f64, + /// Distance between best and selected actions' scores. + pub optimality_gap: f64, +} + +struct Action<'a> { + key: &'a str, + attributes: &'a ContextAttributes, +} + +/// Result of evaluating a bandit. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BanditResult { + /// Selected variation from the feature flag. + variation: String, + /// Selected action if any. + action: Option, + /// Flag assignment event that needs to be logged to analytics storage. + assignment_event: Option, + /// Bandit assignment event that needs to be logged to analytics storage. + bandit_event: Option, +} + +impl Configuration { + /// Evaluate the specified string feature flag for the given subject. If resulting variation is + /// a bandit, evaluate the bandit to return the action. + pub fn get_bandit_action( + &self, + flag_key: &str, + subject_key: &str, + subject_attributes: &ContextAttributes, + actions: &HashMap, + default_variation: &str, + ) -> BanditResult { + let assignment = self + .get_assignment( + flag_key, + subject_key, + &subject_attributes.to_generic_attributes(), + Some(VariationType::String), + ) + .unwrap_or_default() + .unwrap_or_else(|| Assignment { + value: AssignmentValue::String(default_variation.to_owned()), + event: None, + }); + + let variation = assignment + .value + .to_string() + .expect("flag assignment in bandit evaluation is always a string"); + + let Some(bandit_key) = self.get_bandit_key(flag_key, &variation) else { + // It's not a bandit variation, just return it. + return BanditResult { + variation, + action: None, + assignment_event: assignment.event, + bandit_event: None, + }; + }; + + let Some(bandit) = self.get_bandit(bandit_key) else { + // We've evaluated a flag that resulted in a bandit but now we cannot find the bandit + // configuration and we cannot proceed. + // + // This should normally never happen as it means that there's a mismatch between the + // general UFC config and bandits config. + // + // Abort evaluation and return default variant, ignoring `assignment.event` logging. + log::warn!(target: "eppo", bandit_key; "unable to find bandit configuration"); + return BanditResult { + variation: default_variation.to_owned(), + action: None, + assignment_event: None, + bandit_event: None, + }; + }; + + let Some(evaluation) = + bandit + .model_data + .evaluate(flag_key, subject_key, subject_attributes, actions) + else { + // We've evaluated a flag but now bandit evaluation failed. (Likely to user supplying + // empty actions, or NaN attributes.) + // + // Abort evaluation and return default variant, ignoring `assignment.event` logging. + return BanditResult { + variation: default_variation.to_owned(), + action: None, + assignment_event: None, + bandit_event: None, + }; + }; + + let bandit_event = BanditEvent { + flag_key: flag_key.to_owned(), + bandit_key: bandit_key.to_owned(), + subject: subject_key.to_owned(), + action: evaluation.action_key.clone(), + action_probability: evaluation.action_weight, + optimality_gap: evaluation.optimality_gap, + model_version: bandit.model_version.clone(), + timestamp: Utc::now().to_rfc3339(), + subject_numeric_attributes: evaluation.subject_attributes.numeric, + subject_categorical_attributes: evaluation.subject_attributes.categorical, + action_numeric_attributes: evaluation.action_attributes.numeric, + action_categorical_attributes: evaluation.action_attributes.categorical, + meta_data: [( + "eppoCoreVersion".to_owned(), + env!("CARGO_PKG_VERSION").to_owned(), + )] + .into(), + }; + + return BanditResult { + variation, + action: Some(evaluation.action_key), + assignment_event: assignment.event, + bandit_event: Some(bandit_event), + }; + } +} + +impl BanditModelData { + fn evaluate( + &self, + flag_key: &str, + subject_key: &str, + subject_attributes: &ContextAttributes, + actions: &HashMap, + ) -> Option { + // total_shards is not configurable at the moment. + const TOTAL_SHARDS: u64 = 10_000; + + let scores = actions + .iter() + .map(|(key, attributes)| { + ( + key.as_str(), + self.score_action(Action { key, attributes }, subject_attributes), + ) + }) + .filter(|&(_, score)| !score.is_nan()) + .collect::>(); + + let best = scores.iter().map(|(k, v)| (*k, *v)).max_by(|a, b| { + f64::partial_cmp(&a.1, &b.1) + .expect("action scores should be comparable as we filtered out any possible NaNs") + })?; + + let weights = self.weigh_actions(&scores, best); + + // Pseudo-random deterministic shuffle of actions. Shuffling is unique per subject, so when + // weights change slightly, large swatches of subjects are not reassign from one action to + // the same other action (instead, if subject is pushed away from an action, it will get + // assigned to a pseudo-random other action). + let shuffled_actions = { + let mut shuffled_actions = actions.keys().map(|x| x.as_str()).collect::>(); + // Sort actions by their shard value. Use action key as tie breaker. + shuffled_actions.sort_by_cached_key(|&action_key| { + let hash = Md5Sharder.get_shard( + format!("{flag_key}-{subject_key}-{action_key}"), + TOTAL_SHARDS, + ); + (hash, action_key) + }); + shuffled_actions + }; + + let selection_hash = + (Md5Sharder.get_shard(format!("{flag_key}-{subject_key}"), TOTAL_SHARDS) as f64) + / (TOTAL_SHARDS as f64); + + let selected_action = { + let mut cumulative_weight = 0.0; + *shuffled_actions + .iter() + .find(|&action_key| { + cumulative_weight += weights[action_key]; + cumulative_weight > selection_hash + }) + .or_else(|| shuffled_actions.last())? + }; + + let optimality_gap = best.1 - scores[selected_action]; + + Some(BanditEvaluationDetails { + flag_key: flag_key.to_owned(), + subject_key: subject_key.to_owned(), + subject_attributes: subject_attributes.to_owned(), + action_key: selected_action.to_owned(), + action_attributes: actions[selected_action].to_owned(), + action_score: scores[selected_action], + action_weight: weights[selected_action], + gamma: self.gamma, + optimality_gap, + }) + } + + /// Weigh actions depending on their scores. Higher-scored actions receive more weight, except + /// best action which receive the remainder weight. + fn weigh_actions<'a>( + &self, + scores: &HashMap<&'a str, f64>, + (best_action, best_score): (&'a str, f64), + ) -> HashMap<&'a str, f64> { + let mut weights = HashMap::<&str, f64>::new(); + + let n_actions = scores.len() as f64; + + let mut remainder_weight = 1.0; + for (action, score) in scores { + if *action != best_action { + let min_probability = self.action_probability_floor / n_actions; + let weight = + min_probability.max(1.0 / (n_actions + self.gamma * (best_score - score))); + + weights.insert(action, weight); + remainder_weight -= weight; + } + } + + weights.insert(best_action, f64::max(remainder_weight, 0.0)); + + weights + } + + fn score_action(&self, action: Action, subject_attributes: &ContextAttributes) -> f64 { + let Some(coefficients) = self.coefficients.get(action.key) else { + return self.default_action_score; + }; + + coefficients.intercept + + score_attributes( + &action.attributes, + &coefficients.action_numeric_coefficients, + &coefficients.action_categorical_coefficients, + ) + + score_attributes( + subject_attributes, + &coefficients.subject_numeric_coefficients, + &coefficients.subject_categorical_coefficients, + ) + } +} + +fn score_attributes( + attributes: &ContextAttributes, + numeric_coefficients: &[BanditNumericAttributeCoefficient], + categorical_coefficients: &[BanditCategoricalAttributeCoefficient], +) -> f64 { + numeric_coefficients + .into_iter() + .map(|coef| { + attributes + .numeric + .get(&coef.attribute_key) + .map(|value| value * coef.coefficient) + .unwrap_or(coef.missing_value_coefficient) + }) + .chain(categorical_coefficients.into_iter().map(|coef| { + attributes + .categorical + .get(&coef.attribute_key) + .and_then(|value| coef.value_coefficients.get(value)) + .copied() + .unwrap_or(coef.missing_value_coefficient) + })) + .sum() +} + +#[cfg(test)] +mod tests { + use std::{ + collections::HashMap, + fs::{read_dir, File}, + }; + + use serde::{Deserialize, Serialize}; + + use crate::{Configuration, ContextAttributes}; + + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "camelCase")] + struct TestFile { + flag: String, + default_value: String, + subjects: Vec, + } + + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "camelCase")] + struct TestSubject { + subject_key: String, + subject_attributes: TestContextAttributes, + actions: Vec, + assignment: TestAssignment, + } + + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "camelCase")] + struct TestContextAttributes { + numeric_attributes: HashMap, + categorical_attributes: HashMap, + } + impl From for ContextAttributes { + fn from(value: TestContextAttributes) -> ContextAttributes { + ContextAttributes { + numeric: value.numeric_attributes, + categorical: value.categorical_attributes, + } + } + } + + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "camelCase")] + struct TestAction { + action_key: String, + #[serde(flatten)] + attributes: TestContextAttributes, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "camelCase")] + struct TestAssignment { + variation: String, + action: Option, + } + + #[test] + fn sdk_test_data() { + let config = + serde_json::from_reader(File::open("tests/data/ufc/bandit-flags-v1.json").unwrap()) + .unwrap(); + let bandits = + serde_json::from_reader(File::open("tests/data/ufc/bandit-models-v1.json").unwrap()) + .unwrap(); + + let config = Configuration::new(Some(config), Some(bandits)); + + for entry in read_dir("tests/data/ufc/bandit-tests/").unwrap() { + let entry = entry.unwrap(); + println!("Processing test file: {:?}", entry.path()); + + if entry + .file_name() + .into_string() + .unwrap() + .ends_with(".dynamic-typing.json") + { + // Not applicable to Rust as it's strongly statically typed. + continue; + } + + let test: TestFile = serde_json::from_reader(File::open(entry.path()).unwrap()) + .expect("cannot parse test file"); + + for subject in test.subjects { + print!("test subject {:?}... ", subject.subject_key); + + let actions = subject + .actions + .into_iter() + .map(|x| (x.action_key, x.attributes.into())) + .collect(); + + let result = config.get_bandit_action( + &test.flag, + &subject.subject_key, + &subject.subject_attributes.into(), + &actions, + &test.default_value, + ); + + assert_eq!( + TestAssignment { + variation: result.variation, + action: result.action + }, + subject.assignment + ); + + println!("ok") + } + } + } +} diff --git a/eppo_core/src/bandits/event.rs b/eppo_core/src/bandits/event.rs new file mode 100644 index 00000000..9fa428cc --- /dev/null +++ b/eppo_core/src/bandits/event.rs @@ -0,0 +1,22 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +/// Bandit evaluation event that needs to be logged to analytics storage. +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BanditEvent { + pub flag_key: String, + pub bandit_key: String, + pub subject: String, + pub action: String, + pub action_probability: f64, + pub optimality_gap: f64, + pub model_version: String, + pub timestamp: String, + pub subject_numeric_attributes: HashMap, + pub subject_categorical_attributes: HashMap, + pub action_numeric_attributes: HashMap, + pub action_categorical_attributes: HashMap, + pub meta_data: HashMap, +} diff --git a/eppo_core/src/bandits/mod.rs b/eppo_core/src/bandits/mod.rs new file mode 100644 index 00000000..cb57d2eb --- /dev/null +++ b/eppo_core/src/bandits/mod.rs @@ -0,0 +1,7 @@ +mod eval; +mod event; +mod models; + +pub use eval::BanditResult; +pub use event::BanditEvent; +pub use models::*; diff --git a/eppo_core/src/bandits/models.rs b/eppo_core/src/bandits/models.rs new file mode 100644 index 00000000..d125184a --- /dev/null +++ b/eppo_core/src/bandits/models.rs @@ -0,0 +1,60 @@ +#![allow(missing_docs)] + +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +type Timestamp = chrono::DateTime; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BanditResponse { + pub bandits: HashMap, + pub updated_at: Timestamp, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BanditConfiguration { + pub bandit_key: String, + pub model_name: String, + pub model_version: String, + pub model_data: BanditModelData, + pub updated_at: Timestamp, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BanditModelData { + pub gamma: f64, + pub default_action_score: f64, + pub action_probability_floor: f64, + pub coefficients: HashMap, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BanditCoefficients { + pub action_key: String, + pub intercept: f64, + pub subject_numeric_coefficients: Vec, + pub subject_categorical_coefficients: Vec, + pub action_numeric_coefficients: Vec, + pub action_categorical_coefficients: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BanditNumericAttributeCoefficient { + pub attribute_key: String, + pub coefficient: f64, + pub missing_value_coefficient: f64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BanditCategoricalAttributeCoefficient { + pub attribute_key: String, + pub value_coefficients: HashMap, + pub missing_value_coefficient: f64, +} diff --git a/eppo_core/src/configuration.rs b/eppo_core/src/configuration.rs index 3c9527a4..f266e0a4 100644 --- a/eppo_core/src/configuration.rs +++ b/eppo_core/src/configuration.rs @@ -1,9 +1,66 @@ -use std::sync::Arc; +use std::collections::HashMap; -use crate::ufc::UniversalFlagConfig; +use crate::{ + bandits::{BanditConfiguration, BanditResponse}, + ufc::{BanditVariation, UniversalFlagConfig}, +}; +/// Remote configuration for the eppo client. It's a central piece that defines client behavior. #[derive(Default, Clone)] pub struct Configuration { - /// UFC configuration. - pub ufc: Option>, + /// Flags configuration. + pub flags: Option, + /// Bandits configuration. + pub bandits: Option, + /// Mapping from flag key to flag variation value to bandit variation. + pub flag_to_bandit_associations: + HashMap>, +} + +impl Configuration { + /// Create a new configuration from server responses. + pub fn new( + config: Option, + bandits: Option, + ) -> Configuration { + let flag_to_bandit_associations = config + .as_ref() + .map(get_flag_to_bandit_associations) + .unwrap_or_default(); + Configuration { + flags: config, + bandits, + flag_to_bandit_associations, + } + } + + /// Return a bandit variant for the specified flag key and string flag variation. + pub(crate) fn get_bandit_key<'a>(&'a self, flag_key: &str, variation: &str) -> Option<&'a str> { + self.flag_to_bandit_associations + .get(flag_key) + .and_then(|x| x.get(variation)) + .map(|variation| variation.key.as_str()) + } + + /// Return bandit configuration for the given key. + /// + /// Returns `None` if bandits are missing for bandit does not exist. + pub(crate) fn get_bandit<'a>(&'a self, bandit_key: &str) -> Option<&'a BanditConfiguration> { + self.bandits.as_ref()?.bandits.get(bandit_key) + } +} + +fn get_flag_to_bandit_associations( + config: &UniversalFlagConfig, +) -> HashMap> { + config + .bandits + .iter() + .flat_map(|(_, bandits)| bandits.iter()) + .fold(HashMap::new(), |mut acc, variation| { + acc.entry(variation.flag_key.clone()) + .or_default() + .insert(variation.variation_value.clone(), variation.clone()); + acc + }) } diff --git a/eppo_core/src/configuration_fetcher.rs b/eppo_core/src/configuration_fetcher.rs index 0b349df1..640df72b 100644 --- a/eppo_core/src/configuration_fetcher.rs +++ b/eppo_core/src/configuration_fetcher.rs @@ -3,12 +3,13 @@ use std::sync::Arc; use reqwest::{StatusCode, Url}; -use crate::{ufc::UniversalFlagConfig, Configuration, Error, Result}; +use crate::{bandits::BanditResponse, ufc::UniversalFlagConfig, Configuration, Error, Result}; +#[derive(Debug, PartialEq, Eq)] pub struct ConfigurationFetcherConfig { pub base_url: String, pub api_key: String, - /// SDK name. Usually, language name. + /// SDK name. (Usually, language name.) pub sdk_name: String, /// Version of SDK. pub sdk_version: String, @@ -17,6 +18,7 @@ pub struct ConfigurationFetcherConfig { pub const DEFAULT_BASE_URL: &'static str = "https://fscdn.eppo.cloud/api"; const UFC_ENDPOINT: &'static str = "/flag-config/v1/config"; +const BANDIT_ENDPOINT: &'static str = "/flag-config/v1/bandits"; /// A client that fetches Eppo configuration from the server. pub struct ConfigurationFetcher { @@ -46,9 +48,14 @@ impl ConfigurationFetcher { let ufc = self.fetch_ufc_configuration()?; - Ok(Configuration { - ufc: Some(Arc::new(ufc)), - }) + let bandits = if ufc.bandits.is_empty() { + // We don't need bandits configuration if there are no bandits. + None + } else { + Some(self.fetch_bandits_configuration()?) + }; + + Ok(Configuration::new(Some(ufc), bandits)) } fn fetch_ufc_configuration(&mut self) -> Result { @@ -84,4 +91,38 @@ impl ConfigurationFetcher { Ok(configuration) } + + fn fetch_bandits_configuration(&mut self) -> Result { + let url = Url::parse_with_params( + &format!("{}{}", self.config.base_url, BANDIT_ENDPOINT), + &[ + ("apiKey", &*self.config.api_key), + ("sdkName", &*self.config.sdk_name), + ("sdkVersion", &*self.config.sdk_version), + ("coreVersion", env!("CARGO_PKG_VERSION")), + ], + ) + .map_err(|err| Error::InvalidBaseUrl(err))?; + + log::debug!(target: "eppo", "fetching UFC configuration"); + let response = self.client.get(url).send()?; + + let response = response.error_for_status().map_err(|err| { + if err.status() == Some(StatusCode::UNAUTHORIZED) { + log::warn!(target: "eppo", "client is not authorized. Check your API key"); + self.unauthorized = true; + return Error::Unauthorized; + } else { + log::warn!(target: "eppo", "received non-200 response while fetching new configuration: {:?}", err); + return Error::from(err); + + } + })?; + + let configuration = response.json()?; + + log::debug!(target: "eppo", "successfully fetched UFC configuration"); + + Ok(configuration) + } } diff --git a/eppo_core/src/configuration_store.rs b/eppo_core/src/configuration_store.rs index 8bddab50..2e105437 100644 --- a/eppo_core/src/configuration_store.rs +++ b/eppo_core/src/configuration_store.rs @@ -1,7 +1,7 @@ //! A thread-safe in-memory storage for currently active configuration. [`ConfigurationStore`] //! provides a concurrent access for readers (e.g., flag evaluation) and writers (e.g., periodic //! configuration fetcher). -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use crate::Configuration; @@ -11,7 +11,7 @@ use crate::Configuration; /// `Configuration` itself is always immutable and can only be replaced fully. #[derive(Default)] pub struct ConfigurationStore { - configuration: RwLock, + configuration: RwLock>, } impl ConfigurationStore { @@ -19,7 +19,7 @@ impl ConfigurationStore { ConfigurationStore::default() } - pub fn get_configuration(&self) -> Configuration { + pub fn get_configuration(&self) -> Arc { // self.configuration.read() should always return Ok(). Err() is possible only if the lock // is poisoned (writer panicked while holding the lock), which should never happen. let configuration = self @@ -32,6 +32,7 @@ impl ConfigurationStore { /// Set new configuration. pub fn set_configuration(&self, config: Configuration) { + let config = Arc::new(config); let mut configuration_slot = self .configuration .write() @@ -55,15 +56,17 @@ mod tests { { let store = store.clone(); let _ = std::thread::spawn(move || { - store.set_configuration(Configuration { - ufc: Some(Arc::new(UniversalFlagConfig { + store.set_configuration(Configuration::new( + Some(UniversalFlagConfig { flags: HashMap::new(), - })), - }); + bandits: HashMap::new(), + }), + None, + )) }) .join(); } - assert!(store.get_configuration().ufc.is_some()); + assert!(store.get_configuration().flags.is_some()); } } diff --git a/eppo_core/src/context_attributes.rs b/eppo_core/src/context_attributes.rs new file mode 100644 index 00000000..858f6c46 --- /dev/null +++ b/eppo_core/src/context_attributes.rs @@ -0,0 +1,74 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use crate::{AttributeValue, Attributes}; + +/// `ContextAttributes` are subject or action attributes split by their semantics. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ContextAttributes { + /// Numeric attributes are quantitative (e.g., real numbers) and define a scale. + /// + /// Not all numbers are numeric attributes. If a number is used to represent an enumeration or + /// on/off values, it is a categorical attribute. + pub numeric: HashMap, + /// Categorical attributes are attributes that have a finite set of values that are not directly + /// comparable (i.e., enumeration). + pub categorical: HashMap, +} + +impl From for ContextAttributes { + fn from(value: Attributes) -> Self { + ContextAttributes::from_iter(value) + } +} + +impl FromIterator<(K, V)> for ContextAttributes +where + K: ToOwned, + V: ToOwned, +{ + fn from_iter>(iter: T) -> Self { + iter.into_iter() + .fold(ContextAttributes::default(), |mut acc, (key, value)| { + match value.to_owned() { + AttributeValue::String(value) => { + acc.categorical.insert(key.to_owned(), value); + } + AttributeValue::Number(value) => { + acc.numeric.insert(key.to_owned(), value); + } + AttributeValue::Boolean(value) => { + // TBD: shall we ignore boolean attributes instead? + // + // One argument for including it here is that this basically guarantees that + // assignment evaluation inside bandit evaluation works the same way as if + // `get_assignment()` was called with generic `Attributes`. + // + // We can go a step further and remove `AttributeValue::Boolean` altogether, + // forcing it to be converted to a string before any evaluation. + acc.categorical.insert(key.to_owned(), value.to_string()); + } + AttributeValue::Null => { + // Nulls are missing values and are ignored. + } + } + acc + }) + } +} + +impl ContextAttributes { + /// Convert contextual attributes to generic `Attributes`. + pub fn to_generic_attributes(&self) -> Attributes { + let mut result = HashMap::with_capacity(self.numeric.len() + self.categorical.capacity()); + for (key, value) in self.numeric.iter() { + result.insert(key.clone(), AttributeValue::Number(*value)); + } + for (key, value) in self.categorical.iter() { + result.insert(key.clone(), AttributeValue::String(value.clone())); + } + result + } +} diff --git a/eppo_core/src/lib.rs b/eppo_core/src/lib.rs index 952cd00e..84dcb5bd 100644 --- a/eppo_core/src/lib.rs +++ b/eppo_core/src/lib.rs @@ -15,6 +15,7 @@ #![warn(rustdoc::missing_crate_level_docs)] #![warn(missing_docs)] +pub mod bandits; pub mod configuration_fetcher; pub mod configuration_store; pub mod poller_thread; @@ -23,8 +24,10 @@ pub mod ufc; mod attributes; mod configuration; +mod context_attributes; mod error; pub use attributes::{AttributeValue, Attributes}; pub use configuration::Configuration; +pub use context_attributes::ContextAttributes; pub use error::{Error, Result}; diff --git a/eppo_core/src/ufc/assignment.rs b/eppo_core/src/ufc/assignment.rs index 7ffea626..9712a70c 100644 --- a/eppo_core/src/ufc/assignment.rs +++ b/eppo_core/src/ufc/assignment.rs @@ -4,6 +4,16 @@ use serde::{Deserialize, Serialize}; use crate::Attributes; +/// Result of assignment evaluation. +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Assignment { + /// Assignment value that should be returned to the user. + pub value: AssignmentValue, + /// Optional assignment event that should be logged to storage. + pub event: Option, +} + /// Enum representing values assigned to a subject as a result of feature flag evaluation. #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] diff --git a/eppo_core/src/ufc/eval.rs b/eppo_core/src/ufc/eval.rs index cc84d3d2..a19da242 100644 --- a/eppo_core/src/ufc/eval.rs +++ b/eppo_core/src/ufc/eval.rs @@ -4,14 +4,56 @@ use chrono::Utc; use crate::{ sharder::{Md5Sharder, Sharder}, - Attributes, Error, Result, + Attributes, Configuration, Error, Result, }; use super::{ - Allocation, AssignmentEvent, AssignmentValue, Flag, Shard, Split, Timestamp, TryParse, + Allocation, Assignment, AssignmentEvent, Flag, Shard, Split, Timestamp, TryParse, UniversalFlagConfig, VariationType, }; +impl Configuration { + /// Evaluate the specified feature flag for the given subject and return assigned variation and + /// an optional assignment event for logging. + pub fn get_assignment( + &self, + flag_key: &str, + subject_key: &str, + subject_attributes: &Attributes, + expected_type: Option, + ) -> Result> { + let Some(ufc) = &self.flags else { + log::warn!(target: "eppo", flag_key, subject_key; "evaluating a flag before Eppo configuration has been fetched"); + // We treat missing configuration (the poller has not fetched config) as a normal + // scenario. + return Ok(None); + }; + + let assignment = + match ufc.eval_flag(&flag_key, &subject_key, &subject_attributes, expected_type) { + Ok(result) => result, + Err(err) => { + log::warn!(target: "eppo", + flag_key, + subject_key, + subject_attributes:serde; + "error occurred while evaluating a flag: {:?}", err, + ); + return Err(err); + } + }; + + log::trace!(target: "eppo", + flag_key, + subject_key, + subject_attributes:serde, + assignment:serde = assignment.as_ref().map(|Assignment{value, ..}| value); + "evaluated a flag"); + + Ok(assignment) + } +} + impl UniversalFlagConfig { /// Evaluate the flag for the given subject, expecting `expected_type` type. /// @@ -28,7 +70,7 @@ impl UniversalFlagConfig { subject_key: &str, subject_attributes: &Attributes, expected_type: Option, - ) -> Result)>> { + ) -> Result> { let flag = self.get_flag(flag_key)?; if let Some(ty) = expected_type { @@ -65,7 +107,7 @@ impl Flag { subject_key: &str, subject_attributes: &Attributes, sharder: &impl Sharder, - ) -> Result)>> { + ) -> Result> { if !self.enabled { return Ok(None); } @@ -123,17 +165,21 @@ impl Flag { subject: subject_key.to_owned(), subject_attributes: subject_attributes.clone(), timestamp: now.to_rfc3339(), - meta_data: HashMap::from([( + meta_data: [( "eppoCoreVersion".to_owned(), env!("CARGO_PKG_VERSION").to_owned(), - )]), + )] + .into(), extra_logging: split.extra_logging.clone(), }) } else { None }; - Ok(Some((assignment_value, event))) + Ok(Some(Assignment { + value: assignment_value, + event, + })) } } @@ -258,7 +304,7 @@ mod tests { let result_assingment = result .as_ref() - .map(|(value, _event)| value) + .map(|assignment| &assignment.value) .unwrap_or(&default_assignment); let expected_assignment = to_value(subject.assignment) .to_assignment_value(test_file.variation_type) diff --git a/eppo_core/src/ufc/mod.rs b/eppo_core/src/ufc/mod.rs index e589adf2..8a89db87 100644 --- a/eppo_core/src/ufc/mod.rs +++ b/eppo_core/src/ufc/mod.rs @@ -4,5 +4,5 @@ mod eval; mod models; mod rules; -pub use assignment::{AssignmentEvent, AssignmentValue}; +pub use assignment::{Assignment, AssignmentEvent, AssignmentValue}; pub use models::*; diff --git a/eppo_core/src/ufc/models.rs b/eppo_core/src/ufc/models.rs index 291a65e1..8f2e29dc 100644 --- a/eppo_core/src/ufc/models.rs +++ b/eppo_core/src/ufc/models.rs @@ -17,6 +17,10 @@ pub struct UniversalFlagConfig { /// Value is wrapped in `TryParse` so that if we fail to parse one flag (e.g., new server /// format), we can still serve other flags. pub flags: HashMap>, + /// `bandits` field connects string feature flags to bandits. Actual bandits configuration is + /// served separately. + #[serde(default)] + pub bandits: HashMap>, } /// `TryParse` allows the subfield to fail parsing without failing the parsing of the whole @@ -279,6 +283,19 @@ impl ShardRange { } } +/// `BanditVariation` associates a variation in feature flag with a bandit. +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct BanditVariation { + pub key: String, + /// Key of the flag. + pub flag_key: String, + /// Today it's the same as `variation_value`. + pub variation_key: String, + /// String variation value. + pub variation_value: String, +} + #[cfg(test)] mod tests { use std::{fs::File, io::BufReader}; diff --git a/ruby-sdk/ext/eppo_rb/Cargo.toml b/ruby-sdk/ext/eppo_rb/Cargo.toml index b5e03ad6..1da3a0a2 100644 --- a/ruby-sdk/ext/eppo_rb/Cargo.toml +++ b/ruby-sdk/ext/eppo_rb/Cargo.toml @@ -10,7 +10,7 @@ rust-version = "1.65.0" crate-type = ["cdylib"] [dependencies] -env_logger = { version = "0.11.3", features = ["unstable-kv"] } +env_logger = { version = "0.11.3", default-features = false, features = ["unstable-kv"] } eppo_core = { version = "0.1.0", path = "../../../eppo_core" } log = { version = "0.4.21", features = ["kv_serde"] } magnus = { version = "0.6.2" } diff --git a/ruby-sdk/ext/eppo_rb/src/client.rs b/ruby-sdk/ext/eppo_rb/src/client.rs index 3dc00b68..f81f3f7e 100644 --- a/ruby-sdk/ext/eppo_rb/src/client.rs +++ b/ruby-sdk/ext/eppo_rb/src/client.rs @@ -1,18 +1,10 @@ use std::{cell::RefCell, sync::Arc}; use eppo_core::{ - configuration_fetcher::ConfigurationFetcher, - configuration_store::ConfigurationStore, - poller_thread::PollerThread, - ufc::{AssignmentEvent, AssignmentValue, VariationType}, - Attributes, Configuration, -}; -use magnus::{ - error::Result, - exception::{self, exception}, - prelude::*, - Error, IntoValue, RHash, RString, Symbol, TryConvert, Value, + configuration_fetcher::ConfigurationFetcher, configuration_store::ConfigurationStore, + poller_thread::PollerThread, ufc::VariationType, Attributes, ContextAttributes, }; +use magnus::{error::Result, exception, prelude::*, Error, TryConvert, Value}; #[derive(Debug)] #[magnus::wrap(class = "EppoClient::Core::Config", size, free_immediately)] @@ -30,25 +22,6 @@ impl TryConvert for Config { } } -#[derive(Debug, serde::Serialize)] -pub struct Assignment { - value: Option, - event: Option, -} -impl Assignment { - const fn empty() -> Assignment { - Assignment { - value: None, - event: None, - } - } -} -impl IntoValue for Assignment { - fn into_value_with(self, handle: &magnus::Ruby) -> Value { - serde_magnus::serialize(&self).expect("Assignment value should be serializable") - } -} - #[magnus::wrap(class = "EppoClient::Core::Client")] pub struct Client { configuration_store: Arc, @@ -89,51 +62,50 @@ impl Client { subject_key: String, subject_attributes: Value, expected_type: Value, - ) -> Result { + ) -> Result { let expected_type: VariationType = serde_magnus::deserialize(expected_type)?; let subject_attributes: Attributes = serde_magnus::deserialize(subject_attributes)?; - let Configuration { ufc: Some(ufc) } = self.configuration_store.get_configuration() else { - log::warn!(target: "eppo", flag_key, subject_key; "evaluating a flag before Eppo configuration has been fetched"); - // We treat missing configuration (the poller has not fetched config) as a normal - // scenario (at least for now). - return Ok(Assignment::empty()); - }; + let config = self.configuration_store.get_configuration(); + let result = config + .get_assignment( + &flag_key, + &subject_key, + &subject_attributes, + Some(expected_type), + ) + // TODO: maybe expose possible errors individually. + .map_err(|err| Error::new(exception::runtime_error(), err.to_string()))?; - let evaluation = match ufc.eval_flag( + Ok(serde_magnus::serialize(&result).expect("assignment value should be serializable")) + } + + pub fn get_bandit_action( + &self, + flag_key: String, + subject_key: String, + subject_attributes: Value, + actions: Value, + default_variation: String, + ) -> Result { + let subject_attributes = + serde_magnus::deserialize::<_, ContextAttributes>(subject_attributes) + // Allow the user to pass generic Attributes instead of ContextAttributes + .or_else(|_err| { + serde_magnus::deserialize::<_, Attributes>(subject_attributes).map(|x| x.into()) + })?; + let actions = serde_magnus::deserialize(actions)?; + + let config = self.configuration_store.get_configuration(); + let result = config.get_bandit_action( &flag_key, &subject_key, &subject_attributes, - Some(expected_type), - ) { - Ok(result) => result, - Err(err) => { - log::warn!(target: "eppo", - flag_key, - subject_key, - subject_attributes:serde; - "error occurred while evaluating a flag: {:?}", err, - ); - return Err(Error::new(exception::runtime_error(), "blah")); - // return Err(err); - } - }; - - log::trace!(target: "eppo", - flag_key, - subject_key, - subject_attributes:serde, - assignment:serde = evaluation.as_ref().map(|(value, _event)| value); - "evaluated a flag"); - - let Some((value, event)) = evaluation else { - return Ok(Assignment::empty()); - }; + &actions, + &default_variation, + ); - Ok(Assignment { - value: Some(value), - event, - }) + serde_magnus::serialize(&result) } pub fn shutdown(&self) { diff --git a/ruby-sdk/ext/eppo_rb/src/lib.rs b/ruby-sdk/ext/eppo_rb/src/lib.rs index b1a62d46..9b73be8e 100644 --- a/ruby-sdk/ext/eppo_rb/src/lib.rs +++ b/ruby-sdk/ext/eppo_rb/src/lib.rs @@ -14,6 +14,7 @@ fn init(ruby: &Ruby) -> Result<(), Error> { let core_client = core.define_class("Client", magnus::class::object())?; core_client.define_singleton_method("new", function!(Client::new, 1))?; core_client.define_method("get_assignment", method!(Client::get_assignment, 4))?; + core_client.define_method("get_bandit_action", method!(Client::get_bandit_action, 5))?; core_client.define_method("shutdown", method!(Client::shutdown, 0))?; core.const_set( diff --git a/ruby-sdk/lib/eppo_client/client.rb b/ruby-sdk/lib/eppo_client/client.rb index a347a274..db29c2fa 100644 --- a/ruby-sdk/lib/eppo_client/client.rb +++ b/ruby-sdk/lib/eppo_client/client.rb @@ -48,32 +48,59 @@ def get_assignment_inner(flag_key, subject_key, subject_attributes, expected_typ logger = Logger.new($stdout) begin assignment = @core.get_assignment(flag_key, subject_key, subject_attributes, expected_type) - - event = assignment[:event] - if event - begin - event["metaData"]["sdkName"] = "ruby" - event["metaData"]["sdkVersion"] = EppoClient::VERSION - - @assignment_logger.log_assignment(event) - rescue EppoClient::AssignmentLoggerError - # Error means log_assignment was not set up. This is okay to ignore. - rescue StandardError => e - logger.error("[Eppo SDK] Error logging assignment event: #{e}") - end + if not assignment then + return default_value end - value = assignment[:value]&.[](expected_type) - value.nil? ? default_value : value - rescue StandardError + log_assignment(assignment[:event]) + + return assignment[:value][expected_type] + rescue StandardError => error logger.debug("[Eppo SDK] Failed to get assignment: #{error}") - # TODO: graceful mode? + # TODO: non-graceful mode? default_value end end # rubocop:enable Metrics/MethodLength - private :get_assignment_inner + def get_bandit_action(flag_key, subject_key, subject_attributes, actions, default_variation) + result = @core.get_bandit_action(flag_key, subject_key, subject_attributes, actions, default_variation) + + log_assignment(result[:assignment_event]) + log_bandit_action(result[:bandit_event]) + + return {:variation => result[:variation], :action=>result[:action]} + end + + def log_assignment(event) + if not event then return end + begin + event["metaData"]["sdkName"] = "ruby" + event["metaData"]["sdkVersion"] = EppoClient::VERSION + + @assignment_logger.log_assignment(event) + rescue EppoClient::AssignmentLoggerError + # Error means log_assignment was not set up. This is okay to ignore. + rescue StandardError => error + logger.error("[Eppo SDK] Error logging assignment event: #{error}") + end + end + + def log_bandit_action(event) + if not event then return end + begin + event["metaData"]["sdkName"] = "ruby" + event["metaData"]["sdkVersion"] = EppoClient::VERSION + + @assignment_logger.log_bandit_action(event) + rescue EppoClient::AssignmentLoggerError + # Error means log_assignment was not set up. This is okay to ignore. + rescue StandardError => error + logger.error("[Eppo SDK] Error logging bandit action event: #{error}") + end + end + + private :get_assignment_inner, :log_assignment, :log_bandit_action end end diff --git a/rust-sdk/src/client.rs b/rust-sdk/src/client.rs index 86b161e5..f3df14f3 100644 --- a/rust-sdk/src/client.rs +++ b/rust-sdk/src/client.rs @@ -7,9 +7,8 @@ use crate::{ AssignmentValue, Attributes, ClientConfig, Result, }; -use eppo_core::configuration_store::ConfigurationStore; use eppo_core::ufc::VariationType; -use eppo_core::Configuration; +use eppo_core::{configuration_store::ConfigurationStore, ufc::Assignment}; /// A client for Eppo API. /// @@ -377,39 +376,11 @@ impl<'a> Client<'a> { expected_type: Option, convert: impl FnOnce(AssignmentValue) -> T, ) -> Result> { - let Configuration { - ufc: Some(configuration), - } = self.configuration_store.get_configuration() - else { - log::warn!(target: "eppo", flag_key, subject_key; "evaluating a flag before Eppo configuration has been fetched"); - // We treat missing configuration (the poller has not fetched config) as a normal - // scenario (at least for now). - return Ok(None); - }; - - let evaluation = - match configuration.eval_flag(flag_key, subject_key, subject_attributes, expected_type) - { - Ok(result) => result, - Err(err) => { - log::warn!(target: "eppo", - flag_key, - subject_key, - subject_attributes:serde; - "error occurred while evaluating a flag: {:?}", err, - ); - return Err(err); - } - }; - - log::trace!(target: "eppo", - flag_key, - subject_key, - subject_attributes:serde, - assignment:serde = evaluation.as_ref().map(|(value, _event)| value); - "evaluated a flag"); + let config = self.configuration_store.get_configuration(); + let assignment = + config.get_assignment(flag_key, subject_key, subject_attributes, expected_type)?; - let Some((value, event)) = evaluation else { + let Some(Assignment { value, event }) = assignment else { return Ok(None); }; @@ -469,8 +440,8 @@ mod tests { ); // updating configuration after client is created - configuration_store.set_configuration(Configuration { - ufc: Some(Arc::new(UniversalFlagConfig { + configuration_store.set_configuration(Configuration::new( + Some(UniversalFlagConfig { flags: [( "flag".to_owned(), TryParse::Parsed(Flag { @@ -501,8 +472,10 @@ mod tests { }), )] .into(), - })), - }); + bandits: HashMap::new(), + }), + None, + )); assert_eq!( client