From 1f6fb205b9dde22bd2073589408f713a0d0b1de9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=A9=E7=80=9A=E5=AE=87?= <2805890662@qq.com> Date: Wed, 12 Jun 2024 10:21:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20impl=20muti=5Fpacket=20h?= =?UTF-8?q?ttp&improve=20pressure?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/cicd.yml | 6 + Cargo.lock | 283 ++++++++++++++----------------- Cargo.toml | 15 +- src/main.rs | 20 ++- src/transport_layer/mod.rs | 159 ++++++++++++----- src/transport_layer/odd.rs | 6 +- src/transport_layer/processor.rs | 277 ++++++++++++++++++++++-------- src/transport_layer/transport.rs | 39 ++++- 8 files changed, 517 insertions(+), 288 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 03f011e..5433622 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -40,4 +40,10 @@ jobs: with: command: clippy args: --tests -- -D warnings + - name: Test + if: success() + uses: actions-rs/cargo@v1 + with: + command: test + diff --git a/Cargo.lock b/Cargo.lock index 7bf9e8d..23c101b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" dependencies = [ "gimli", ] @@ -37,9 +37,9 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "backtrace" -version = "0.3.71" +version = "0.3.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" +checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" dependencies = [ "addr2line", "cc", @@ -52,15 +52,15 @@ dependencies = [ [[package]] name = "bitflags" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "bytemuck" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5" +checksum = "6fd4c6dcc3b0aea2f5c0b4b82c2b15fe39ddbc76041a310848f4706edf76bb31" [[package]] name = "byteorder" @@ -70,15 +70,18 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "cc" -version = "1.0.98" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f" +checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48" +dependencies = [ + "shlex", +] [[package]] name = "cfg-if" @@ -200,9 +203,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.28.1" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" [[package]] name = "h2" @@ -248,9 +251,9 @@ dependencies = [ [[package]] name = "http-body" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", "http", @@ -258,12 +261,12 @@ dependencies = [ [[package]] name = "http-body-util" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", - "futures-core", + "futures-util", "http", "http-body", "pin-project-lite", @@ -271,9 +274,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.8.0" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" [[package]] name = "httpdate" @@ -283,9 +286,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.3.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", @@ -304,9 +307,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d8d52be92d09acc2e01dddb7fde3ad983fc6489c7db4837e605bc3fca4cb63e" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" dependencies = [ "bytes", "futures-channel", @@ -324,9 +327,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c" dependencies = [ "equivalent", "hashbrown", @@ -340,15 +343,15 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "lazy_static" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.155" +version = "0.2.156" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" [[package]] name = "lock_api" @@ -362,35 +365,35 @@ dependencies = [ [[package]] name = "log" -version = "0.4.21" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "memchr" -version = "2.7.2" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "miniz_oxide" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87dfd01fe195c66b572b37921ad8803d010623c0aca821bea2302239d155cdae" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" dependencies = [ "adler", ] [[package]] name = "mio" -version = "0.8.11" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi", "libc", - "log", "wasi", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -415,9 +418,9 @@ dependencies = [ [[package]] name = "object" -version = "0.32.2" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" dependencies = [ "memchr", ] @@ -454,7 +457,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets 0.52.5", + "windows-targets", ] [[package]] @@ -491,15 +494,18 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] [[package]] name = "proc-macro2" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b33eb56c327dec362a9e55b3ad14f9d2f0904fb5a5b03b513ab5465399e9f43" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] @@ -545,18 +551,18 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.1" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" dependencies = [ "bitflags", ] [[package]] name = "roaring" -version = "0.10.4" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b26f4c25a604fcb3a1bcd96dd6ba37c93840de95de8198d94c0d571a74a804d1" +checksum = "8f4b84ba6e838ceb47b41de5194a60244fac43d9fe03b71dbe8c5a201081d6d1" dependencies = [ "bytemuck", "byteorder", @@ -583,6 +589,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -614,14 +626,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] name = "syn" -version = "2.0.66" +version = "2.0.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" +checksum = "1fceb41e3d546d0bd83421d3409b1460cc7444cd389341a4c880fe7a042cb3d7" dependencies = [ "proc-macro2", "quote", @@ -640,28 +652,27 @@ dependencies = [ [[package]] name = "tokio" -version = "1.37.0" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -698,15 +709,15 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -832,144 +843,78 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows-sys" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" -dependencies = [ - "windows-targets 0.48.5", -] - [[package]] name = "windows-sys" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.5", -] - -[[package]] -name = "windows-targets" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" -dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows-targets", ] [[package]] name = "windows-targets" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.5", - "windows_aarch64_msvc 0.52.5", - "windows_i686_gnu 0.52.5", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", "windows_i686_gnullvm", - "windows_i686_msvc 0.52.5", - "windows_x86_64_gnu 0.52.5", - "windows_x86_64_gnullvm 0.52.5", - "windows_x86_64_msvc 0.52.5", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] [[package]] name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" -version = "0.48.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" - -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" [[package]] name = "windows_i686_gnullvm" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" - -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" -version = "0.48.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" -version = "0.48.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "wrk" @@ -978,12 +923,13 @@ dependencies = [ "anyhow", "bytes", "futures", + "futures-util", "http-body-util", "hyper", "hyper-util", "log", - "mio", "num_cpus", + "pin-project", "rand", "roaring", "tokio", @@ -991,3 +937,24 @@ dependencies = [ "tracing-subscriber", "trait-variant", ] + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index 270a5c2..d55b5ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,18 +6,19 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow="1.0.77" -tokio={version = "1.35.1",features = ["full"]} -mio = { version = "0.8.10",features = ["default"] } -log = "0.4.20" +anyhow="1.0.86" +tokio={version = "1.39.3",features = ["full"]} +log = "0.4.22" tracing = { version = "0.1.40",features = ["log"] } tracing-subscriber = "0.3" futures = "0.3.30" -roaring = "0.10.2" -hyper = { version = "1.1.0",features = ["client","http1","server"] } +roaring = "0.10.6" +hyper = { version = "1.4.1",features = ["client","http1","server"] } http-body-util = "0.1" hyper-util = { version = "0.1", features = ["full"] } -bytes = "1.5.0" +bytes = "1.7.1" rand="0.8.5" num_cpus = "1.16.0" trait-variant = "0.1.2" +futures-util = "0.3.30" +pin-project = "1.1.5" diff --git a/src/main.rs b/src/main.rs index 2541623..6dc2b81 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,19 @@ use bytes::Bytes; use http_body_util::Empty; -use std::io::stdout; use std::sync::Arc; use std::thread; - +use std::time::Duration; use wrk::transport_layer::processor::Http1Handle; -use wrk::transport_layer::transport::TcpSteamMaker; +use wrk::transport_layer::transport::{TcpSteamMaker, TimeOutStop}; use wrk::transport_layer::Pressure; fn main() { // env::set_var("RUST_BACKTRACE", "1"); // - // tracing_subscriber::fmt() - // .with_line_number(true) - // .with_max_level(tracing::Level::DEBUG) - // .init(); + tracing_subscriber::fmt() + .with_line_number(true) + .with_max_level(tracing::Level::INFO) + .init(); // info!("test out"); let rt = Arc::new( tokio::runtime::Builder::new_multi_thread() @@ -23,16 +22,19 @@ fn main() { .unwrap(), ); + //let a = Arc::new(AtomicBool::new(false)); + let a = TimeOutStop::new(Duration::from_secs(5)); let cpus = num_cpus::get(); let mut ht = None; - for _ in 0..cpus - 1 { + for _ in 0..cpus - 2 { let n_rt = rt.clone(); + let a_c = a.clone(); ht = Some(thread::spawn(|| { let transport_conn = TcpSteamMaker::new("127.0.0.1:8080"); let processor = Http1Handle::new("http://127.0.0.1:8080", Empty::::new()).unwrap(); let pressure = Pressure::new(n_rt, transport_conn, processor, None); - pressure.run(Box::new(stdout())); + pressure.run(a_c); })); } let _ = ht.unwrap().join(); diff --git a/src/transport_layer/mod.rs b/src/transport_layer/mod.rs index 3565768..38c6cd1 100644 --- a/src/transport_layer/mod.rs +++ b/src/transport_layer/mod.rs @@ -5,11 +5,12 @@ use std::cell::RefCell; use std::collections::{BTreeSet, HashMap}; use std::fmt::Display; use std::future::Future; -use std::io::Write; use std::marker::PhantomData; use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; +use std::time; +use std::time::{Duration, SystemTime}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio::runtime::Runtime; @@ -29,14 +30,40 @@ pub trait TransportConn { } #[trait_variant::make] -pub trait Processor { +pub trait Processor { async fn turn(&self, conn: Rc>) -> Result; } +// TODO:扩展,调节run过程中的状态,如sleep一段时间,关闭部分连接等。 +pub trait Stop { + fn stop(&mut self, status: &HashMap>>) -> bool + where + T: Instruct; +} + +pub trait Instruct: Display + Default { + // 0-1的归一化数值,指示压力状况 + fn instruct(&self) -> f64 { + 0f64 + } +} + +// per conn? +#[derive(Default)] +pub struct Statistics { + // pull间隔 + pull_interval: Vec, + // 每次turn完成花费的时间 + // 理论上这个的len就是turn的次数 + turn_interval: Vec, + // Processor 内输出的信息 + turn_collect: Vec, +} + pub struct Pressure where C: Conn, - S: Display, + S: Instruct, T: TransportConn, P: Processor, { @@ -51,7 +78,7 @@ where impl Pressure where C: Conn, - S: Display, + S: Instruct, T: TransportConn, P: Processor, { @@ -65,18 +92,34 @@ where status: Default::default(), } } - - pub fn run(self, _collect: Box) { + pub fn run(self, mut stop: Stp) -> Vec> + where + Stp: Stop, + { let mut connections = HashMap::new(); let mut pinned_futures = HashMap::new(); + let mut pinned_futures_start_time = HashMap::new(); + let mut pinned_poll_interval = HashMap::new(); let mut wake_run = HashMap::new(); + let mut conn_sta = HashMap::new(); let bitmap = Arc::new(Mutex::new(RoaringBitmap::new())); let mut un_use_seq = (0..=self.max).collect::>(); let mut all_conn = 0; + let begin = time::SystemTime::now(); loop { - info!("all conn{all_conn}, now conn{:}", connections.len()); - //sleep(Duration::from_secs(2)); - //connections.len() + if stop.stop(&conn_sta) { + let mut all_connected: usize = 0; + let mut all_turn = 0; + let status: Vec> = conn_sta.into_values().flatten().collect(); + for sta in &status { + all_connected += 1; + all_turn += sta.turn_interval.len(); + } + let cost = time::SystemTime::now().duration_since(begin).unwrap(); + info!(allConn=%all_connected,allReq=%all_turn,cost=?cost); + return status; + } + debug!("all conn{all_conn}, now conn{:}", connections.len()); if connections.len() < self.max as usize { self.rt.block_on(async { match self.transport.new_conn().await { @@ -89,6 +132,10 @@ where seq, futures::task::waker(Arc::new(Wk::new(bitmap.clone(), seq))), ); + conn_sta + .entry(seq) + .or_insert(Vec::new()) + .push(Statistics::default()); all_conn += 1; } Err(_) => { @@ -98,9 +145,10 @@ where }); } let mut conn_loop_iter = 0; + let mut pre = vec![]; loop { conn_loop_iter += 1; - debug!("run loop iter exec num{conn_loop_iter}"); + debug!(num = %conn_loop_iter,"run loop"); let iter: Vec<_> = { let mut map = bitmap.lock().unwrap(); let iter = map.clone().into_iter(); @@ -111,15 +159,18 @@ where // DEBUG iter.collect() }; - if conn_loop_iter == 1 { - info!( - "get all wake seq len {:?} fist {:?}, last {:?}", - iter.len(), - iter.first(), - iter.last() - ); - } for seq in iter { + let status = conn_sta.get_mut(&seq).unwrap().last_mut().unwrap(); + let loop_start = time::SystemTime::now(); + let interval_entry = pinned_poll_interval.entry(seq); + interval_entry + .and_modify(|last: &mut SystemTime| { + let result = loop_start.duration_since(*last).unwrap(); + status.pull_interval.push(result); + *last = loop_start; + }) + .or_insert(loop_start); + let waker = wake_run.get(&seq).unwrap(); let mut cx = Context::from_waker(waker); let poll_result = { @@ -129,26 +180,38 @@ where pinned } None => { - debug!("new pinned {seq}"); + debug!("new pinned"); let conn = connections.get(&seq).unwrap().clone(); let pinned_owner = Box::pin(self.processor.turn(conn)); + pinned_futures_start_time.insert(seq, loop_start); pinned_futures.insert(seq, pinned_owner); pinned_futures.get_mut(&seq).unwrap() } }; - debug!("start poll seq {seq}"); + debug!(seq=%seq,"start poll"); pinned.as_mut().poll(&mut cx) }; debug!("end poll"); - //debug!("get result {:?}",&poll_result); + let mut collect = |ins| { + let start = *pinned_futures_start_time.get(&seq).unwrap(); + let turn = loop_start.duration_since(start).unwrap(); + status.turn_interval.push(turn); + if let Some(i) = ins { + status.turn_collect.push(i); + } + }; match poll_result { - Poll::Ready(Ok(_)) => { - debug!("finish turn {seq}"); + Poll::Ready(Ok(ins)) => { + let pressure = ins.instruct(); + pre.push(pressure); + collect(Some(ins)); + debug!(seq=%seq,"finish turn"); pinned_futures.remove(&seq); waker.wake_by_ref(); } Poll::Ready(Err(e)) => { - debug!("read or write err {e}"); + collect(None); + debug!(err=%e,"read or write err"); pinned_futures.remove(&seq); wake_run.remove(&seq); connections.remove(&seq); @@ -162,7 +225,7 @@ where break; } } - info!("one turn of loop num {conn_loop_iter}"); + debug!(num=%conn_loop_iter,"one turn of loop"); } } } @@ -174,42 +237,45 @@ mod test { use crate::transport_layer::Pressure; use bytes::Bytes; use http_body_util::Empty; - use log::info; use crate::transport_layer::processor::test::tcp_echo_process_listener; - use std::env; - use std::io::stdout; + use std::net::SocketAddr; + use std::sync::atomic::AtomicBool; + use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; + use std::time::Duration; use tokio::net::TcpListener; + use tokio::time::sleep; #[test] fn pressure_echo() { - env::set_var("RUST_BACKTRACE", "1"); - - tracing_subscriber::fmt() - .with_line_number(true) - .with_max_level(tracing::Level::INFO) - .init(); - info!("test out"); + // tracing_subscriber::fmt() + // .with_line_number(true) + // .with_max_level(tracing::Level::INFO) + // .init(); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); rt.spawn(tcp_echo_process_listener()); - + let a1 = Arc::new(AtomicBool::new(false)); + let a2 = a1.clone(); + rt.spawn(async move { + sleep(Duration::from_secs(3)).await; + a1.store(true, Relaxed) + }); let transport_conn = TcpSteamMaker::new("127.0.0.1:8080"); - let processor = Echo::new(); + let processor = Echo::default(); let pressure = Pressure::new(Arc::new(rt), transport_conn, processor, None); - pressure.run(Box::new(stdout())) + pressure.run(a2); } #[test] fn http_test() { - tracing_subscriber::fmt() - .with_max_level(tracing::Level::DEBUG) - .init(); - info!("test out"); + // tracing_subscriber::fmt() + // .with_max_level(tracing::Level::INFO) + // .init(); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -218,10 +284,15 @@ mod test { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = rt.block_on(async { TcpListener::bind(addr).await.unwrap() }); rt.spawn(http_server(listener)); - + let a1 = Arc::new(AtomicBool::new(false)); + let a2 = a1.clone(); + rt.spawn(async move { + sleep(Duration::from_secs(3)).await; + a1.store(true, Relaxed) + }); let transport_conn = TcpSteamMaker::new("127.0.0.1:3000"); let processor = Http1Handle::new("http://127.0.0.1:3000", Empty::::new()).unwrap(); let pressure = Pressure::new(Arc::new(rt), transport_conn, processor, None); - pressure.run(Box::new(stdout())) + pressure.run(a2); } } diff --git a/src/transport_layer/odd.rs b/src/transport_layer/odd.rs index 94142c3..37236cb 100644 --- a/src/transport_layer/odd.rs +++ b/src/transport_layer/odd.rs @@ -1,3 +1,4 @@ +use crate::transport_layer::Instruct; use futures::task::ArcWake; use roaring::RoaringBitmap; use std::cell::RefCell; @@ -11,7 +12,6 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; -use tracing::debug; pub struct WarpConn(pub Rc>); @@ -75,11 +75,11 @@ impl Wk { impl ArcWake for Wk { fn wake_by_ref(arc_self: &Arc) { - debug!("been call wake {}", arc_self.index); arc_self.bitmap.lock().unwrap().insert(arc_self.index); } } +#[derive(Default)] pub struct Empty; impl Display for Empty { @@ -87,3 +87,5 @@ impl Display for Empty { write!(f, "EMPTY") } } + +impl Instruct for Empty {} diff --git a/src/transport_layer/processor.rs b/src/transport_layer/processor.rs index f21e332..5274447 100644 --- a/src/transport_layer/processor.rs +++ b/src/transport_layer/processor.rs @@ -1,33 +1,31 @@ -use super::odd::{Empty, WarpConn}; -use crate::transport_layer::Processor; +use super::odd::{Empty, WarpConn, Wk}; +use crate::transport_layer::{Instruct, Processor}; use anyhow::anyhow; -use hyper::body::Body; +use http_body_util::{combinators::Collect, BodyExt}; +use hyper::body::{Body, Incoming}; use hyper::client::conn::http1 as client_http1; -use hyper::{Request, Uri}; +use hyper::client::conn::http1::Connection; +use hyper::{Request, Response, Uri}; use hyper_util::rt::TokioIo; +use pin_project::pin_project; +use roaring::RoaringBitmap; use std::cell::RefCell; +use std::collections::HashSet; use std::error::Error; use std::fmt::{Debug, Display, Formatter}; +use std::future::Future; +use std::pin::{pin, Pin}; use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use std::task::Poll::{Pending, Ready}; +use std::task::{ready, Context, Poll, Wake, Waker}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; -use tokio::select; use tracing::debug; +#[derive(Default)] pub struct Echo {} -impl Default for Echo { - fn default() -> Self { - Self::new() - } -} - -impl Echo { - pub fn new() -> Self { - Self {} - } -} - impl Processor for Echo { #[allow(clippy::await_holding_refcell_ref)] async fn turn(&self, conn: Rc>) -> anyhow::Result { @@ -80,7 +78,8 @@ impl Display for H1Detail { } } -// TODO: 扩展processor,让Pressure负责conn的pull +impl Instruct for H1Detail {} + impl Processor for Http1Handle where T: Clone + Body + 'static, @@ -89,39 +88,155 @@ where { async fn turn(&self, conn: Rc>) -> anyhow::Result { let tokio_io = TokioIo::new(WarpConn(conn)); - let (mut request_sender, connection) = client_http1::handshake(tokio_io).await.unwrap(); + let (mut request_sender, connection) = client_http1::handshake(tokio_io).await?; let req = self.request.clone(); + let status = H1Call::new(connection, Box::pin(request_sender.send_request(req))).await?; + Ok(H1Detail { http_code: status }) + } +} - select! { - // 理论来说connection不会await成功,只有Shutdown和upgrade会返回 - conn_err=connection=>{ - conn_err?; - Err(anyhow!("shutdown or upgrade")) +struct WKWithMark { + mark: Waker, + wk: Waker, +} + +impl WKWithMark { + fn new(mark: Waker, wk: Waker) -> Self { + Self { mark, wk } + } +} + +impl Wake for WKWithMark { + fn wake(self: Arc) { + self.mark.wake_by_ref(); + self.wk.wake_by_ref(); + } +} + +#[pin_project] +struct H1Call +where + T: hyper::rt::Read + hyper::rt::Write, + B: Body + 'static, +{ + status: Option>>, + #[pin] + conn: Connection, + request: Pin>>>>, + incoming: Option>>>, + result: u16, + // cx: [Option; 2], +} + +impl H1Call +where + T: hyper::rt::Read + hyper::rt::Write, + B: Body + 'static, +{ + fn new( + conn: Connection, + request: Pin>>>>, + ) -> Self { + Self { + status: None, + conn, + request, + incoming: None, + result: 0, + // cx: [None; 2], + } + } +} + +impl Future for H1Call +where + T: hyper::rt::Read + hyper::rt::Write + Unpin, + B: Clone + Body + 'static, + ::Data: Send, + ::Error: Error + Send + Sync, +{ + type Output = anyhow::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let is_first = { + if this.status.is_none() { + this.status + .replace(Arc::new(Mutex::new(RoaringBitmap::new()))); + true + } else { + false + } + }; + let waker = this.status.as_mut().unwrap(); + let notify: HashSet = { + let mut map = waker.lock().unwrap(); + let iter = map.clone().into_iter(); + let min = map.min().unwrap_or_default(); + let max = map.max().unwrap_or_default(); + map.remove_range(min..=max); + iter.collect() + }; + + let cx_mark0 = futures::task::waker(Arc::new(Wk::new(waker.clone(), 0))); + let cx_mark1 = futures::task::waker(Arc::new(Wk::new(waker.clone(), 1))); + let cx_wk0 = Arc::new(WKWithMark::new(cx_mark0, cx.waker().clone())).into(); + let cx_wk1 = Arc::new(WKWithMark::new(cx_mark1, cx.waker().clone())).into(); + // 尽管入参引用,但是wake trait变成waker的时候调用了forget,这里函数结束drop cx_wk没问题,但是如果用闭包封装函数,会导致编译器不高兴过不了编译 + let mut wk0 = Context::from_waker(&cx_wk0); + let mut wk1 = Context::from_waker(&cx_wk1); + + if is_first || notify.contains(&0) { + //this.cx[0].replace(cx_wk0); + let conn_result = this.conn.poll(&mut wk0); + if conn_result.is_ready() { + return Ready(Err(anyhow!("connection return {:?}", conn_result))); } - response=request_sender.send_request(req)=>{ - // TODO: body 是income类型,会不会污染后续从conn读 - Ok(H1Detail{http_code:response?.status().into()}) - } + } + if !is_first && !notify.contains(&1) { + return Pending; + } + //this.cx[0].replace(cx_wk1); + let future = if this.incoming.is_none() { + let req_result = ready!(this.request.as_mut().poll(&mut wk1)); + if req_result.is_err() { + return Ready(Err(anyhow!("request err {:?}", req_result))); + } + let resp = req_result.unwrap(); + let (header, body) = resp.into_parts(); + *this.result = header.status.into(); + this.incoming.replace(Box::pin(body.collect())); + this.incoming.as_mut().unwrap().as_mut() + } else { + this.incoming.as_mut().unwrap().as_mut() + }; + let result = ready!(future.poll(&mut wk1)); + if result.is_err() { + Ready(Err(anyhow!("get body result err {:?}", result))) + } else { + Ready(Ok(*this.result)) } } } #[cfg(test)] pub mod test { - - use crate::transport_layer::processor::{Http1Handle}; + use crate::transport_layer::odd::WarpConn; + use crate::transport_layer::processor::{H1Call, Http1Handle}; use crate::transport_layer::Processor; - use http_body_util::{BodyExt, Empty, Full}; + use anyhow::Error; + use http_body_util::{Empty, Full}; use hyper::body::Bytes; use hyper::service::service_fn; - use hyper::{client::conn::http1 as client_http1, server::conn::http1 as server_http1}; + use hyper::{ + client::conn::http1 as client_http1, server::conn::http1 as server_http1, StatusCode, + }; use hyper::{Request, Response}; use hyper_util::rt::TokioIo; use std::cell::RefCell; use std::convert::Infallible; + use std::net::SocketAddr; use std::rc::Rc; - - use crate::transport_layer::odd::WarpConn; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::spawn; @@ -129,52 +244,54 @@ pub mod test { #[tokio::test] async fn http_conn() { + // tracing_subscriber::fmt() + // .with_max_level(tracing::Level::INFO) + // .init(); + let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let listener = TcpListener::bind(addr).await.unwrap(); + spawn(muti_pack_server(listener)); let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap(); - //let (mut request_sender, connection) = client_http1::handshake(TokioIo::new(stream)).await.unwrap(); - // spawn(async move { - // if let Err(e) = connection.await { - // eprintln!("Error in connection: {}", e); - // } - // }); let http_handle = Http1Handle::new("http://127.0.0.1:8080", Empty::::new()).unwrap(); - let _ = http_handle - .turn(Rc::new(RefCell::new(stream))) - .await - .unwrap(); - - //let response = request_sender.send_request(request).await.unwrap(); - // assert_eq!(response.status(), StatusCode::OK); - - // let request = Request::builder() - // .header("Host", "example.com") - // .method("GET") - // .body(Empty::::new()).unwrap(); - - // let response = request_sender.send_request(request).await.unwrap(); - // assert_eq!(response.status(), StatusCode::OK); + let value = RefCell::new(stream); + let value = Rc::new(value); + for _ in 0..10 { + let _ = http_handle.turn(value.clone()).await.unwrap(); + } } - #[tokio::test] + #[allow(dead_code)] + //#[tokio::test(flavor = "multi_thread")] async fn tokio() { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .init(); + let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let listener = TcpListener::bind(addr).await.unwrap(); + spawn(muti_pack_server(listener)); + let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap(); let c = Rc::new(RefCell::new(stream)); let wc = WarpConn(c); let (mut request_sender, connection) = client_http1::handshake(TokioIo::new(wc)).await.unwrap(); - spawn(async move { - if let Err(e) = connection.await { - eprintln!("Error in connection: {}", e); - } - }); + // spawn(async move { + // if let Err(e) = connection.await { + // eprintln!("Error in connection: {}", e); + // } + // }); let request = Request::builder() .uri("http://127.0.0.1:8080") .header("Host", "127.0.0.1") .body(Empty::::new()) .unwrap(); - let response = request_sender.send_request(request).await.unwrap(); - - dbg!(response.into_body().frame().await); - //assert_eq!(response.status(), StatusCode::OK); + let fu = request_sender.send_request(request); + let call = H1Call::new(connection, Box::pin(fu)); + let result = call.await; + // let response = request_sender.send_request(request).await.unwrap(); + // + // let b = response.into_body().collect().await.unwrap().to_bytes(); + // dbg!(b.len()); + assert_eq!(result.unwrap(), StatusCode::OK.as_u16()); } pub async fn tcp_echo_process_listener() { @@ -191,12 +308,14 @@ pub mod test { turn += 1; debug!("conn {num}, server start"); let mut buf = vec![0; 1024]; - let read_num = conn.read(&mut buf).await.unwrap(); + let read_num = conn.read(&mut buf).await?; let str = String::from_utf8_lossy(&buf[0..read_num]); debug!("read_num {read_num},read_conn ,{:}", &str); - let write_num = conn.write(&buf[0..read_num]).await.unwrap(); + let write_num = conn.write(&buf[0..read_num]).await?; debug!("conn_num {num},conn_turn {turn},write_num {write_num}"); } + #[allow(unreachable_code)] + Ok::<(), Error>(()) }); } } @@ -220,4 +339,28 @@ pub mod test { async fn hello(_: Request) -> Result>, Infallible> { Ok(Response::new(Full::new(Bytes::from("Hello, World!")))) } + + pub async fn muti_pack_server(lis: TcpListener) { + loop { + let (stream, _) = lis.accept().await.unwrap(); + let io = TokioIo::new(stream); + + tokio::task::spawn(async move { + if let Err(err) = server_http1::Builder::new() + .serve_connection( + io, + service_fn(|_: Request| async { + let big_pack: Vec = vec![1; 16384]; + Ok::>, Infallible>( + Response::new(Full::::from(big_pack)), + ) + }), + ) + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } + } } diff --git a/src/transport_layer/transport.rs b/src/transport_layer/transport.rs index 295ec47..edc295a 100644 --- a/src/transport_layer/transport.rs +++ b/src/transport_layer/transport.rs @@ -1,5 +1,11 @@ -use crate::transport_layer::TransportConn; +use super::{Instruct, Statistics, Stop, TransportConn}; +use std::collections::HashMap; use std::net::{SocketAddr, ToSocketAddrs}; +use std::ops::Add; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; use tokio::net::TcpStream; pub struct TcpSteamMaker { @@ -19,3 +25,34 @@ impl TransportConn for TcpSteamMaker { Ok(TcpStream::connect(self.addr).await?) } } + +impl Stop for Arc { + fn stop(&mut self, _status: &HashMap>>) -> bool + where + T: Instruct, + { + self.load(Relaxed) + } +} + +#[derive(Debug, Clone)] +pub struct TimeOutStop { + pub timeout: SystemTime, +} + +impl Stop for TimeOutStop { + fn stop(&mut self, _status: &HashMap>>) -> bool + where + T: Instruct, + { + SystemTime::now() > self.timeout + } +} + +impl TimeOutStop { + pub fn new(duration: Duration) -> Self { + Self { + timeout: SystemTime::now().add(duration), + } + } +}