diff --git a/include/tz/core/job.hpp b/include/tz/core/job.hpp index b80d444c00..fe189ce2b3 100644 --- a/include/tz/core/job.hpp +++ b/include/tz/core/job.hpp @@ -16,7 +16,6 @@ namespace tz job_handle job_execute(job_function fn); job_handle job_execute_on(job_function fn, job_worker worker); void job_wait(job_handle job); - void job_wait_all(); bool job_complete(job_handle job); std::size_t job_count(); std::size_t job_worker_count(); diff --git a/src/tz/core/job.cpp b/src/tz/core/job.cpp index ecffa7d198..502a27dc1c 100644 --- a/src/tz/core/job.cpp +++ b/src/tz/core/job.cpp @@ -36,40 +36,50 @@ namespace tz std::mutex wake_mutex; std::condition_variable wake_condition; + job_handle impl_execute_job(job_data job); + void impl_wait(); + // state end, api begin job_handle job_execute(job_function fn) { - (void)fn; - return tz::nullhand; + return impl_execute_job + ({ + .fn = fn, + .job_id = lifetime_count.load(), + .affinity = std::nullopt + }); } job_handle job_execute_on(job_function fn, job_worker worker) { - (void)fn; - (void)worker; - return tz::nullhand; + return impl_execute_job + ({ + .fn = fn, + .job_id = lifetime_count.load(), + .affinity = worker + }); } void job_wait(job_handle job) { - (void)job; - } - - void job_wait_all() - { - + while(!job_complete(job)) + { + impl_wait(); + } } bool job_complete(job_handle job) { - (void)job; - return false; + std::unique_lock lock(waiting_job_id_mutex); + bool job_isnt_waiting = std::find(waiting_job_ids.begin(), waiting_job_ids.end(), job.peek()) == waiting_job_ids.end(); + bool no_workers_doing_it = std::all_of(workers.begin(), workers.end(), [jid = job.peek()](const worker_data& worker){return worker.current_job != jid;}); + return job_isnt_waiting && no_workers_doing_it; } std::size_t job_count() { - return 0; + return jobs.size_approx(); } std::size_t job_worker_count() @@ -115,12 +125,36 @@ namespace tz std::unique_lock lock(waiting_job_id_mutex); auto iter = std::find(waiting_job_ids.begin(), waiting_job_ids.end(), job.job_id); tz_assert(iter != waiting_job_ids.end(), "job system thread took a job, but its id is not on the list of waiting jobs???"); + waiting_job_ids.erase(iter); } job.fn(); me.current_job = job_id_null; } } + job_handle impl_execute_job(job_data job) + { + if(job.affinity.has_value()) + { + auto val = job.affinity.value(); + auto& q = workers[val].affine_jobs; + q.enqueue(job); + wake_condition.notify_all(); + } + else + { + { + std::unique_lock lock(waiting_job_id_mutex); + waiting_job_ids.push_back(lifetime_count); + } + jobs.enqueue(job); + wake_condition.notify_one(); + lifetime_count++; + } + return static_cast(job.job_id); + } + + namespace detail { void job_system_initialise()