Skip to content

Commit

Permalink
feat: 🎸 add ci lint and extend action
Browse files Browse the repository at this point in the history
  • Loading branch information
haohanyuzmx committed Jun 10, 2024
1 parent 78244b6 commit 1384f94
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 139 deletions.
43 changes: 43 additions & 0 deletions .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: CICD


env:
RUST_SRV: "1.78.0"

on:
pull_request:
branches:
- "**"
push:
branches: [ main ]

jobs:
style:
name: Style
runs-on: ubuntu-latest
# runs-on: ${{ matrix.job.os }}
# strategy:
# fail-fast: false
# matrix:
# job: [ { os: ubuntu-latest }, { os: macos-latest }, { os: windows-latest } ]
steps:
- uses: actions/checkout@v1
- name: Install `rust` toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ env.RUST_SRV }}
override: true
profile: minimal # minimal component installation (ie, no documentation)
components: rustfmt, clippy
- name: "`fmt` testing"
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- name: "`clippy` testing"
if: success() || failure() # run regardless of prior step ("`fmt` testing") success/failure
uses: actions-rs/cargo@v1
with:
command: clippy
args: --tests -- -D warnings

7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
fmtCheck:
cargo fmt --all -- --check

clippyCheck:
cargo --tests -- -D warnings

precheck: fmtCheck clippyCheck
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ wrk 每次使用都需要自己指定连接数,是一个比较麻烦的调优

主要抽象有两个,一个是负责执行异步任务的运行时;一个是负责压测的程序。

主要思想是自己实现一部分异步运行时;在该运行时管理的连接中,如果有事件就直接去处理事件,如果没有时间;就可以根据某些指标创建新的连接。
主要思想是自己实现一部分异步运行时;在该运行时管理的连接中,如果有事件就直接去处理事件,如果没有事件;就可以根据某些指标创建新的连接。

其中,这个指标可能是基于压测任务反馈(背压),也能是基于用户给定的一些知识。组合这些内容可以使用如卡尔曼滤波,或者pid调参。

Expand Down
14 changes: 7 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use bytes::Bytes;
use http_body_util::Empty;
use std::sync::{Arc};
use std::io::stdout;
use std::sync::Arc;
use std::thread;

use wrk::transport_layer::processor::{HttpHandle};
use wrk::transport_layer::processor::Http1Handle;
use wrk::transport_layer::transport::TcpSteamMaker;
use wrk::transport_layer::Pressure;

