From ec3760e3f572c9c5c7a55169d4d6d40904fc0bd3 Mon Sep 17 00:00:00 2001 From: Subhankar Pal Date: Sun, 4 Oct 2020 20:36:14 -0400 Subject: [PATCH] Implement sleep-wake primitive --- emu/inc/util.h | 9 ++- emu/src/util.cpp | 83 +++++++++++++++++++--------- example/app/src/serial_factorial.cpp | 4 +- example/app/src/workq_mutex.cpp | 26 ++++----- example/model/params.h | 1 + example/sim/inc/util.h | 4 +- 6 files changed, 81 insertions(+), 46 deletions(-) diff --git a/emu/inc/util.h b/emu/inc/util.h index 83a90fa..748bf7d 100644 --- a/emu/inc/util.h +++ b/emu/inc/util.h @@ -60,9 +60,10 @@ class q_intfc_t { }; void __register_core_id(unsigned); +void __init(unsigned long num_pe, unsigned long wq_depth); +void __teardown(void); + // work queue interface -void __init_queues(long unsigned int depth); -void __teardown_queues(void); void __push(unsigned int, uint64_t data); void __push_mmap(unsigned int, uint64_t data); uint64_t __pop(unsigned int); @@ -77,6 +78,10 @@ void __mutex_init(pthread_mutex_t *lock); void __mutex_lock(pthread_mutex_t *lock); void __mutex_unlock(pthread_mutex_t *lock); +// sleep-wake primitives +void __sleep(); +void __wake(unsigned pe_id); + // emulation lib treats these as no-ops void __reset_stats(); void __dump_stats(); diff --git a/emu/src/util.cpp b/emu/src/util.cpp index 63c620d..28df0f5 100644 --- a/emu/src/util.cpp +++ b/emu/src/util.cpp @@ -35,14 +35,27 @@ std::mutex m; std::map tid_to_core_id_map; // STL map of PE/core ID to pointer to queue intfc object std::map, q_intfc_t *> queues; +// Vector of pointers to per-PE mutex and cv objects +std::vector cvs; +std::vector mutexes; +// Vector of sleep_cntr to implement sleep/wake +std::vector sleep_cntr; // record own user-specified PE/core ID void __register_core_id(unsigned core_id) { m.lock(); tid_to_core_id_map.insert(std::make_pair(std::this_thread::get_id(), core_id)); m.unlock(); } -// work queue interface -void __init_queues(long unsigned int depth) { +void __init(unsigned long num_pe, unsigned long wq_depth) { + // create one cv and mutex object per PE + for (unsigned i = 0; i < num_pe; ++i) { + sleep_cntr.push_back(0); + cvs.push_back(new std::condition_variable); + mutexes.push_back(new std::mutex); + } + + // init queues + unsigned depth = wq_depth; // queues.at(std::make_pair(source, sink))(new q_intfc_t(depth)); // begin generated code for init_queues queues.insert(std::make_pair(std::make_pair(0, 1), new q_intfc_t(depth))); @@ -55,26 +68,28 @@ void __init_queues(long unsigned int depth) { queues.insert(std::make_pair(std::make_pair(0, 8), new q_intfc_t(depth))); // end generated code for init_queues } -void __teardown_queues(void) { - for (auto &e : queues) { - delete e.second; - } +void __teardown(void) { + // teardown per PE cvs and mutexes + for (auto &e : cvs) { delete e; } + cvs.clear(); + for (auto &e : mutexes) { delete e; } + mutexes.clear(); + + // teardown queues + for (auto &e : queues) { delete e.second; } queues.clear(); + + // teardown tid_to_core_id_map + tid_to_core_id_map.clear(); } +// work queue interface void __push(unsigned sink, uint64_t data) { __push_mmap(sink, data); } void __push_mmap(unsigned sink, uint64_t data) { unsigned source; q_intfc_t *queue; - try { - source = tid_to_core_id_map.at(std::this_thread::get_id()); - } catch (const std::out_of_range &e) { - printf("%s(): exception occured \"%s\", aborting. Perhaps your threads didn't call " - "register_core_id()?\n", - __FUNCTION__, e.what()); - exit(1); - } + source = get_id(); try { queue = queues.at(std::make_pair(source, sink)); } catch (const std::out_of_range &e) { @@ -104,14 +119,7 @@ uint64_t __pop(unsigned source) { uint64_t __pop_mmap(unsigned source) { unsigned sink; q_intfc_t *queue; - try { - sink = tid_to_core_id_map.at(std::this_thread::get_id()); - } catch (const std::out_of_range &e) { - printf("%s(): exception occured \"%s\", aborting. Perhaps your threads didn't call " - "register_core_id()?\n", - __FUNCTION__, e.what()); - exit(1); - } + sink = get_id(); try { queue = queues.at(std::make_pair(source, sink)); } catch (const std::out_of_range &e) { @@ -149,6 +157,27 @@ void __mutex_init(pthread_mutex_t *lock) { pthread_mutex_init(lock, NULL); } void __mutex_lock(pthread_mutex_t *lock) { pthread_mutex_lock(lock); } void __mutex_unlock(pthread_mutex_t *lock) { pthread_mutex_unlock(lock); } +// sleep-wake +void __sleep() { + // get self PE-ID + unsigned pe_id = get_id(); + + // increment sleep_cntr for self and wait on CV + // if it is <= 0 it means that someone has already called wake on it + std::mutex *m = mutexes.at(pe_id); + std::unique_lock lock(*m); + ++sleep_cntr.at(pe_id); + while(sleep_cntr.at(pe_id) > 0) { + cvs.at(pe_id)->wait(lock); + } +} +void __wake(unsigned pe_id) { + // set sleep_cntr to false for target PE and notify + std::unique_lock lock(*mutexes.at(pe_id)); + --sleep_cntr.at(pe_id); + cvs.at(pe_id)->notify_one(); +} + // m5 op wrapper primitives void __reset_stats() {} void __dump_stats() {} @@ -161,12 +190,12 @@ int get_id() { int id; try { - id = tid_to_core_id_map.at(std::this_thread::get_id()); + id = tid_to_core_id_map.at(std::this_thread::get_id()); } catch (const std::out_of_range &e) { - printf("%s(): exception occured \"%s\", aborting. Perhaps your threads didn't call " - "register_core_id()?\n", - __FUNCTION__, e.what()); - exit(1); + printf("%s(): exception occured \"%s\", aborting. Perhaps your threads didn't call " + "register_core_id()?\n", + __FUNCTION__, e.what()); + exit(1); } m.unlock(); diff --git a/example/app/src/serial_factorial.cpp b/example/app/src/serial_factorial.cpp index 1de61ca..47817c3 100644 --- a/example/app/src/serial_factorial.cpp +++ b/example/app/src/serial_factorial.cpp @@ -47,7 +47,7 @@ int main() { printf("Mgr starting up.\n"); - __init_queues(WQ_DEPTH); + __init(NUM_PE, WQ_DEPTH); __register_core_id(0); #if defined(MANUAL_TRACING) || defined(AUTO_TRACING) __open_trace_log(0); @@ -91,7 +91,7 @@ int main() { #ifdef EMULATION munmap(dspm, SPM_SIZE_BYTES); #endif // EMULATION - __teardown_queues(); + __teardown(); #if defined(MANUAL_TRACING) || defined(AUTO_TRACING) __close_trace_log(0); #endif // MANUAL_TRACING || AUTO_TRACING diff --git a/example/app/src/workq_mutex.cpp b/example/app/src/workq_mutex.cpp index c57c1a5..5512b5a 100644 --- a/example/app/src/workq_mutex.cpp +++ b/example/app/src/workq_mutex.cpp @@ -89,16 +89,18 @@ void *func(void *args) { #endif // AUTO_TRACING || MANUAL_TRACING - // in EMU mode: use pthread barrier located in DSPM - // in TRE-SIM mode: use SLEEP-WAKE: wake up wrkr 2, that wakes up wrkr 3, and so on + // use SLEEP-WAKE: wake up wrkr 2, that wakes up wrkr 3, and so on pthread_barrier_t *gbar = my_args->gbar; #ifdef MANUAL_TRACING _C_wrkr__sleep(); #endif // MANUAL_TRACING - __barrier_wait(gbar); -#ifdef MANUAL_TRACING - if (tid != NUM_WORKER) _C_wrkr__wake(tid + 1); -#endif // MANUAL_TRACING + __sleep(); + if (tid != NUM_WORKER) { + #ifdef MANUAL_TRACING + _C_wrkr__wake(tid + 1); + #endif // MANUAL_TRACING + __wake(tid + 1); + } // retrieve variables from work queue #ifdef MANUAL_TRACING @@ -153,7 +155,7 @@ void *func(void *args) { int main(int argc, const char *argv[]) { printf("== Running Test with %u Workers ==\n", NUM_WORKER); - __init_queues(WQ_DEPTH); + __init(NUM_PE, WQ_DEPTH); __register_core_id(0); #if defined(AUTO_TRACING) || defined(MANUAL_TRACING) __open_trace_log(0); @@ -211,16 +213,14 @@ int main(int argc, const char *argv[]) { if (pthread_create(threads + tid, NULL, func, &t_args[tid]) != 0) { break; } - // printf("Spawned %u threads.\n", n_worker_threads); } + // printf("Spawned %u threads.\n", NUM_WORKER); - // in EMU mode: synchronize using barrier in DSPM - // in TRE-SIM mode: synchronize using SLEEP-WAKE - // after synchronization, push through the work queues + // synchronize using SLEEP-WAKE; after synchronization, push through the work queues #ifdef MANUAL_TRACING _C_mgr__wake(1); // wake up wrkr 1, that wakes up wrkr 2, and so on #endif // MANUAL_TRACING - __barrier_wait(global_bar); + __wake(1); for (int tid = 0; tid < NUM_WORKER; ++tid) { // communicate nsteps, pointer to the shared variable, and pointer to the mutex object @@ -294,7 +294,7 @@ int main(int argc, const char *argv[]) { #endif // EMULATION delete[] threads; delete[] t_args; - __teardown_queues(); + __teardown(); #if defined(AUTO_TRACING) || defined(MANUAL_TRACING) __close_trace_log(0); diff --git a/example/model/params.h b/example/model/params.h index 51b69ca..0df36e9 100644 --- a/example/model/params.h +++ b/example/model/params.h @@ -32,6 +32,7 @@ * @author Subhankar Pal */ #define NUM_WORKER 8 +#define NUM_PE (NUM_WORKER + 1) #define WQ_DEPTH 4 #define MAX_OUTSTANDING_REQS 1 #define CLOCK_SPEED_GHZ 1 diff --git a/example/sim/inc/util.h b/example/sim/inc/util.h index 7dc5422..823746e 100644 --- a/example/sim/inc/util.h +++ b/example/sim/inc/util.h @@ -26,13 +26,13 @@ extern "C" #endif // __cplusplus __INLINE void -__init_queues(unsigned depth) { } +__init(unsigned long num_pe, unsigned long wq_depth) { } extern #ifdef __cplusplus "C" #endif // __cplusplus __INLINE void -__teardown_queues(void) { } +__teardown(void) { } /** * @brief Pushes an item to the back of the work queue indexed by PE ID. * @param pe_id ID of the PE that this work queue is connected to.