Skip to content

Commit

Permalink
Await functions
Browse files Browse the repository at this point in the history
  • Loading branch information
lpil committed Oct 31, 2024
1 parent 5fb5bae commit 5aef7de
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 11 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Unreleased

- The `try_await_forever` function in the `gleam/otp/task` module has been deprecated.
- The `gleam/otp/task` module gains the `pid`, `await2`, `await3`, and `await4`
functions.
- Tasks are no longer monitored.

## v0.12.1 - 2024-09-30

- The minimum required Gleam version in `gleam.toml` has been increased to match
Expand Down
185 changes: 176 additions & 9 deletions src/gleam/otp/task.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
//// spawning a process that sends a message to the caller once the given
//// computation is performed.
////
//// There are two important things to consider when using `async`:
//// There are some important things to consider when using tasks:
////
//// 1. If you are using async tasks, you must await a reply as they are always
//// sent.
////
//// 2. async tasks link the caller and the spawned process. This means that,
//// 2. Tasks link the caller and the spawned process. This means that,
//// if the caller crashes, the task will crash too and vice-versa. This is
//// on purpose: if the process meant to receive the result no longer
//// exists, there is no purpose in completing the computation.
////
//// 3. A task's callback function must complete by returning or panicking.
//// It must not `exit` with the reason "normal".
////
//// This module is inspired by Elixir's [Task module][1].
////
//// [1]: https://hexdocs.pm/elixir/master/Task.html
Expand All @@ -31,7 +34,7 @@
import gleam/dynamic.{type Dynamic}
import gleam/erlang/process.{type Pid, type Selector, type Subject}
import gleam/function
import gleam/option.{type Option, Some}
import gleam/option.{type Option, None, Some}

pub opaque type Task(value) {
Task(owner: Pid, pid: Pid, subject: Subject(value))
Expand Down Expand Up @@ -91,7 +94,6 @@ pub fn try_await(task: Task(value), timeout: Int) -> Result(value, AwaitError) {
}
}

// TODO: test
/// Wait for the value computed by a task.
///
/// If the a value is not received before the timeout has elapsed or if the
Expand All @@ -102,6 +104,12 @@ pub fn await(task: Task(value), timeout: Int) -> value {
value
}

/// Get the `Pid` for a task.
///
pub fn pid(task: Task(value)) -> Pid {
task.pid
}

@deprecated("Use await_forever")
pub fn try_await_forever(task: Task(value)) -> Result(value, AwaitError) {
assert_owner(task)
Expand Down Expand Up @@ -155,15 +163,13 @@ pub fn try_await2(
|> process.selecting(task1.subject, M2FromSubject1)
|> process.selecting(task2.subject, M2FromSubject2)
|> process.selecting(timeout_subject, function.identity)
|> try_await2_loop(option.None, option.None, task1, task2, timer)
|> try_await2_loop(None, None, timer)
}

fn try_await2_loop(
selector: Selector(Message2(t1, t2)),
t1: Option(Result(t1, AwaitError)),
t2: Option(Result(t2, AwaitError)),
task1: Task(t1),
task2: Task(t2),
timeout: process.Timer,
) -> #(Result(t1, AwaitError), Result(t2, AwaitError)) {
case t1, t2 {
Expand All @@ -174,11 +180,11 @@ fn try_await2_loop(
// The task process has sent back a value
M2FromSubject1(x) -> {
let t1 = Some(Ok(x))
try_await2_loop(selector, t1, t2, task1, task2, timeout)
try_await2_loop(selector, t1, t2, timeout)
}
M2FromSubject2(x) -> {
let t2 = Some(Ok(x))
try_await2_loop(selector, t1, t2, task1, task2, timeout)
try_await2_loop(selector, t1, t2, timeout)
}

M2Timeout -> {
Expand All @@ -191,3 +197,164 @@ fn try_await2_loop(
}
}
}

type Message3(t1, t2, t3) {
M3FromSubject1(t1)
M3FromSubject2(t2)
M3FromSubject3(t3)
M3Timeout
}

/// Wait for the values computed by multiple tasks.
///
/// For each task, if the a value is not received before the timeout has
/// elapsed then an error is returned.
///
pub fn try_await3(
task1: Task(t1),
task2: Task(t2),
task3: Task(t3),
timeout: Int,
) -> #(Result(t1, AwaitError), Result(t2, AwaitError), Result(t3, AwaitError)) {
assert_owner(task1)
assert_owner(task2)
assert_owner(task3)

let timeout_subject = process.new_subject()
let timer = process.send_after(timeout_subject, timeout, M3Timeout)

process.new_selector()
|> process.selecting(task1.subject, M3FromSubject1)
|> process.selecting(task2.subject, M3FromSubject2)
|> process.selecting(task3.subject, M3FromSubject3)
|> process.selecting(timeout_subject, function.identity)
|> try_await3_loop(None, None, None, timer)
}

fn try_await3_loop(
selector: Selector(Message3(t1, t2, t3)),
t1: Option(Result(t1, AwaitError)),
t2: Option(Result(t2, AwaitError)),
t3: Option(Result(t3, AwaitError)),
timeout: process.Timer,
) -> #(Result(t1, AwaitError), Result(t2, AwaitError), Result(t3, AwaitError)) {
case t1, t2, t3 {
Some(t1), Some(t2), Some(t3) -> #(t1, t2, t3)

_, _, _ -> {
case process.select_forever(selector) {
// The task process has sent back a value
M3FromSubject1(x) -> {
let t1 = Some(Ok(x))
try_await3_loop(selector, t1, t2, t3, timeout)
}
M3FromSubject2(x) -> {
let t2 = Some(Ok(x))
try_await3_loop(selector, t1, t2, t3, timeout)
}
M3FromSubject3(x) -> {
let t3 = Some(Ok(x))
try_await3_loop(selector, t1, t2, t3, timeout)
}

M3Timeout -> {
#(
option.unwrap(t1, Error(Timeout)),
option.unwrap(t2, Error(Timeout)),
option.unwrap(t3, Error(Timeout)),
)
}
}
}
}
}

type Message4(t1, t2, t3, t4) {
M4FromSubject1(t1)
M4FromSubject2(t2)
M4FromSubject3(t3)
M4FromSubject4(t4)
M4Timeout
}

/// Wait for the values computed by multiple tasks.
///
/// For each task, if the a value is not received before the timeout has
/// elapsed then an error is returned.
///
pub fn try_await4(
task1: Task(t1),
task2: Task(t2),
task3: Task(t3),
task4: Task(t4),
timeout: Int,
) -> #(
Result(t1, AwaitError),
Result(t2, AwaitError),
Result(t3, AwaitError),
Result(t4, AwaitError),
) {
assert_owner(task1)
assert_owner(task2)
assert_owner(task3)

let timeout_subject = process.new_subject()
let timer = process.send_after(timeout_subject, timeout, M4Timeout)

process.new_selector()
|> process.selecting(task1.subject, M4FromSubject1)
|> process.selecting(task2.subject, M4FromSubject2)
|> process.selecting(task3.subject, M4FromSubject3)
|> process.selecting(task4.subject, M4FromSubject4)
|> process.selecting(timeout_subject, function.identity)
|> try_await4_loop(None, None, None, None, timer)
}

fn try_await4_loop(
selector: Selector(Message4(t1, t2, t3, t4)),
t1: Option(Result(t1, AwaitError)),
t2: Option(Result(t2, AwaitError)),
t3: Option(Result(t3, AwaitError)),
t4: Option(Result(t4, AwaitError)),
timeout: process.Timer,
) -> #(
Result(t1, AwaitError),
Result(t2, AwaitError),
Result(t3, AwaitError),
Result(t4, AwaitError),
) {
case t1, t2, t3, t4 {
Some(t1), Some(t2), Some(t3), Some(t4) -> #(t1, t2, t3, t4)

_, _, _, _ -> {
case process.select_forever(selector) {
// The task process has sent back a value
M4FromSubject1(x) -> {
let t1 = Some(Ok(x))
try_await4_loop(selector, t1, t2, t3, t4, timeout)
}
M4FromSubject2(x) -> {
let t2 = Some(Ok(x))
try_await4_loop(selector, t1, t2, t3, t4, timeout)
}
M4FromSubject3(x) -> {
let t3 = Some(Ok(x))
try_await4_loop(selector, t1, t2, t3, t4, timeout)
}
M4FromSubject4(x) -> {
let t4 = Some(Ok(x))
try_await4_loop(selector, t1, t2, t3, t4, timeout)
}

M4Timeout -> {
#(
option.unwrap(t1, Error(Timeout)),
option.unwrap(t2, Error(Timeout)),
option.unwrap(t3, Error(Timeout)),
option.unwrap(t4, Error(Timeout)),
)
}
}
}
}
}
65 changes: 63 additions & 2 deletions test/gleam/otp/task_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,25 @@ pub fn try_await2_timeout_test() {
|> should.equal(#(Ok(1), Error(Timeout)))
}

pub fn try_await3_test() {
// Start with an empty mailbox
flush()

let work = fn(x) {
fn() {
sleep(5)
x
}
}

let task1 = task.async(work(1))
let task2 = task.async(work(2))
let task3 = task.async(work(3))

task.try_await3(task1, task2, task3, 8)
|> should.equal(#(Ok(1), Ok(2), Ok(3)))
}

pub fn try_await3_timeout_test() {
// Start with an empty mailbox
flush()
Expand All @@ -152,7 +171,49 @@ pub fn try_await3_timeout_test() {
// 1 will not finish in time
let task1 = task.async(work(1, 100))
let task2 = task.async(work(2, 1))
let task3 = task.async(work(3, 1))

task.try_await3(task1, task2, task3, 20)
|> should.equal(#(Error(Timeout), Ok(2), Ok(3)))
}

pub fn try_await4_test() {
// Start with an empty mailbox
flush()

let work = fn(x) {
fn() {
sleep(5)
x
}
}

let task1 = task.async(work(1))
let task2 = task.async(work(2))
let task3 = task.async(work(3))
let task4 = task.async(work(4))

task.try_await4(task1, task2, task3, task4, 8)
|> should.equal(#(Ok(1), Ok(2), Ok(3), Ok(4)))
}

pub fn try_await4_timeout_test() {
// Start with an empty mailbox
flush()

let work = fn(x, y) {
fn() {
sleep(y)
x
}
}

// 1 will not finish in time
let task1 = task.async(work(1, 100))
let task2 = task.async(work(2, 1))
let task3 = task.async(work(3, 1))
let task4 = task.async(work(4, 1))

task.try_await2(task1, task2, 20)
|> should.equal(#(Error(Timeout), Ok(2)))
task.try_await4(task1, task2, task3, task4, 20)
|> should.equal(#(Error(Timeout), Ok(2), Ok(3), Ok(4)))
}

0 comments on commit 5aef7de

Please sign in to comment.