diff --git a/Cargo.lock b/Cargo.lock index 2373169..898f377 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,6 +172,7 @@ checksum = "2fb79c228270dcf2426e74864cabc94babb5dbab01a4314e702d2f16540e1591" dependencies = [ "async-trait", "axum-core", + "base64 0.21.0", "bitflags", "bytes", "futures-util", @@ -189,8 +190,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-http", "tower-layer", diff --git a/boltconn/Cargo.toml b/boltconn/Cargo.toml index 9b6e97a..c0d9b9e 100644 --- a/boltconn/Cargo.toml +++ b/boltconn/Cargo.toml @@ -10,7 +10,7 @@ aho-corasick = "0.7.20" anyhow = "1.0.66" arrayref = "0.3.6" async-trait = "0.1.58" -axum = "0.6.1" +axum = { version = "0.6.1", features = ["ws"] } base64 = "0.21.0" boltapi = { path = "../boltapi" } boringtun = "0.5.2" diff --git a/boltconn/src/external/api_server.rs b/boltconn/src/external/api_server.rs index 0fccec7..27e624a 100644 --- a/boltconn/src/external/api_server.rs +++ b/boltconn/src/external/api_server.rs @@ -3,8 +3,10 @@ use crate::dispatch::{Dispatching, GeneralProxy}; use crate::network::configure::TunConfigure; use crate::platform::process::ProcessInfo; use crate::proxy::{AgentCenter, DumpedRequest, DumpedResponse, HttpCapturer, SessionManager}; -use axum::extract::{Path, Query, State}; +use axum::extract::ws::{Message, WebSocket}; +use axum::extract::{ws::WebSocketUpgrade, Path, Query, State}; use axum::middleware::map_request; +use axum::response::IntoResponse; use axum::routing::{delete, get, post}; use axum::{Json, Router}; use boltapi::{ @@ -61,6 +63,7 @@ impl ApiServer { let wrapper = move |r| Self::auth(secret.clone(), r); let app = Router::new() .route("/logs", get(Self::get_logs)) + .route("/ws/traffic", get(Self::ws_get_traffic)) .route( "/tun", get(Self::get_tun_configure).put(Self::set_tun_configure), @@ -143,6 +146,25 @@ impl ApiServer { })) } + async fn ws_get_traffic(State(server): State, ws: WebSocketUpgrade) -> impl IntoResponse { + ws.on_upgrade(move |socket| Self::ws_get_traffic_inner(server, socket)) + } + + async fn ws_get_traffic_inner(server: Self, mut socket: WebSocket) { + loop { + // send traffic with 1 second interval + let data = json!(TrafficResp { + upload: server.stat_center.get_upload().load(Ordering::Relaxed), + download: server.stat_center.get_download().load(Ordering::Relaxed), + }) + .to_string(); + if socket.send(Message::Text(data)).await.is_err() { + return; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + async fn get_all_conn(State(server): State) -> Json { let list = server.stat_center.get_copy().await; let mut result = Vec::new();