Skip to content

Commit

Permalink
Merge pull request #286 from Kuadrant/no_lazy_static
Browse files Browse the repository at this point in the history
Removed the lazy static from the prometheus metrics
  • Loading branch information
eguzki authored Apr 9, 2024
2 parents 01bf010 + 265c870 commit ae41cd9
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 88 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::envoy_rls::server::envoy::service::ratelimit::v3::rate_limit_service_
use crate::envoy_rls::server::envoy::service::ratelimit::v3::{
RateLimitRequest, RateLimitResponse,
};
use crate::{Limiter, PROMETHEUS_METRICS};
use crate::prometheus_metrics::PrometheusMetrics;
use crate::Limiter;

include!("envoy_types.rs");

Expand All @@ -32,13 +33,19 @@ pub enum RateLimitHeaders {
pub struct MyRateLimiter {
limiter: Arc<Limiter>,
rate_limit_headers: RateLimitHeaders,
metrics: Arc<PrometheusMetrics>,
}

impl MyRateLimiter {
pub fn new(limiter: Arc<Limiter>, rate_limit_headers: RateLimitHeaders) -> Self {
pub fn new(
limiter: Arc<Limiter>,
rate_limit_headers: RateLimitHeaders,
metrics: Arc<PrometheusMetrics>,
) -> Self {
Self {
limiter,
rate_limit_headers,
metrics,
}
}
}
Expand Down Expand Up @@ -124,11 +131,11 @@ impl RateLimitService for MyRateLimiter {

let mut rate_limited_resp = rate_limited_resp.unwrap();
let resp_code = if rate_limited_resp.limited {
PROMETHEUS_METRICS
self.metrics
.incr_limited_calls(&namespace, rate_limited_resp.limit_name.as_deref());
Code::OverLimit
} else {
PROMETHEUS_METRICS.incr_authorized_calls(&namespace);
self.metrics.incr_authorized_calls(&namespace);
Code::Ok
};

Expand Down Expand Up @@ -237,9 +244,10 @@ pub async fn run_envoy_rls_server(
address: String,
limiter: Arc<Limiter>,
rate_limit_headers: RateLimitHeaders,
metrics: Arc<PrometheusMetrics>,
grpc_reflection_service: bool,
) -> Result<(), transport::Error> {
let rate_limiter = MyRateLimiter::new(limiter, rate_limit_headers);
let rate_limiter = MyRateLimiter::new(limiter, rate_limit_headers, metrics);
let svc = RateLimitServiceServer::new(rate_limiter);

let reflection_service = match grpc_reflection_service {
Expand Down Expand Up @@ -303,6 +311,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::Blocking(limiter)),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -361,6 +370,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::new(Configuration::default()).await.unwrap()),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -391,6 +401,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::new(Configuration::default()).await.unwrap()),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -433,6 +444,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::Blocking(limiter)),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -491,6 +503,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::Blocking(limiter)),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -556,6 +569,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::Blocking(limiter)),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down
73 changes: 51 additions & 22 deletions limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::http_api::request_types::{CheckAndReportInfo, Counter, Limit};
use crate::{Limiter, PROMETHEUS_METRICS};
use crate::prometheus_metrics::PrometheusMetrics;
use crate::Limiter;
use actix_web::{http::StatusCode, ResponseError};
use actix_web::{App, HttpServer};
use paperclip::actix::{
Expand All @@ -13,6 +14,24 @@ use paperclip::actix::{
use std::fmt;
use std::sync::Arc;

struct RateLimitData {
limiter: Arc<Limiter>,
metrics: Arc<PrometheusMetrics>,
}

impl RateLimitData {
fn new(limiter: Arc<Limiter>, metrics: Arc<PrometheusMetrics>) -> Self {
Self { limiter, metrics }
}
fn limiter(&self) -> &Limiter {
self.limiter.as_ref()
}

fn metrics(&self) -> &PrometheusMetrics {
self.metrics.as_ref()
}
}

#[api_v2_errors(429, 500)]
#[derive(Debug)]
enum ErrorResponse {
Expand Down Expand Up @@ -44,20 +63,20 @@ async fn status() -> web::Json<()> {
Json(())
}

#[tracing::instrument(skip(_data))]
#[tracing::instrument(skip(data))]
#[api_v2_operation]
async fn metrics(_data: web::Data<Arc<Limiter>>) -> String {
PROMETHEUS_METRICS.gather_metrics()
async fn metrics(data: web::Data<RateLimitData>) -> String {
data.get_ref().metrics().gather_metrics()
}

#[api_v2_operation]
#[tracing::instrument(skip(data))]
async fn get_limits(
data: web::Data<Arc<Limiter>>,
data: web::Data<RateLimitData>,
namespace: web::Path<String>,
) -> Result<web::Json<Vec<Limit>>, ErrorResponse> {
let namespace = &namespace.into_inner().into();
let limits = match data.get_ref().as_ref() {
let limits = match data.get_ref().limiter() {
Limiter::Blocking(limiter) => limiter.get_limits(namespace),
Limiter::Async(limiter) => limiter.get_limits(namespace),
};
Expand All @@ -68,11 +87,11 @@ async fn get_limits(
#[tracing::instrument(skip(data))]
#[api_v2_operation]
async fn get_counters(
data: web::Data<Arc<Limiter>>,
data: web::Data<RateLimitData>,
namespace: web::Path<String>,
) -> Result<web::Json<Vec<Counter>>, ErrorResponse> {
let namespace = namespace.into_inner().into();
let get_counters_result = match data.get_ref().as_ref() {
let get_counters_result = match data.get_ref().limiter() {
Limiter::Blocking(limiter) => limiter.get_counters(&namespace),
Limiter::Async(limiter) => limiter.get_counters(&namespace).await,
};
Expand All @@ -92,7 +111,7 @@ async fn get_counters(
#[tracing::instrument(skip(state))]
#[api_v2_operation]
async fn check(
state: web::Data<Arc<Limiter>>,
state: web::Data<RateLimitData>,
request: web::Json<CheckAndReportInfo>,
) -> Result<web::Json<()>, ErrorResponse> {
let CheckAndReportInfo {
Expand All @@ -101,7 +120,7 @@ async fn check(
delta,
} = request.into_inner();
let namespace = namespace.into();
let is_rate_limited_result = match state.get_ref().as_ref() {
let is_rate_limited_result = match state.get_ref().limiter() {
Limiter::Blocking(limiter) => limiter.is_rate_limited(&namespace, &values, delta),
Limiter::Async(limiter) => limiter.is_rate_limited(&namespace, &values, delta).await,
};
Expand All @@ -121,7 +140,7 @@ async fn check(
#[tracing::instrument(skip(data))]
#[api_v2_operation]
async fn report(
data: web::Data<Arc<Limiter>>,
data: web::Data<RateLimitData>,
request: web::Json<CheckAndReportInfo>,
) -> Result<web::Json<()>, ErrorResponse> {
let CheckAndReportInfo {
Expand All @@ -130,7 +149,7 @@ async fn report(
delta,
} = request.into_inner();
let namespace = namespace.into();
let update_counters_result = match data.get_ref().as_ref() {
let update_counters_result = match data.get_ref().limiter() {
Limiter::Blocking(limiter) => limiter.update_counters(&namespace, &values, delta),
Limiter::Async(limiter) => limiter.update_counters(&namespace, &values, delta).await,
};
Expand All @@ -144,7 +163,7 @@ async fn report(
#[tracing::instrument(skip(data))]
#[api_v2_operation]
async fn check_and_report(
data: web::Data<Arc<Limiter>>,
data: web::Data<RateLimitData>,
request: web::Json<CheckAndReportInfo>,
) -> Result<web::Json<()>, ErrorResponse> {
let CheckAndReportInfo {
Expand All @@ -153,7 +172,8 @@ async fn check_and_report(
delta,
} = request.into_inner();
let namespace = namespace.into();
let rate_limited_and_update_result = match data.get_ref().as_ref() {
let rate_limit_data = data.get_ref();
let rate_limited_and_update_result = match rate_limit_data.limiter() {
Limiter::Blocking(limiter) => {
limiter.check_rate_limited_and_update(&namespace, &values, delta, false)
}
Expand All @@ -167,20 +187,25 @@ async fn check_and_report(
match rate_limited_and_update_result {
Ok(is_rate_limited) => {
if is_rate_limited.limited {
PROMETHEUS_METRICS
rate_limit_data
.metrics()
.incr_limited_calls(&namespace, is_rate_limited.limit_name.as_deref());
Err(ErrorResponse::TooManyRequests)
} else {
PROMETHEUS_METRICS.incr_authorized_calls(&namespace);
rate_limit_data.metrics().incr_authorized_calls(&namespace);
Ok(Json(()))
}
}
Err(_) => Err(ErrorResponse::InternalServerError),
}
}

pub async fn run_http_server(address: &str, rate_limiter: Arc<Limiter>) -> std::io::Result<()> {
let data = web::Data::new(rate_limiter);
pub async fn run_http_server(
address: &str,
rate_limiter: Arc<Limiter>,
prometheus_metrics: Arc<PrometheusMetrics>,
) -> std::io::Result<()> {
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));

// This uses the paperclip crate to generate an OpenAPI spec.
// Ref: https://paperclip.waffles.space/actix-plugin.html
Expand Down Expand Up @@ -233,7 +258,8 @@ mod tests {
async fn test_metrics() {
let rate_limiter: Arc<Limiter> =
Arc::new(Limiter::new(Configuration::default()).await.unwrap());
let data = web::Data::new(rate_limiter);
let prometheus_metrics: Arc<PrometheusMetrics> = Arc::new(PrometheusMetrics::default());
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));
let app = test::init_service(
App::new()
.app_data(data.clone())
Expand All @@ -257,7 +283,8 @@ mod tests {

let limit = create_test_limit(&limiter, namespace, 10).await;
let rate_limiter: Arc<Limiter> = Arc::new(limiter);
let data = web::Data::new(rate_limiter);
let prometheus_metrics: Arc<PrometheusMetrics> = Arc::new(PrometheusMetrics::default());
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));
let app = test::init_service(
App::new()
.app_data(data.clone())
Expand All @@ -283,7 +310,8 @@ mod tests {
let namespace = "test_namespace";
let _limit = create_test_limit(&limiter, namespace, 1).await;
let rate_limiter: Arc<Limiter> = Arc::new(limiter);
let data = web::Data::new(rate_limiter);
let prometheus_metrics: Arc<PrometheusMetrics> = Arc::new(PrometheusMetrics::default());
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));
let app = test::init_service(
App::new()
.app_data(data.clone())
Expand Down Expand Up @@ -327,7 +355,8 @@ mod tests {
let _limit = create_test_limit(&limiter, namespace, 1).await;

let rate_limiter: Arc<Limiter> = Arc::new(limiter);
let data = web::Data::new(rate_limiter);
let prometheus_metrics: Arc<PrometheusMetrics> = Arc::new(PrometheusMetrics::default());
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));
let app = test::init_service(
App::new()
.app_data(data.clone())
Expand Down
21 changes: 10 additions & 11 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::http_api::server::run_http_server;
use crate::metrics::MetricsLayer;
use clap::{value_parser, Arg, ArgAction, Command};
use const_format::formatcp;
use lazy_static::lazy_static;
use limitador::counter::Counter;
use limitador::errors::LimitadorError;
use limitador::limit::Limit;
Expand Down Expand Up @@ -76,10 +75,6 @@ pub enum LimitadorServerError {
Internal(LimitadorError),
}

lazy_static! {
pub static ref PROMETHEUS_METRICS: PrometheusMetrics = PrometheusMetrics::default();
}

pub enum Limiter {
Blocking(RateLimiter),
Async(AsyncRateLimiter),
Expand Down Expand Up @@ -262,7 +257,7 @@ fn find_first_negative_limit(limits: &[Limit]) -> Option<usize> {

#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = {
let (config, prometheus_metrics) = {
let (config, version) = create_config();
println!("{LIMITADOR_HEADER} {version}");
let level = config.log_level.unwrap_or_else(|| {
Expand All @@ -276,9 +271,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::layer()
};

let limit_name_in_metrics = config.limit_name_in_labels;
let prometheus_metrics =
Arc::new(PrometheusMetrics::new_with_options(limit_name_in_metrics));
let metrics = prometheus_metrics.clone();

let metrics_layer = MetricsLayer::new().gather(
"should_rate_limit",
|timings| PROMETHEUS_METRICS.counter_access(Duration::from(timings)),
move |timings| metrics.counter_access(Duration::from(timings)),
vec!["datastore"],
);

Expand Down Expand Up @@ -310,11 +310,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.init();
};

PROMETHEUS_METRICS.set_use_limit_name_in_label(config.limit_name_in_labels);

info!("Version: {}", version);
info!("Using config: {:?}", config);
config
(config, prometheus_metrics)
};

let limit_file = config.limits_file.clone();
Expand Down Expand Up @@ -410,11 +408,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
envoy_rls_address.to_string(),
rate_limiter.clone(),
rate_limit_headers,
prometheus_metrics.clone(),
grpc_reflection_service,
));

info!("HTTP server starting on {}", http_api_address);
run_http_server(&http_api_address, rate_limiter.clone()).await?;
run_http_server(&http_api_address, rate_limiter.clone(), prometheus_metrics).await?;

Ok(())
}
Expand Down
Loading

0 comments on commit ae41cd9

Please sign in to comment.