diff --git a/Cargo.lock b/Cargo.lock index fab963c..8e3b873 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -205,6 +205,7 @@ dependencies = [ "axum-test-helper", "base64", "bytes", + "dashmap", "envconfig", "flate2", "governor", diff --git a/Cargo.toml b/Cargo.toml index 402243e..5a0d501 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ tracing = "0.1" tracing-subscriber = "0.3" serde = { version = "1.0.160", features = ["derive"] } serde_json = "1.0.96" -governor = "0.5.1" +governor = {version = "0.5.1", features=["dashmap"]} tower_governor = "0.0.4" time = { version = "0.3.20", features = ["formatting", "macros", "parsing", "serde"] } tower-http = { version = "0.4.0", features = ["cors", "trace"] } diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs index e9329cf..c0ee9ba 100644 --- a/capture-server/tests/common.rs +++ b/capture-server/tests/common.rs @@ -2,6 +2,7 @@ use std::default::Default; use std::net::{SocketAddr, TcpListener}; +use std::num::NonZeroU32; use std::str::FromStr; use std::string::ToString; use std::sync::{Arc, Once}; @@ -26,6 +27,8 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { print_sink: false, address: SocketAddr::from_str("127.0.0.1:0").unwrap(), redis_url: "redis://localhost:6379/".to_string(), + burst_limit: NonZeroU32::new(5).unwrap(), + per_second_limit: NonZeroU32::new(10).unwrap(), kafka: KafkaConfig { kafka_producer_linger_ms: 0, // Send messages as soon as possible kafka_producer_queue_mib: 10, @@ -144,6 +147,24 @@ impl EphemeralTopic { None => bail!("kafka read timeout"), } } + pub fn next_message_key(&self) -> anyhow::Result> { + match self.consumer.poll(self.read_timeout) { + Some(Ok(message)) => { + let key = message.key(); + + if let Some(key) = key { + let key = std::str::from_utf8(key)?; + let key = String::from_str(key)?; + + Ok(Some(key)) + } else { + Ok(None) + } + } + Some(Err(err)) => bail!("kafka read error: {}", err), + None => bail!("kafka read timeout"), + } + } pub fn topic_name(&self) -> &str { &self.topic_name diff --git a/capture-server/tests/events.rs b/capture-server/tests/events.rs index 42facd8..27db9a7 100644 --- a/capture-server/tests/events.rs +++ b/capture-server/tests/events.rs @@ -1,3 +1,5 @@ +use std::num::NonZeroU32; + use anyhow::Result; use assert_json_diff::assert_json_include; use reqwest::StatusCode; @@ -73,3 +75,94 @@ async fn it_captures_a_batch() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn it_is_limited_with_burst() -> Result<()> { + setup_tracing(); + + let token = random_string("token", 16); + let distinct_id = random_string("id", 16); + + let topic = EphemeralTopic::new().await; + + let mut config = DEFAULT_CONFIG.clone(); + config.kafka.kafka_topic = topic.topic_name().to_string(); + config.burst_limit = NonZeroU32::new(2).unwrap(); + config.per_second_limit = NonZeroU32::new(1).unwrap(); + + let server = ServerHandle::for_config(config); + + let event = json!([{ + "token": token, + "event": "event1", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event2", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event3", + "distinct_id": distinct_id + }]); + + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + assert_eq!(topic.next_message_key()?, None); + + Ok(()) +} + +#[tokio::test] +async fn it_does_not_partition_limit_different_ids() -> Result<()> { + setup_tracing(); + + let token = random_string("token", 16); + let distinct_id = random_string("id", 16); + let distinct_id2 = random_string("id", 16); + + let topic = EphemeralTopic::new().await; + + let mut config = DEFAULT_CONFIG.clone(); + config.kafka.kafka_topic = topic.topic_name().to_string(); + config.burst_limit = NonZeroU32::new(1).unwrap(); + config.per_second_limit = NonZeroU32::new(1).unwrap(); + + let server = ServerHandle::for_config(config); + + let event = json!([{ + "token": token, + "event": "event1", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event2", + "distinct_id": distinct_id2 + }]); + + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id2) + ); + + Ok(()) +} diff --git a/capture/Cargo.toml b/capture/Cargo.toml index 6bdfc39..3b2b100 100644 --- a/capture/Cargo.toml +++ b/capture/Cargo.toml @@ -31,6 +31,7 @@ metrics-exporter-prometheus = { workspace = true } thiserror = { workspace = true } redis = { version="0.23.3", features=["tokio-comp", "cluster", "cluster-async"] } envconfig = { workspace = true } +dashmap = "5.5.3" [dev-dependencies] assert-json-diff = { workspace = true } diff --git a/capture/src/capture.rs b/capture/src/capture.rs index dbff970..83466eb 100644 --- a/capture/src/capture.rs +++ b/capture/src/capture.rs @@ -78,12 +78,12 @@ pub async fn event( client_ip: ip.to_string(), }; - let limited = state + let billing_limited = state .billing .is_limited(context.token.as_str(), QuotaResource::Events) .await; - if limited { + if billing_limited { report_dropped_events("over_quota", 1); // for v0 we want to just return ok 🙃 diff --git a/capture/src/config.rs b/capture/src/config.rs index da8377a..8a471b3 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, num::NonZeroU32}; use envconfig::Envconfig; @@ -6,16 +6,25 @@ use envconfig::Envconfig; pub struct Config { #[envconfig(default = "false")] pub print_sink: bool, + #[envconfig(default = "127.0.0.1:3000")] pub address: SocketAddr, + pub redis_url: String, + pub otel_url: Option, + + #[envconfig(default = "100")] + pub per_second_limit: NonZeroU32, + + #[envconfig(default = "1000")] + pub burst_limit: NonZeroU32, #[envconfig(nested = true)] pub kafka: KafkaConfig, - pub otel_url: Option, #[envconfig(default = "1.0")] pub otel_sampling_rate: f64, + #[envconfig(default = "true")] pub export_prometheus: bool, } diff --git a/capture/src/lib.rs b/capture/src/lib.rs index 50f6705..eea915c 100644 --- a/capture/src/lib.rs +++ b/capture/src/lib.rs @@ -4,6 +4,7 @@ pub mod capture; pub mod config; pub mod event; pub mod health; +pub mod partition_limits; pub mod prometheus; pub mod redis; pub mod router; diff --git a/capture/src/partition_limits.rs b/capture/src/partition_limits.rs new file mode 100644 index 0000000..fe63ec1 --- /dev/null +++ b/capture/src/partition_limits.rs @@ -0,0 +1,58 @@ +/// When a customer is writing too often to the same key, we get hot partitions. This negatively +/// affects our write latency and cluster health. We try to provide ordering guarantees wherever +/// possible, but this does require that we map key -> partition. +/// +/// If the write-rate reaches a certain amount, we need to be able to handle the hot partition +/// before it causes a negative impact. In this case, instead of passing the error to the customer +/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the +/// customers data will be spread across all partitions. +use std::{num::NonZeroU32, sync::Arc}; + +use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter}; + +// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads +#[derive(Clone)] +pub struct PartitionLimiter { + limiter: Arc, clock::DefaultClock>>, +} + +impl PartitionLimiter { + pub fn new(per_second: NonZeroU32, burst: NonZeroU32) -> Self { + let quota = Quota::per_second(per_second).allow_burst(burst); + let limiter = Arc::new(governor::RateLimiter::dashmap(quota)); + + PartitionLimiter { limiter } + } + + pub fn is_limited(&self, key: &String) -> bool { + self.limiter.check_key(key).is_err() + } +} + +#[cfg(test)] +mod tests { + use crate::partition_limits::PartitionLimiter; + use std::num::NonZeroU32; + + #[tokio::test] + async fn low_limits() { + let limiter = + PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap()); + let token = String::from("test"); + + assert!(!limiter.is_limited(&token)); + assert!(limiter.is_limited(&token)); + } + + #[tokio::test] + async fn bursting() { + let limiter = + PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap()); + let token = String::from("test"); + + assert!(!limiter.is_limited(&token)); + assert!(!limiter.is_limited(&token)); + assert!(!limiter.is_limited(&token)); + assert!(limiter.is_limited(&token)); + } +} diff --git a/capture/src/prometheus.rs b/capture/src/prometheus.rs index d9dbea8..f225520 100644 --- a/capture/src/prometheus.rs +++ b/capture/src/prometheus.rs @@ -9,6 +9,11 @@ use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle}; pub fn report_dropped_events(cause: &'static str, quantity: u64) { counter!("capture_events_dropped_total", quantity, "cause" => cause); } + +pub fn report_overflow_partition(quantity: u64) { + counter!("capture_partition_key_capacity_exceeded_total", quantity); +} + pub fn setup_metrics_recorder() -> PrometheusHandle { // Ok I broke it at the end, but the limit on our ingress is 60 and that's a nicer way of reaching it const EXPONENTIAL_SECONDS: &[f64] = &[ diff --git a/capture/src/server.rs b/capture/src/server.rs index 8c40fd3..c84bd20 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -7,8 +7,10 @@ use time::Duration; use crate::billing_limits::BillingLimiter; use crate::config::Config; use crate::health::{ComponentStatus, HealthRegistry}; +use crate::partition_limits::PartitionLimiter; use crate::redis::RedisClient; use crate::{router, sink}; + pub async fn serve(config: Config, listener: TcpListener, shutdown: F) where F: Future, @@ -28,6 +30,7 @@ where .await .report_status(ComponentStatus::Unhealthy) .await; + router::router( crate::time::SystemTime {}, liveness, @@ -40,7 +43,10 @@ where let sink_liveness = liveness .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let sink = sink::KafkaSink::new(config.kafka, sink_liveness).unwrap(); + + let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit); + let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition).unwrap(); + router::router( crate::time::SystemTime {}, liveness, diff --git a/capture/src/sink.rs b/capture/src/sink.rs index a7be458..9d915b0 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -14,6 +14,7 @@ use crate::api::CaptureError; use crate::config::KafkaConfig; use crate::event::ProcessedEvent; use crate::health::HealthHandle; +use crate::partition_limits::PartitionLimiter; use crate::prometheus::report_dropped_events; #[async_trait] @@ -111,10 +112,15 @@ impl rdkafka::ClientContext for KafkaContext { pub struct KafkaSink { producer: FutureProducer, topic: String, + partition: PartitionLimiter, } impl KafkaSink { - pub fn new(config: KafkaConfig, liveness: HealthHandle) -> anyhow::Result { + pub fn new( + config: KafkaConfig, + liveness: HealthHandle, + partition: PartitionLimiter, + ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); let mut client_config = ClientConfig::new(); @@ -147,6 +153,7 @@ impl KafkaSink { Ok(KafkaSink { producer, + partition, topic: config.kafka_topic, }) } @@ -157,6 +164,7 @@ impl KafkaSink { producer: FutureProducer, topic: String, event: ProcessedEvent, + limited: bool, ) -> Result<(), CaptureError> { let payload = serde_json::to_string(&event).map_err(|e| { tracing::error!("failed to serialize event: {}", e); @@ -164,12 +172,13 @@ impl KafkaSink { })?; let key = event.key(); + let partition_key = if limited { None } else { Some(key.as_str()) }; match producer.send_result(FutureRecord { topic: topic.as_str(), payload: Some(&payload), partition: None, - key: Some(&key), + key: partition_key, timestamp: None, headers: None, }) { @@ -194,10 +203,12 @@ impl KafkaSink { impl EventSink for KafkaSink { #[instrument(skip_all)] async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { - Self::kafka_send(self.producer.clone(), self.topic.clone(), event).await?; + let limited = self.partition.is_limited(&event.key()); + Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?; histogram!("capture_event_batch_size", 1.0); counter!("capture_events_ingested_total", 1); + Ok(()) } @@ -209,7 +220,8 @@ impl EventSink for KafkaSink { let producer = self.producer.clone(); let topic = self.topic.clone(); - set.spawn(Self::kafka_send(producer, topic, event)); + let limited = self.partition.is_limited(&event.key()); + set.spawn(Self::kafka_send(producer, topic, event, limited)); } // Await on all the produce promises