From d90af81d00c52e4ca68098d9baf49810830c27a7 Mon Sep 17 00:00:00 2001 From: aumetra Date: Wed, 27 Sep 2023 21:19:43 +0200 Subject: [PATCH] Add crate for LocalSet execution --- Cargo.lock | 108 ++++++++++++++++----------------- Cargo.toml | 1 + lib/tokio-run-local/Cargo.toml | 9 +++ lib/tokio-run-local/src/lib.rs | 70 +++++++++++++++++++++ 4 files changed, 132 insertions(+), 56 deletions(-) create mode 100644 lib/tokio-run-local/Cargo.toml create mode 100644 lib/tokio-run-local/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 3f326aeb4..47004a7d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,9 +410,9 @@ dependencies = [ [[package]] name = "async-task" -version = "4.4.1" +version = "4.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9441c6b2fe128a7c2bf680a44c34d0df31ce09e5b7e401fcca3faa483dbc921" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" [[package]] name = "async-trait" @@ -459,9 +459,9 @@ checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" [[package]] name = "atomic-waker" -version = "1.1.2" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" [[package]] name = "autocfg" @@ -504,7 +504,7 @@ checksum = "70a66ac8ef5fa9cf01c2d999f39d16812e90ec1467bd382cbbb74ba23ea86201" dependencies = [ "aws-smithy-async", "aws-smithy-types", - "fastrand 2.0.1", + "fastrand 2.0.0", "tokio", "tracing", "zeroize", @@ -544,7 +544,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "fastrand 2.0.1", + "fastrand 2.0.0", "http", "percent-encoding", "tracing", @@ -648,7 +648,7 @@ dependencies = [ "aws-smithy-http-tower", "aws-smithy-types", "bytes", - "fastrand 2.0.1", + "fastrand 2.0.0", "http", "http-body", "hyper", @@ -732,7 +732,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", - "fastrand 2.0.1", + "fastrand 2.0.0", "http", "http-body", "once_cell", @@ -1090,18 +1090,17 @@ dependencies = [ [[package]] name = "blocking" -version = "1.4.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94c4ef1f913d78636d78d538eec1f18de81e481f44b1be0a81060090530846e1" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" dependencies = [ "async-channel", "async-lock", "async-task", - "fastrand 2.0.1", - "futures-io", + "atomic-waker", + "fastrand 1.9.0", "futures-lite", - "piper", - "tracing", + "log", ] [[package]] @@ -1405,9 +1404,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.3.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" dependencies = [ "crossbeam-utils", ] @@ -1811,9 +1810,9 @@ dependencies = [ [[package]] name = "deadpool-runtime" -version = "0.1.3" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" +checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" dependencies = [ "tokio", ] @@ -1911,9 +1910,9 @@ dependencies = [ [[package]] name = "diesel_derives" -version = "2.1.2" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef8337737574f55a468005a83499da720f20c65586241ffea339db9ecdfd2b44" +checksum = "e054665eaf6d97d1e7125512bb2d35d07c73ac86cc6920174cb42d1ab697a554" dependencies = [ "diesel_table_macro_syntax", "proc-macro2", @@ -2258,9 +2257,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" [[package]] name = "ff" @@ -3898,17 +3897,16 @@ dependencies = [ [[package]] name = "matchit" -version = "0.7.3" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" [[package]] name = "md-5" -version = "0.10.6" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "cfg-if", "digest", ] @@ -4571,9 +4569,9 @@ dependencies = [ [[package]] name = "parking" -version = "2.1.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e52c774a4c39359c1d1c52e43f73dd91a75a614652c825408eec30c95a9b2067" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" [[package]] name = "parking_lot" @@ -4642,9 +4640,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pest" -version = "2.7.4" +version = "2.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c022f1e7b65d6a24c0dbbd5fb344c66881bc01f3e5ae74a1c8100f2f985d98a4" +checksum = "d7a4d085fd991ac8d5b05a147b437791b4260b76326baf0fc60cf7c9c27ecd33" dependencies = [ "memchr", "thiserror", @@ -4789,17 +4787,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "piper" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" -dependencies = [ - "atomic-waker", - "fastrand 2.0.1", - "futures-io", -] - [[package]] name = "pkcs1" version = "0.7.5" @@ -5475,7 +5462,7 @@ checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", "ring", - "rustls-webpki 0.101.6", + "rustls-webpki 0.101.5", "sct", ] @@ -5512,9 +5499,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.6" +version = "0.101.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" +checksum = "45a27e3b59326c16e23d30aeb7a36a24cc0d29e71d68ff611cdfb4a01d013bed" dependencies = [ "ring", "untrusted", @@ -5656,9 +5643,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.19" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0" +checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" dependencies = [ "serde", ] @@ -5833,9 +5820,9 @@ dependencies = [ [[package]] name = "sha1" -version = "0.10.6" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" dependencies = [ "cfg-if", "cpufeatures", @@ -6328,7 +6315,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", - "fastrand 2.0.1", + "fastrand 2.0.0", "redox_syscall 0.3.5", "rustix", "windows-sys 0.48.0", @@ -6516,6 +6503,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-run-local" +version = "0.0.1-pre.3" +dependencies = [ + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -6540,9 +6536,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.20.1" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" dependencies = [ "futures-util", "log", @@ -6836,9 +6832,9 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "tungstenite" -version = "0.20.1" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649" dependencies = [ "byteorder", "bytes", @@ -7077,9 +7073,9 @@ checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" [[package]] name = "waker-fn" -version = "1.1.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" [[package]] name = "walkdir" diff --git a/Cargo.toml b/Cargo.toml index b0f50eed0..180ceb871 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "lib/just-retry", "lib/post-process", "lib/speedy-uuid", + "lib/tokio-run-local", ] resolver = "2" diff --git a/lib/tokio-run-local/Cargo.toml b/lib/tokio-run-local/Cargo.toml new file mode 100644 index 000000000..9ed592a8d --- /dev/null +++ b/lib/tokio-run-local/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "tokio-run-local" +edition.workspace = true +version.workspace = true + +[dependencies] +thiserror = "1.0.49" +tokio = { version = "1.32.0", features = ["rt", "sync"] } +tracing = "0.1.37" diff --git a/lib/tokio-run-local/src/lib.rs b/lib/tokio-run-local/src/lib.rs new file mode 100644 index 000000000..a9e43a306 --- /dev/null +++ b/lib/tokio-run-local/src/lib.rs @@ -0,0 +1,70 @@ +use std::{any::Any, future::Future, pin::Pin, sync::OnceLock, thread}; +use thiserror::Error; +use tokio::{ + sync::{mpsc, oneshot}, + task::LocalSet, +}; + +type LocalBoxFuture<'a, O> = Pin + 'a>>; +type FutureProducer = Box LocalBoxFuture<'static, Box> + Send>; +type Task = (FutureProducer, oneshot::Sender>); + +const CHANNEL_CAPACITY: usize = 500; +static GLOBAL_SINGLE_THREADED_RUNTIME: OnceLock> = OnceLock::new(); + +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + Oneshot(#[from] oneshot::error::RecvError), + + #[error("Spawn error")] + SpawnError, +} + +pub async fn run(func: F) -> Result, Error> +where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future, + ::Output: Send + 'static, +{ + let runtime_handle = GLOBAL_SINGLE_THREADED_RUNTIME.get_or_init(|| { + let (sender, mut receiver) = mpsc::channel::(CHANNEL_CAPACITY); + + thread::spawn(move || { + let local_set = LocalSet::new(); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + local_set.block_on(&runtime, async move { + while let Some((producer, sender)) = receiver.recv().await { + tokio::task::spawn_local(async move { + if sender.send(producer().await).is_err() { + tracing::debug!("failed to send value from single threaded executor"); + } + }); + } + }); + }); + + sender + }); + + let (sender, receiver) = oneshot::channel(); + let closure = Box::new(|| { + Box::pin(async move { + let output = func().await; + Box::new(output) as Box + }) as _ + }); + + runtime_handle + .send((closure, sender)) + .await + .map_err(|_| Error::SpawnError)?; + + let result = receiver.await?; + + Ok(result.downcast().unwrap()) +}