diff --git a/capture/src/config.rs b/capture/src/config.rs index 07b7f89..d91e7b7 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -13,6 +13,9 @@ pub struct Config { pub redis_url: String, pub otel_url: Option, + #[envconfig(default = "false")] + pub overflow_enabled: bool, + #[envconfig(default = "100")] pub overflow_per_second_limit: NonZeroU32, diff --git a/capture/src/server.rs b/capture/src/server.rs index 0704987..8585036 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -48,24 +48,30 @@ where .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let partition = OverflowLimiter::new( - config.overflow_per_second_limit, - config.overflow_burst_limit, - config.overflow_forced_keys, - ); - if config.export_prometheus { - let partition = partition.clone(); - tokio::spawn(async move { - partition.report_metrics().await; - }); - } - { - // Ensure that the rate limiter state does not grow unbounded - let partition = partition.clone(); - tokio::spawn(async move { - partition.clean_state().await; - }); - } + let partition = match config.overflow_enabled { + false => None, + true => { + let partition = OverflowLimiter::new( + config.overflow_per_second_limit, + config.overflow_burst_limit, + config.overflow_forced_keys, + ); + if config.export_prometheus { + let partition = partition.clone(); + tokio::spawn(async move { + partition.report_metrics().await; + }); + } + { + // Ensure that the rate limiter state does not grow unbounded + let partition = partition.clone(); + tokio::spawn(async move { + partition.clean_state().await; + }); + } + Some(partition) + } + }; let sink = KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink"); diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index 945e581..b82d3c3 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -36,12 +36,11 @@ impl rdkafka::ClientContext for KafkaContext { for (topic, stats) in stats.topics { gauge!( "capture_kafka_produce_avg_batch_size_bytes", - "topic" => topic.clone() + "topic" => topic.clone() ) .set(stats.batchsize.avg as f64); gauge!( "capture_kafka_produce_avg_batch_size_events", - "topic" => topic ) .set(stats.batchcnt.avg as f64); @@ -49,30 +48,58 @@ impl rdkafka::ClientContext for KafkaContext { for (_, stats) in stats.brokers { let id_string = format!("{}", stats.nodeid); + if let Some(rtt) = stats.rtt { + gauge!( + "capture_kafka_produce_rtt_latency_us", + "quantile" => "p50", + "broker" => id_string.clone() + ) + .set(rtt.p50 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_us", + "quantile" => "p90", + "broker" => id_string.clone() + ) + .set(rtt.p90 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_us", + "quantile" => "p95", + "broker" => id_string.clone() + ) + .set(rtt.p95 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_us", + "quantile" => "p99", + "broker" => id_string.clone() + ) + .set(rtt.p99 as f64); + } + gauge!( "capture_kafka_broker_requests_pending", - "broker" => id_string.clone() ) .set(stats.outbuf_cnt as f64); gauge!( "capture_kafka_broker_responses_awaiting", - "broker" => id_string.clone() ) .set(stats.waitresp_cnt as f64); counter!( "capture_kafka_broker_tx_errors_total", - "broker" => id_string.clone() ) .absolute(stats.txerrs); counter!( "capture_kafka_broker_rx_errors_total", - - "broker" => id_string + "broker" => id_string.clone() ) .absolute(stats.rxerrs); + counter!( + "capture_kafka_broker_request_timeouts", + "broker" => id_string + ) + .absolute(stats.req_timeouts); } } } @@ -80,7 +107,7 @@ impl rdkafka::ClientContext for KafkaContext { #[derive(Clone)] pub struct KafkaSink { producer: FutureProducer, - partition: OverflowLimiter, + partition: Option, main_topic: String, historical_topic: String, } @@ -89,7 +116,7 @@ impl KafkaSink { pub fn new( config: KafkaConfig, liveness: HealthHandle, - partition: OverflowLimiter, + partition: Option, ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); @@ -150,7 +177,11 @@ impl KafkaSink { DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events DataType::AnalyticsMain => { // TODO: deprecate capture-led overflow or move logic in handler - if self.partition.is_limited(&event_key) { + let is_limited = match &self.partition { + None => false, + Some(partition) => partition.is_limited(&event_key), + }; + if is_limited { (&self.main_topic, None) // Analytics overflow goes to the main topic without locality } else { (&self.main_topic, Some(event_key.as_str())) @@ -280,11 +311,11 @@ mod tests { let handle = registry .register("one".to_string(), Duration::seconds(30)) .await; - let limiter = OverflowLimiter::new( + let limiter = Some(OverflowLimiter::new( NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap(), None, - ); + )); let cluster = MockCluster::new(1).expect("failed to create mock brokers"); let config = config::KafkaConfig { kafka_producer_linger_ms: 0, diff --git a/capture/src/v0_endpoint.rs b/capture/src/v0_endpoint.rs index 3849e29..ff4b90f 100644 --- a/capture/src/v0_endpoint.rs +++ b/capture/src/v0_endpoint.rs @@ -150,7 +150,14 @@ pub async fn event( tracing::debug!(context=?context, events=?events, "decoded request"); if let Err(err) = process_events(state.sink.clone(), &events, &context).await { - report_dropped_events("process_events_error", events.len() as u64); + let cause = match err { + // TODO: automate this with a macro + CaptureError::EmptyDistinctId => "empty_distinct_id", + CaptureError::MissingDistinctId => "missing_distinct_id", + CaptureError::MissingEventName => "missing_event_name", + _ => "process_events_error", + }; + report_dropped_events(cause, events.len() as u64); tracing::log::warn!("rejected invalid payload: {}", err); return Err(err); } diff --git a/capture/tests/common.rs b/capture/tests/common.rs index 788e6e2..868b27c 100644 --- a/capture/tests/common.rs +++ b/capture/tests/common.rs @@ -29,6 +29,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { print_sink: false, address: SocketAddr::from_str("127.0.0.1:0").unwrap(), redis_url: "redis://localhost:6379/".to_string(), + overflow_enabled: false, overflow_burst_limit: NonZeroU32::new(5).unwrap(), overflow_per_second_limit: NonZeroU32::new(10).unwrap(), overflow_forced_keys: None, diff --git a/capture/tests/events.rs b/capture/tests/events.rs index 111b02c..7d2defc 100644 --- a/capture/tests/events.rs +++ b/capture/tests/events.rs @@ -174,6 +174,7 @@ async fn it_overflows_events_on_burst() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); @@ -223,6 +224,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(1).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); @@ -254,6 +256,58 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { Ok(()) } +#[tokio::test] +async fn it_skips_overflows_when_disabled() -> Result<()> { + setup_tracing(); + + let token = random_string("token", 16); + let distinct_id = random_string("id", 16); + + let topic = EphemeralTopic::new().await; + + let mut config = DEFAULT_CONFIG.clone(); + config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = false; + config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); + config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); + + let server = ServerHandle::for_config(config).await; + + let event = json!([{ + "token": token, + "event": "event1", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event2", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event3", + "distinct_id": distinct_id + }]); + + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + // Should have triggered overflow, but has not + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + Ok(()) +} + #[tokio::test] async fn it_trims_distinct_id() -> Result<()> { setup_tracing(); diff --git a/feature-flags/src/flag_definitions.rs b/feature-flags/src/flag_definitions.rs index 04f39fe..5efa0d1 100644 --- a/feature-flags/src/flag_definitions.rs +++ b/feature-flags/src/flag_definitions.rs @@ -1,4 +1,4 @@ -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use std::sync::Arc; use tracing::instrument; @@ -13,44 +13,30 @@ pub const TEAM_FLAGS_CACHE_PREFIX: &str = "posthog:1:team_feature_flags_"; // TODO: Hmm, revisit when dealing with groups, but seems like // ideal to just treat it as a u8 and do our own validation on top -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize)] pub enum GroupTypeIndex {} -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum OperatorType { - #[serde(rename = "exact")] Exact, - #[serde(rename = "is_not")] IsNot, - #[serde(rename = "icontains")] Icontains, - #[serde(rename = "not_icontains")] NotIcontains, - #[serde(rename = "regex")] Regex, - #[serde(rename = "not_regex")] NotRegex, - #[serde(rename = "gt")] Gt, - #[serde(rename = "lt")] Lt, - #[serde(rename = "gte")] Gte, - #[serde(rename = "lte")] Lte, - #[serde(rename = "is_set")] IsSet, - #[serde(rename = "is_not_set")] IsNotSet, - #[serde(rename = "is_date_exact")] IsDateExact, - #[serde(rename = "is_date_after")] IsDateAfter, - #[serde(rename = "is_date_before")] IsDateBefore, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct PropertyFilter { pub key: String, // TODO: Probably need a default for value? @@ -63,28 +49,28 @@ pub struct PropertyFilter { pub group_type_index: Option, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct FlagGroupType { pub properties: Option>, pub rollout_percentage: Option, pub variant: Option, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct MultivariateFlagVariant { pub key: String, pub name: Option, pub rollout_percentage: f64, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct MultivariateFlagOptions { pub variants: Vec, } // TODO: test name with https://www.fileformat.info/info/charset/UTF-16/list.htm values, like '𝖕𝖗𝖔𝖕𝖊𝖗𝖙𝖞': `𝓿𝓪𝓵𝓾𝓮` -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct FlagFilters { pub groups: Vec, pub multivariate: Option, @@ -93,7 +79,7 @@ pub struct FlagFilters { pub super_groups: Option>, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct FeatureFlag { pub id: i64, pub team_id: i64, @@ -125,7 +111,7 @@ impl FeatureFlag { } } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize)] pub struct FeatureFlagList { pub flags: Vec, @@ -180,7 +166,9 @@ mod tests { async fn test_fetch_flags_from_redis() { let client = setup_redis_client(None); - let team = insert_new_team_in_redis(client.clone()).await.unwrap(); + let team = insert_new_team_in_redis(client.clone()) + .await + .expect("Failed to insert team"); insert_flags_for_team_in_redis(client.clone(), team.id, None) .await @@ -188,13 +176,20 @@ mod tests { let flags_from_redis = FeatureFlagList::from_redis(client.clone(), team.id) .await - .unwrap(); + .expect("Failed to fetch flags from redis"); assert_eq!(flags_from_redis.flags.len(), 1); - let flag = flags_from_redis.flags.get(0).unwrap(); + let flag = flags_from_redis.flags.get(0).expect("Empty flags in redis"); assert_eq!(flag.key, "flag1"); assert_eq!(flag.team_id, team.id); assert_eq!(flag.filters.groups.len(), 1); - assert_eq!(flag.filters.groups[0].properties.as_ref().unwrap().len(), 1); + assert_eq!( + flag.filters.groups[0] + .properties + .as_ref() + .expect("Properties don't exist on flag") + .len(), + 1 + ); } #[tokio::test] diff --git a/feature-flags/src/flag_matching.rs b/feature-flags/src/flag_matching.rs index d0693fa..510fc15 100644 --- a/feature-flags/src/flag_matching.rs +++ b/feature-flags/src/flag_matching.rs @@ -1,6 +1,6 @@ -use sha1::{Digest, Sha1}; - use crate::flag_definitions::{FeatureFlag, FlagGroupType}; +use sha1::{Digest, Sha1}; +use std::fmt::Write; #[derive(Debug, PartialEq, Eq)] pub struct FeatureFlagMatch { @@ -135,11 +135,10 @@ impl FeatureFlagMatcher { hasher.update(hash_key.as_bytes()); let result = hasher.finalize(); // :TRICKY: Convert the first 15 characters of the digest to a hexadecimal string - // not sure if this is correct, padding each byte as 2 characters - let hex_str: String = result - .iter() - .map(|byte| format!("{:02x}", byte)) - .collect::()[..15] + let hex_str: String = result.iter().fold(String::new(), |mut acc, byte| { + let _ = write!(acc, "{:02x}", byte); + acc + })[..15] .to_string(); let hash_val = u64::from_str_radix(&hex_str, 16).unwrap(); diff --git a/feature-flags/src/redis.rs b/feature-flags/src/redis.rs index 3aeec47..89dde42 100644 --- a/feature-flags/src/redis.rs +++ b/feature-flags/src/redis.rs @@ -59,11 +59,6 @@ impl Client for RedisClient { Ok(fut?) } - // TODO: Ask Xavier if there's a better way to handle this. - // The problem: I want to match on the error type from this function, and do appropriate things like 400 or 500 response. - // Buuut, if I use anyhow::Error, I can't reverse-coerce into a NotFound or serde_pickle::Error. - // Thus, I need to create a custom error enum of all possible errors + my own custom not found, so I can match on it. - // Is this the canonical way? async fn get(&self, k: String) -> Result { let mut conn = self.client.get_async_connection().await?; diff --git a/feature-flags/tests/test_flag_matching_consistency.rs b/feature-flags/tests/test_flag_matching_consistency.rs index 57cce35..4a24b0e 100644 --- a/feature-flags/tests/test_flag_matching_consistency.rs +++ b/feature-flags/tests/test_flag_matching_consistency.rs @@ -1,7 +1,6 @@ /// These tests are common between all libraries doing local evaluation of feature flags. /// This ensures there are no mismatches between implementations. use feature_flags::flag_matching::{FeatureFlagMatch, FeatureFlagMatcher}; -// use feature_flags::flag_definitions::{FeatureFlag, FlagGroupType}; use feature_flags::test_utils::create_flag_from_json; use serde_json::json;