diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index ec8669b295..cc83ba8b5e 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -89,9 +89,9 @@ class ThreadPool extends WorkOutput /** __Access this only from the main thread.__ - The sum of all active single-threaded pools' `workPriority` values. + The sum of `workPriority` values from all pools with an ongoing + single-threaded job. **/ - @:allow(lime.system.JobList) private static var __totalWorkPriority:Float = 0; /** @@ -181,22 +181,19 @@ class ThreadPool extends WorkOutput private var __doWork:WorkFunctionWorkOutput->Void>; - private var __activeJobs:JobList; - #if lime_threads - /** - The set of threads actively running a job. - **/ - private var __activeThreads:Map; - /** A list of idle threads. Not to be confused with `idleThreads`, a public variable equal to `__idleThreads.length`. **/ - private var __idleThreads:Array; + private var __idleThreads:Array = []; + + private var __multiThreadedJobs:JobArray = []; + private var __multiThreadedQueue:JobArray = []; #end - private var __jobQueue:JobList = new JobList(); + private var __singleThreadedJob(default, set):JobData; + private var __singleThreadedQueue:JobArray = []; /** __Call this only from the main thread.__ @@ -206,26 +203,20 @@ class ThreadPool extends WorkOutput immediately; only after enough calls to `run()`. Only applies in multi-threaded mode. @param maxThreads The maximum number of threads that will run at once. - @param mode Defaults to `MULTI_THREADED` on most targets, but - `SINGLE_THREADED` in HTML5. In HTML5, `MULTI_THREADED` mode uses web - workers, which impose additional restrictions. + @param mode The mode jobs will run in by default. Defaults to + `SINGLE_THREADED` in HTML5 for backwards compatibility. **/ public function new(minThreads:Int = 0, maxThreads:Int = 1, mode:ThreadMode = null) { - super(mode); + if (!isMainThread()) + { + throw "Call new ThreadPool() only from the main thread."; + } - __activeJobs = new JobList(this); + super(mode); this.minThreads = minThreads; this.maxThreads = maxThreads; - - #if lime_threads - if (this.mode == MULTI_THREADED) - { - __activeThreads = new Map(); - __idleThreads = []; - } - #end } /** @@ -243,13 +234,13 @@ class ThreadPool extends WorkOutput Application.current.onUpdate.remove(__update); + #if lime_threads // Cancel active jobs, leaving `minThreads` idle threads. - for (job in __activeJobs) + for (job in __multiThreadedJobs) { - #if lime_threads if (mode == MULTI_THREADED) { - var thread:Thread = __activeThreads[job.id]; + var thread:Thread = job.thread; if (idleThreads < minThreads) { thread.sendMessage({event: CANCEL}); @@ -260,7 +251,6 @@ class ThreadPool extends WorkOutput thread.sendMessage({event: EXIT}); } } - #end if (error != null) { @@ -274,9 +264,8 @@ class ThreadPool extends WorkOutput activeJob = null; } } - __activeJobs.clear(); + __multiThreadedJobs.clear(); - #if lime_threads // Exit idle threads if there are more than the minimum. while (idleThreads > minThreads) { @@ -284,16 +273,34 @@ class ThreadPool extends WorkOutput } #end - // Clear the job queue. + if (__singleThreadedJob != null && error != null) + { + activeJob = __singleThreadedJob; + onError.dispatch(error); + activeJob = null; + } + __singleThreadedJob = null; + + // Clear the job queues. if (error != null) { - for (job in __jobQueue) + for (job in __singleThreadedQueue) + { + activeJob = job; + onError.dispatch(error); + } + #if lime_threads + for (job in __multiThreadedQueue) { activeJob = job; onError.dispatch(error); } + #end } - __jobQueue.clear(); + __singleThreadedQueue.clear(); + #if lime_threads + __multiThreadedQueue.clear(); + #end __jobComplete.value = false; activeJob = null; @@ -305,17 +312,32 @@ class ThreadPool extends WorkOutput **/ public function cancelJob(jobID:Int):Bool { + if (__singleThreadedJob != null && __singleThreadedJob.id == jobID) + { + __singleThreadedJob = null; + return true; + } + else if (__singleThreadedQueue.removeJob(jobID) != null) + { + return true; + } + #if lime_threads - var thread:Thread = __activeThreads[jobID]; - if (thread != null) + var job:JobData = __multiThreadedJobs.removeJob(jobID); + if (job != null) { - thread.sendMessage({event: CANCEL}); - __activeThreads.remove(jobID); - __idleThreads.push(thread); + if (job.thread != null) + { + job.thread.sendMessage({event: CANCEL}); + __idleThreads.push(job.thread); + } + return true; } - #end - return __activeJobs.remove(__activeJobs.get(jobID)) || __jobQueue.remove(__jobQueue.get(jobID)); + return __multiThreadedQueue.removeJob(jobID) != null; + #else + return false; + #end } /** @@ -334,9 +356,11 @@ class ThreadPool extends WorkOutput only access its arguments, and return often. @param state An object to pass to `doWork`, ideally a mutable object so that `doWork` can save its progress. + @param mode Which mode to run the job in. If omitted, the pool's default + mode will be used. @return The job's unique ID. **/ - public function run(doWork:WorkFunctionWorkOutput->Void> = null, state:State = null):Int + public function run(doWork:WorkFunctionWorkOutput->Void> = null, state:State = null, ?mode:ThreadMode = null):Int { if (!isMainThread()) { @@ -361,13 +385,24 @@ class ThreadPool extends WorkOutput } var job:JobData = new JobData(doWork, state); - __jobQueue.push(job); + #if lime_threads + if (mode == MULTI_THREADED || mode == null && this.mode == MULTI_THREADED) + { + __multiThreadedQueue.push(job); + } + else + #end + { + __singleThreadedQueue.push(job); + } if (!Application.current.onUpdate.has(__update)) { Application.current.onUpdate.add(__update); } + __startJobs(); + return job.id; } @@ -410,7 +445,7 @@ class ThreadPool extends WorkOutput return; } - if (event.event != WORK || event.job == null) + if (event.event != WORK || event.doWork == null || event.jobID == null) { // Go idle. event = null; @@ -418,7 +453,7 @@ class ThreadPool extends WorkOutput } // Get to work. - output.activeJob = event.job; + output.activeJob = new JobData(event.doWork, event.state, event.jobID); var interruption:Dynamic = null; try @@ -426,7 +461,7 @@ class ThreadPool extends WorkOutput while (!output.__jobComplete.value && (interruption = Thread.readMessage(false)) == null) { output.workIterations.value++; - event.job.doWork.dispatch(event.job.state, output); + event.doWork.dispatch(event.state, output); } } catch (e:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end) @@ -469,50 +504,66 @@ class ThreadPool extends WorkOutput } /** - Schedules (in multi-threaded mode) or runs (in single-threaded mode) the - job queue, then processes incoming events. + Processes the job queues, starting any jobs that can be started. **/ - private function __update(deltaTime:Int):Void + private function __startJobs():Void { if (!isMainThread()) { return; } - // Process the queue. - while (__jobQueue.length > 0 && activeJobs < maxThreads) + if (__singleThreadedJob == null && __singleThreadedQueue.length > 0) { - var job:JobData = __jobQueue.pop(); - - job.startTime = timestamp(); - __activeJobs.push(job); + __singleThreadedJob = __singleThreadedQueue.shift(); + __singleThreadedJob.startTime = timestamp(); + } - #if lime_threads - if (mode == MULTI_THREADED) + #if lime_threads + for (job in __multiThreadedQueue) + { + if (__multiThreadedJobs.length >= maxThreads) { - #if html5 - job.doWork.makePortable(); - #end - - var thread:Thread = __idleThreads.length == 0 ? createThread(__executeThread) : __idleThreads.pop(); - __activeThreads[job.id] = thread; - thread.sendMessage({event: WORK, job: job}); + break; } + + #if html5 + job.doWork.makePortable(); #end + + job.thread = __idleThreads.length == 0 ? createThread(__executeThread) : __idleThreads.pop(); + job.thread.sendMessage({event: WORK, jobID: job.id, doWork: job.doWork, state: job.state}); + job.startTime = timestamp(); + + __multiThreadedJobs.push(job); + __multiThreadedQueue.remove(job); } + #end + } - // Run the next single-threaded job, if any. - if (mode == SINGLE_THREADED && __activeJobs.hasNext()) + /** + Processes the job queues, then processes incoming events. + **/ + private function __update(deltaTime:Int):Void + { + if (!isMainThread()) { - activeJob = __activeJobs.next(); + return; + } + + __startJobs(); + + // Run the single-threaded job. + if (__singleThreadedJob != null) + { + activeJob = __singleThreadedJob; var state:State = activeJob.state; __jobComplete.value = false; workIterations.value = 0; - // `workLoad / frameRate` is the total time that pools may use per - // frame. `workPriority / __totalWorkPriority` is this pool's - // fraction of that total. + // `workLoad / frameRate` is the total time that pools may use per frame. + // `workPriority / __totalWorkPriority` is this pool's fraction of that total. var maxTimeElapsed:Float = workPriority * workLoad / (__totalWorkPriority * Application.current.window.frameRate); var startTime:Float = timestamp(); @@ -540,24 +591,32 @@ class ThreadPool extends WorkOutput var threadEvent:ThreadEvent; while ((threadEvent = __jobOutput.pop(false)) != null) { - if (threadEvent.jobID != null) + var activeJobMode:ThreadMode = SINGLE_THREADED; + if (__singleThreadedJob != null && threadEvent.jobID == __singleThreadedJob.id) { - activeJob = __activeJobs.get(threadEvent.jobID); + activeJob = __singleThreadedJob; } else { - activeJob = threadEvent.job; + #if lime_threads + activeJob = __multiThreadedJobs.getJob(threadEvent.jobID); + activeJobMode = MULTI_THREADED; + #else + continue; + #end } - if (activeJob == null || !__activeJobs.exists(activeJob)) + if (activeJob == null) { continue; } - if (mode == MULTI_THREADED) + #if lime_threads + if (activeJobMode == MULTI_THREADED) { activeJob.duration = timestamp() - activeJob.startTime; } + #end switch (threadEvent.event) { @@ -577,24 +636,25 @@ class ThreadPool extends WorkOutput onError.dispatch(threadEvent.message); } - __activeJobs.remove(activeJob); - #if lime_threads - if (mode == MULTI_THREADED) + if (activeJobMode == MULTI_THREADED) { - var thread:Thread = __activeThreads[activeJob.id]; - __activeThreads.remove(activeJob.id); + __multiThreadedJobs.remove(activeJob); - if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads) + if (currentThreads > maxThreads || currentThreads - __multiThreadedQueue.length > minThreads) { - thread.sendMessage({event: EXIT}); + activeJob.thread.sendMessage({event: EXIT}); } else { - __idleThreads.push(thread); + __idleThreads.push(activeJob.thread); } } + else #end + { + __singleThreadedJob = null; + } default: } @@ -602,7 +662,7 @@ class ThreadPool extends WorkOutput activeJob = null; } - if (activeJobs == 0 && __jobQueue.length == 0) + if (0 == activeJobs + __singleThreadedQueue.length #if lime_threads + __multiThreadedQueue.length #end) { Application.current.onUpdate.remove(__update); } @@ -624,7 +684,8 @@ class ThreadPool extends WorkOutput private inline function get_activeJobs():Int { - return __activeJobs.length; + return #if lime_threads __multiThreadedJobs.length + #end + (__singleThreadedJob != null ? 1 : 0); } private inline function get_idleThreads():Int @@ -642,9 +703,22 @@ class ThreadPool extends WorkOutput return this; } + private inline function set___singleThreadedJob(value:JobData):JobData + { + if (value != null && __singleThreadedJob == null) + { + __totalWorkPriority += workPriority; + } + else if (value == null && __singleThreadedJob != null) + { + __totalWorkPriority -= workPriority; + } + return __singleThreadedJob = value; + } + private function set_workPriority(value:Float):Float { - if (mode == SINGLE_THREADED && activeJobs > 0) + if (__singleThreadedJob != null) { __totalWorkPriority += value - workPriority; } @@ -705,104 +779,21 @@ private abstract PseudoEvent(ThreadPool) from ThreadPool } } -class JobList +@:forward +private abstract JobArray(Array) from Array { - /** - * Whether `pool.workPriority` is being added to - * `ThreadPool.__totalWorkPriority`. Set this to true when `length > 0` and - * false when `length == 0`. The setter will ensure it is only added once. - */ - @:allow(lime.system.ThreadPool) - private var __addingWorkPriority(default, set):Bool; - - private var __index:Int = 0; - - private var __jobs:Array = []; - - public var length(get, never):Int; - - public var pool(default, null):ThreadPool; - - public inline function new(?pool:ThreadPool) - { - this.pool = pool; - @:bypassAccessor __addingWorkPriority = false; - } - public inline function clear():Void { #if haxe4 - __jobs.resize(0); + this.resize(0); #else - __jobs = []; + this.splice(0, this.length); #end - __addingWorkPriority = false; - } - - public inline function exists(job:JobData):Bool - { - return get(job.id) != null; - } - - public inline function hasNext():Bool - { - return __jobs.length > 0; - } - - /** - Iterates in an endless loop, starting over upon reaching the end. - **/ - public inline function next():JobData - { - __index++; - if (__index >= length) - { - __index = 0; - } - - return __jobs[__index]; - } - - public inline function pop():JobData - { - var job:JobData = __jobs.pop(); - __addingWorkPriority = length > 0; - return job; - } - - public function remove(job:JobData):Bool - { - if (__jobs.remove(job)) - { - __addingWorkPriority = length > 0; - return true; - } - else if (removeByID(job.id)) - { - return true; - } - else - { - return false; - } } - public inline function removeByID(id:Int):Bool + public function getJob(id:Int):JobData { - if (__jobs.remove(get(id))) - { - __addingWorkPriority = length > 0; - return true; - } - else - { - return false; - } - } - - public function get(id:Int):JobData - { - for (job in __jobs) + for (job in this) { if (job.id == id) { @@ -811,36 +802,18 @@ class JobList } return null; } - public inline function push(job:JobData):Void - { - __jobs.push(job); - __addingWorkPriority = true; - } - // Getters & Setters - - private inline function set___addingWorkPriority(value:Bool):Bool + public function removeJob(id:Int):JobData { - if (pool != null && __addingWorkPriority != value && ThreadPool.isMainThread()) + for (i in 0...this.length) { - if (value) - { - ThreadPool.__totalWorkPriority += pool.workPriority; - } - else + var job:JobData = this[i]; + if (job.id == id) { - ThreadPool.__totalWorkPriority -= pool.workPriority; + this.splice(i, 1); + return job; } - return __addingWorkPriority = value; - } - else - { - return __addingWorkPriority; } - } - - private inline function get_length():Int - { - return __jobs.length; + return null; } } diff --git a/src/lime/system/WorkOutput.hx b/src/lime/system/WorkOutput.hx index c2673703bd..49b7c8be1a 100644 --- a/src/lime/system/WorkOutput.hx +++ b/src/lime/system/WorkOutput.hx @@ -49,17 +49,10 @@ class WorkOutput public var workIterations(default, null):Tls = new Tls(); /** - Whether background threads are being/will be used. If threads aren't - available on this target, `mode` will always be `SINGLE_THREADED`. + The mode jobs will run in by default. If threads aren't available, jobs + will always run in `SINGLE_THREADED` mode. **/ - public var mode(get, never):ThreadMode; - - #if lime_threads - /** - __Set this only via the constructor.__ - **/ - private var __mode:ThreadMode; - #end + public var mode:ThreadMode; /** Messages sent by active jobs, received by the main thread. @@ -87,7 +80,7 @@ class WorkOutput __jobComplete.value = false; #if lime_threads - __mode = mode != null ? mode : #if html5 SINGLE_THREADED #else MULTI_THREADED #end; + this.mode = mode != null ? mode : #if html5 SINGLE_THREADED #else MULTI_THREADED #end; #end } @@ -185,15 +178,6 @@ class WorkOutput // Getters & Setters - private inline function get_mode():ThreadMode - { - #if lime_threads - return __mode; - #else - return SINGLE_THREADED; - #end - } - private inline function get_activeJob():JobData { return __activeJob.value; @@ -331,10 +315,15 @@ class JobData @:allow(lime.system.WorkOutput) private var startTime:Float = 0; + #if lime_threads @:allow(lime.system.WorkOutput) - private inline function new(doWork:WorkFunctionWorkOutput->Void>, state:State) + private var thread:Thread; + #end + + @:allow(lime.system.WorkOutput) + private inline function new(doWork:WorkFunctionWorkOutput->Void>, state:State, ?id:Int) { - id = nextID++; + this.id = id != null ? id : nextID++; this.doWork = doWork; this.state = state; } @@ -358,8 +347,11 @@ typedef ThreadEvent = { var event:ThreadEventType; @:optional var message:Dynamic; - @:optional var job:JobData; @:optional var jobID:Int; + + // Only for "WORK" events + @:optional var doWork:WorkFunctionWorkOutput->Void>; + @:optional var state:State; } class JSAsync