Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

feat: add partition limiter + overflow #48

Merged
merged 10 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ tracing = "0.1"
tracing-subscriber = "0.3"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
governor = "0.5.1"
governor = {version = "0.5.1", features=["dashmap"]}
tower_governor = "0.0.4"
time = { version = "0.3.20", features = ["formatting", "macros", "parsing", "serde"] }
tower-http = { version = "0.4.0", features = ["cors", "trace"] }
Expand Down
21 changes: 21 additions & 0 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::default::Default;
use std::net::{SocketAddr, TcpListener};
use std::num::NonZeroU32;
use std::str::FromStr;
use std::string::ToString;
use std::sync::{Arc, Once};
Expand All @@ -26,6 +27,8 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
burst_limit: NonZeroU32::new(5).unwrap(),
per_second_limit: NonZeroU32::new(10).unwrap(),
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
Expand Down Expand Up @@ -144,6 +147,24 @@ impl EphemeralTopic {
None => bail!("kafka read timeout"),
}
}
pub fn next_message_key(&self) -> anyhow::Result<Option<String>> {
match self.consumer.poll(self.read_timeout) {
Some(Ok(message)) => {
let key = message.key();

if let Some(key) = key {
let key = std::str::from_utf8(key)?;
let key = String::from_str(key)?;

Ok(Some(key))
} else {
Ok(None)
}
}
Some(Err(err)) => bail!("kafka read error: {}", err),
None => bail!("kafka read timeout"),
}
}

pub fn topic_name(&self) -> &str {
&self.topic_name
Expand Down
93 changes: 93 additions & 0 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::num::NonZeroU32;

use anyhow::Result;
use assert_json_diff::assert_json_include;
use reqwest::StatusCode;
Expand Down Expand Up @@ -73,3 +75,94 @@ async fn it_captures_a_batch() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn it_is_limited_with_burst() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(2).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

let event = json!([{
"token": token,
"event": "event1",
"distinct_id": distinct_id
},{
"token": token,
"event": "event2",
"distinct_id": distinct_id
},{
"token": token,
"event": "event3",
"distinct_id": distinct_id
}]);

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

assert_eq!(topic.next_message_key()?, None);

Ok(())
}

#[tokio::test]
async fn it_does_not_partition_limit_different_ids() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let distinct_id2 = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(1).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

let event = json!([{
"token": token,
"event": "event1",
"distinct_id": distinct_id
},{
"token": token,
"event": "event2",
"distinct_id": distinct_id2
}]);

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id2)
);

Ok(())
}
1 change: 1 addition & 0 deletions capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ metrics-exporter-prometheus = { workspace = true }
thiserror = { workspace = true }
redis = { version="0.23.3", features=["tokio-comp", "cluster", "cluster-async"] }
envconfig = { workspace = true }
dashmap = "5.5.3"

[dev-dependencies]
assert-json-diff = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ pub async fn event(
client_ip: ip.to_string(),
};

let limited = state
let billing_limited = state
.billing
.is_limited(context.token.as_str(), QuotaResource::Events)
.await;

if limited {
if billing_limited {
report_dropped_events("over_quota", 1);

// for v0 we want to just return ok 🙃
Expand Down
13 changes: 11 additions & 2 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, num::NonZeroU32};

use envconfig::Envconfig;

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(default = "false")]
pub print_sink: bool,

#[envconfig(default = "127.0.0.1:3000")]
pub address: SocketAddr,

pub redis_url: String,
pub otel_url: Option<String>,

#[envconfig(default = "100")]
pub per_second_limit: NonZeroU32,

#[envconfig(default = "1000")]
pub burst_limit: NonZeroU32,

#[envconfig(nested = true)]
pub kafka: KafkaConfig,

pub otel_url: Option<String>,
#[envconfig(default = "1.0")]
pub otel_sampling_rate: f64,

#[envconfig(default = "true")]
pub export_prometheus: bool,
}
Expand Down
1 change: 1 addition & 0 deletions capture/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod capture;
pub mod config;
pub mod event;
pub mod health;
pub mod partition_limits;
pub mod prometheus;
pub mod redis;
pub mod router;
Expand Down
58 changes: 58 additions & 0 deletions capture/src/partition_limits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/// When a customer is writing too often to the same key, we get hot partitions. This negatively
/// affects our write latency and cluster health. We try to provide ordering guarantees wherever
/// possible, but this does require that we map key -> partition.
///
/// If the write-rate reaches a certain amount, we need to be able to handle the hot partition
/// before it causes a negative impact. In this case, instead of passing the error to the customer
/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the
/// customers data will be spread across all partitions.
use std::{num::NonZeroU32, sync::Arc};

use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
pub struct PartitionLimiter {
limiter: Arc<RateLimiter<String, DefaultKeyedStateStore<String>, clock::DefaultClock>>,
}

impl PartitionLimiter {
pub fn new(per_second: NonZeroU32, burst: NonZeroU32) -> Self {
let quota = Quota::per_second(per_second).allow_burst(burst);
let limiter = Arc::new(governor::RateLimiter::dashmap(quota));

PartitionLimiter { limiter }
}

pub fn is_limited(&self, key: &String) -> bool {
self.limiter.check_key(key).is_err()
}
}

#[cfg(test)]
mod tests {
use crate::partition_limits::PartitionLimiter;
use std::num::NonZeroU32;

#[tokio::test]
async fn low_limits() {
let limiter =
PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap());
let token = String::from("test");

assert!(!limiter.is_limited(&token));
assert!(limiter.is_limited(&token));
}

#[tokio::test]
async fn bursting() {
let limiter =
PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap());
let token = String::from("test");

assert!(!limiter.is_limited(&token));
assert!(!limiter.is_limited(&token));
assert!(!limiter.is_limited(&token));
assert!(limiter.is_limited(&token));
}
}
5 changes: 5 additions & 0 deletions capture/src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
pub fn report_dropped_events(cause: &'static str, quantity: u64) {
counter!("capture_events_dropped_total", quantity, "cause" => cause);
}

pub fn report_overflow_partition(quantity: u64) {
counter!("capture_partition_key_capacity_exceeded_total", quantity);
}

pub fn setup_metrics_recorder() -> PrometheusHandle {
// Ok I broke it at the end, but the limit on our ingress is 60 and that's a nicer way of reaching it
const EXPONENTIAL_SECONDS: &[f64] = &[
Expand Down
8 changes: 7 additions & 1 deletion capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use time::Duration;
use crate::billing_limits::BillingLimiter;
use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
use crate::partition_limits::PartitionLimiter;
use crate::redis::RedisClient;
use crate::{router, sink};

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
F: Future<Output = ()>,
Expand All @@ -28,6 +30,7 @@ where
.await
.report_status(ComponentStatus::Unhealthy)
.await;

router::router(
crate::time::SystemTime {},
liveness,
Expand All @@ -40,7 +43,10 @@ where
let sink_liveness = liveness
.register("rdkafka".to_string(), Duration::seconds(30))
.await;
let sink = sink::KafkaSink::new(config.kafka, sink_liveness).unwrap();

let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit);
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition).unwrap();

router::router(
crate::time::SystemTime {},
liveness,
Expand Down
Loading