Skip to content

Commit

Permalink
Follow OpenTelemetry Semantic Conventions by renaming and addition of…
Browse files Browse the repository at this point in the history
… new field (#91)

* hide contest submission and private problem submission in global list.

* Follow OpenTelemetry Semantic Conventions
  • Loading branch information
Eason0729 authored Sep 8, 2024
1 parent c5bd22a commit 88370ca
Show file tree
Hide file tree
Showing 17 changed files with 45 additions and 88 deletions.
2 changes: 2 additions & 0 deletions backend/migration/src/m20231207_000001_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ enum Submit {
Status,
Accept,
Score,
Public
}
#[derive(Iden)]
enum Testcase {
Expand Down Expand Up @@ -441,6 +442,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Submit::Lang).text().not_null())
.col(ColumnDef::new(Submit::Code).not_null().binary())
.col(ColumnDef::new(Submit::Memory).big_integer().null())
.col(ColumnDef::new(Submit::Public).boolean().not_null())
.col(
ColumnDef::new(Submit::PassCase)
.integer()
Expand Down
1 change: 1 addition & 0 deletions backend/src/controller/judger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl Judger {
lang: ActiveValue::Set(req.lang.clone().to_string()),
code: ActiveValue::Set(req.code.clone()),
memory: ActiveValue::Set(Some(req.memory_limit)),
public: ActiveValue::Set(problem.public),
..Default::default()
}
.save(db.as_ref())
Expand Down
17 changes: 5 additions & 12 deletions backend/src/controller/judger/route/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
use crossbeam_queue::SegQueue;
use dashmap::{DashMap, DashSet};
use tonic::{service::Interceptor, *};
use tracing::{debug_span, instrument, span, Instrument, Level};
use tracing::{debug_span, instrument, Instrument};
use uuid::Uuid;

use crate::config::{self, Judger as JudgerConfig};
Expand Down Expand Up @@ -126,30 +126,23 @@ impl Drop for ConnGuard {
/// keep discovering new Upstream from config(IE: docker swarm, static address)
///
/// occupy future, should generally be spawn in a green thread
#[instrument(skip(router), level = "info")]
async fn discover<I: Routable + Send>(
config: JudgerConfig,
router: Weak<Router>,
) -> Result<(), Error> {
let mut instance = I::new(config.clone())?;
let span = span!(Level::INFO, "service_discover", config_name = config.name);
loop {
match instance
.discover()
.instrument(debug_span!(parent:span.clone(), "try advance"))
.in_current_span()
.await
{
match instance.discover().in_current_span().await {
RouteStatus::NewConnection(detail) => {
let _span =
span!(parent:span.clone(),Level::DEBUG,"upstream_connect",uri=detail.uri);
let router = match router.upgrade() {
Some(x) => x,
None => break,
};
let uri = detail.uri.clone();
let (upstream, langs) = Upstream::new(detail).in_current_span().await?;
let _ = debug_span!("connected", uri = uri).entered();
for (uuid, lang) in langs.into_iter() {
let _ = tracing::span!(parent:&_span,Level::DEBUG,"lang_insert",uuid=?&uuid)
.entered();
router.langs.insert(lang);
loop {
match router.routing_table.get(&uuid) {
Expand Down
14 changes: 9 additions & 5 deletions backend/src/controller/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,13 @@ impl RateLimitController {
/// - [`TrafficType::Blacklist`]: dedicated rate limit (because verify token take time)
///
/// We identify [`TrafficType::Blacklist`] by ip blacklist,
/// whose entries is added when user fail to login or sent invaild token
#[instrument(skip_all, level = "debug", name = "rate_limit")]
/// whose entries is added when user fail to log in or sent invalid token
#[instrument(
skip_all,
level = "debug",
name = "check_traffic_type",
fields(traffic_type)
)]
pub async fn check<'a, T, F, Fut>(
&self,
req: &'a tonic::Request<T>,
Expand All @@ -211,14 +216,13 @@ impl RateLimitController {
{
TrafficType::Login(x) => Bucket::Login((self.user_limiter.clone(), x)),
TrafficType::Guest => Bucket::Guest((self.ip_limiter.clone(), addr)),
TrafficType::Blacklist(err) => {
tracing::warn!(msg = err.to_string(), "ip_blacklist");
TrafficType::Blacklist(_) => {
self.ip_blacklist.insert(addr, ());
Bucket::Blacklist(self.blacklist_limiter.clone())
}
};

debug!(traffic_type = res.get_name());
Span::current().record("traffic_type", &res.get_name());

Ok(res)
}
Expand Down
9 changes: 6 additions & 3 deletions backend/src/controller/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sea_orm::*;
use spin::Mutex;
use std::{ops::Deref, sync::Arc};
use tokio::time;
use tracing::{instrument, Instrument};
use tracing::{info, instrument, Instrument};

use crate::report_internal;

Expand Down Expand Up @@ -115,6 +115,8 @@ impl TokenController {
.in_current_span()
.await?;

info!(monotonic_counter.token.count = 1);

Ok((
base64::Engine::encode(&base64::engine::general_purpose::STANDARD_NO_PAD, rand),
expiry,
Expand Down Expand Up @@ -144,9 +146,12 @@ impl TokenController {
let token = match cache_result {
Some(token) => {
tracing::trace!(user_id = token.user_id, "cache_hit");
info!(monotonic_counter.token.cache_hit.count = 1);
token
}
None => {
info!(monotonic_counter.token.cache_missed.count = 1);

let token: CachedToken = (token::Entity::find()
.filter(token::Column::Rand.eq(rand.to_vec()))
.one(self.db.deref())
Expand All @@ -155,8 +160,6 @@ impl TokenController {
.ok_or(Error::NonExist)?)
.into();

tracing::trace!(user_id = token.user_id, "cache_missed");

self.cache.insert(rand, token.clone());

token
Expand Down
2 changes: 1 addition & 1 deletion backend/src/endpoint/announcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl Announcement for ArcServer {

let id = *model.id.as_ref();

info!(count.announcement = 1, id = id);
info!(count.announcement.count = 1, id = id);

Ok(id.into())
})
Expand Down
14 changes: 3 additions & 11 deletions backend/src/endpoint/chat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use tracing_futures::Instrument;

use grpc::backend::chat_server::*;

Expand Down Expand Up @@ -30,10 +31,7 @@ impl Chat for ArcServer {
err(level = "debug", Display)
)]
async fn create(&self, req: Request<CreateChatRequest>) -> Result<Response<Id>, Status> {
let (auth, req) = self
.parse_request_n(req, NonZeroU32!(5))
.in_current_span()
.await?;
let (auth, req) = self.rate_limit(req).in_current_span().await?;
let (user_id, _) = auth.assume_login()?;

req.bound_check()?;
Expand Down Expand Up @@ -98,13 +96,7 @@ impl Chat for ArcServer {
&self,
req: Request<ListChatRequest>,
) -> Result<Response<ListChatResponse>, Status> {
let (auth, req) = self
.parse_request_fn(req, |req| {
(req.size + req.offset.saturating_abs() as u64 / 5 + 2)
.try_into()
.unwrap_or(u32::MAX)
})
.await?;
let (auth, req) = self.rate_limit(req).in_current_span().await?;

req.bound_check()?;

Expand Down
2 changes: 1 addition & 1 deletion backend/src/endpoint/contest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl Contest for ArcServer {

let id = *model.id.as_ref();

info!(count.contest = 1, id = id);
info!(count.contest.count = 1, id = id);

Ok(id.into())
})
Expand Down
2 changes: 1 addition & 1 deletion backend/src/endpoint/education.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Education for ArcServer {

let id = *model.id.as_ref();

info!(count.education = 1, id = id);
info!(count.education.count = 1, id = id);

Ok(id.into())
})
Expand Down
1 change: 0 additions & 1 deletion backend/src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mod testcase;
mod token;
mod user;

use crate::NonZeroU32;
use grpc::backend::{Id, Order, *};
use sea_orm::{Value, *};
use std::ops::Deref;
Expand Down
4 changes: 2 additions & 2 deletions backend/src/endpoint/problem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Problem for ArcServer {
.await
.map_err(|_| Error::Retry)?;

info!(count.problem = 1, id = id);
info!(count.problem.count = 1, id = id);

Ok(id.into())
})
Expand Down Expand Up @@ -217,7 +217,7 @@ impl Problem for ArcServer {
if result.rows_affected == 0 {
return Err(Error::NotInDB);
}
info!(count.problem = -1, id = req.id);
info!(count.problem.count = -1, id = req.id);
Ok(())
})
.await
Expand Down
9 changes: 2 additions & 7 deletions backend/src/endpoint/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,7 @@ impl Submit for ArcServer {
err(level = "debug", Display)
)]
async fn follow(&self, req: Request<Id>) -> Result<Response<Self::FollowStream>, Status> {
let (_, req) = self
.parse_request_n(req, NonZeroU32!(5))
.in_current_span()
.await?;
let (_, req) = self.rate_limit(req).in_current_span().await?;

Ok(Response::new(self.judger.follow(req.id).unwrap_or_else(
|| {
Expand Down Expand Up @@ -273,9 +270,7 @@ impl Submit for ArcServer {

#[instrument(skip_all, level = "debug")]
async fn list_lang(&self, req: Request<()>) -> Result<Response<Languages>, Status> {
self.parse_request_n(req, NonZeroU32!(5))
.in_current_span()
.await?;
self.rate_limit(req).in_current_span().await?;

let list: Vec<_> = self
.judger
Expand Down
2 changes: 1 addition & 1 deletion backend/src/endpoint/testcase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Testcase for ArcServer {

let id = *model.id.as_ref();

info!(count.testcase = 1, id = id);
info!(count.testcase.count = 1, id = id);

Ok(id.into())
})
Expand Down
4 changes: 2 additions & 2 deletions backend/src/endpoint/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Token for ArcServer {
err(level = "debug", Display)
)]
async fn refresh(&self, req: Request<RefreshRequest>) -> Result<Response<TokenInfo>, Status> {
let (_, bucket) = self.parse_auth(&req).in_current_span().await?;
let (_, bucket) = self.authenticate_user(&req).in_current_span().await?;
let (meta, _, req) = req.into_parts();
bucket.cost(NonZeroU32::new(req.get_cost()).unwrap())?;

Expand Down Expand Up @@ -151,7 +151,7 @@ impl Token for ArcServer {
err(level = "debug", Display)
)]
async fn logout(&self, req: Request<()>) -> Result<Response<()>, Status> {
let (auth, bucket) = self.parse_auth(&req).in_current_span().await?;
let (auth, bucket) = self.authenticate_user(&req).in_current_span().await?;
auth.assume_login()?;
bucket.cost(NonZeroU32::new(10).unwrap())?;

Expand Down
1 change: 0 additions & 1 deletion backend/src/entity/contest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl WithAuthTrait for Paginator {}

impl Paginator {
pub fn new_text(text: String, start_from_end: bool) -> Self {
// FIXME: check dup text
Self::Text(TextPaginator::new(text, start_from_end))
}
pub fn new_sort(sort: Sort, start_from_end: bool) -> Self {
Expand Down
4 changes: 3 additions & 1 deletion backend/src/entity/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Model {
pub status: Option<u32>,
pub accept: bool,
pub score: u32,
pub public: bool,
}

#[derive(DerivePartialModel, FromQueryResult)]
Expand All @@ -50,6 +51,7 @@ pub struct PartialModel {
pub status: Option<u32>,
pub accept: bool,
pub score: u32,
pub public: bool,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -89,7 +91,7 @@ impl ActiveModelBehavior for ActiveModel {}
impl Filter for Entity {
#[instrument(skip_all, level = "debug")]
fn read_filter<S: QueryFilter + Send>(query: S, _: &Auth) -> Result<S, Error> {
Ok(query)
Ok(query.filter(Column::Public.eq(true)))
}

#[instrument(skip_all, level = "debug")]
Expand Down
45 changes: 6 additions & 39 deletions backend/src/util/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl Server {
/// It's useful for endpoints that require resolving identity
/// before rate limiting, such as logout
#[instrument(skip_all, level = "info")]
pub async fn parse_auth<T>(
pub async fn authenticate_user<T>(
&self,
req: &tonic::Request<T>,
) -> Result<(Auth, Bucket), tonic::Status> {
Expand All @@ -40,7 +40,6 @@ impl Server {
}
}
} else {
tracing::debug!("token_missing");
TrafficType::Guest
}
})
Expand All @@ -49,53 +48,21 @@ impl Server {
tracing::info!(auth = %auth);
Ok((auth, bucket))
}
/// parse request for payload and immediately rate
/// limiting base on a const cost
#[inline]
#[instrument(skip_all, level = "info", name = "parse")]
pub async fn parse_request_n<T>(
&self,
req: tonic::Request<T>,
permit: NonZeroU32,
) -> Result<(Auth, T), tonic::Status> {
let (auth, bucket) = self.parse_auth(&req).await?;

bucket.cost(permit)?;

Ok((auth, req.into_inner()))
}
/// parse request for payload and immediately rate
/// limiting base on a dynamic cost(calculated by a function)
#[inline]
pub async fn parse_request_fn<T, F>(
&self,
req: tonic::Request<T>,
f: F,
) -> Result<(Auth, T), tonic::Status>
where
F: FnOnce(&T) -> u32,
{
let (auth, bucket) = self.parse_auth(&req).await?;
let req = req.into_inner();

if let Some(cost) = NonZeroU32::new(f(&req)) {
bucket.cost(cost)?;
}

Ok((auth, req))
}
#[instrument(skip_all, level = "info")]
#[instrument(skip_all, level = "info", fields(cost))]
pub async fn rate_limit<T: RateLimit>(
&self,
req: tonic::Request<T>,
) -> Result<(Auth, T), tonic::Status> {
let (auth, bucket) = self.parse_auth(&req).in_current_span().await?;
let (auth, bucket) = self.authenticate_user(&req).in_current_span().await?;
bucket.cost(NonZeroU32::new(3).unwrap())?;
let req = req.into_inner();
tracing::debug!(bucket = %bucket);

if let Some(cost) = NonZeroU32::new(req.get_cost()) {
Span::current().record("cost", cost.saturating_add(3));
bucket.cost(cost)?;
} else {
Span::current().record("cost", 3);
}

Ok((auth, req))
Expand Down

0 comments on commit 88370ca

Please sign in to comment.