Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

feat: add partition limiter + overflow #48

Merged
merged 10 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ tracing = "0.1"
tracing-subscriber = "0.3"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
governor = "0.5.1"
governor = {version = "0.5.1", features=["dashmap"]}
tower_governor = "0.0.4"
time = { version = "0.3.20", features = ["formatting", "macros", "parsing", "serde"] }
tower-http = { version = "0.4.0", features = ["cors", "trace"] }
Expand Down
21 changes: 21 additions & 0 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::default::Default;
use std::net::{SocketAddr, TcpListener};
use std::num::NonZeroU32;
use std::str::FromStr;
use std::string::ToString;
use std::sync::{Arc, Once};
Expand All @@ -26,6 +27,8 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
burst_limit: NonZeroU32::new(5).unwrap(),
per_second_limit: NonZeroU32::new(10).unwrap(),
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
Expand Down Expand Up @@ -144,6 +147,24 @@ impl EphemeralTopic {
None => bail!("kafka read timeout"),
}
}
pub fn next_message_key(&self) -> anyhow::Result<Option<String>> {
match self.consumer.poll(self.read_timeout) {
Some(Ok(message)) => {
let key = message.key();

if let Some(key) = key {
let key = std::str::from_utf8(key)?;
let key = String::from_str(key)?;

Ok(Some(key))
} else {
Ok(None)
}
}
Some(Err(err)) => bail!("kafka read error: {}", err),
None => bail!("kafka read timeout"),
}
}

pub fn topic_name(&self) -> &str {
&self.topic_name
Expand Down
88 changes: 88 additions & 0 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::num::NonZeroU32;

use anyhow::Result;
use assert_json_diff::assert_json_include;
use reqwest::StatusCode;
Expand Down Expand Up @@ -73,3 +75,89 @@ async fn it_captures_a_batch() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn it_is_limited() -> Result<()> {
ellie marked this conversation as resolved.
Show resolved Hide resolved
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(1).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

let event = json!([{
"token": token,
"event": "event1",
"distinct_id": distinct_id
},{
"token": token,
"event": "event2",
"distinct_id": distinct_id
}]);

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_eq!(
topic.next_message_key()?.unwrap(),
format!(
"{}:{}",
token,
distinct_id
)
);

assert_eq!(topic.next_message_key()?, None);

Ok(())
}

#[tokio::test]
async fn it_does_not_partition_limit_different_ids() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let distinct_id2 = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(1).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

let event = json!([{
"token": token,
"event": "event1",
"distinct_id": distinct_id
},{
"token": token,
"event": "event2",
"distinct_id": distinct_id2
}]);

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id2)
);

Ok(())
}
1 change: 1 addition & 0 deletions capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ metrics-exporter-prometheus = { workspace = true }
thiserror = { workspace = true }
redis = { version="0.23.3", features=["tokio-comp", "cluster", "cluster-async"] }
envconfig = { workspace = true }
dashmap = "5.5.3"

[dev-dependencies]
assert-json-diff = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ pub async fn event(
client_ip: ip.to_string(),
};

let limited = state
let billing_limited = state
.billing
.is_limited(context.token.as_str(), QuotaResource::Events)
.await;

if limited {
if billing_limited {
report_dropped_events("over_quota", 1);

// for v0 we want to just return ok 🙃
Expand Down
13 changes: 11 additions & 2 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, num::NonZeroU32};

use envconfig::Envconfig;

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(default = "false")]
pub print_sink: bool,

#[envconfig(default = "127.0.0.1:3000")]
pub address: SocketAddr,

pub redis_url: String,
pub otel_url: Option<String>,

#[envconfig(default = "100")]
pub per_second_limit: NonZeroU32,

#[envconfig(default = "1000")]
pub burst_limit: NonZeroU32,

#[envconfig(nested = true)]
pub kafka: KafkaConfig,

pub otel_url: Option<String>,
#[envconfig(default = "1.0")]
pub otel_sampling_rate: f64,

#[envconfig(default = "true")]
pub export_prometheus: bool,
}
Expand Down
1 change: 1 addition & 0 deletions capture/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod capture;
pub mod config;
pub mod event;
pub mod health;
pub mod partition_limits;
pub mod prometheus;
pub mod redis;
pub mod router;
Expand Down
58 changes: 58 additions & 0 deletions capture/src/partition_limits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/// When a customer is writing too often to the same key, we get hot partitions. This negatively
/// affects our write latency and cluster health. We try to provide ordering guarantees wherever
/// possible, but this does require that we map key -> partition.
///
/// If the write-rate reaches a certain amount, we need to be able to handle the hot partition
/// before it causes a negative impact. In this case, instead of passing the error to the customer
/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the
/// customers data will be written to random partitions.
ellie marked this conversation as resolved.
Show resolved Hide resolved
use std::{num::NonZeroU32, sync::Arc};

use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
pub struct PartitionLimiter {
limiter: Arc<RateLimiter<String, DefaultKeyedStateStore<String>, clock::DefaultClock>>,
}

impl PartitionLimiter {
pub fn new(per_second: NonZeroU32, burst: NonZeroU32) -> Self {
let quota = Quota::per_second(per_second).allow_burst(burst);
let limiter = Arc::new(governor::RateLimiter::dashmap(quota));

PartitionLimiter { limiter }
}

pub fn is_limited(&self, key: &String) -> bool {
self.limiter.check_key(key).is_err()
}
}

#[cfg(test)]
mod tests {
use crate::partition_limits::PartitionLimiter;
use std::num::NonZeroU32;

#[tokio::test]
async fn low_limits() {
let limiter =
PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap());
let token = String::from("test");

assert!(!limiter.is_limited(&token));
assert!(limiter.is_limited(&token));
}

#[tokio::test]
async fn bursting() {
let limiter =
PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap());
let token = String::from("test");

assert!(!limiter.is_limited(&token));
assert!(!limiter.is_limited(&token));
assert!(!limiter.is_limited(&token));
assert!(limiter.is_limited(&token));
}
}
5 changes: 5 additions & 0 deletions capture/src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
pub fn report_dropped_events(cause: &'static str, quantity: u64) {
counter!("capture_events_dropped_total", quantity, "cause" => cause);
}

pub fn report_overflow_partition(quantity: u64) {
counter!("capture_partition_key_capacity_exceeded_total", quantity);
}

pub fn setup_metrics_recorder() -> PrometheusHandle {
// Ok I broke it at the end, but the limit on our ingress is 60 and that's a nicer way of reaching it
const EXPONENTIAL_SECONDS: &[f64] = &[
Expand Down
8 changes: 7 additions & 1 deletion capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use time::Duration;
use crate::billing_limits::BillingLimiter;
use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
use crate::partition_limits::PartitionLimiter;
use crate::redis::RedisClient;
use crate::{router, sink};

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
F: Future<Output = ()>,
Expand All @@ -28,6 +30,7 @@ where
.await
.report_status(ComponentStatus::Unhealthy)
.await;

router::router(
crate::time::SystemTime {},
liveness,
Expand All @@ -40,7 +43,10 @@ where
let sink_liveness = liveness
.register("rdkafka".to_string(), Duration::seconds(30))
.await;
let sink = sink::KafkaSink::new(config.kafka, sink_liveness).unwrap();

let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit);
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition).unwrap();

router::router(
crate::time::SystemTime {},
liveness,
Expand Down
Loading
Loading