Skip to content

Commit

Permalink
aa
Browse files Browse the repository at this point in the history
  • Loading branch information
arloor committed Sep 27, 2024
1 parent fb6f225 commit d65f612
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 46 deletions.
58 changes: 37 additions & 21 deletions io_x/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::{fmt::Debug, pin::Pin, task::Context, task::Poll};

use pin_project_lite::pin_project;
use prometheus_client::metrics::{counter::Counter, family::Family};
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use std::io;
use std::time::Duration;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;

use futures_util::Future;
use tokio::time::{sleep, Instant, Sleep};
Expand Down Expand Up @@ -118,7 +118,6 @@ where
}
}


pin_project! {
/// enhance inner tcp stream with prometheus counter
#[derive(Debug)]
Expand All @@ -139,7 +138,7 @@ impl<T> TimeoutIO<T>
where
T: AsyncWrite + AsyncRead,
{
pub fn new(inner: T, timeout:Duration) -> Self {
pub fn new(inner: T, timeout: Duration) -> Self {
Self {
inner,
timeout,
Expand All @@ -149,7 +148,10 @@ where
/// set timeout
pub fn _set_timeout_pinned(mut self: Pin<&mut Self>, timeout: Duration) {
*self.as_mut().project().timeout = timeout;
self.project().idle_future.as_mut().reset(Instant::now() + timeout);
self.project()
.idle_future
.as_mut()
.reset(Instant::now() + timeout);
}
}

Expand All @@ -166,12 +168,15 @@ where
let idle_feature = pro.idle_future;
let timeout: &mut Duration = pro.timeout;
let read_poll = pro.inner.poll_read(cx, buf);
if read_poll.is_ready(){
if read_poll.is_ready() {
// 读到内容或者读到EOF等等,重置计时
idle_feature.reset(Instant::now() + *timeout);
}else if idle_feature.poll(cx).is_ready(){
} else if idle_feature.poll(cx).is_ready() {
// 没有读到内容,且已经timeout,则返回错误
return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("read idle for {:?}",timeout))));
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("read idle for {:?}", timeout),
)));
}
read_poll
}
Expand All @@ -190,10 +195,13 @@ where
let idle_feature = pro.idle_future;
let timeout: &mut Duration = pro.timeout;
let write_poll = pro.inner.poll_write(cx, buf);
if write_poll.is_ready(){
if write_poll.is_ready() {
idle_feature.reset(Instant::now() + *timeout);
}else if idle_feature.poll(cx).is_ready(){
return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("write idle for {:?}",timeout))));
} else if idle_feature.poll(cx).is_ready() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("write idle for {:?}", timeout),
)));
}
write_poll
}
Expand All @@ -203,10 +211,13 @@ where
let idle_feature = pro.idle_future;
let timeout: &mut Duration = pro.timeout;
let write_poll = pro.inner.poll_flush(cx);
if write_poll.is_ready(){
if write_poll.is_ready() {
idle_feature.reset(Instant::now() + *timeout);
}else if idle_feature.poll(cx).is_ready(){
return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("write idle for {:?}",timeout))));
} else if idle_feature.poll(cx).is_ready() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("write idle for {:?}", timeout),
)));
}
write_poll
}
Expand All @@ -219,10 +230,13 @@ where
let idle_feature = pro.idle_future;
let timeout: &mut Duration = pro.timeout;
let write_poll = pro.inner.poll_shutdown(cx);
if write_poll.is_ready(){
if write_poll.is_ready() {
idle_feature.reset(Instant::now() + *timeout);
}else if idle_feature.poll(cx).is_ready(){
return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("write idle for {:?}",timeout))));
} else if idle_feature.poll(cx).is_ready() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("write idle for {:?}", timeout),
)));
}
write_poll
}
Expand All @@ -240,12 +254,14 @@ where
let idle_feature = pro.idle_future;
let timeout: &mut Duration = pro.timeout;
let write_poll = pro.inner.poll_write_vectored(cx, bufs);
if write_poll.is_ready(){
if write_poll.is_ready() {
idle_feature.reset(Instant::now() + *timeout);
}else if idle_feature.poll(cx).is_ready(){
return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("write idle for {:?}",timeout))));
} else if idle_feature.poll(cx).is_ready() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("write idle for {:?}", timeout),
)));
}
write_poll
}
}

2 changes: 1 addition & 1 deletion prom_label/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct LabelImpl<R>(R)
where
R: Clone + Debug + Hash + PartialEq + Eq + EncodeLabelSet + 'static;

