From 8ed4cb67b92817aad42e0a9d50cf796e28e74c5c Mon Sep 17 00:00:00 2001 From: harrand Date: Wed, 1 Nov 2023 00:13:59 +0000 Subject: [PATCH] [job] added new job system backend. not sure whether its an upgrade quite yet? blocking should no longer randomly take 16ms thanks to thread sleeps. however, there is a massive amount of lock contention over the done_list within the implementation, as all the workers constantly fight over it when they finish a job at the same time. --- CMakeLists.txt | 2 + .../job/impl/concurrentqueue_blocking/job.cpp | 244 ++++++++++++++++++ .../job/impl/concurrentqueue_blocking/job.hpp | 61 +++++ src/tz/core/job/job.hpp | 3 +- src/tz/ren/animation2.cpp | 3 +- test/core/job_test.cpp | 4 +- 6 files changed, 312 insertions(+), 5 deletions(-) create mode 100644 src/tz/core/job/impl/concurrentqueue_blocking/job.cpp create mode 100644 src/tz/core/job/impl/concurrentqueue_blocking/job.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 255aa1d4df..1f413b68cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -48,6 +48,8 @@ add_library(topaz STATIC src/tz/core/job/job.cpp src/tz/core/job/job.hpp src/tz/core/job/api/job.hpp + src/tz/core/job/impl/concurrentqueue_blocking/job.cpp + src/tz/core/job/impl/concurrentqueue_blocking/job.hpp src/tz/core/job/impl/threadpool_lfq/job.cpp src/tz/core/job/impl/threadpool_lfq/job.hpp src/tz/core/memory/allocators/adapter.hpp diff --git a/src/tz/core/job/impl/concurrentqueue_blocking/job.cpp b/src/tz/core/job/impl/concurrentqueue_blocking/job.cpp new file mode 100644 index 0000000000..e4c4187084 --- /dev/null +++ b/src/tz/core/job/impl/concurrentqueue_blocking/job.cpp @@ -0,0 +1,244 @@ +#include "tz/core/job/impl/concurrentqueue_blocking/job.hpp" +#include "tz/core/debug.hpp" +#include "tz/core/profile.hpp" +#include +#include + +#include +#include + +namespace tz::impl +{ + job_system_blockingcurrentqueue::job_system_blockingcurrentqueue() + { + TZ_PROFZONE("job_system - initialise", 0xFFAA0000); + this->done_job_ids.reserve(1024); + for(std::size_t i = 0; i < std::thread::hardware_concurrency(); i++) + { + auto& worker = this->thread_pool.emplace_back(); + worker.thread = std::thread([this, i](){this->worker_thread_entrypoint(i);}); + worker.local_tid = i; + } + } + +//-------------------------------------------------------------------------------------------------- + + job_system_blockingcurrentqueue::~job_system_blockingcurrentqueue() + { + TZ_PROFZONE("job_system - terminate", 0xFFAA0000); + this->close_requested.store(true); + for(worker_t& worker : this->thread_pool) + { + worker.thread.join(); + } + tz::assert(!this->any_work_remaining()); + } + +//-------------------------------------------------------------------------------------------------- + + job_handle job_system_blockingcurrentqueue::execute(job_t job, execution_info einfo) + { + TZ_PROFZONE("job_system - execute", 0xFFAA0000); + job_handle ret + { + .handle = static_cast(this->lifetime_jobs_created.load()), .owned = false + }; + + job_info_t jinfo + { + .func = job, + .job_id = this->lifetime_jobs_created.load(), + .maybe_affinity = einfo.maybe_worker_affinity + }; + this->lifetime_jobs_created.store(this->lifetime_jobs_created.load() + 1); + this->jobs_created_this_frame.store(this->jobs_created_this_frame.load() + 1); + if(einfo.maybe_worker_affinity.has_value()) + { + // add to list of affine jobs instead. + auto val = einfo.maybe_worker_affinity.value(); + tz::assert(this->thread_pool[val].local_tid == val); + this->thread_pool[val].affine_jobs.enqueue(jinfo); + // ???dogshit? + ret.owned = true; + } + else + { + this->global_job_queue.enqueue(jinfo); + } + //std::osyncstream(std::cout) << "added new job " << jinfo.job_id << "\n"; + return ret; + } + +//-------------------------------------------------------------------------------------------------- + + void job_system_blockingcurrentqueue::block(job_handle j) const + { + TZ_PROFZONE("job_system - block", 0xFFAA0000); + using namespace std::chrono_literals; + // while we dont think the job is done. + //std::osyncstream(std::cout) << "blocking on job " << static_cast(static_cast(j.handle)) << "\n"; + while(!this->complete(j)) + { + // sleep time until any job is done. + std::unique_lock lock(this->wake_me_on_a_job_done_mutex); + this->wake_me_on_a_job_done.wait_for(lock, 1ms); + } + //std::osyncstream(std::cout) << "block on job " << static_cast(static_cast(j.handle)) << " complete!\n"; + } + +//-------------------------------------------------------------------------------------------------- + + bool job_system_blockingcurrentqueue::complete(job_handle j) const + { + TZ_PROFZONE("job_system - is complete", 0xFFAA0000); + // convert handle value into size_t (its actually the job id) + auto hanval = static_cast(static_cast(j.handle)); + // lock the done job list mutex. + std::unique_lock lock(this->done_job_list_mutex); + // it's done if the job id is on that list. + return std::find(this->done_job_ids.begin(), this->done_job_ids.end(), hanval) != this->done_job_ids.end(); + } + +//-------------------------------------------------------------------------------------------------- + + bool job_system_blockingcurrentqueue::any_work_remaining() const + { + TZ_PROFZONE("job_system - any work remaining", 0xFFAA0000); + if(this->lifetime_jobs_created.load() == 0) + { + return false; + } + bool all_complete = true; + std::unique_lock lock(this->done_job_list_mutex); + for(std::size_t i = 0; i < this->lifetime_jobs_created.load(); i++) + { + all_complete &= std::find(this->done_job_ids.begin(), this->done_job_ids.end(), i) != this->done_job_ids.end(); + } + // worker_t stores the currently running job id. + // so we could just check if all of them have no running job. + // but that doesnt mean work isnt pending. + // also global_job_queue.size_approx() == 0 is not necessarily accurate. + // if we do both it might be kinda reasonable? not accurate still tho. + bool any_workers_busy = false; + for(const worker_t& worker : this->thread_pool) + { + if(worker.get_running_job().has_value()) + { + any_workers_busy = true; + } + } + auto approx_job_count = this->global_job_queue.size_approx(); + return approx_job_count > 0 || any_workers_busy || !all_complete; + } + +//-------------------------------------------------------------------------------------------------- + + void job_system_blockingcurrentqueue::block_all() const + { + TZ_PROFZONE("job_system - block all", 0xFFAA0000); + using namespace std::chrono_literals; + //std::osyncstream(std::cout) << "waiting for all remaining jobs to complete...\n"; + // VERY similar to block, but for any_work_remaining instead of complete(j) + while(this->any_work_remaining()) + { + // sleep time until any job is done. + std::unique_lock lock(this->wake_me_on_a_job_done_mutex); + this->wake_me_on_a_job_done.wait_for(lock, 1ms); + } + //std::osyncstream(std::cout) << "no jobs still running!...\n"; + } + +//-------------------------------------------------------------------------------------------------- + + void job_system_blockingcurrentqueue::new_frame() + { + this->jobs_created_this_frame.store(0); + } + +//-------------------------------------------------------------------------------------------------- + + std::size_t job_system_blockingcurrentqueue::size() const + { + return this->global_job_queue.size_approx(); + } + +//-------------------------------------------------------------------------------------------------- + + std::size_t job_system_blockingcurrentqueue::worker_count() const + { + return this->thread_pool.size(); + } + +//-------------------------------------------------------------------------------------------------- + + std::vector job_system_blockingcurrentqueue::get_worker_ids() const + { + std::vector ret; + ret.reserve(this->worker_count()); + for(const auto& worker : this->thread_pool) + { + ret.push_back(worker.local_tid); + } + return ret; + } + +//-------------------------------------------------------------------------------------------------- + + unsigned int job_system_blockingcurrentqueue::jobs_started_this_frame() const + { + return this->jobs_created_this_frame.load(); + } + +//-------------------------------------------------------------------------------------------------- + + std::optional job_system_blockingcurrentqueue::worker_t::get_running_job() const + { + std::size_t val = this->currently_running_job_id.load(); + if(val == std::numeric_limits::max()) + { + return std::nullopt; + } + return val; + } + +//-------------------------------------------------------------------------------------------------- + + void job_system_blockingcurrentqueue::worker_thread_entrypoint(std::size_t local_tid) + { + std::string thread_name = "Topaz Job Thread " + std::to_string(local_tid); + TZ_THREAD(thread_name.c_str()); + worker_t& worker = this->thread_pool[local_tid]; + constexpr std::int64_t queue_wait_timer_micros = 1000; // 1 millis + while(!this->close_requested.load()) + { + job_info_t job; + + // lets try to retrieve an affine job, if thats empty then get a job from the global queue. + // this `if statement` could not happen if we hit the timeout without getting a job. + // in which case we simply recurse. + if(worker.affine_jobs.try_dequeue(job) || this->global_job_queue.wait_dequeue_timed(job, queue_wait_timer_micros)) + { + TZ_PROFZONE("job worker - do collected job", 0xFFAA0000); + // we have a job to do + worker.currently_running_job_id = job.job_id; + //std::osyncstream(std::cout) << "[" << worker.local_tid << "] - running new job " << job.job_id << "\n"; + // do it + { + TZ_PROFZONE("job worker - run job function", 0xFFAA0000); + job.func(); + } + + TZ_PROFZONE("job worker - mark job completed", 0xFFAA0000); + // put it in the done list. + std::unique_lock lock(this->done_job_list_mutex); + this->done_job_ids.push_back(worker.currently_running_job_id); + // wakey wakey on people waiting on a job to be done. + //std::osyncstream(std::cout) << "[" << worker.local_tid << "] - finished job " << job.job_id << "\n"; + this->wake_me_on_a_job_done.notify_all(); + // and now we have no job again. recurse. + worker.currently_running_job_id = std::numeric_limits::max(); + } + } + } + +} \ No newline at end of file diff --git a/src/tz/core/job/impl/concurrentqueue_blocking/job.hpp b/src/tz/core/job/impl/concurrentqueue_blocking/job.hpp new file mode 100644 index 0000000000..37c1d463d4 --- /dev/null +++ b/src/tz/core/job/impl/concurrentqueue_blocking/job.hpp @@ -0,0 +1,61 @@ +#ifndef TZ_JOB_IMPL_CONCURRENTQUEUE_BLOCKING_JOB_HPP +#define TZ_JOB_IMPL_CONCURRENTQUEUE_BLOCKING_JOB_HPP +#include "tz/core/job/api/job.hpp" +#include "blockingconcurrentqueue.h" +#include +#include +#include + +namespace tz::impl +{ + class job_system_blockingcurrentqueue : public i_job_system + { + public: + job_system_blockingcurrentqueue(); + ~job_system_blockingcurrentqueue(); + + virtual job_handle execute(job_t job, execution_info einfo = {}) override; + virtual void block(job_handle j) const override; + virtual bool complete(job_handle j) const override; + virtual bool any_work_remaining() const override; + virtual void block_all() const override; + void new_frame(); // ??? + virtual std::size_t size() const override; + virtual std::size_t worker_count() const override; + virtual std::vector get_worker_ids() const override; + unsigned int jobs_started_this_frame() const; + private: + struct job_info_t + { + job_t func; + std::size_t job_id; + std::optional maybe_affinity = std::nullopt; + }; + + struct worker_t + { + std::thread thread; + std::size_t local_tid; + std::atomic currently_running_job_id = std::numeric_limits::max(); + moodycamel::ConcurrentQueue affine_jobs; + std::vector completed_job_cache; + std::optional get_running_job() const; + void clear_job_cache(job_system_blockingcurrentqueue& js); + }; + friend struct worker_t; + + void worker_thread_entrypoint(std::size_t local_tid); + + std::deque thread_pool; + moodycamel::BlockingConcurrentQueue global_job_queue; + mutable std::mutex done_job_list_mutex; + std::vector done_job_ids = {}; + mutable std::mutex wake_me_on_a_job_done_mutex; + mutable std::condition_variable wake_me_on_a_job_done; + std::atomic lifetime_jobs_created = 0u; + std::atomic jobs_created_this_frame = 0u; + std::atomic close_requested = false; + }; +} + +#endif // TZ_JOB_IMPL_CONCURRENTQUEUE_BLOCKING_JOB_HPP \ No newline at end of file diff --git a/src/tz/core/job/job.hpp b/src/tz/core/job/job.hpp index ef766cbc56..4b004c0c91 100644 --- a/src/tz/core/job/job.hpp +++ b/src/tz/core/job/job.hpp @@ -3,6 +3,7 @@ // TODO: Configurable #include "tz/core/job/impl/threadpool_lfq/job.hpp" +#include "tz/core/job/impl/concurrentqueue_blocking/job.hpp" #undef assert namespace tz @@ -11,7 +12,7 @@ namespace tz * @ingroup tz_core_job * Underlying job system. See @ref tz::job_system_type for API. */ - using job_system_t = tz::impl::job_system_threadpool_lfq; + using job_system_t = tz::impl::job_system_blockingcurrentqueue; namespace detail { diff --git a/src/tz/ren/animation2.cpp b/src/tz/ren/animation2.cpp index 8d35e98fff..6dea1d90c5 100644 --- a/src/tz/ren/animation2.cpp +++ b/src/tz/ren/animation2.cpp @@ -343,13 +343,12 @@ namespace tz::ren TZ_PROFZONE("animation_renderer2 - animation advance", 0xFFE54550); std::size_t job_count = std::thread::hardware_concurrency(); - this->animation_advance_jobs.resize(job_count); std::size_t objects_per_job = this->animated_objects.size() / job_count; std::size_t remainder_objects = this->animated_objects.size() % job_count; tz::assert((objects_per_job * job_count) + remainder_objects == this->animated_objects.size()); - std::vector jobs(job_count); if(objects_per_job > 0) { + this->animation_advance_jobs.resize(job_count); for(std::size_t i = 0; i < job_count; i++) { this->animation_advance_jobs[i] = tz::job_system().execute([this, delta, offset = i * objects_per_job, object_count = objects_per_job] diff --git a/test/core/job_test.cpp b/test/core/job_test.cpp index 1dc19a823d..c969e68d2b 100644 --- a/test/core/job_test.cpp +++ b/test/core/job_test.cpp @@ -6,8 +6,8 @@ #include #include -#define TESTFUNC_BEGIN(n) void n(){tz::assert(!tz::job_system().any_work_remaining()); -#define TESTFUNC_END ;tz::job_system().block_all(); tz::assert(!tz::job_system().any_work_remaining());} +#define TESTFUNC_BEGIN(n) void n(){tz::assert(!tz::job_system().any_work_remaining(), "Old jobs still running when entering new test function"); +#define TESTFUNC_END ;tz::job_system().block_all(); tz::assert(!tz::job_system().any_work_remaining(), "Jobs still running after blocking at the end of test function");} TESTFUNC_BEGIN(basic_job) TZ_PROFZONE("basic job", 0xFF00AA00);