Skip to content

Commit

Permalink
Refactor net_monitor.rs to use cgroup_skb_monitor for cgroup transmit…
Browse files Browse the repository at this point in the history
… counter initialization
  • Loading branch information
arloor committed Sep 26, 2024
1 parent 62d2941 commit 2c2d698
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 84 deletions.
72 changes: 57 additions & 15 deletions rust_http_proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ use tokio::{net::TcpStream, pin};

pub struct ProxyHandler {
pub(crate) config: Config,
prom_registry: Registry,
metrics: Metrics,
net_monitor: NetMonitor,
pub(crate) prom_registry: Registry,
pub(crate) metrics: Metrics,
pub(crate) net_monitor: NetMonitor,
http1_client: HttpClient<Incoming>,
reverse_client: legacy::Client<hyper_rustls::HttpsConnector<HttpConnector>, Incoming>,
redirect_bachpaths: Vec<RedirectBackpaths>,
Expand All @@ -60,6 +60,7 @@ pub struct ProxyHandler {
pub(crate) struct Metrics {
pub(crate) http_req_counter: Family<LabelImpl<ReqLabels>, Counter>,
pub(crate) proxy_traffic: Family<LabelImpl<AccessLabel>, Counter>,
#[cfg(feature = "bpf")]
pub(crate) net_bytes: Family<LabelImpl<NetDirectionLabel>, Counter>,
#[cfg(feature = "bpf")]
pub(crate) cgroup_bytes: Family<LabelImpl<NetDirectionLabel>, Counter>,
Expand Down Expand Up @@ -350,17 +351,9 @@ impl ProxyHandler {
"reject http GET/POST when ask_for_auth and basic_auth not empty",
));
}
web_func::serve_http_request(
req,
client_socket_addr,
&self.config,
path,
&self.net_monitor,
&self.metrics,
&self.prom_registry,
)
.await
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))
web_func::serve_http_request(self, req, client_socket_addr, path)
.await
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))
}

async fn reverse_proxy(
Expand Down Expand Up @@ -411,6 +404,53 @@ impl ProxyHandler {
}
}
}

#[cfg(feature = "bpf")]
pub(crate) fn snapshot_metrics(&self) {
{
self.metrics
.net_bytes
.get_or_create(&LabelImpl::new(NetDirectionLabel {
direction: "egress",
}))
.inner()
.store(
crate::net_monitor::get_egress(),
std::sync::atomic::Ordering::Relaxed,
);
self.metrics
.net_bytes
.get_or_create(&LabelImpl::new(NetDirectionLabel {
direction: "ingress",
}))
.inner()
.store(
crate::net_monitor::get_ingress(),
std::sync::atomic::Ordering::Relaxed,
);

self.metrics
.cgroup_bytes
.get_or_create(&LabelImpl::new(NetDirectionLabel {
direction: "egress",
}))
.inner()
.store(
self.net_monitor.get_cgroup_egress(),
std::sync::atomic::Ordering::Relaxed,
);
self.metrics
.cgroup_bytes
.get_or_create(&LabelImpl::new(NetDirectionLabel {
direction: "ingress",
}))
.inner()
.store(
self.net_monitor.get_cgroup_ingress(),
std::sync::atomic::Ordering::Relaxed,
);
}
}
}