impl<R: Clone + Debug + Hash + PartialEq + Eq + EncodeLabelSet + 'static> LabelImpl<R> {
impl<R: Clone + Debug + Hash + PartialEq + Eq + EncodeLabelSet + 'static> LabelImpl<R> {
pub fn new(s: R) -> Self {
Self(s)
}
Expand Down
5 changes: 1 addition & 4 deletions rust_http_proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,7 @@ fn log_config(config: &Config) {
for ele in reverse_proxy_config.1 {
info!(
" {:<70} -> {}{}**",
format!(
"*://{}:*{}**",
reverse_proxy_config.0, ele.location
),
format!("*://{}:*{}**", reverse_proxy_config.0, ele.location),
ele.upstream.scheme_and_authority,
ele.upstream.replacement
);
Expand Down
16 changes: 8 additions & 8 deletions rust_http_proxy/src/net_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) const IGNORED_INTERFACES: [&str; 7] =

pub struct NetMonitor {
buffer: Arc<RwLock<VecDeque<TimeValue>>>,
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
cgroup_transmit_counter: cgroup_traffic::CgroupTransmitCounter,
}
const TOTAL_SECONDS: u64 = 900;
Expand All @@ -35,17 +35,17 @@ impl NetMonitor {
pub fn new() -> Result<NetMonitor, crate::DynError> {
Ok(NetMonitor {
buffer: Arc::new(RwLock::new(VecDeque::<TimeValue>::new())),
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
cgroup_transmit_counter: cgroup_traffic::init_self_cgroup_skb_monitor()?,
})
}

#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
pub(crate) fn get_cgroup_egress(&self) -> u64 {
self.cgroup_transmit_counter.get_egress()
}

#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
pub(crate) fn get_cgroup_ingress(&self) -> u64 {
self.cgroup_transmit_counter.get_ingress()
}
Expand Down Expand Up @@ -88,21 +88,21 @@ impl NetMonitor {
}
}

#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
use socket_filter::TransmitCounter;
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
static SOCKET_FILTER: std::sync::LazyLock<Arc<TransmitCounter>> = std::sync::LazyLock::new(|| {
Arc::new(TransmitCounter::new(
&IGNORED_INTERFACES,
Box::leak(Box::new(std::mem::MaybeUninit::uninit())),
))
});

#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
pub fn get_egress() -> u64 {
SOCKET_FILTER.get_egress()
}
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
pub fn get_ingress() -> u64 {
SOCKET_FILTER.get_ingress()
}
Expand Down
22 changes: 11 additions & 11 deletions rust_http_proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::{
};
use {io_x::CounterIO, io_x::TimeoutIO, prom_label::LabelImpl};

#[cfg(target_os = "linux")]
use crate::net_monitor::NetMonitor;
use http::{
header::{HOST, LOCATION},
Uri,
Expand Down Expand Up @@ -45,8 +47,6 @@ use prometheus_client::{
};
use rand::Rng;
use tokio::{net::TcpStream, pin};
#[cfg(target_os = "linux")]
use crate::net_monitor::NetMonitor;

pub struct ProxyHandler {
pub(crate) config: Config,
Expand All @@ -62,9 +62,9 @@ pub struct ProxyHandler {
pub(crate) struct Metrics {
pub(crate) http_req_counter: Family<LabelImpl<ReqLabels>, Counter>,
pub(crate) proxy_traffic: Family<LabelImpl<AccessLabel>, Counter>,
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
pub(crate) net_bytes: Family<LabelImpl<NetDirectionLabel>, Counter>,
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
pub(crate) cgroup_bytes: Family<LabelImpl<NetDirectionLabel>, Counter>,
}
const DEFAULT_HOST: &str = "default_host";
Expand Down Expand Up @@ -410,7 +410,7 @@ impl ProxyHandler {
}
}

#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
pub(crate) fn snapshot_metrics(&self) {
{
self.metrics
Expand Down Expand Up @@ -719,17 +719,17 @@ fn register_metrics(registry: &mut Registry) -> Metrics {
);
let proxy_traffic = Family::<LabelImpl<AccessLabel>, Counter>::default();
registry.register("proxy_traffic", "num proxy_traffic", proxy_traffic.clone());
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
let net_bytes = Family::<LabelImpl<NetDirectionLabel>, Counter>::default();
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
registry.register(
"net_bytes",
"num hosts net traffic in bytes",
net_bytes.clone(),
);
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
let cgroup_bytes = Family::<LabelImpl<NetDirectionLabel>, Counter>::default();
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
registry.register(
"cgroup_bytes",
"num this cgroup's net traffic in bytes",
Expand All @@ -742,9 +742,9 @@ fn register_metrics(registry: &mut Registry) -> Metrics {
Metrics {
http_req_counter,
proxy_traffic,
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
net_bytes,
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
cgroup_bytes,
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust_http_proxy/src/web_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn serve_http_request(
) {
return Ok(build_authenticate_resp(false));
}
#[cfg(feature = "bpf")]
#[cfg(all(target_os = "linux", feature = "bpf"))]
proxy_handler.snapshot_metrics();
serve_metrics(&proxy_handler.prom_registry, can_gzip).await
}
Expand Down

0 comments on commit d65f612

Please sign in to comment.