Skip to content

Commit

Permalink
feat: respect Task.map/bind (sync := true) after waiting (#6976)
Browse files Browse the repository at this point in the history
This PR extends the behavior of the `sync` flag for `Task.map/bind` etc.
to encompass synchronous execution even when they first have to wait on
completion of the first task, drastically lowering the overhead of such
tasks. Thus the flag is now equivalent to e.g. .NET's
`TaskContinuationOptions.ExecuteSynchronously`.
  • Loading branch information
Kha authored Feb 7, 2025
1 parent af385d7 commit ac97080
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
8 changes: 6 additions & 2 deletions src/Init/Core.lean
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,9 @@ set_option linter.unusedVariables.funArgs false in
be available and then calls `f` on the result.
`prio`, if provided, is the priority of the task.
If `sync` is set to true, `f` is executed on the current thread if `x` has already finished.
If `sync` is set to true, `f` is executed on the current thread if `x` has already finished and
otherwise on the thread that `x` finished on. `prio` is ignored in this case. This should only be
done when executing `f` is cheap and non-blocking.
-/
@[noinline, extern "lean_task_map"]
protected def map (f : α → β) (x : Task α) (prio := Priority.default) (sync := false) : Task β :=
Expand All @@ -607,7 +609,9 @@ for the value of `x` to be available and then calls `f` on the result,
resulting in a new task which is then run for a result.
`prio`, if provided, is the priority of the task.
If `sync` is set to true, `f` is executed on the current thread if `x` has already finished.
If `sync` is set to true, `f` is executed on the current thread if `x` has already finished and
otherwise on the thread that `x` finished on. `prio` is ignored in this case. This should only be
done when executing `f` is cheap and non-blocking.
-/
@[noinline, extern "lean_task_bind"]
protected def bind (x : Task α) (f : α → Task β) (prio := Priority.default) (sync := false) :
Expand Down
30 changes: 17 additions & 13 deletions src/runtime/object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ extern "C" LEAN_EXPORT __attribute__((weak)) void free_sized(void *ptr, size_t)

// see `Task.Priority.max`
#define LEAN_MAX_PRIO 8
#define LEAN_SYNC_PRIO std::numeric_limits<unsigned>::max()

namespace lean {

Expand Down Expand Up @@ -760,7 +761,7 @@ class task_manager {
lock.lock();
} else if (v != nullptr) {
lean_assert(t->m_imp->m_closure == nullptr);
resolve_core(t, v);
resolve_core(lock, t, v);
} else {
// `bind` task has not finished yet, re-add as dependency of nested task
// NOTE: closure MUST be extracted before unlocking the mutex as otherwise
Expand All @@ -773,27 +774,30 @@ class task_manager {
}
}

void resolve_core(lean_task_object * t, object * v) {
handle_finished(t);
void resolve_core(unique_lock<mutex> & lock, lean_task_object * t, object * v) {
mark_mt(v);
t->m_value = v;
/* After the task has been finished and we propagated
dependencies, we can release `m_imp` and keep just the value */
free_task_imp(t->m_imp);
lean_task_imp * imp = t->m_imp;
t->m_imp = nullptr;
handle_finished(lock, t, imp);
/* After the task has been finished and we propagated
dependencies, we can release `imp` and keep just the value */
free_task_imp(imp);
m_task_finished_cv.notify_all();
}

void handle_finished(lean_task_object * t) {
lean_task_object * it = t->m_imp->m_head_dep;
t->m_imp->m_head_dep = nullptr;
void handle_finished(unique_lock<mutex> & lock, lean_task_object * t, lean_task_imp * imp) {
lean_task_object * it = imp->m_head_dep;
imp->m_head_dep = nullptr;
while (it) {
if (t->m_imp->m_canceled)
if (imp->m_canceled)
it->m_imp->m_canceled = true;
lean_task_object * next_it = it->m_imp->m_next_dep;
it->m_imp->m_next_dep = nullptr;
if (it->m_imp->m_deleted) {
free_task(it);
} else if (it->m_imp->m_prio == LEAN_SYNC_PRIO) {
run_task(lock, it);
} else {
enqueue_core(it);
}
Expand Down Expand Up @@ -844,7 +848,7 @@ class task_manager {
dec(v);
return;
}
resolve_core(t, v);
resolve_core(lock, t, v);
}

void add_dep(lean_task_object * t1, lean_task_object * t2) {
Expand Down Expand Up @@ -1031,7 +1035,7 @@ extern "C" LEAN_EXPORT obj_res lean_task_map_core(obj_arg f, obj_arg t, unsigned
if (!g_task_manager || (sync && lean_to_task(t)->m_value)) {
return lean_task_pure(apply_1(f, lean_task_get_own(t)));
} else {
lean_task_object * new_task = alloc_task(mk_closure_3_2(task_map_fn, f, t), prio, keep_alive);
lean_task_object * new_task = alloc_task(mk_closure_3_2(task_map_fn, f, t), sync ? LEAN_SYNC_PRIO : prio, keep_alive);
g_task_manager->add_dep(lean_to_task(t), new_task);
return (lean_object*)new_task;
}
Expand Down Expand Up @@ -1074,7 +1078,7 @@ extern "C" LEAN_EXPORT obj_res lean_task_bind_core(obj_arg x, obj_arg f, unsigne
if (!g_task_manager || (sync && lean_to_task(x)->m_value)) {
return apply_1(f, lean_task_get_own(x));
} else {
lean_task_object * new_task = alloc_task(mk_closure_3_2(task_bind_fn1, x, f), prio, keep_alive);
lean_task_object * new_task = alloc_task(mk_closure_3_2(task_bind_fn1, x, f), sync ? LEAN_SYNC_PRIO : prio, keep_alive);
g_task_manager->add_dep(lean_to_task(x), new_task);
return (lean_object*)new_task;
}
Expand Down

0 comments on commit ac97080

Please sign in to comment.