Skip to content

Commit

Permalink
Implement sleep-wake primitive
Browse files Browse the repository at this point in the history
  • Loading branch information
subhankarpal committed Oct 5, 2020
1 parent 27bebb2 commit ec3760e
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 46 deletions.
9 changes: 7 additions & 2 deletions emu/inc/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ class q_intfc_t {
};
void __register_core_id(unsigned);

void __init(unsigned long num_pe, unsigned long wq_depth);
void __teardown(void);

// work queue interface
void __init_queues(long unsigned int depth);
void __teardown_queues(void);
void __push(unsigned int, uint64_t data);
void __push_mmap(unsigned int, uint64_t data);
uint64_t __pop(unsigned int);
Expand All @@ -77,6 +78,10 @@ void __mutex_init(pthread_mutex_t *lock);
void __mutex_lock(pthread_mutex_t *lock);
void __mutex_unlock(pthread_mutex_t *lock);

// sleep-wake primitives
void __sleep();
void __wake(unsigned pe_id);

// emulation lib treats these as no-ops
void __reset_stats();
void __dump_stats();
Expand Down
83 changes: 56 additions & 27 deletions emu/src/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,27 @@ std::mutex m;
std::map<std::thread::id, unsigned> tid_to_core_id_map;
// STL map of PE/core ID to pointer to queue intfc object
std::map<std::pair<unsigned, unsigned>, q_intfc_t *> queues;
// Vector of pointers to per-PE mutex and cv objects
std::vector<std::condition_variable *> cvs;
std::vector<std::mutex *> mutexes;
// Vector of sleep_cntr to implement sleep/wake
std::vector<int> sleep_cntr;
// record own user-specified PE/core ID
void __register_core_id(unsigned core_id) {
m.lock();
tid_to_core_id_map.insert(std::make_pair(std::this_thread::get_id(), core_id));
m.unlock();
}
// work queue interface
void __init_queues(long unsigned int depth) {
void __init(unsigned long num_pe, unsigned long wq_depth) {
// create one cv and mutex object per PE
for (unsigned i = 0; i < num_pe; ++i) {
sleep_cntr.push_back(0);
cvs.push_back(new std::condition_variable);
mutexes.push_back(new std::mutex);
}

// init queues
unsigned depth = wq_depth;
// queues.at(std::make_pair(source, sink))(new q_intfc_t(depth));
// begin generated code for init_queues
queues.insert(std::make_pair(std::make_pair(0, 1), new q_intfc_t(depth)));
Expand All @@ -55,26 +68,28 @@ void __init_queues(long unsigned int depth) {
queues.insert(std::make_pair(std::make_pair(0, 8), new q_intfc_t(depth)));
// end generated code for init_queues
}
void __teardown_queues(void) {
for (auto &e : queues) {
delete e.second;
}
void __teardown(void) {
// teardown per PE cvs and mutexes
for (auto &e : cvs) { delete e; }
cvs.clear();
for (auto &e : mutexes) { delete e; }
mutexes.clear();

// teardown queues
for (auto &e : queues) { delete e.second; }
queues.clear();

// teardown tid_to_core_id_map
tid_to_core_id_map.clear();
}
// work queue interface
void __push(unsigned sink, uint64_t data) {
__push_mmap(sink, data);
}
void __push_mmap(unsigned sink, uint64_t data) {
unsigned source;
q_intfc_t *queue;
try {
source = tid_to_core_id_map.at(std::this_thread::get_id());
} catch (const std::out_of_range &e) {
printf("%s(): exception occured \"%s\", aborting. Perhaps your threads didn't call "
"register_core_id()?\n",
__FUNCTION__, e.what());
exit(1);
}
source = get_id();
try {
queue = queues.at(std::make_pair(source, sink));
} catch (const std::out_of_range &e) {
Expand Down Expand Up @@ -104,14 +119,7 @@ uint64_t __pop(unsigned source) {
uint64_t __pop_mmap(unsigned source) {
unsigned sink;
q_intfc_t *queue;
try {
sink = tid_to_core_id_map.at(std::this_thread::get_id());
} catch (const std::out_of_range &e) {
printf("%s(): exception occured \"%s\", aborting. Perhaps your threads didn't call "
"register_core_id()?\n",
__FUNCTION__, e.what());
exit(1);
}
sink = get_id();
try {
queue = queues.at(std::make_pair(source, sink));
} catch (const std::out_of_range &e) {
Expand Down Expand Up @@ -149,6 +157,27 @@ void __mutex_init(pthread_mutex_t *lock) { pthread_mutex_init(lock, NULL); }
void __mutex_lock(pthread_mutex_t *lock) { pthread_mutex_lock(lock); }
void __mutex_unlock(pthread_mutex_t *lock) { pthread_mutex_unlock(lock); }

// sleep-wake
void __sleep() {
// get self PE-ID
unsigned pe_id = get_id();

// increment sleep_cntr for self and wait on CV
// if it is <= 0 it means that someone has already called wake on it
std::mutex *m = mutexes.at(pe_id);
std::unique_lock<std::mutex> lock(*m);
++sleep_cntr.at(pe_id);
while(sleep_cntr.at(pe_id) > 0) {
cvs.at(pe_id)->wait(lock);
}
}
void __wake(unsigned pe_id) {
// set sleep_cntr to false for target PE and notify
std::unique_lock<std::mutex> lock(*mutexes.at(pe_id));
--sleep_cntr.at(pe_id);
cvs.at(pe_id)->notify_one();
}

// m5 op wrapper primitives
void __reset_stats() {}
void __dump_stats() {}
Expand All @@ -161,12 +190,12 @@ int get_id() {
int id;

try {
id = tid_to_core_id_map.at(std::this_thread::get_id());
id = tid_to_core_id_map.at(std::this_thread::get_id());
} catch (const std::out_of_range &e) {
printf("%s(): exception occured \"%s\", aborting. Perhaps your threads didn't call "
"register_core_id()?\n",
__FUNCTION__, e.what());
exit(1);
printf("%s(): exception occured \"%s\", aborting. Perhaps your threads didn't call "
"register_core_id()?\n",
__FUNCTION__, e.what());
exit(1);
}

m.unlock();
Expand Down
4 changes: 2 additions & 2 deletions example/app/src/serial_factorial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

int main() {
printf("Mgr starting up.\n");
__init_queues(WQ_DEPTH);
__init(NUM_PE, WQ_DEPTH);
__register_core_id(0);
#if defined(MANUAL_TRACING) || defined(AUTO_TRACING)
__open_trace_log(0);
Expand Down Expand Up @@ -91,7 +91,7 @@ int main() {
#ifdef EMULATION
munmap(dspm, SPM_SIZE_BYTES);
#endif // EMULATION
__teardown_queues();
__teardown();
#if defined(MANUAL_TRACING) || defined(AUTO_TRACING)
__close_trace_log(0);
#endif // MANUAL_TRACING || AUTO_TRACING
Expand Down
26 changes: 13 additions & 13 deletions example/app/src/workq_mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,18 @@ void *func(void *args) {
#endif // AUTO_TRACING || MANUAL_TRACING


// in EMU mode: use pthread barrier located in DSPM
// in TRE-SIM mode: use SLEEP-WAKE: wake up wrkr 2, that wakes up wrkr 3, and so on
// use SLEEP-WAKE: wake up wrkr 2, that wakes up wrkr 3, and so on
pthread_barrier_t *gbar = my_args->gbar;
#ifdef MANUAL_TRACING
_C_wrkr__sleep();
#endif // MANUAL_TRACING
__barrier_wait(gbar);
#ifdef MANUAL_TRACING
if (tid != NUM_WORKER) _C_wrkr__wake(tid + 1);
#endif // MANUAL_TRACING
__sleep();
if (tid != NUM_WORKER) {
#ifdef MANUAL_TRACING
_C_wrkr__wake(tid + 1);
#endif // MANUAL_TRACING
__wake(tid + 1);
}

// retrieve variables from work queue
#ifdef MANUAL_TRACING
Expand Down Expand Up @@ -153,7 +155,7 @@ void *func(void *args) {

int main(int argc, const char *argv[]) {
printf("== Running Test with %u Workers ==\n", NUM_WORKER);
__init_queues(WQ_DEPTH);
__init(NUM_PE, WQ_DEPTH);
__register_core_id(0);
#if defined(AUTO_TRACING) || defined(MANUAL_TRACING)
__open_trace_log(0);
Expand Down Expand Up @@ -211,16 +213,14 @@ int main(int argc, const char *argv[]) {
if (pthread_create(threads + tid, NULL, func, &t_args[tid]) != 0) {
break;
}
// printf("Spawned %u threads.\n", n_worker_threads);
}
// printf("Spawned %u threads.\n", NUM_WORKER);

// in EMU mode: synchronize using barrier in DSPM
// in TRE-SIM mode: synchronize using SLEEP-WAKE
// after synchronization, push through the work queues
// synchronize using SLEEP-WAKE; after synchronization, push through the work queues
#ifdef MANUAL_TRACING
_C_mgr__wake(1); // wake up wrkr 1, that wakes up wrkr 2, and so on
#endif // MANUAL_TRACING
__barrier_wait(global_bar);
__wake(1);

for (int tid = 0; tid < NUM_WORKER; ++tid) {
// communicate nsteps, pointer to the shared variable, and pointer to the mutex object
Expand Down Expand Up @@ -294,7 +294,7 @@ int main(int argc, const char *argv[]) {
#endif // EMULATION
delete[] threads;
delete[] t_args;
__teardown_queues();
__teardown();

#if defined(AUTO_TRACING) || defined(MANUAL_TRACING)
__close_trace_log(0);
Expand Down
1 change: 1 addition & 0 deletions example/model/params.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* @author Subhankar Pal
*/
#define NUM_WORKER 8
#define NUM_PE (NUM_WORKER + 1)
#define WQ_DEPTH 4
#define MAX_OUTSTANDING_REQS 1
#define CLOCK_SPEED_GHZ 1
Expand Down
4 changes: 2 additions & 2 deletions example/sim/inc/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ extern
"C"
#endif // __cplusplus
__INLINE void
__init_queues(unsigned depth) { }
__init(unsigned long num_pe, unsigned long wq_depth) { }
extern
#ifdef __cplusplus
"C"
#endif // __cplusplus
__INLINE void
__teardown_queues(void) { }
__teardown(void) { }
/**
* @brief Pushes an item to the back of the work queue indexed by PE ID.
* @param pe_id ID of the PE that this work queue is connected to.
Expand Down

0 comments on commit ec3760e

Please sign in to comment.