Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async operations are not really async #142

Open
jondot opened this issue May 1, 2024 · 5 comments
Open

async operations are not really async #142

jondot opened this issue May 1, 2024 · 5 comments

Comments

@jondot
Copy link

jondot commented May 1, 2024

Hi, I've been working on a langchain variant in Rust myself, and I am reading this project to compare notes.
I can offer a tip, something I've bumped into in the past: when running CPU bound operations (loading text from PDF, splitting text, embedding, and more), you need to offload to one of the two:

  1. Tokio's builtin async pool with an async task
  2. Create your own dedicated pool for specific resources (a pool for embedders, a pool for splitters, etc), and offload to that dedicated pool (still using Tokio, of course)

For one-time loading during boot process of the app or similar workflow I recommend (1),
For multi-request processing, such as initializing a full-blown chain and then putting it on a webservice for serving multiple inference requests (with the same chain), I recommend (2).

It is a non-trivial refactor as you might discover some pieces of code are not Sync or Send and you'll have to drill down to make them such.
However, there's no getting around this kind of refactor, as currently CPU bound operations will completely block Tokio's async operations and make using Tokio pointless.

@prabirshrestha
Copy link
Collaborator

I have thought about this but not sure if there is a good way to solve this. For example, if I have these running in a worker which is is already in a thread pool or new thread, I wouldn't want to create it a new thread. This should be controlled by the user.

Wondering if it instead makes sense to have some sort of Parallelism similar to how jwalk crate does it. https://docs.rs/jwalk/latest/jwalk/enum.Parallelism.html.

pub enum Parallelism {
    Serial,
    RayonDefaultPool {
        busy_timeout: [Duration](https://doc.rust-lang.org/nightly/core/time/struct.Duration.html),
    },
    RayonExistingPool {
        pool: Arc<ThreadPool>,
        busy_timeout: Option<Duration>,
    },
    RayonNewPool(usize),
}

We could pick the best one by default so users don't have to worry much. they can always use Serial and add to their own scheduler. Another option is to use custom scheduler such as https://caolan.github.io/tamawiki/tokio/executor/trait.Executor.html.

@jondot
Copy link
Author

jondot commented May 1, 2024

Hmmm interesting. Not sure,
I think what is certain is that an async runtime meeting a blocking CPU action is killing the runtime.

@prabirshrestha
Copy link
Collaborator

I think at minimal what needs to happen is convert Read to AsyncRead i.e. use async io instead of sync io. For processing that is heavy and can't be solved with moving to async io it would need the executors.

Do note that some libraries don't support async io such as csv so we need to use an alternate library. BurntSushi/rust-csv#171

@jondot
Copy link
Author

jondot commented May 2, 2024

yea that is kinda my point, you don't have to use async variant of a library if it does not exist, you can use Tokio to "async defer" it to a pool.

Here's how I do it in my library, taking a costly CPU bound operation and making it compatible with Tokio, by moving it into a rayon pool, backed by Tokio sync primitives:

    async fn embed(&self, inputs: &[&str]) -> Result<Vec<Vec<f32>>> {
        let (send, recv) = tokio::sync::oneshot::channel();

        rayon::scope(|s| {
            s.spawn(|_| {
                let _ = send.send(self.sync_embed(inputs));
            });
        });

        recv.await?
    }

In here, I leave it to the user to implement their own sync_embed function per-provider, so that they don't deal with the innerworkings of async. The trait Embedding implements the boilerplate of taking something that's sync IO bound, and turning it into async Tokio-friendly operation.

@mqudsi
Copy link

mqudsi commented May 19, 2024

@jondot doesn’t rayon::scope() block until all spawned tasks have completed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants