Skip to content

Commit

Permalink
[job] implemented jobs API. multithreading awayyy. only done some ver…
Browse files Browse the repository at this point in the history
…y very basic testing, but this is mostly copied from Topaz 4.2 which i was reasonably confident was decently battle-tested
  • Loading branch information
harrand committed Oct 15, 2024
1 parent 691d12e commit 20585fd
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 15 deletions.
1 change: 0 additions & 1 deletion include/tz/core/job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace tz
job_handle job_execute(job_function fn);
job_handle job_execute_on(job_function fn, job_worker worker);
void job_wait(job_handle job);
void job_wait_all();
bool job_complete(job_handle job);
std::size_t job_count();
std::size_t job_worker_count();
Expand Down
62 changes: 48 additions & 14 deletions src/tz/core/job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,40 +36,50 @@ namespace tz
std::mutex wake_mutex;
std::condition_variable wake_condition;

job_handle impl_execute_job(job_data job);
void impl_wait();

// state end, api begin

job_handle job_execute(job_function fn)
{
(void)fn;
return tz::nullhand;
return impl_execute_job
({
.fn = fn,
.job_id = lifetime_count.load(),
.affinity = std::nullopt
});
}

job_handle job_execute_on(job_function fn, job_worker worker)
{
(void)fn;
(void)worker;
return tz::nullhand;
return impl_execute_job
({
.fn = fn,
.job_id = lifetime_count.load(),
.affinity = worker
});
}

void job_wait(job_handle job)
{
(void)job;
}

void job_wait_all()
{

while(!job_complete(job))
{
impl_wait();
}
}

bool job_complete(job_handle job)
{
(void)job;
return false;
std::unique_lock<std::mutex> lock(waiting_job_id_mutex);
bool job_isnt_waiting = std::find(waiting_job_ids.begin(), waiting_job_ids.end(), job.peek()) == waiting_job_ids.end();
bool no_workers_doing_it = std::all_of(workers.begin(), workers.end(), [jid = job.peek()](const worker_data& worker){return worker.current_job != jid;});
return job_isnt_waiting && no_workers_doing_it;
}

std::size_t job_count()
{
return 0;
return jobs.size_approx();
}

std::size_t job_worker_count()
Expand Down Expand Up @@ -115,12 +125,36 @@ namespace tz
std::unique_lock<std::mutex> lock(waiting_job_id_mutex);
auto iter = std::find(waiting_job_ids.begin(), waiting_job_ids.end(), job.job_id);
tz_assert(iter != waiting_job_ids.end(), "job system thread took a job, but its id is not on the list of waiting jobs???");
waiting_job_ids.erase(iter);
}
job.fn();
me.current_job = job_id_null;
}
}

job_handle impl_execute_job(job_data job)
{
if(job.affinity.has_value())
{
auto val = job.affinity.value();
auto& q = workers[val].affine_jobs;
q.enqueue(job);
wake_condition.notify_all();
}
else
{
{
std::unique_lock<std::mutex> lock(waiting_job_id_mutex);
waiting_job_ids.push_back(lifetime_count);
}
jobs.enqueue(job);
wake_condition.notify_one();
lifetime_count++;
}
return static_cast<tz::hanval>(job.job_id);
}


namespace detail
{
void job_system_initialise()
Expand Down

0 comments on commit 20585fd

Please sign in to comment.