fn mod_http1_proxy_req(req: &mut Request<Incoming>) -> io::Result<()> {
Expand Down Expand Up @@ -674,9 +714,10 @@ fn register_metrics(registry: &mut Registry) -> Metrics {
);
let proxy_traffic = Family::<LabelImpl<AccessLabel>, Counter>::default();
registry.register("proxy_traffic", "num proxy_traffic", proxy_traffic.clone());
#[cfg(feature = "bpf")]
let net_bytes = Family::<LabelImpl<NetDirectionLabel>, Counter>::default();
#[cfg(feature = "bpf")]
registry.register("net_bytes", "num net_bytes", net_bytes.clone());

#[cfg(feature = "bpf")]
let cgroup_bytes = Family::<LabelImpl<NetDirectionLabel>, Counter>::default();
#[cfg(feature = "bpf")]
Expand All @@ -688,6 +729,7 @@ fn register_metrics(registry: &mut Registry) -> Metrics {
Metrics {
http_req_counter,
proxy_traffic,
#[cfg(feature = "bpf")]
net_bytes,
#[cfg(feature = "bpf")]
cgroup_bytes,
Expand Down
83 changes: 14 additions & 69 deletions rust_http_proxy/src/web_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ use crate::proxy::build_authenticate_resp;
use crate::proxy::check_auth;
use crate::proxy::empty_body;
use crate::proxy::full_body;
use crate::proxy::Metrics;
use crate::proxy::NetDirectionLabel;
use crate::proxy::ProxyHandler;
use crate::proxy::ReqLabels;
use crate::Config;
use http::response::Builder;
use prom_label::LabelImpl;

Expand Down Expand Up @@ -42,18 +40,14 @@ const SERVER_NAME: &str = "arloor's creation";

static GZIP: &str = "gzip";

#[allow(clippy::too_many_arguments)]
pub async fn serve_http_request(
proxy_handler: &ProxyHandler,
req: &Request<impl Body>,
client_socket_addr: SocketAddr,
proxy_config: &Config,
path: &str,
_net_monitor: &NetMonitor,
metrics: &Metrics,
prom_registry: &Registry,
) -> Result<Response<BoxBody<Bytes, io::Error>>, Error> {
let web_content_path = &proxy_config.web_content_path;
let refer = &proxy_config.referer;
let web_content_path = &proxy_handler.config.web_content_path;
let refer = &proxy_handler.config.referer;
let referer_header = req
.headers()
.get(REFERER)
Expand All @@ -73,7 +67,7 @@ pub async fn serve_http_request(
return not_found();
}
}
let _hostname = &proxy_config.hostname;
let _hostname = &proxy_handler.config.hostname;
let _hostname = req
.uri()
.authority()
Expand All @@ -88,28 +82,22 @@ pub async fn serve_http_request(
#[cfg(target_os = "linux")]
(_, "/nt") => _count_stream(),
#[cfg(target_os = "linux")]
(_, "/speed") => _speed(_net_monitor, _hostname, can_gzip).await,
(_, "/speed") => _speed(&proxy_handler.net_monitor, _hostname, can_gzip).await,
#[cfg(target_os = "linux")]
(_, "/net") => _speed(_net_monitor, _hostname, can_gzip).await,
(_, "/net") => _speed(&proxy_handler.net_monitor, _hostname, can_gzip).await,
(_, "/metrics") => {
let (_, authed) = check_auth(
&proxy_config.basic_auth,
&proxy_handler.config.basic_auth,
req,
&client_socket_addr,
hyper::header::AUTHORIZATION,
);
if !authed {
return Ok(build_authenticate_resp(false));
}
serve_metrics(
prom_registry,
_net_monitor,
&metrics.net_bytes,
#[cfg(feature = "bpf")]
&metrics.cgroup_bytes,
can_gzip,
)
.await
#[cfg(feature = "bpf")]
proxy_handler.snapshot_metrics();
serve_metrics(&proxy_handler.prom_registry, can_gzip).await
}
(&Method::GET, path) => {
let is_outer_view_html = (path.ends_with('/') || path.ends_with(".html"))
Expand All @@ -136,7 +124,7 @@ pub async fn serve_http_request(
&r,
is_outer_view_html,
is_shell,
&metrics.http_req_counter,
&proxy_handler.metrics.http_req_counter,
referer_header,
path,
);
Expand Down Expand Up @@ -191,54 +179,11 @@ fn extract_search_engine_from_referer(referer: &str) -> Result<String, regex::Er
}

async fn serve_metrics(
registry: &Registry,
_net_monitor: &NetMonitor,
_net_bytes: &Family<LabelImpl<NetDirectionLabel>, Counter>,
#[cfg(feature = "bpf")] _cgroup_bytes: &Family<LabelImpl<NetDirectionLabel>, Counter>,
prom_registry: &Registry,
can_gzip: bool,
) -> Result<Response<BoxBody<Bytes, io::Error>>, Error> {
#[cfg(feature = "bpf")]
{
_net_bytes
.get_or_create(&LabelImpl::new(NetDirectionLabel {
direction: "egress",
}))
.inner()
.store(
crate::net_monitor::get_egress(),
std::sync::atomic::Ordering::Relaxed,
);
_net_bytes
.get_or_create(&LabelImpl::new(NetDirectionLabel {
direction: "ingress",
}))
.inner()
.store(
crate::net_monitor::get_ingress(),
std::sync::atomic::Ordering::Relaxed,
);

_cgroup_bytes
.get_or_create(&LabelImpl::new(NetDirectionLabel {
direction: "egress",
}))
.inner()
.store(
_net_monitor.get_cgroup_egress(),
std::sync::atomic::Ordering::Relaxed,
);
_cgroup_bytes
.get_or_create(&LabelImpl::new(NetDirectionLabel {
direction: "ingress",
}))
.inner()
.store(
_net_monitor.get_cgroup_ingress(),
std::sync::atomic::Ordering::Relaxed,
);
}
let mut buffer = String::new();
if let Err(e) = encode(&mut buffer, registry) {
if let Err(e) = encode(&mut buffer, prom_registry) {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(http::header::SERVER, SERVER_NAME)
Expand Down

0 comments on commit 2c2d698

Please sign in to comment.