Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds core.sync.event.EventAwaiter #64

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions druntime/src/core/internal/gc/impl/conservative/gc.d
Original file line number Diff line number Diff line change
Expand Up @@ -3269,8 +3269,7 @@ Lmark:
Gcx.instance.scanThreadData = null;
Gcx.instance.busyThreads = 0;

memset(&Gcx.instance.evStart, 0, Gcx.instance.evStart.sizeof);
memset(&Gcx.instance.evDone, 0, Gcx.instance.evDone.sizeof);
memset(&Gcx.instance.ev, 0, Gcx.instance.ev.sizeof);
}
}
}
Expand All @@ -3282,7 +3281,7 @@ Lmark:

import core.atomic;
import core.cpuid;
import core.sync.event;
import core.sync.event : EventAwaiter;

private: // disable invariants for background threads

Expand All @@ -3293,8 +3292,12 @@ Lmark:
uint numScanThreads;
ScanThreadData* scanThreadData;

Event evStart;
Event evDone;
static struct EvStartDone
{
EventAwaiter start;
EventAwaiter done;
}
EvStartDone ev;

shared uint busyThreads;
shared uint stoppedThreads;
Expand Down Expand Up @@ -3336,7 +3339,7 @@ Lmark:

busyThreads.atomicOp!"+="(1); // main thread is busy

evStart.setIfInitialized();
ev.start.set();

debug(PARALLEL_PRINTF) printf("mark %lld roots\n", cast(ulong)(ptop - pbot));

Expand Down Expand Up @@ -3396,8 +3399,8 @@ Lmark:
if (!scanThreadData)
onOutOfMemoryError();

evStart.initialize(false, false);
evDone.initialize(false, false);
ev.start = EventAwaiter(false, false);
ev.done = EventAwaiter(false, false);

version (Posix)
{
Expand Down Expand Up @@ -3438,8 +3441,8 @@ Lmark:
stopGC = true;
while (atomicLoad(stoppedThreads) < startedThreads && !allThreadsDead)
{
evStart.setIfInitialized();
evDone.wait(dur!"msecs"(1));
ev.start.set();
ev.done.wait(dur!"msecs"(1));
}

for (int idx = 0; idx < numScanThreads; idx++)
Expand All @@ -3451,8 +3454,8 @@ Lmark:
}
}

evDone.terminate();
evStart.terminate();
destroy(ev.start);
destroy(ev.done);

