From 3a283bf28b9e75e52c57a9889e6df254fbc4d8dd Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Tue, 4 Apr 2023 12:36:11 +0800 Subject: [PATCH] feat: logs websocket API --- Cargo.lock | 13 ++++ boltconn/Cargo.toml | 2 +- boltconn/src/external/api_server.rs | 20 +++++- boltconn/src/external/logger.rs | 103 ++++++++++++++++++++++++++++ boltconn/src/external/mod.rs | 2 + boltconn/src/main.rs | 39 ++--------- 6 files changed, 141 insertions(+), 38 deletions(-) create mode 100644 boltconn/src/external/logger.rs diff --git a/Cargo.lock b/Cargo.lock index 898f377..d0a0b90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2997,6 +2997,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" @@ -3007,12 +3017,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/boltconn/Cargo.toml b/boltconn/Cargo.toml index c0d9b9e..1af9572 100644 --- a/boltconn/Cargo.toml +++ b/boltconn/Cargo.toml @@ -51,7 +51,7 @@ tokio = { version = "1.25.0", features = ["rt", "rt-multi-thread", "net", "sync" tokio-rustls = { version = "0.23.4", features = ["dangerous_configuration"] } tokio-tungstenite = "0.18.0" tracing = "0.1.37" -tracing-subscriber = { version = "0.3.16", features = ["env-filter", "std", "fmt"] } +tracing-subscriber = { version = "0.3.16", features = ["env-filter", "std", "fmt", "json"] } trust-dns-proto = "0.22.0" trust-dns-resolver = { version = "0.22.0", features = ['dns-over-rustls', 'dns-over-https-rustls', 'dns-over-https', 'dns-over-tls'] } webpki-roots = "0.22.5" diff --git a/boltconn/src/external/api_server.rs b/boltconn/src/external/api_server.rs index 8fcca90..3a8b2c3 100644 --- a/boltconn/src/external/api_server.rs +++ b/boltconn/src/external/api_server.rs @@ -1,5 +1,6 @@ use crate::config::LinkedState; use crate::dispatch::{Dispatching, GeneralProxy}; +use crate::external::{StreamLoggerHandle, StreamLoggerRecv}; use crate::network::configure::TunConfigure; use crate::platform::process::ProcessInfo; use crate::proxy::{AgentCenter, DumpedRequest, DumpedResponse, HttpCapturer, SessionManager}; @@ -33,6 +34,7 @@ pub struct ApiServer { tun_configure: Arc>, reload_sender: Arc>, state: Arc>, + stream_logger: StreamLoggerHandle, } impl ApiServer { @@ -46,6 +48,7 @@ impl ApiServer { global_setting: Arc>, reload_sender: tokio::sync::mpsc::Sender<()>, state: LinkedState, + stream_logger: StreamLoggerHandle, ) -> Self { Self { secret, @@ -56,6 +59,7 @@ impl ApiServer { dispatching, reload_sender: Arc::new(reload_sender), state: Arc::new(Mutex::new(state)), + stream_logger, } } @@ -63,8 +67,8 @@ impl ApiServer { let secret = Arc::new(self.secret.clone()); let wrapper = move |r| Self::auth(secret.clone(), r); let app = Router::new() - .route("/logs", get(Self::get_logs)) .route("/ws/traffic", get(Self::ws_get_traffic)) + .route("/ws/logs", get(Self::ws_get_logs)) .route( "/tun", get(Self::get_tun_configure).put(Self::set_tun_configure), @@ -134,8 +138,18 @@ impl ApiServer { })) } - async fn get_logs(State(_server): State) -> Json { - Json(serde_json::Value::Null) + async fn ws_get_logs(State(server): State, ws: WebSocketUpgrade) -> impl IntoResponse { + let recv = server.stream_logger.subscribe(); + ws.on_upgrade(move |socket| Self::ws_get_logs_inner(recv, socket)) + } + + async fn ws_get_logs_inner(mut recv: StreamLoggerRecv, mut socket: WebSocket) { + while let Ok(log) = recv.recv().await { + println!("Log is {:?}", log.as_bytes()); + if socket.send(Message::Text(log)).await.is_err() { + return; + } + } } async fn get_traffic(State(server): State) -> Json { diff --git a/boltconn/src/external/logger.rs b/boltconn/src/external/logger.rs new file mode 100644 index 0000000..133786e --- /dev/null +++ b/boltconn/src/external/logger.rs @@ -0,0 +1,103 @@ +use chrono::Timelike; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::RecvError; +use tracing_subscriber::fmt::{format::Writer, time::FormatTime, MakeWriter}; +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +#[derive(Clone)] +pub struct StreamLoggerHandle { + sender: broadcast::Sender, +} + +struct StreamLogger { + sender: broadcast::Sender, +} + +impl StreamLoggerHandle { + pub fn new() -> Self { + let (sender, _) = broadcast::channel(15); + Self { sender } + } + + pub fn subscribe(&self) -> StreamLoggerRecv { + StreamLoggerRecv { + receiver: self.sender.subscribe(), + } + } +} + +impl std::io::Write for StreamLogger { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + if let Ok(s) = std::str::from_utf8(buf) { + let _ = self.sender.send(s.to_string()); + } + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +pub struct StreamLoggerRecv { + receiver: broadcast::Receiver, +} + +impl StreamLoggerRecv { + pub async fn recv(&mut self) -> Result { + loop { + match self.receiver.recv().await { + Err(RecvError::Lagged(_)) => continue, + r => return r, + } + } + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)] +pub struct SystemTime; + +impl FormatTime for SystemTime { + fn format_time(&self, w: &mut Writer<'_>) -> core::fmt::Result { + let time = chrono::prelude::Local::now(); + write!( + w, + "{:02}:{:02}:{:02}.{:03}", + time.hour() % 24, + time.minute(), + time.second(), + time.timestamp_subsec_millis() + ) + } +} + +struct LoggerMaker { + logger: broadcast::Sender, +} +impl<'a> MakeWriter<'a> for LoggerMaker { + type Writer = StreamLogger; + + fn make_writer(&'a self) -> Self::Writer { + StreamLogger { + sender: self.logger.clone(), + } + } +} + +pub fn init_tracing(logger: &StreamLoggerHandle) { + let stdout_layer = fmt::layer() + .compact() + .with_writer(std::io::stdout) + .with_timer(SystemTime::default()); + let stream_layer = fmt::layer() + .json() + .with_writer(LoggerMaker { + logger: logger.sender.clone(), + }) + .with_timer(SystemTime::default()); + tracing_subscriber::registry() + .with(stdout_layer) + .with(stream_layer) + .with(EnvFilter::new("boltconn=trace")) + .init(); +} diff --git a/boltconn/src/external/mod.rs b/boltconn/src/external/mod.rs index 16fd6b0..f22ea37 100644 --- a/boltconn/src/external/mod.rs +++ b/boltconn/src/external/mod.rs @@ -1,3 +1,5 @@ mod api_server; +mod logger; pub use api_server::*; +pub use logger::*; diff --git a/boltconn/src/main.rs b/boltconn/src/main.rs index fefe2a3..2b53117 100644 --- a/boltconn/src/main.rs +++ b/boltconn/src/main.rs @@ -5,11 +5,10 @@ extern crate core; use crate::config::{LinkedState, ProxySchema, RawRootCfg, RawState, RuleSchema}; use crate::dispatch::{Dispatching, DispatchingBuilder}; use crate::eavesdrop::{EavesdropModifier, HeaderModManager, UrlModManager}; -use crate::external::ApiServer; +use crate::external::{ApiServer, StreamLoggerHandle}; use crate::network::configure::TunConfigure; use crate::network::dns::{extract_address, new_bootstrap_resolver, parse_dns_config}; use crate::proxy::{AgentCenter, HttpCapturer, HttpInbound, Socks5Inbound, TunUdpInbound}; -use chrono::Timelike; use ipnet::Ipv4Net; use is_root::is_root; use network::tun_device::TunDevice; @@ -31,9 +30,6 @@ use std::{fs, io}; use structopt::StructOpt; use tokio::select; use tracing::{event, Level}; -use tracing_subscriber::fmt::format::Writer; -use tracing_subscriber::fmt::time::FormatTime; -use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; mod adapter; mod common; @@ -46,23 +42,6 @@ mod platform; mod proxy; mod transport; -#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)] -pub struct SystemTime; - -impl FormatTime for SystemTime { - fn format_time(&self, w: &mut Writer<'_>) -> core::fmt::Result { - let time = chrono::prelude::Local::now(); - write!( - w, - "{:02}:{:02}:{:02}.{:03}", - (time.hour() + 8) % 24, - time.minute(), - time.second(), - time.timestamp_subsec_millis() - ) - } -} - #[derive(Debug, StructOpt)] #[structopt(name = "boltconn", about = "BoltConn core binary")] struct Args { @@ -148,17 +127,6 @@ fn mapping_rewrite(list: &[String]) -> anyhow::Result<(Vec, Vec) Ok((url_list, header_list)) } -fn init_tracing() { - let formatting_layer = fmt::layer() - .compact() - .with_writer(std::io::stdout) - .with_timer(SystemTime::default()); - tracing_subscriber::registry() - .with(formatting_layer) - .with(EnvFilter::new("boltconn=trace")) - .init(); -} - fn main() -> ExitCode { if !is_root() { eprintln!("BoltConn must be run with root privilege."); @@ -170,7 +138,9 @@ fn main() -> ExitCode { // tokio and tracing let rt = tokio::runtime::Runtime::new().expect("Tokio failed to initialize"); - init_tracing(); + + let stream_logger = StreamLoggerHandle::new(); + external::init_tracing(&stream_logger); // interface let (_, real_iface_name) = get_default_route().expect("failed to get default route"); @@ -288,6 +258,7 @@ fn main() -> ExitCode { state_path: state_path(&config_path), state, }, + stream_logger, ); let dispatcher = {