Expand All @@ -24,16 +25,15 @@ fn main() {

let cpus = num_cpus::get();
let mut ht = None;
for i in 0..cpus - 1 {
for _ in 0..cpus - 1 {
let n_rt = rt.clone();
ht = Some(thread::spawn(|| {
let transport_conn = TcpSteamMaker::new("127.0.0.1:8080");
let processor =
HttpHandle::new("http://127.0.0.1:8080", Empty::<Bytes>::new()).unwrap();
Http1Handle::new("http://127.0.0.1:8080", Empty::<Bytes>::new()).unwrap();
let pressure = Pressure::new(n_rt, transport_conn, processor, None);
pressure.run();
pressure.run(Box::new(stdout()));
}));
}
ht.unwrap().join();

let _ = ht.unwrap().join();
}
71 changes: 25 additions & 46 deletions src/transport_layer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use anyhow::Result;
use futures::task::ArcWake;
use odd::Wk;
use roaring::RoaringBitmap;
use std::cell::RefCell;
use std::collections::{BTreeSet, HashMap};
use std::fmt::Display;
use std::future::Future;
use std::io::Write;
use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use tracing::{debug, info};

pub mod odd;
pub mod processor;
pub mod transport;

Expand All @@ -26,59 +29,44 @@ pub trait TransportConn<T: Conn> {
}

#[trait_variant::make]
pub trait Processor<T: Conn> {
async fn turn(&self, conn: Arc<RefCell<T>>) -> Result<()>;
pub trait Processor<C: Conn, S: Display> {
async fn turn(&self, conn: Rc<RefCell<C>>) -> Result<S>;
}

pub struct Pressure<T, P, C>
pub struct Pressure<T, P, C, S>
where
C: Conn,
S: Display,
T: TransportConn<C>,
P: Processor<C>,
P: Processor<C, S>,
{
rt: Arc<Runtime>,
transport: T,
processor: P,
max: u32,
stop: AtomicBool,
phantom: PhantomData<C>,
conn: PhantomData<C>,
status: PhantomData<S>,
}

struct Wk {
bitmap: Arc<Mutex<RoaringBitmap>>,
index: u32,
}

impl Wk {
fn new(bitmap: Arc<Mutex<RoaringBitmap>>, index: u32) -> Self {
Self { bitmap, index }
}
}

impl ArcWake for Wk {
fn wake_by_ref(arc_self: &Arc<Self>) {
debug!("been call wake {}", arc_self.index);
arc_self.bitmap.lock().unwrap().insert(arc_self.index);
}
}

impl<T, P, C> Pressure<T, P, C>
impl<T, P, C, S> Pressure<T, P, C, S>
where
C: Conn,
S: Display,
T: TransportConn<C>,
P: Processor<C>,
P: Processor<C, S>,
{
pub fn new(rt: Arc<Runtime>, transport: T, processor: P, max: Option<u32>) -> Self {
Self {
rt,
transport,
processor,
max: max.unwrap_or(65535),
stop: AtomicBool::new(false),
phantom: Default::default(),
conn: Default::default(),
status: Default::default(),
}
}
pub fn run(self) {

pub fn run(self, _collect: Box<dyn Write>) {
let mut connections = HashMap::new();
let mut pinned_futures = HashMap::new();
let mut wake_run = HashMap::new();
Expand All @@ -96,7 +84,7 @@ where
let seq = *un_use_seq.iter().next().unwrap();
un_use_seq.take(&seq);
bitmap.lock().unwrap().insert(seq);
connections.insert(seq, Arc::new(RefCell::new(conn)));
connections.insert(seq, Rc::new(RefCell::new(conn)));
wake_run.insert(
seq,
futures::task::waker(Arc::new(Wk::new(bitmap.clone(), seq))),
Expand Down Expand Up @@ -142,11 +130,6 @@ where
}
None => {
debug!("new pinned {seq}");
// let conn = unsafe{
// let conn=connections.get(&seq).unwrap();
// #[allow(invalid_reference_casting)]
// &mut *(conn as *const C as *mut C)
// };
let conn = connections.get(&seq).unwrap().clone();
let pinned_owner = Box::pin(self.processor.turn(conn));
pinned_futures.insert(seq, pinned_owner);
Expand Down Expand Up @@ -186,7 +169,7 @@ where

#[cfg(test)]
mod test {
use crate::transport_layer::processor::{test::http_server, Echo, HttpHandle};
use crate::transport_layer::processor::{test::http_server, Echo, Http1Handle};
use crate::transport_layer::transport::TcpSteamMaker;
use crate::transport_layer::Pressure;
use bytes::Bytes;
Expand All @@ -195,14 +178,10 @@ mod test {

use crate::transport_layer::processor::test::tcp_echo_process_listener;
use std::env;
use std::fs::File;
use std::io::stdout;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::spawn;
use tracing::debug;

#[test]
fn pressure_echo() {
Expand All @@ -222,7 +201,7 @@ mod test {
let transport_conn = TcpSteamMaker::new("127.0.0.1:8080");
let processor = Echo::new();
let pressure = Pressure::new(Arc::new(rt), transport_conn, processor, None);
pressure.run()
pressure.run(Box::new(stdout()))
}

#[test]
Expand All @@ -241,8 +220,8 @@ mod test {
rt.spawn(http_server(listener));

let transport_conn = TcpSteamMaker::new("127.0.0.1:3000");
let processor = HttpHandle::new("http://127.0.0.1:3000", Empty::<Bytes>::new()).unwrap();
let processor = Http1Handle::new("http://127.0.0.1:3000", Empty::<Bytes>::new()).unwrap();
let pressure = Pressure::new(Arc::new(rt), transport_conn, processor, None);
pressure.run()
pressure.run(Box::new(stdout()))
}
}
89 changes: 89 additions & 0 deletions src/transport_layer/odd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use futures::task::ArcWake;
use roaring::RoaringBitmap;
use std::cell::RefCell;
use std::fmt::{Display, Formatter};
use std::io;
use std::io::IoSlice;
use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
use tracing::debug;

pub struct WarpConn(pub Rc<RefCell<TcpStream>>);

unsafe impl Send for WarpConn {}

macro_rules! pinc {
($self:ident) => {
Pin::new($self.get_mut().0.borrow_mut().deref_mut())
};
}
impl AsyncRead for WarpConn {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
pinc!(self).poll_read(cx, buf)
}
}

impl AsyncWrite for WarpConn {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
pinc!(self).poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
pinc!(self).poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
pinc!(self).poll_shutdown(cx)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
pinc!(self).poll_write_vectored(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
self.0.borrow().is_write_vectored()
}
}

pub struct Wk {
bitmap: Arc<Mutex<RoaringBitmap>>,
index: u32,
}

impl Wk {
pub fn new(bitmap: Arc<Mutex<RoaringBitmap>>, index: u32) -> Self {
Self { bitmap, index }
}
}

impl ArcWake for Wk {
fn wake_by_ref(arc_self: &Arc<Self>) {
debug!("been call wake {}", arc_self.index);
arc_self.bitmap.lock().unwrap().insert(arc_self.index);
}
}

pub struct Empty;

impl Display for Empty {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "EMPTY")
}
}
Loading

0 comments on commit 1384f94

Please sign in to comment.