cstdlib.free(scanThreadData);
// scanThreadData = null; // keep non-null to not start again after shutdown
Expand All @@ -3465,9 +3468,9 @@ Lmark:
{
while (!stopGC)
{
evStart.wait();
ev.start.wait();
pullFromScanStack();
evDone.setIfInitialized();
ev.done.set();
}
stoppedThreads.atomicOp!"+="(1);
}
Expand Down Expand Up @@ -3496,7 +3499,7 @@ Lmark:
{
if (toscan.empty)
{
evDone.wait(dur!"msecs"(1));
ev.done.wait(dur!"msecs"(1));
continue;
}

Expand Down
140 changes: 133 additions & 7 deletions druntime/src/core/sync/event.d
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,122 @@ module core.sync.event;

import core.time;

import rt.sys.config;
struct EventAwaiter
{
nothrow @nogc:
/**
* Creates an event object.
*
* Params:
* manualReset = the state of the event is not reset automatically after resuming waiting clients
* initialState = initial state of the signal
*/
this(bool manualReset, bool initialState)
{
osEvent = OsEvent(manualReset, initialState);
}

// copying not allowed, can produce resource leaks
@disable this(this);

ref EventAwaiter opAssign(return scope EventAwaiter s) @live
{
this.osEvent = s.osEvent;

return this;
}

/// Set the event to "signaled", so that waiting clients are resumed
void set()
{
osEvent.set();
}

/// Reset the event manually
void reset()
{
osEvent.reset();
}

/**
* Wait for the event to be signaled without timeout.
*
* Returns:
* `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
*/
bool wait()
{
return osEvent.wait();
}

/**
* Wait for the event to be signaled with timeout.
*
* Params:
* tmout = the maximum time to wait
* Returns:
* `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
* the event is uninitialized or another error occured
*/
bool wait(Duration tmout)
{
return osEvent.wait(tmout);
}

private:
import rt.sys.config;

mixin("import " ~ osEventImport ~ ";");
OsEvent osEvent;
}

// Test single-thread (non-shared) use.
@nogc nothrow unittest
{
// auto-reset, initial state false
EventAwaiter ev1 = EventAwaiter(false, false);
assert(!ev1.wait(1.dur!"msecs"));
ev1.set();
assert(ev1.wait());
assert(!ev1.wait(1.dur!"msecs"));

// manual-reset, initial state true
EventAwaiter ev2 = EventAwaiter(true, true);
assert(ev2.wait());
assert(ev2.wait());
ev2.reset();
assert(!ev2.wait(1.dur!"msecs"));
}

unittest
{
import core.thread, core.atomic;

scope event = new EventAwaiter(true, false);
int numThreads = 10;
shared int numRunning = 0;

void testFn()
{
event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
numRunning.atomicOp!"+="(1);
}

auto group = new ThreadGroup;

for (int i = 0; i < numThreads; ++i)
group.create(&testFn);

auto start = MonoTime.currTime;
assert(numRunning == 0);

event.set();
group.joinAll();

assert(numRunning == numThreads);

assert(MonoTime.currTime - start < 5.dur!"seconds");
}

/**
* represents an event. Clients of an event are suspended while waiting
Expand Down Expand Up @@ -51,6 +166,7 @@ struct ProcessFile
}
---
*/
deprecated("Please use core.sync.event.EventAwaiter instead")
struct Event
{
nothrow @nogc:
Expand All @@ -73,9 +189,10 @@ nothrow @nogc:
* manualReset = the state of the event is not reset automatically after resuming waiting clients
* initialState = initial state of the signal
*/
void initialize(bool manualReset, bool initialState)
void initialize(bool manualReset, bool initialState) @live
{
osEvent.create(manualReset, initialState);
osEvent = EventAwaiter(manualReset, initialState);
m_initalized = true;
}

// copying not allowed, can produce resource leaks
Expand All @@ -85,6 +202,7 @@ nothrow @nogc:
~this()
{
terminate();
m_initalized = false;
}

/**
Expand All @@ -104,13 +222,15 @@ nothrow @nogc:
/// Set the event to "signaled", so that waiting clients are resumed
void setIfInitialized()
{
osEvent.setIfInitialized();
if(m_initalized)
osEvent.set();
}

/// Reset the event manually
void reset()
{
osEvent.reset();
if(m_initalized)
osEvent.reset();
}

/**
Expand All @@ -121,6 +241,9 @@ nothrow @nogc:
*/
bool wait()
{
if (!m_initalized)
return false;

return osEvent.wait();
}

Expand All @@ -135,12 +258,15 @@ nothrow @nogc:
*/
bool wait(Duration tmout)
{
if (!m_initalized)
return false;

return osEvent.wait(tmout);
}

private:
mixin("import " ~ osEventImport ~ ";");
OsEvent osEvent;
EventAwaiter osEvent;
bool m_initalized;
}

// Test single-thread (non-shared) use.
Expand Down
45 changes: 14 additions & 31 deletions druntime/src/rt/sys/posix/osevent.d
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import core.internal.abort : abort;

struct OsEvent
{
void create(bool manualReset, bool initialState) nothrow @trusted @nogc
this(bool manualReset, bool initialState) nothrow @trusted @nogc
{
if (m_initalized)
return;
pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
abort("Error: pthread_mutex_init failed.");
static if ( is( typeof( pthread_condattr_setclock ) ) )
Expand All @@ -37,40 +35,29 @@ struct OsEvent

m_state = initialState;
m_manualReset = manualReset;
m_initalized = true;
}

void destroy() nothrow @trusted @nogc
~this() nothrow @trusted @nogc
{
if (m_initalized)
{
pthread_mutex_destroy(&m_mutex) == 0 ||
abort("Error: pthread_mutex_destroy failed.");
pthread_cond_destroy(&m_cond) == 0 ||
abort("Error: pthread_cond_destroy failed.");
m_initalized = false;
}
pthread_mutex_destroy(&m_mutex) == 0 ||
abort("Error: pthread_mutex_destroy failed.");
pthread_cond_destroy(&m_cond) == 0 ||
abort("Error: pthread_cond_destroy failed.");
}

void setIfInitialized() nothrow @trusted @nogc
void set() nothrow @trusted @nogc
{
if (m_initalized)
{
pthread_mutex_lock(&m_mutex);
m_state = true;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
}
pthread_mutex_lock(&m_mutex);
m_state = true;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
}

void reset() nothrow @trusted @nogc
{
if (m_initalized)
{
pthread_mutex_lock(&m_mutex);
m_state = false;
pthread_mutex_unlock(&m_mutex);
}
pthread_mutex_lock(&m_mutex);
m_state = false;
pthread_mutex_unlock(&m_mutex);
}

bool wait() nothrow @trusted @nogc
Expand All @@ -80,9 +67,6 @@ struct OsEvent

bool wait(Duration tmout) nothrow @trusted @nogc
{
if (!m_initalized)
return false;

pthread_mutex_lock(&m_mutex);

int result = 0;
Expand Down Expand Up @@ -114,7 +98,6 @@ private:

pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
bool m_initalized;
bool m_state;
bool m_manualReset;
}
Loading
Loading