Skip to content

Commit

Permalink
feat: 🎸 my wrk
Browse files Browse the repository at this point in the history
  • Loading branch information
haohanyuzmx committed May 30, 2024
1 parent 89b4c6b commit 78244b6
Show file tree
Hide file tree
Showing 8 changed files with 1,143 additions and 71 deletions.
618 changes: 551 additions & 67 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@ mio = { version = "0.8.10",features = ["default"] }
log = "0.4.20"
tracing = { version = "0.1.40",features = ["log"] }
tracing-subscriber = "0.3"

futures = "0.3.30"
roaring = "0.10.2"
hyper = { version = "1.1.0",features = ["client","http1","server"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
bytes = "1.5.0"
rand="0.8.5"
num_cpus = "1.16.0"
trait-variant = "0.1.2"
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# wrk压测--rust版

## 背景

wrk 每次使用都需要自己指定连接数,是一个比较麻烦的调优方式,没啥迹象可循

## 实现

主要抽象有两个,一个是负责执行异步任务的运行时;一个是负责压测的程序。

主要思想是自己实现一部分异步运行时;在该运行时管理的连接中,如果有事件就直接去处理事件,如果没有时间;就可以根据某些指标创建新的连接。

其中,这个指标可能是基于压测任务反馈(背压),也能是基于用户给定的一些知识。组合这些内容可以使用如卡尔曼滤波,或者pid调参。

(当然上面的内容还是画饼,目前只有一直创建新的连接
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
mod transport_layer;
pub mod transport_layer;
38 changes: 37 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,39 @@
use bytes::Bytes;
use http_body_util::Empty;
use std::sync::{Arc};
use std::thread;

use wrk::transport_layer::processor::{HttpHandle};
use wrk::transport_layer::transport::TcpSteamMaker;
use wrk::transport_layer::Pressure;

fn main() {
println!("Hello, world!");
// env::set_var("RUST_BACKTRACE", "1");
//
// tracing_subscriber::fmt()
// .with_line_number(true)
// .with_max_level(tracing::Level::DEBUG)
// .init();
// info!("test out");
let rt = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);

let cpus = num_cpus::get();
let mut ht = None;
for i in 0..cpus - 1 {
let n_rt = rt.clone();
ht = Some(thread::spawn(|| {
let transport_conn = TcpSteamMaker::new("127.0.0.1:8080");
let processor =
HttpHandle::new("http://127.0.0.1:8080", Empty::<Bytes>::new()).unwrap();
let pressure = Pressure::new(n_rt, transport_conn, processor, None);
pressure.run();
}));
}
ht.unwrap().join();

}
249 changes: 248 additions & 1 deletion src/transport_layer/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,248 @@
mod transport;
use anyhow::Result;
use futures::task::ArcWake;
use roaring::RoaringBitmap;
use std::cell::RefCell;
use std::collections::{BTreeSet, HashMap};
use std::future::Future;
use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use tracing::{debug, info};

pub mod processor;
pub mod transport;

pub trait Conn: AsyncRead + AsyncWrite + Send + Unpin {}

impl Conn for TcpStream {}

#[trait_variant::make(Send)]
pub trait TransportConn<T: Conn> {
async fn new_conn(&self) -> Result<T>;
}

#[trait_variant::make]
pub trait Processor<T: Conn> {
async fn turn(&self, conn: Arc<RefCell<T>>) -> Result<()>;
}

pub struct Pressure<T, P, C>
where
C: Conn,
T: TransportConn<C>,
P: Processor<C>,
{
rt: Arc<Runtime>,
transport: T,
processor: P,
max: u32,
stop: AtomicBool,
phantom: PhantomData<C>,
}

struct Wk {
bitmap: Arc<Mutex<RoaringBitmap>>,
index: u32,
}

impl Wk {
fn new(bitmap: Arc<Mutex<RoaringBitmap>>, index: u32) -> Self {
Self { bitmap, index }
}
}

impl ArcWake for Wk {
fn wake_by_ref(arc_self: &Arc<Self>) {
debug!("been call wake {}", arc_self.index);
arc_self.bitmap.lock().unwrap().insert(arc_self.index);
}
}

impl<T, P, C> Pressure<T, P, C>
where
C: Conn,
T: TransportConn<C>,
P: Processor<C>,
{
pub fn new(rt: Arc<Runtime>, transport: T, processor: P, max: Option<u32>) -> Self {
Self {
rt,
transport,
processor,
max: max.unwrap_or(65535),
stop: AtomicBool::new(false),
phantom: Default::default(),
}
}
pub fn run(self) {
let mut connections = HashMap::new();
let mut pinned_futures = HashMap::new();
let mut wake_run = HashMap::new();
let bitmap = Arc::new(Mutex::new(RoaringBitmap::new()));
let mut un_use_seq = (0..=self.max).collect::<BTreeSet<u32>>();
let mut all_conn = 0;
loop {
info!("all conn{all_conn}, now conn{:}", connections.len());
//sleep(Duration::from_secs(2));
//connections.len()
if connections.len() < self.max as usize {
self.rt.block_on(async {
match self.transport.new_conn().await {
Ok(conn) => {
let seq = *un_use_seq.iter().next().unwrap();
un_use_seq.take(&seq);
bitmap.lock().unwrap().insert(seq);
connections.insert(seq, Arc::new(RefCell::new(conn)));
wake_run.insert(
seq,
futures::task::waker(Arc::new(Wk::new(bitmap.clone(), seq))),
);
all_conn += 1;
}
Err(_) => {
//TODO
}
};
});
}
let mut conn_loop_iter = 0;
loop {
conn_loop_iter += 1;
debug!("run loop iter exec num{conn_loop_iter}");
let iter: Vec<_> = {
let mut map = bitmap.lock().unwrap();
let iter = map.clone().into_iter();
let min = map.min().unwrap_or_default();
let max = map.max().unwrap_or_default();
map.remove_range(min..=max);

// DEBUG
iter.collect()
};
if conn_loop_iter == 1 {
info!(
"get all wake seq len {:?} fist {:?}, last {:?}",
iter.len(),
iter.first(),
iter.last()
);
}
for seq in iter {
let waker = wake_run.get(&seq).unwrap();
let mut cx = Context::from_waker(waker);
let poll_result = {
let pinned = match pinned_futures.get_mut(&seq) {
Some(pinned) => {
debug!("use pinned future");
pinned
}
None => {
debug!("new pinned {seq}");
// let conn = unsafe{
// let conn=connections.get(&seq).unwrap();
// #[allow(invalid_reference_casting)]
// &mut *(conn as *const C as *mut C)
// };
let conn = connections.get(&seq).unwrap().clone();
let pinned_owner = Box::pin(self.processor.turn(conn));
pinned_futures.insert(seq, pinned_owner);
pinned_futures.get_mut(&seq).unwrap()
}
};
debug!("start poll seq {seq}");
pinned.as_mut().poll(&mut cx)
};
debug!("end poll");
//debug!("get result {:?}",&poll_result);
match poll_result {
Poll::Ready(Ok(_)) => {
debug!("finish turn {seq}");
pinned_futures.remove(&seq);
waker.wake_by_ref();
}
Poll::Ready(Err(e)) => {
debug!("read or write err {e}");
pinned_futures.remove(&seq);
wake_run.remove(&seq);
connections.remove(&seq);
un_use_seq.insert(seq);
}
Poll::Pending => {}
}
}
let empty = { bitmap.lock().unwrap().is_empty() };
if empty {
break;
}
}
info!("one turn of loop num {conn_loop_iter}");
}
}
}

#[cfg(test)]
mod test {
use crate::transport_layer::processor::{test::http_server, Echo, HttpHandle};
use crate::transport_layer::transport::TcpSteamMaker;
use crate::transport_layer::Pressure;
use bytes::Bytes;
use http_body_util::Empty;
use log::info;

use crate::transport_layer::processor::test::tcp_echo_process_listener;
use std::env;
use std::fs::File;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::spawn;
use tracing::debug;

#[test]
fn pressure_echo() {
env::set_var("RUST_BACKTRACE", "1");

tracing_subscriber::fmt()
.with_line_number(true)
.with_max_level(tracing::Level::INFO)
.init();
info!("test out");
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
rt.spawn(tcp_echo_process_listener());

let transport_conn = TcpSteamMaker::new("127.0.0.1:8080");
let processor = Echo::new();
let pressure = Pressure::new(Arc::new(rt), transport_conn, processor, None);
pressure.run()
}

#[test]
fn http_test() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
info!("test out");
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = rt.block_on(async { TcpListener::bind(addr).await.unwrap() });
rt.spawn(http_server(listener));

let transport_conn = TcpSteamMaker::new("127.0.0.1:3000");
let processor = HttpHandle::new("http://127.0.0.1:3000", Empty::<Bytes>::new()).unwrap();
let pressure = Pressure::new(Arc::new(rt), transport_conn, processor, None);
pressure.run()
}
}
Loading

0 comments on commit 78244b6

Please sign in to comment.