Skip to content

Commit

Permalink
feat: logs websocket API
Browse files Browse the repository at this point in the history
  • Loading branch information
XOR-op committed Apr 4, 2023
1 parent 53f72e1 commit 3a283bf
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 38 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion boltconn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ tokio = { version = "1.25.0", features = ["rt", "rt-multi-thread", "net", "sync"
tokio-rustls = { version = "0.23.4", features = ["dangerous_configuration"] }
tokio-tungstenite = "0.18.0"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "std", "fmt"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "std", "fmt", "json"] }
trust-dns-proto = "0.22.0"
trust-dns-resolver = { version = "0.22.0", features = ['dns-over-rustls', 'dns-over-https-rustls', 'dns-over-https', 'dns-over-tls'] }
webpki-roots = "0.22.5"
Expand Down
20 changes: 17 additions & 3 deletions boltconn/src/external/api_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::config::LinkedState;
use crate::dispatch::{Dispatching, GeneralProxy};
use crate::external::{StreamLoggerHandle, StreamLoggerRecv};
use crate::network::configure::TunConfigure;
use crate::platform::process::ProcessInfo;
use crate::proxy::{AgentCenter, DumpedRequest, DumpedResponse, HttpCapturer, SessionManager};
Expand Down Expand Up @@ -33,6 +34,7 @@ pub struct ApiServer {
tun_configure: Arc<Mutex<TunConfigure>>,
reload_sender: Arc<tokio::sync::mpsc::Sender<()>>,
state: Arc<Mutex<LinkedState>>,
stream_logger: StreamLoggerHandle,
}

