Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement TryParallel operator #221

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 54 additions & 27 deletions rig-core/src/pipeline/try_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,33 +345,35 @@
}
}

// TODO: Implement TryParallel
// pub struct TryParallel<Op1, Op2> {
// op1: Op1,
// op2: Op2,
// }

// impl<Op1, Op2> TryParallel<Op1, Op2> {
// pub fn new(op1: Op1, op2: Op2) -> Self {
// Self { op1, op2 }
// }
// }

// impl<Op1, Op2> TryOp for TryParallel<Op1, Op2>
// where
// Op1: TryOp,
// Op2: TryOp<Input = Op1::Input, Output = Op1::Output, Error = Op1::Error>,
// {
// type Input = Op1::Input;
// type Output = (Op1::Output, Op2::Output);
// type Error = Op1::Error;

// #[inline]
// async fn try_call(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
// let (output1, output2) = tokio::join!(self.op1.try_call(input.clone()), self.op2.try_call(input));
// Ok((output1?, output2?))
// }
// }
// Implement TryParallel
pub struct TryParallel<Op1, Op2> {
op1: Op1,
op2: Op2,
}

impl<Op1, Op2> TryParallel<Op1, Op2> {
pub fn new(op1: Op1, op2: Op2) -> Self {
Self { op1, op2 }
}
}

impl<Op1, Op2> op::Op for TryParallel<Op1, Op2>

Check failure on line 360 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / check rig-core wasm target

not all trait items implemented, missing: `call`

Check failure on line 360 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / test

not all trait items implemented, missing: `call`

Check failure on line 360 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / doc

not all trait items implemented, missing: `call`
where
Op1: TryOp,
Op2: TryOp<Input = Op1::Input, Error = Op1::Error>,

Check failure on line 363 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / clippy

not all trait items implemented, missing: `call`

error[E0046]: not all trait items implemented, missing: `call` --> rig-core/src/pipeline/try_op.rs:360:1 | 360 | / impl<Op1, Op2> op::Op for TryParallel<Op1, Op2> 361 | | where 362 | | Op1: TryOp, 363 | | Op2: TryOp<Input = Op1::Input, Error = Op1::Error>, | |_______________________________________________________^ missing `call` in implementation | ::: rig-core/src/pipeline/op.rs:14:5 | 14 | fn call(&self, input: Self::Input) -> impl Future<Output = Self::Output> + Send; | -------------------------------------------------------------------------------- `call` from trait
{
type Input = Op1::Input;
type Output = Result<(Op1::Output, Op2::Output), Op1::Error>;

#[inline]

Check warning on line 368 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / fmt

Diff in /home/runner/work/rig/rig/rig-core/src/pipeline/try_op.rs
async fn try_call(&self, input: Self::Input) -> Result<(Op1::Output, Op2::Output), Op1::Error> {

Check failure on line 369 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / check rig-core wasm target

method `try_call` is not a member of trait `op::Op`

Check failure on line 369 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / test

method `try_call` is not a member of trait `op::Op`

Check failure on line 369 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / doc

method `try_call` is not a member of trait `op::Op`
use futures::try_join;
try_join!(
self.op1.try_call(input.clone()),

Check failure on line 372 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / check rig-core wasm target

no method named `clone` found for associated type `<Op1 as TryOp>::Input` in the current scope

Check failure on line 372 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / test

no method named `clone` found for associated type `<Op1 as try_op::TryOp>::Input` in the current scope

Check failure on line 372 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / test

no method named `clone` found for associated type `<Op1 as TryOp>::Input` in the current scope

Check failure on line 372 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / clippy

no method named `clone` found for associated type `<Op1 as pipeline::try_op::TryOp>::Input` in the current scope

error[E0599]: no method named `clone` found for associated type `<Op1 as pipeline::try_op::TryOp>::Input` in the current scope --> rig-core/src/pipeline/try_op.rs:372:37 | 372 | self.op1.try_call(input.clone()), | ^^^^^ method not found in `<Op1 as TryOp>::Input`

Check failure on line 372 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / doc

no method named `clone` found for associated type `<Op1 as TryOp>::Input` in the current scope
self.op2.try_call(input)
)
}

Check failure on line 375 in rig-core/src/pipeline/try_op.rs

View workflow job for this annotation

GitHub Actions / stable / clippy

method `try_call` is not a member of trait `op::Op`

error[E0407]: method `try_call` is not a member of trait `op::Op` --> rig-core/src/pipeline/try_op.rs:369:5 | 369 | / async fn try_call(&self, input: Self::Input) -> Result<(Op1::Output, Op2::Output), Op1::Error> { 370 | | use futures::try_join; 371 | | try_join!( 372 | | self.op1.try_call(input.clone()), 373 | | self.op2.try_call(input) 374 | | ) 375 | | } | |_____^ not a member of trait `op::Op`
}

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -472,4 +474,29 @@
let result = pipeline.try_call(1).await.unwrap();
assert_eq!(result, 15);
}

#[tokio::test]
async fn test_try_parallel() {
let op1 = map(|x: i32| {
if x % 2 == 0 {
Ok(x + 1)
} else {
Err("x is odd")
}
});
let op2 = map(|x: i32| {
if x % 2 == 0 {
Ok(x * 2)
} else {
Err("x is odd")
}
});
let pipeline = TryParallel::new(op1, op2);

let result = pipeline.try_call(2).await;
assert_eq!(result, Ok((3, 4)));

let result = pipeline.try_call(1).await;
assert_eq!(result, Err("x is odd"));
}
}
Loading