-
-
Notifications
You must be signed in to change notification settings - Fork 658
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[work in progress] Coroutines for Haxe #11554
base: development
Are you sure you want to change the base?
Conversation
# Conflicts: # src/typing/typer.ml
I'm making this the main coroutine PR with the following changes:
Still a million things to figure out, but I'm slowly starting to understand how all this is supposed to work! |
Some notes on the transformation: If we have this: @:coroutine
@:coroutine.debug
private function mapCalls<TArg, TRet>(args:Array<TArg>, f:Coroutine<TArg->TRet>):Array<TRet> {
return [for (arg in args) f(arg)];
} The transformer looks at this: function(args:Array<mapCalls.TArg>, f:Coroutine<mapCalls.TArg -> mapCalls.TRet>) {
return {
var ` = [];
{
var ` = 0;
while (` < args.length) {
var arg = args[`];
++ `;
`.push(f(arg));
};
};
`;
};
} And generates this: function(args:Array<mapCalls.TArg>, f:Coroutine<mapCalls.TArg -> mapCalls.TRet>, _hx_continuation:(Array<mapCalls.TRet>, Dynamic) -> Void) {
var ` = null;
var ` = 0;
var _hx_state = 0;
var _hx_exceptionState = 10;
var _hx_stateMachine = function(_hx_result:Dynamic, _hx_error:Dynamic) {
if (_hx_error != null) _hx_state = _hx_exceptionState;
do (try switch (_hx_state) {
case 0: {
_hx_state = 1;
};
case 1: {
`;
` = [];
_hx_state = 3;
};
case 2: {
_hx_state = -1;
_hx_continuation(null, null);
return;
};
case 3: {
`;
` = 0;
_hx_state = 5;
};
case 4: {
_hx_state = -1;
_hx_continuation(`, null);
return;
};
case 5: {
if (! (` < args.length)) _hx_state = 7 else _hx_state = 8;
};
case 6: {
_hx_state = 4;
};
case 7: {
_hx_state = 6;
};
case 8: {
var arg;
arg = args[`];
++ `;
_hx_state = 9;
f(arg, _hx_stateMachine)(null, null);
return;
};
case 9: {
`.push(cast _hx_result);
_hx_state = 5;
};
case 10: throw _hx_error;
default: {
_hx_state = 10;
throw "Invalid coroutine state";
}
} catch (e:Dynamic) if (_hx_state == 10) {
_hx_exceptionState = 10;
_hx_continuation(null, e);
return;
} else {
_hx_state = _hx_exceptionState;
_hx_error = e;
}) while(true);
};
return _hx_stateMachine;
} For one, it would be nice to eliminate the forwarding states 0, 6 and 7. Ideally in a way that would still give us the nice state numbering without any holes. The other thing that looks suspicious to me is that we have Perhaps this could even be optimized for when there's only a single exception state (which I think is the vast majority of coros) because then we can just call the continuation, which we would otherwise do after catching the Yes I know, premature optimization and all that, but I find this stuff easier to debug if the code looks cleaner! |
I got the basics working on the JVM target. This increasingly feels like I have no idea what I'm doing, but that may or may not be normal when implementing something like coroutines. The generator tests don't work yet because I still don't know how to actually implement For now, I'm dealing with the Coroutine special cases in genjvm directly. I don't know if we should map this away in a distinct pass instead - seems a bit annoying because the Coroutine type could "leak" to various places. Maybe simply dealing with this in the generators is acceptable. |
Got the suspend thing working as well. This feels pretty hacky to do it like that, but I suppose it's not unexpected with these two different signatures going on. @Aidan63 How did you do this for C++? I managed to include your Coroutine implementation but I can't seem to get the function right. |
Just gave this a try and it seems to work as it did before for my quick asys bodge, but without needing any of my additions. I added I agree that I'm still seeing C++ errors with non static coroutines, stuff about I'll have to run the coroutine tests instead of just my quick asys bodge to see what else currently breaks. |
Hmm, that's what I tried as well but it generates this which the C++ compiler doesn't appreciate: ::Dynamic Coroutine_Impl__obj::_hx_f0xd5f8b494( ::Dynamic f, ::Dynamic cont) {
return ::::hx::Coroutine::suspend(f,cont);
} |
That quad colon is suspicious, if I just tried compiling Looking at that header file it does not have a foward declaration of the exception type for some reason. |
I can reproduce the Exception problem as well. Don't know yet what that is about, #11574 doesn't help with that either. Another problem afterwards is this: @:coroutine function error() {
throw "nope";
}
function main() {
error.start((result, error) -> {
trace(error);
});
} This fails with |
I had a very similar idea, except that I was thinking we could generalize switch (_hx_error) {
case Result:
// Continue normally
case Error:
// set _hx_state to uncaught error state, treat _hx_result as error value
case Suspended:
// Call _hx_continuation (I think?) and return
} The main advantage is that we don't need to wrap any values, the combination of The one thing I absolutely cannot figure out is how we avoid leaking this kind of stuff to the user code, i.e. what the entry point |
If calling function main() {
open.start((result, error) -> {
trace(result, error);
});
trace("Moving on");
} to function main() {
final _hx_result = [ null ];
final _hx_error = null;
final marker = open("C:\\Users\\AidanLee", (result, error) -> {
_hx_result[0] = result;
_hx_error = error;
})(null, null);
if (marker == Suspended) {
while (_hx_result[0] == null || _hx_error == null) {
Thread.current().events.progress();
}
}
trace(_hx_result[0], _hx_error);
trace("Moving on");
} But this could cause previously queued up thread events to be ran prematurely, so not actually blocking. From a very quick google it seems like kotlin deals with this by having the concept of a coroutine context and |
Thank you for also looking into this! It's quite reassuring because you tend to come to very similar conclusions as me. It looks like we've under-designed this whole suspension aspect quite a bit... |
Not wanting to add more questions to the pile, but if more thought needs to be put into suspension it might also be worth considering continuation and threads in a solution. Looking at the following contrived example. @:coroutine function threaded() {
return Coroutine.suspend(cont -> {
Thread.create(() -> cont(null, null));
});
}
@:coroutine function main() {
trace("before");
threaded();
trace("after");
} What thread should "after" be traced from? I don't think its unexpected to assume it would be the same as "before", but since Kotlin seems to work around this by allowing you to specify an "interceptor" with a coroutine context which will automatically shuffle continuation calls onto a thread of you're choice. Dotnet allows you to associate a "SynchronisationContext" with a given thread which decides where a given awaited task will be resumed. This is all pretty meaningless for js, but will probably be important for most other targets which have threading capabilities. |
hi there is the possibility of serialize the state of a coroutine? |
Alright guys, I'm trying to fight off a defeatist attitude at the moment because the problems indeed seem to just pile on. However, everything being raised here is very much relevant, so we will have to deal with that. @francescoagati I'm aware of the serialization question, but I would like to handle this "later". For now, it is more important to get the basics working, and once we have a robust set of tests we can start redesigning some parts. @Aidan63 Yeah, uh... that seems like a problem for sure... I don't even know if our threading API has the capabilities to manage this properly. The more I look at it, the more I think that we have to make the coroutine implementation interact with the haxe.MainLoop architecture, which I don't particularly like... I've started working on that control state approach that I mentioned before on this branch: https://github.com/HaxeFoundation/haxe/tree/coroutines_2025 Internally, instead of function TestJsPromise_await(p,_hx_continuation) {
var _hx_state = 1;
var _hx_stateMachine = function(_hx_result,_hx_control) {
switch(_hx_control) {
case 0:
break;
case 1:
_hx_state = 3;
break;
}
while(true) try { My idea is to add The coroutine entrypoints (i.e. function TestJsPromise_promise(c) {
return new Promise(function(resolve,reject) {
(c(function(_hx_result,_hx_control) {
var _hx_continuation = function(result,error) {
if(error != null) {
reject(error);
} else {
resolve(result);
}
};
switch(_hx_control) {
case 0:
_hx_continuation(_hx_result,null);
break;
case 1:
_hx_continuation(null,_hx_result);
break;
}
}))(null,0);
});
} As you can see, we carry the error value in This is also the place where the suspend I'm designing all this on the fly so please comment if I'm doing something silly. |
I was thinking about how you'd make custom "executors" or event loops and how to make a potential coroutine aware asys api work with those. @:coro function readFile():haxe.io.Bytes {
var eventLoop = Thread.current().events;
eventLoop.promise();
return suspend(c -> someThreadPool.run(() -> {
var bytes = readFileSync();
eventLoop.runPromised(() -> c.resume(bytes);
});
} But then you wouldn't be able to use these apis in environments that lack a standard haxe event loop. To solve this there would have to be a way to add a custom suspend/resume mechanism. @:coro function readFile(): haxe.io.Bytes {
return suspend(c -> someThreadPool.run(() -> c.resume(readFileSync())));
}
|
That |
@Apprentice-Alchemist also posted this on Discord: https://try.haxe.org/#20d320bD So (correct me if I'm wrong) we not only pass a continuation around, but also an executor, and instead of doing The |
Yes. @:coroutine function suspend<T>(fn:(resumer:T->Void)->Void):T;
// would desugar to
function suspend<T>(exec:Executor, fn:(resumer:T->Void)->Void, cont: T->Void) {
var resumer = exec.createResumer(cont);
fn(resumer);
} Instead of managing the continuation directly, the callback in suspend gets a "resumer" that tells the executor to schedule invocation of the continuation appropriately.
Depends on what It seems that As for who decides which executor is actually created and used, in the end that should be up to the user. Unrelated to the executor stuff, I'm a bit confused by the generator implementation in the test suite. So I feel like there would practically need to be two generator variants: // synchronous generator, can be invoked by any function
@:generator function myGenerator():Int;
// desugars to
function myGenerator():Cont<Void, Int>;
// which returns a coroutine that can invoked multiple times, always returns directly to the parent
// a full asynchronous generator, can only be invoked from a coroutine
@:asyncGenerator function myGenerator():Int;
// would desugar to something like
function myGenerator():Cont<Cont<Int, Void>, Void>;
// which returns a coroutine that can be invoked multiple times and itself takes a continuation And only the full asynchronous generator could be implemented by a coroutine that takes a yield continuation. One more thing: I find the mixture of "coroutine" and "continuation" confusing because while coroutines and continuations are similar, they are also different. |
Regarding the generator question, I can't say much more than "it was like this when I found it". I'm just working with the design that was handed to me, which doesn't feel great because I can't answer questions like this, but at least this gives us a chance at making some progress. The continuation thing looks like a mere nomenclature problem here. It might indeed be a bit confusing to name it that, and I'm happy to clean up the overall naming. What name would you expect for these callbacks?
Yes I'm concerned about this as well. Our event loop handling feels a bit too "all or nothing" at the moment, but I don't have much of a vision what to do here. I think this will have to be cleaned up for coroutines anyway, so there's a good chance we get this kind of improvement as a fallout. |
Kotlin's The |
I've had another look at this and after re-learning most of it I think I understand things a bit more. For the suspend implementation I think, as the name implies, it always suspends. More specifically it always returns the With that in mind I think a simple haxe thread event loop implementation would look like this. function suspend(cont:(c:Dynamic->Dynamic->Void)->Void, _hx_continuation:Dynamic->Dynamic->Void):CoroutineResult {
Thread.current().events.run(() -> {
cont(_hx_continuation);
});
return Suspended;
} This is with the I've also been playing around with creating some state machines by hand to test out the internal https://gist.github.com/Aidan63/6eebf0c4f2649c4c55c85e560a592a23 This is what the output of the below haxe coroutine might look like. class CoroDirectory {
@:coroutine public static function read_dir() {
final dir = open();
final files = dir.read();
trace(files);
}
@:coroutine public static function open() {
return Coroutine.suspend(cont -> {
cont(new GeneratedDirectory(), null);
});
}
@:coroutine public function read() {
return Coroutine.suspend(cont -> {
cont([ "foo.txt", "bar.txt" ], null);
});
}
} What feels off about this is the fact that some of the functions want I've also spent more time inspecting decompiled kotlin bytecode and reading some of the kotlin coroutine implementation docs and think I've got a good understanding of how they've implented it now. As a though experiment I did the same manually generated state machines in the kotline style, but haxe. I'm not suggesting we bin the current approach and go with this but it does server as a useful comparison of how it solves some of the issues which have been identified and what we might need to do. Kotlin generates class which implements a interface IContinuation<T> {
function resumeWith(_hx_result:T, _hx_error:Exception):Void;
} That public static function open(completion:IContinuation<Any>) : CoroutineResult<ClassCoroDirectory> {
final continuation = if (completion is HxCoro_open) (cast completion : HxCoro_open) else new HxCoro_open(completion);
while (true) {
try {
switch continuation._hx_state {
case 1: {
continuation._hx_state = 2;
switch suspend(cont -> cont.resumeWith(new ClassCoroDirectory(), null), continuation) {
case Suspended:
return Suspended;
case Success(v):
continuation._hx_result = v;
case Error(exn):
throw exn;
}
}
case 2: {
continuation._hx_state = -1;
continuation.completion.resumeWith(continuation._hx_result, continuation._hx_error);
return Success(continuation._hx_result);
}
case 3: {
continuation._hx_state = -1;
throw continuation._hx_error;
}
default: {
throw new Exception("Invalid coroutine state");
}
}
} catch (_g:Exception) {
continuation._hx_state = 3;
continuation.completion.resumeWith(null, _g);
return Error(_g);
}
}
} and private class HxCoro_open implements IContinuation<Any> {
public final completion : IContinuation<Any>;
public var _hx_state : Int;
public var _hx_error : Exception;
public var _hx_result : Any;
public function new(completion) {
this.completion = completion;
_hx_state = 1;
}
public function resumeWith(_hx_result:Any, _hx_error:Exception) {
this._hx_result = _hx_result;
this._hx_error = _hx_error;
ClassCoroDirectory.open(this);
}
} Things to note, the I've put together a second gist with that same full transformed example. https://gist.github.com/Aidan63/d4ebb6dcf46e96c3f63fa419f3f26c9e Both of these manual gists can be ran through any version haxe since it doesn't need any of the coroutine stuff from this branch. It's easy to see how this method solves the threading issue previously mentioned. The continuation implementations in kotlin have a field of the I started to update Hopefully that all makes sense and is somewhat helpful and I'm not just me being slow on the uptake, repeating things people have already figured out. |
Do you mean for the asys api? Because it seems to be doing something similar to libuv. |
I got about halfway through generating a switch statement before remembering that manually building typed expressions is a right pain and that I'm a lazy sod, so I gave up. I then started to convert the state machine transformation into a macro to make it easier to experiment with and also got half way through before giving up, mainly because I had a feeling I'd already seen one, and going to the original nadako repo there was one! So I spent some time updating that to work and back porting some changes from the current compiler version and experimenting with that. I've seemingly got suspension working with the marker system previously discussed. I then went on to experiment with a class based approach copied from kotlin's code generation, added scheduling to that to solve the threading issues, and got some initial cancellation working. Plus, most importantly, I finally understand how I've forked nadakos repo and updated master to still have the function based approach but to be more like the current compiler one. The state machine functions work on a result and error argument and @:suspend static function delay(ms:Int):Void {
return Coroutine.suspend(cont -> {
haxe.Timer.delay(() -> cont(null, null), ms);
});
}
@:suspend static function getNumber():Int {
return Coroutine.suspend(cont -> {
cont(++nextNumber, null);
});
}
@:suspend static function someAsync():Int {
trace("hi");
while (getNumber() < 10) {
write('wait for it...');
delay(1000);
write(Std.string(getNumber()));
}
// throw 'bye';
return 15;
} SuspensionI've updated the state machine to return the This is pretty much what was previously discussed but actually implemented in the macros instead of manually bodging dumps to see if the idea is sound. This is all in master of my fork, I haven't implemented the Class StyleIn the e.g. this basic function @:suspend static function getNumber():Int {
return Coroutine.suspend(cont -> {
cont.resume(++nextNumber, null);
});
} gets transformed into the following static function getNumber(_hx_completion:IContinuation<Any>):CoroutineResult<Any> {
final _hx_continuation = if (_hx_completion is HxCoro_getNumber) (cast _hx_completion : HxCoro_getNumber) else new HxCoro_getNumber(_hx_completion);
// usual state machine.
} and class HxCoro_getNumber implements IContinuation<Any> {
public var _hx_completion(default,never):IContinuation<Any>;
public var _hx_state:Int;
public var _hx_result:Any;
public var _hx_error:haxe.Exception;
public function new(completion:IContinuation<Any>) {
_hx_completion = completion;
_hx_state = 0;
_hx_result = null;
_hx_error = null;
}
public function resume(result:Any, error:haxe.Exception) {
_hx_result = result;
_hx_error = error;
Main.getNumber(this);
}
} Any variables in a suspending function which are used between states would need to be hoisted into these generated classes and any access to them re-mapped to operate on The blocking start function then has to change slightly, I create a custom private class WaitingCompletion implements IContinuation<Any> {
var running : Bool;
var result : Any;
var error : Exception;
public function new() {
running = true;
result = null;
error = null;
}
public function resume(result:Any, error:Exception) {
running = false;
this.result = result;
this.error = error;
}
public function wait():Any {
while (running) {
Thread.current().events.progress();
}
if (error != null) {
throw error;
} else {
return result;
}
}
} I've then implemented final result = {
final blocker = new WaitingCompletion();
switch getNumber(blocker) {
case Suspended:
blocker.wait();
case Result(v):
v;
case Error(exn):
throw exn;
}
} SchedulingNext up I wanted to tackle the threading issues previously mentioned, and with the above class based approach it was shockingly simple. I created a new interface IScheduler {
function schedule(func:()->Void):Void;
} The public function resume(result:Any, error:haxe.Exception) {
_hx_result = result;
_hx_error = error;
_hx_context.scheduler.schedule(() -> {
Main.getNumber(this);
});
} These generated classes will set their context to the context of the completion instance they're provided in the constructor, so this way the context (and therefor scheduler) is inherited to all suspending functions. I updated the private class EventLoopScheduler implements IScheduler {
final loop:EventLoop;
public function new(loop) {
this.loop = loop;
}
public function schedule(func:() -> Void) {
loop.run(func);
}
} This then completely solves the threading issue I mentioned a few comments back. E.g. if you have the following two suspend functions. @:suspend static function spawnThread():Void {
return Coroutine.suspend(cont -> {
Thread.create(() -> {
trace('Hello from thread ${ Thread.current() }');
cont.resume(null, null);
});
});
}
@:suspend static function schedulerTesting():Int {
trace('coro from thread ${ Thread.current() }');
spawnThread();
trace('coro from thread ${ Thread.current() }');
return 0;
} The output you see from
Even though a suspending function resumes from a different thread the implemention of resume will schedule continuation back onto the original thread. As part of this I also updated the final pool = new FixedThreadPool(4);
final scheduler = new ThreadPoolScheduler(pool);
Coroutine.start(schedulerTesting, scheduler); Kotlin does things slightly differently, they have an interceptor type which also runs in the CancellationI saw there was a closed issue in the nadako repo which said a cancellation mechanism was not planned, that seems pretty mad, surely there needs to be some way to trigger a cancellation of a coroutine and mechanisms in place to hook into the underlying callbacks, promises, whatever. The @:suspend static function cooperativeCancellation():Int {
trace('starting work');
while (Coroutine.isCancellationRequested() == false) {
accumulated = getNumber();
}
return accumulated;
} I updated the final blocker = new WaitingCompletion(new EventLoopScheduler(Thread.current().events));
final result = switch cooperativeCancellation(blocker) {
case Suspended:
Timer.delay(blocker.cancel, 2000);
blocker.wait();
case Success(v):
v;
case Error(exn):
throw exn;
}
trace(result); The above schedules cancellation in 2 seconds time and after that you will get the accumulated number as a result. Another way of responding to cancellation is with a callback, a naive @:suspend static function delay(ms:Int):Void {
return Coroutine.suspend(cont -> {
haxe.Timer.delay(() -> cont.resume(null, null), ms);
});
} The problem with this is that if a cancellation occurs while this coroutine is suspended we won't get a chance to cooperate until atleast we resume. To better support this the @:suspend static function delay(ms:Int):Void {
return Coroutine.suspend(cont -> {
var handle : EventHandler = null;
final events = Thread.current().events;
handle = events.repeat(() -> {
events.cancel(handle);
cont.resume(null, null);
}, ms);
cont._hx_context.token.register(() -> {
events.cancel(handle);
cont.resume(null, new CancellationException('delay has been cancelled'));
});
});
} We use the @:suspend static function cooperativeCancellation():Int {
trace('starting long delay...');
delay(10000);
trace('delay over!');
return 0;
} Since the token is part of the coroutine context its automatically inherited by called suspend functions so we don't need to do anything to forward tokens or establish a link of some sorts, its done automatically. My cancellation token implementation was very quickly thrown to gether and is not robust (no threading protection), and I need a way to unregister callbacks so if cancellation doesn't occur during the delay we can stop being notified later on when we're no longer interested. But it shows that the mechanisms work which is the main I wanted to establish. The Secrets of Coroutine.suspendIt finally makes sense! public static function suspend(func:(IContinuation<Any>)->Void, cont:IContinuation<Any>):CoroutineResult<Any> {
final safe = new SafeContinuation(cont);
func(safe);
return safe.get();
} which is about as clear as mud. After much reading, digging though kotlin's implementation and inspecing decompiled bytecode I think I've figured it out. Kotlin's public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
} with class SafeContinuation<T> implements IContinuation<T> {
final _hx_completion:IContinuation<Any>;
final lock:Mutex;
var state:Null<CoroutineResult<T>>;
public final _hx_context:CoroutineContext;
public function new(completion) {
_hx_completion = completion;
_hx_context = _hx_completion._hx_context;
lock = new Mutex();
state = null;
}
public function resume(result:T, error:Exception) {
_hx_context.scheduler.schedule(() -> {
lock.acquire();
switch state {
case null:
switch error {
case null:
state = Success(result);
case exn:
state = Error(exn);
}
lock.release();
case _:
lock.release();
_hx_completion.resume(result, error);
}
});
}
public function get():CoroutineResult<T> {
lock.acquire();
var result = switch state {
case Success(v):
Success(v);
case Error(exn):
Error(exn);
case _:
state = Suspended;
}
lock.release();
return result;
}
} Both Hopefully most of that makes sense and is useful, I've been trying to figured out what underlying mechanisms might be needed to provide higher level functionality as opposed to what that functionality would look like to the end user since there are still many fundamental questions to be answered. The more I progressed down the class based approach the more convienced I've become that it seems to be a much better way of generating coroutines. Adding the basic scheduling and cancellation was really easy with the context inheritance, but haven't really given much thought to how you might fit that into the function based approach. The kt_style branch of my fork is where all the class based stuff is for anyone who wants to have a dig around. This doesn't have any target specific code (since we now know what Theres one more thing I want to look into which is what kotlin calls "structured concurrency", where a coroutine can spawn new coroutines as children and any errors are propagated up, parent coroutines do not complete until all children complete, and cancelling a parent cancels all children. |
Small update. I've remove the So @:suspend function getNumber():Int {} now becomes function getNumber(_hx_continuation:IContinuation<Any>):Any {} I've also added final task = Coroutine.launch(cancellationTesting);
Timer.delay(task.cancel, 2000);
trace(task.await()); I've also added |
Thank you so much for picking this up! I just read through everything, and I'll definitely have to read it a few more times before I can make useful comments, but here's my initial thoughts:
|
I agree that fields on interfaces look a bit odd, initially I was initially playing around with abstract class as that would also allow built in convenience functions for resuming with just a result or error. But I'm mostly using the c++ target and after finding that abstract class bug when functions contain default values I decided not to risk things further and stuck to interfaces for now. Possible continuation alternative. abstract class Continuation<T> {
public final _hx_context : CoroutineContext;
function new(context) {
_hx_context = context;
}
public abstract function resume(result:T, error:Exception);
public final function resumeWithResult(result:T) {
resume(result, null);
}
public final function resumeWithError(error:Exception) {
resume(null, error);
}
} I'm also very suspicious of the
Which isn't very useful. I'm hoping this is because the original nadako repo is quite old and had zero error support, I just quickly bodged in a try catch around the state machine and have result and errors returned (e.g. there is no ETry transform). It may be wishful thinking but my hope was the current compiler state machine transform is more complete so it wouldn't be a problem as I've made basically no changes to the core state machine generation with my experiments, just how that state machine is contained and invoked. Another thing I meant to bring up was making sure these changes make sense for the js target. While the original prototypes were seemingly tested exclusively on the js target leading to stuff like threading concerns being forgotten about, I'm somewhat wary the pendulum has swung the other way. Obviously the classes which use the sys api's would need non sys equivilents, but the sum of my personal and professional experience with all things web development is a grand total of zero, so I have no idea if the abstractions I've built in my branch make sense / are workable for the js target! |
In digging around the kotlin standard library I found that the abstract class all generated coroutine classes extend also implements a I had a go at adding something similar to the classes I generate and whenever a suspension occurs generate code which will set the stack frame variable of the current continuation object. Whenever an exception is about to be thrown I currently walk up the "coroutine stack" to build a // Walk the coroutine stack
final frames = [ _hx_continuation._hx_stackFrame ];
var frame = _hx_continuation.callerFrame();
while (frame != null) {
frames.push(frame._hx_stackFrame);
frame = frame.callerFrame();
}
trace((frames:haxe.CallStack).toString()); Generated classes now inherit a abstract class FunctionCoroutine implements IContinuation<Any> implements IStackFrame {
public final _hx_completion : IContinuation<Any>;
public final _hx_context : CoroutineContext;
public var _hx_stackFrame : StackItem;
function new(_hx_completion, _hx_context, _hx_stackFrame) {
this._hx_completion = _hx_completion;
this._hx_context = _hx_context;
this._hx_stackFrame = _hx_stackFrame;
}
public function callerFrame():Null<IStackFrame> {
return if (_hx_completion is IStackFrame) {
(cast _hx_completion : IStackFrame);
} else {
null;
}
}
} coro_stack.webmNot sure how we then integrate this with exceptions yet, but it seems like if we want readable stack traces we need to do some manual tracking. Dotnet async stack traces have also historically just been traces of the internal mechanisms, but when they switched to the dotnet core stuff they also started doing the tracking needed through their Don't know how I missed it, but kotlins coroutine context can be extended by user defined keys and these are all copied across automatically as well. That explains how the kotlin standard library coroutine api is very minimal and stuff like cancellation, scopes, jobs, etc are all shifted off to the kotlinx.coroutines external library. The only context element they defined in the standard library is the their interceptor stuff for threading, which seems reasonable as thats too fundamental to be left out. This does make not including cancellation out of the box a bit less mad providing similar context functionality is provided. Also while digging around in kotlin repos I came across this file tucked away in the folder of the compiler which has the coroutine transformation sources, don't think it's been posted yet in this thread or any of the preceeding ones. It covers in a good amount of detal the transformation, optimisations, threading, starting coroutines, safe continuation, etc, etc. It is a behemoth of a document though and not exact a page turner, so bring a strong drink. |
I've changed my mind again on the cancellation stuff, if asys is to have some form of cancellation mechanism then it would probably make sense for that to be included in the context by default. But thats something that can be sorted out later. I made a change to the state machine to resolve an oddity with both the function based approach and the class based one in my test branch. The state machine called the continuation as well as returning a value, error, or suspension marker which could lead to some odd situations. case 1: {
_hx_state = -1;
_hx_continuation.resume("Hello, World!", null);
return "Hello, World!";
}; To solve this the state machine no longer calls the continuation on result, error, or suspension, it just returns the suspension marker, result, or throws the error. The try catch was also removed from the state machine. It's then the callers responsibility (usually function resume(result:Any, error:Exception) {
_hx_result = result;
_hx_error = error;
try {
final result = Main.getNumber(this);
if (result is coro.Primitive) {
return;
}
_hx_completion.resume(result);
} catch (exn) {
_hx_completion.resume(null, exn);
}
} With this change each invocation to a suspend function will run until a suspension point or a result or error is produced, instead of then immediately calling is completion. I don't think @:suspend public static function suspend(func:(IContinuation<Any>)->Void):Any {
final cont = coro.CoroutineIntrinsics.currentContinuation();
final safe = new SafeContinuation(cont);
func(safe);
return safe.getOrThrow();
} I applied the most basic single state machine elimination as well which means that function ultimately becomes the following. public static function suspend(func:coro.IContinuation<Any> -> Void, _hx_completion:coro.IContinuation<Any>) {
var safe = new coro._Coroutine.SafeContinuation(_hx_completion);
func(safe);
return safe.getOrThrow();
} I'm not too sure on the I also think the other bit of compiler magic we have, E.g. if we have the following haxe, @:suspend function bar(i:Int, s:String):Void {
trace('$i, $s');
}
function foo() {
final c = bar.start(7, 'test', new MyCustomContinuation());
} the function foo() {
final c = new HxCoro_bar(7, 'test', new MyContinuation());
} This allows the user to set a final continuation instance and setup the initial context which would be propagated down. To start the coroutine all that needs to be done is call What I have not yet figured out though is how this should work with coroutines passed as arguments. Assumably we'd want to allow stuff like the following to work to allow building higer level functionality on top. function foo(bar:Coroutine<(Int,String)->Void>) {
final c = bar.start(7, 'test', new MyCustomContinuation());
} But also to be able to directly call @:suspend function foo(bar:Coroutine<(Int,String)->Void>) {
bar(7, 'test');
} which makes things a bit tricky. I've got things working on the js target and have added some examples using nodejs timers, stdout, etc. I don't know how we're supposed to implement a blocking coroutine on node though as from a very brief google there's no way to manually pump the event loop. This is also partly why I think the current blocking |
I've implemented passing coroutines as arguments, but its one of those things where it feels like there must be a better way. Firstly I created a series of abstract classes extending abstract class Coroutine0<TReturn> extends Coroutine<Void->TReturn> {
public abstract function create(completion:IContinuation<Any>):IContinuation<Any>;
public abstract function start(completion:IContinuation<Any>):Any;
}
abstract class Coroutine1<TArg0, TReturn> extends Coroutine<TArg0->TReturn> {
public abstract function create(arg0:TArg0, completion:IContinuation<Any>):IContinuation<Any>;
public abstract function start(arg0:TArg0, completion:IContinuation<Any>):Any;
}
// etc, etc... Each suspending function then has another class defined which extends one of these abstracts and implements the class HxCoro_delayFactory extends Coroutine1<Int, Void> {
public static final instance:Coroutine1<Int, Void> = new HxCoro_delayFactory();
private function new() { }
public function create(arg0:Int, completion:IContinuation<Any>):IContinuation<Any> {
return new HxCoro_delay(arg0, completion);
}
public function start(arg0:Int, completion:IContinuation<Any>):Any {
return Main.delay(arg0, completion);
}
} and then things start to get a bit odd... All function arguments of type @:coroutine function foo(v:Int):String {
return 'You entered $v';
}
function bar(c:Coroutine<Int->String>) {
//
}
function main() {
bar(foo);
} turns into this... @:coroutine function foo(v:Int):String {
return 'You entered $v';
}
function bar(c:Coroutine1<Int, String>) {
//
}
function main() {
bar(HxCoro_fooFactory.instance);
} This all works but the potentially ever growing abstract coroutine classes and replacing types with more specific ones feels like a bodge, there must be some fancier type system thing we can do here, even if we have to change the That's about all I can think of to experiment with for now, all of the stuff covered in my last few comments is in my fork of nadakos repo in the |
My suggestions is to move the state machine into the generated Eg something. @:suspend function delay(ms:Int):Void;
function foo(coro:Coroutine<Int->Void>) {
coro.create(0, completion);
coro.start(0, completion);
}
// turns into
function delay(ms:Int, _completion:...): IContinuation<Any>;
function foo(coro:Int->IContinuation<Any>) {
coro(0, completion);
} No idea if that's feasible, I don't fully understand how it's all supposed to work. |
That would work and would be very similar to the current function approach (just in resume instead of a function). The downside to that is that when a suspend function call returns you don't know if it has ran to completion, error, or has been suspended and needs to be resumed at some point. Which is useful to know in many situations. Having looked at the problem again I think I've made it far more complex than it needs to be. My thinking behind the extra factory classes was that to create a coroutine which is initially suspended needed special support, but thats not the case. You can easily implement it with an extra continuation. function test_defered() {
final cont = new BlockingContinuation(new EventLoopScheduler(Thread.current().events));
final defer = new Defer(getNumber, cont);
Assert.equals(0, nextNumber);
defer.resume(null, null);
Assert.equals(1, cont.wait());
} with class Defer implements IContinuation<Any> {
final lambda : IContinuation<Any>->Any;
final continuation : IContinuation<Any>;
public final _hx_context:CoroutineContext;
public function new(lambda, continuation) {
this.lambda = lambda;
this.continuation = continuation;
this._hx_context = continuation._hx_context;
}
public function resume(_:Any, _:Exception) {
try {
final result = lambda(continuation);
if (result is Primitive) {
return;
}
continuation.resume(result, null);
} catch (exn) {
continuation.resume(null, exn);
}
}
} Calling resume on the defer continuation will start execution of the held coroutine function, so its initially suspended until manually started. So I think my previous comment can be disregarded as a load of rubbish. |
But the function call would merely "construct" a coroutine and not start it yet. |
Here's a different approach to handling the internal state for coroutines. This is pretty much #10128 but without the additional argument to
TFun
. The idea is this:"Coroutine"
path, we let it load the abstract. The function type is carried in the type parameter.com.basic.tcoro
which works liketnull
and createsCoroutine<Arguments -> Return>
. This is used when typing@:coroutine
functions.follow_with_coro
, which gives either aCoro(args, ret)
or aNotCoro(t)
. This allows all code paths who are interested in coroutines (which in the entire compiler is only like 10) to check for them, but still deal with the non-coro behavior easily.This gives us a pretty light diff here, which is always good. There's a chance that I'm missing something important because at the moment we only have the few tests in misc/coroutines.