impl ApiServer {
Expand All @@ -46,6 +48,7 @@ impl ApiServer {
global_setting: Arc<Mutex<TunConfigure>>,
reload_sender: tokio::sync::mpsc::Sender<()>,
state: LinkedState,
stream_logger: StreamLoggerHandle,
) -> Self {
Self {
secret,
Expand All @@ -56,15 +59,16 @@ impl ApiServer {
dispatching,
reload_sender: Arc::new(reload_sender),
state: Arc::new(Mutex::new(state)),
stream_logger,
}
}

pub async fn run(self, port: u16) {
let secret = Arc::new(self.secret.clone());
let wrapper = move |r| Self::auth(secret.clone(), r);
let app = Router::new()
.route("/logs", get(Self::get_logs))
.route("/ws/traffic", get(Self::ws_get_traffic))
.route("/ws/logs", get(Self::ws_get_logs))
.route(
"/tun",
get(Self::get_tun_configure).put(Self::set_tun_configure),
Expand Down Expand Up @@ -134,8 +138,18 @@ impl ApiServer {
}))
}

async fn get_logs(State(_server): State<Self>) -> Json<serde_json::Value> {
Json(serde_json::Value::Null)
async fn ws_get_logs(State(server): State<Self>, ws: WebSocketUpgrade) -> impl IntoResponse {
let recv = server.stream_logger.subscribe();
ws.on_upgrade(move |socket| Self::ws_get_logs_inner(recv, socket))
}

async fn ws_get_logs_inner(mut recv: StreamLoggerRecv, mut socket: WebSocket) {
while let Ok(log) = recv.recv().await {
println!("Log is {:?}", log.as_bytes());
if socket.send(Message::Text(log)).await.is_err() {
return;
}
}
}

async fn get_traffic(State(server): State<Self>) -> Json<serde_json::Value> {
Expand Down
103 changes: 103 additions & 0 deletions boltconn/src/external/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use chrono::Timelike;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tracing_subscriber::fmt::{format::Writer, time::FormatTime, MakeWriter};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

#[derive(Clone)]
pub struct StreamLoggerHandle {
sender: broadcast::Sender<String>,
}

struct StreamLogger {
sender: broadcast::Sender<String>,
}

impl StreamLoggerHandle {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(15);
Self { sender }
}

pub fn subscribe(&self) -> StreamLoggerRecv {
StreamLoggerRecv {
receiver: self.sender.subscribe(),
}
}
}

impl std::io::Write for StreamLogger {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if let Ok(s) = std::str::from_utf8(buf) {
let _ = self.sender.send(s.to_string());
}
Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

pub struct StreamLoggerRecv {
receiver: broadcast::Receiver<String>,
}

impl StreamLoggerRecv {
pub async fn recv(&mut self) -> Result<String, RecvError> {
loop {
match self.receiver.recv().await {
Err(RecvError::Lagged(_)) => continue,
r => return r,
}
}
}
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub struct SystemTime;

impl FormatTime for SystemTime {
fn format_time(&self, w: &mut Writer<'_>) -> core::fmt::Result {
let time = chrono::prelude::Local::now();
write!(
w,
"{:02}:{:02}:{:02}.{:03}",
time.hour() % 24,
time.minute(),
time.second(),
time.timestamp_subsec_millis()
)
}
}

struct LoggerMaker {
logger: broadcast::Sender<String>,
}
impl<'a> MakeWriter<'a> for LoggerMaker {
type Writer = StreamLogger;

fn make_writer(&'a self) -> Self::Writer {
StreamLogger {
sender: self.logger.clone(),
}
}
}

pub fn init_tracing(logger: &StreamLoggerHandle) {
let stdout_layer = fmt::layer()
.compact()
.with_writer(std::io::stdout)
.with_timer(SystemTime::default());
let stream_layer = fmt::layer()
.json()
.with_writer(LoggerMaker {
logger: logger.sender.clone(),
})
.with_timer(SystemTime::default());
tracing_subscriber::registry()
.with(stdout_layer)
.with(stream_layer)
.with(EnvFilter::new("boltconn=trace"))
.init();
}
2 changes: 2 additions & 0 deletions boltconn/src/external/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod api_server;
mod logger;

pub use api_server::*;
pub use logger::*;
39 changes: 5 additions & 34 deletions boltconn/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ extern crate core;
use crate::config::{LinkedState, ProxySchema, RawRootCfg, RawState, RuleSchema};
use crate::dispatch::{Dispatching, DispatchingBuilder};
use crate::eavesdrop::{EavesdropModifier, HeaderModManager, UrlModManager};
use crate::external::ApiServer;
use crate::external::{ApiServer, StreamLoggerHandle};
use crate::network::configure::TunConfigure;
use crate::network::dns::{extract_address, new_bootstrap_resolver, parse_dns_config};
use crate::proxy::{AgentCenter, HttpCapturer, HttpInbound, Socks5Inbound, TunUdpInbound};
use chrono::Timelike;
use ipnet::Ipv4Net;
use is_root::is_root;
use network::tun_device::TunDevice;
Expand All @@ -31,9 +30,6 @@ use std::{fs, io};
use structopt::StructOpt;
use tokio::select;
use tracing::{event, Level};
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::time::FormatTime;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

mod adapter;
mod common;
Expand All @@ -46,23 +42,6 @@ mod platform;
mod proxy;
mod transport;

#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub struct SystemTime;

impl FormatTime for SystemTime {
fn format_time(&self, w: &mut Writer<'_>) -> core::fmt::Result {
let time = chrono::prelude::Local::now();
write!(
w,
"{:02}:{:02}:{:02}.{:03}",
(time.hour() + 8) % 24,
time.minute(),
time.second(),
time.timestamp_subsec_millis()
)
}
}

#[derive(Debug, StructOpt)]
#[structopt(name = "boltconn", about = "BoltConn core binary")]
struct Args {
Expand Down Expand Up @@ -148,17 +127,6 @@ fn mapping_rewrite(list: &[String]) -> anyhow::Result<(Vec<String>, Vec<String>)
Ok((url_list, header_list))
}

fn init_tracing() {
let formatting_layer = fmt::layer()
.compact()
.with_writer(std::io::stdout)
.with_timer(SystemTime::default());
tracing_subscriber::registry()
.with(formatting_layer)
.with(EnvFilter::new("boltconn=trace"))
.init();
}

fn main() -> ExitCode {
if !is_root() {
eprintln!("BoltConn must be run with root privilege.");
Expand All @@ -170,7 +138,9 @@ fn main() -> ExitCode {

// tokio and tracing
let rt = tokio::runtime::Runtime::new().expect("Tokio failed to initialize");
init_tracing();

let stream_logger = StreamLoggerHandle::new();
external::init_tracing(&stream_logger);

// interface
let (_, real_iface_name) = get_default_route().expect("failed to get default route");
Expand Down Expand Up @@ -288,6 +258,7 @@ fn main() -> ExitCode {
state_path: state_path(&config_path),
state,
},
stream_logger,
);

let dispatcher = {
Expand Down

0 comments on commit 3a283bf

Please sign in to comment.