Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[work in progress] Coroutines for Haxe #11554

Draft
wants to merge 47 commits into
base: development
Choose a base branch
from
Draft

Conversation

Simn
Copy link
Member

@Simn Simn commented Feb 6, 2024

Here's a different approach to handling the internal state for coroutines. This is pretty much #10128 but without the additional argument to TFun. The idea is this:

  • Instead of hijacking the typeloader upon encountering the "Coroutine" path, we let it load the abstract. The function type is carried in the type parameter.
  • We add com.basic.tcoro which works like tnull and creates Coroutine<Arguments -> Return>. This is used when typing @:coroutine functions.
  • Any place in the compiler that cares about coroutines calls follow_with_coro, which gives either a Coro(args, ret) or a NotCoro(t). This allows all code paths who are interested in coroutines (which in the entire compiler is only like 10) to check for them, but still deal with the non-coro behavior easily.

This gives us a pretty light diff here, which is always good. There's a chance that I'm missing something important because at the moment we only have the few tests in misc/coroutines.

@skial skial mentioned this pull request Feb 7, 2024
1 task
# Conflicts:
#	src/typing/typer.ml
@Simn Simn mentioned this pull request Feb 8, 2024
@Simn
Copy link
Member Author

Simn commented Feb 14, 2024

I'm making this the main coroutine PR with the following changes:

  • The logic from analyzerCoro.ml in Coroutine Experiments #10128 has been moved and slightly cleaned up to coroToTexpr.ml.
  • The new module coroFromTexpr.ml generates the data structure that ^ works with.
  • That transformation is still very simplistic and basically only works with cases that we're testing. The overall structure should allow us to improve on this easily though.
  • Coroutines are now processed right after a function has been typed. I see no reason to wait until later, we already know that we're dealing with a coroutine anyway.
  • As a consequence, we now run all the normal filters on the transformed expression structure. This will deal with all the problems in Coroutine Experiments #10128 related to capture variables and whatnot.
  • Doing that currently breaks the test in testTryCatchFail. This is related to [broken] Exceptions refactor #11574 and the problem is that the coroutine transformer eliminates the TTry, which causes the exception handler to not wrap the caught values. Running the expression wrapping immediately will fix this.
  • I tried testing this with C++ but ran into some problem with HX_STACKFRAME and '_hx_stateMachine': undeclared identifier. The related dump output looks clean, so this might be an entirely separate problem. Maybe @Aidan63 can give it a try!

Still a million things to figure out, but I'm slowly starting to understand how all this is supposed to work!

@Simn
Copy link
Member Author

Simn commented Feb 14, 2024

Some notes on the transformation:

If we have this:

@:coroutine
@:coroutine.debug
private function mapCalls<TArg, TRet>(args:Array<TArg>, f:Coroutine<TArg->TRet>):Array<TRet> {
	return [for (arg in args) f(arg)];
}

The transformer looks at this:

function(args:Array<mapCalls.TArg>, f:Coroutine<mapCalls.TArg -> mapCalls.TRet>) {
        return {
                var ` = [];
                {
                        var ` = 0;
                        while (` < args.length) {
                                var arg = args[`];
                                ++ `;
                                `.push(f(arg));
                        };
                };
                `;
        };
  }

And generates this:

function(args:Array<mapCalls.TArg>, f:Coroutine<mapCalls.TArg -> mapCalls.TRet>, _hx_continuation:(Array<mapCalls.TRet>, Dynamic) -> Void) {
        var ` = null;
        var ` = 0;
        var _hx_state = 0;
        var _hx_exceptionState = 10;
        var _hx_stateMachine = function(_hx_result:Dynamic, _hx_error:Dynamic) {
                if (_hx_error != null) _hx_state = _hx_exceptionState;
                do (try switch (_hx_state) {
                        case 0: {
                                _hx_state = 1;
                        };
                        case 1: {
                                `;
                                ` = [];
                                _hx_state = 3;
                        };
                        case 2: {
                                _hx_state = -1;
                                _hx_continuation(null, null);
                                return;
                        };
                        case 3: {
                                `;
                                ` = 0;
                                _hx_state = 5;
                        };
                        case 4: {
                                _hx_state = -1;
                                _hx_continuation(`, null);
                                return;
                        };
                        case 5: {
                                if (! (` < args.length)) _hx_state = 7 else _hx_state = 8;
                        };
                        case 6: {
                                _hx_state = 4;
                        };
                        case 7: {
                                _hx_state = 6;
                        };
                        case 8: {
                                var arg;
                                arg = args[`];
                                ++ `;
                                _hx_state = 9;
                                f(arg, _hx_stateMachine)(null, null);
                                return;
                        };
                        case 9: {
                                `.push(cast _hx_result);
                                _hx_state = 5;
                        };
                        case 10: throw _hx_error;
                        default: {
                                _hx_state = 10;
                                throw "Invalid coroutine state";
                        }
                } catch (e:Dynamic) if (_hx_state == 10) {
                        _hx_exceptionState = 10;
                        _hx_continuation(null, e);
                        return;
                } else {
                        _hx_state = _hx_exceptionState;
                        _hx_error = e;
                }) while(true);
        };
        return _hx_stateMachine;
  }

For one, it would be nice to eliminate the forwarding states 0, 6 and 7. Ideally in a way that would still give us the nice state numbering without any holes.

The other thing that looks suspicious to me is that we have do try instead of try do. The only path back into the loop is from the else case in the catch, but it looks to me like we can instead recurse upon our function again after setting _hx_error = e. The function entry checks for if (_hx_error != null) _hx_state = _hx_exceptionState; already, which will give us the exact same state.

Perhaps this could even be optimized for when there's only a single exception state (which I think is the vast majority of coros) because then we can just call the continuation, which we would otherwise do after catching the throw _hx_error in state 10.

Yes I know, premature optimization and all that, but I find this stuff easier to debug if the code looks cleaner!

@skial skial mentioned this pull request Feb 14, 2024
1 task
@Simn
Copy link
Member Author

Simn commented Feb 14, 2024

I got the basics working on the JVM target. This increasingly feels like I have no idea what I'm doing, but that may or may not be normal when implementing something like coroutines.

The generator tests don't work yet because I still don't know how to actually implement Coroutine.suspend properly.

For now, I'm dealing with the Coroutine special cases in genjvm directly. I don't know if we should map this away in a distinct pass instead - seems a bit annoying because the Coroutine type could "leak" to various places. Maybe simply dealing with this in the generators is acceptable.

@Simn
Copy link
Member Author

Simn commented Feb 14, 2024

Got the suspend thing working as well. This feels pretty hacky to do it like that, but I suppose it's not unexpected with these two different signatures going on.

@Aidan63 How did you do this for C++? I managed to include your Coroutine implementation but I can't seem to get the function right.

@Aidan63
Copy link
Contributor

Aidan63 commented Feb 14, 2024

Just gave this a try and it seems to work as it did before for my quick asys bodge, but without needing any of my additions. I added @:native('::hx::Coroutine::suspend') to the extern suspend function to have it call the function in the coroutine header, I just pushed the hxcpp changes to my hxcpp asys branch if you want to confirm things.

I agree that suspend is very confusing, everytime I think I understand it I look at it again and feel like I'm back at square one. It's very odd that it takes two arguments and then does...nothing? with them. Is there eventually going to be some advance functionality to it or is that purely to make it fit the "result and error" function signature.

I'm still seeing C++ errors with non static coroutines, stuff about __this again

image

I'll have to run the coroutine tests instead of just my quick asys bodge to see what else currently breaks.

@Simn
Copy link
Member Author

Simn commented Feb 14, 2024

@:native('::hx::Coroutine::suspend')

Hmm, that's what I tried as well but it generates this which the C++ compiler doesn't appreciate:

 ::Dynamic Coroutine_Impl__obj::_hx_f0xd5f8b494( ::Dynamic f, ::Dynamic cont) {
	return ::::hx::Coroutine::suspend(f,cont);
}

@Aidan63
Copy link
Contributor

Aidan63 commented Feb 14, 2024

That quad colon is suspicious, if @:native is working I wouldn't expect it to also prefix :: (don't know off the top of my head in what cases gencpp prefix ::), I also don't see any mention of the suspend function in the generated Coroutine_Impl__obj on my end.

I just tried compiling misc/coroutine for cpp and get another weird error, haxe::Exception has not been declared.

image

Looking at that header file it does not have a foward declaration of the exception type for some reason.

@Simn
Copy link
Member Author

Simn commented Feb 14, 2024

I can reproduce the Exception problem as well. Don't know yet what that is about, #11574 doesn't help with that either.

Another problem afterwards is this:

@:coroutine function error() {
	throw "nope";
}

function main() {
	error.start((result, error) -> {
		trace(error);
	});
}

This fails with 'result': illegal use of type 'void' from the generated void _hx_run(void result, ::Dynamic error){. I suppose it should infer Dynamic instead of Void in such cases, or maybe haxe.Unit once #11563 is done.

@Simn
Copy link
Member Author

Simn commented Feb 18, 2024

enum SuspensionResult {
Suspended;
Result(v:T);
Error(e:Dynamic);
}

I had a very similar idea, except that I was thinking we could generalize _hx_error to carry this kind of "control state" and then hold the value in _hx_result even for the error state. Something along the lines of this in the place we're currently checking if (_hx_error != null) at the coro start:

		switch (_hx_error) {
			case Result:
				// Continue normally
			case Error:
				// set _hx_state to uncaught error state, treat _hx_result as error value
			case Suspended:
				// Call _hx_continuation (I think?) and return
		}

The main advantage is that we don't need to wrap any values, the combination of _hx_result as the value carrier and _hx_error (with a different name) as the control flag would give us all the data we need.

The one thing I absolutely cannot figure out is how we avoid leaking this kind of stuff to the user code, i.e. what the entry point start call would look like. It can't just call its continuation because then it could receive the Suspended state, no matter how we decide to represent that.

@Aidan63
Copy link
Contributor

Aidan63 commented Feb 18, 2024

If calling start on a continuation is suppose to be a blocking call similar to kotlins runBlocking, would it need to manually pump the threads event loop inside that call if the coroutine returns suspended? I guess you could have some sort of captured marker value which is set when that coroutine completes and you pump the event loop in between checking that marker to see if its completed or not.

function main() {
	open.start((result, error) -> {
		trace(result, error);
	});

	trace("Moving on");
}

to

function main() {
	final _hx_result = [ null ];
	final _hx_error = null;

	final marker = open("C:\\Users\\AidanLee", (result, error) -> {
		_hx_result[0] = result;
		_hx_error = error;
	})(null, null);

	if (marker == Suspended) {
		while (_hx_result[0] == null || _hx_error == null) {
			Thread.current().events.progress();
		}
	}

	trace(_hx_result[0], _hx_error);
	trace("Moving on");
}

But this could cause previously queued up thread events to be ran prematurely, so not actually blocking. From a very quick google it seems like kotlin deals with this by having the concept of a coroutine context and runBlocking creates an event loop context which lives exclusively within the scope of that blocking coroutine.

@Simn
Copy link
Member Author

Simn commented Feb 18, 2024

Thank you for also looking into this! It's quite reassuring because you tend to come to very similar conclusions as me.

It looks like we've under-designed this whole suspension aspect quite a bit...

@Aidan63
Copy link
Contributor

Aidan63 commented Feb 20, 2024

Not wanting to add more questions to the pile, but if more thought needs to be put into suspension it might also be worth considering continuation and threads in a solution. Looking at the following contrived example.

@:coroutine function threaded() {
    return Coroutine.suspend(cont -> {
        Thread.create(() -> cont(null, null));
    });
}

@:coroutine function main() {
    trace("before");

    threaded();

    trace("after");
}

What thread should "after" be traced from? I don't think its unexpected to assume it would be the same as "before", but since threaded fires up a new thread and continues on that it wouldn't be the same thread.
This is a big issue for stuff like UI frameworks which are pretty much always single threaded and debugging issues arising from this could be a pain as its not obvious the thread you're executing on has changed and issues might not occur until some time later in the execution.

Kotlin seems to work around this by allowing you to specify an "interceptor" with a coroutine context which will automatically shuffle continuation calls onto a thread of you're choice. Dotnet allows you to associate a "SynchronisationContext" with a given thread which decides where a given awaited task will be resumed.

This is all pretty meaningless for js, but will probably be important for most other targets which have threading capabilities.

@francescoagati
Copy link

hi there is the possibility of serialize the state of a coroutine?

@Simn
Copy link
Member Author

Simn commented Feb 26, 2024

Alright guys, I'm trying to fight off a defeatist attitude at the moment because the problems indeed seem to just pile on. However, everything being raised here is very much relevant, so we will have to deal with that.

@francescoagati I'm aware of the serialization question, but I would like to handle this "later". For now, it is more important to get the basics working, and once we have a robust set of tests we can start redesigning some parts.

@Aidan63 Yeah, uh... that seems like a problem for sure... I don't even know if our threading API has the capabilities to manage this properly. The more I look at it, the more I think that we have to make the coroutine implementation interact with the haxe.MainLoop architecture, which I don't particularly like...


I've started working on that control state approach that I mentioned before on this branch: https://github.com/HaxeFoundation/haxe/tree/coroutines_2025

Internally, instead of (result, error) we now pass around (result, control), and the start of coroutines then look like this:

function TestJsPromise_await(p,_hx_continuation) {
	var _hx_state = 1;
	var _hx_stateMachine = function(_hx_result,_hx_control) {
		switch(_hx_control) {
		case 0:
			break;
		case 1:
			_hx_state = 3;
			break;
		}
		while(true) try {

My idea is to add case 2: return; as the suspend case, which I think is what we want here. I'm unsure if it should call _hx_continuation in that case as well.

The coroutine entrypoints (i.e. start and create) also come with a control switch:

function TestJsPromise_promise(c) {
	return new Promise(function(resolve,reject) {
		(c(function(_hx_result,_hx_control) {
			var _hx_continuation = function(result,error) {
				if(error != null) {
					reject(error);
				} else {
					resolve(result);
				}
			};
			switch(_hx_control) {
			case 0:
				_hx_continuation(_hx_result,null);
				break;
			case 1:
				_hx_continuation(null,_hx_result);
				break;
			}
		}))(null,0);
	});
}

As you can see, we carry the error value in _hx_result as well and whether or not it's an error depends on the control state. This means that we cannot have a result and an error at the same time, which I think makes sense anyway?

This is also the place where the suspend case 2 would do something along the lines of that Thread.current().events.progress() Aidan posted.

I'm designing all this on the fly so please comment if I'm doing something silly.

@Apprentice-Alchemist
Copy link
Contributor

I was thinking about how you'd make custom "executors" or event loops and how to make a potential coroutine aware asys api work with those.
The straightforward way to implement coroutine based FS apis would look like this:

@:coro function readFile():haxe.io.Bytes {
  var eventLoop = Thread.current().events;
  eventLoop.promise();
  return suspend(c -> someThreadPool.run(() -> {
    var bytes = readFileSync();
    eventLoop.runPromised(() -> c.resume(bytes);
  });
}

But then you wouldn't be able to use these apis in environments that lack a standard haxe event loop.
At the moment that includes eg Lime projects, and hooking up the standard haxe event loop is not necessarily straightforward (game engines can just make it tick every frame, but for eg haxeui's hxwidgets backend it's more complicated)

To solve this there would have to be a way to add a custom suspend/resume mechanism.
The readFile function could then look like this

@:coro function readFile(): haxe.io.Bytes {
  return suspend(c -> someThreadPool.run(() -> c.resume(readFileSync())));
}

c.resume wouldn't directly invoke the coroutine anymore but instead use a custom mechanism to schedule it appropriately.
The standard library could then provide a default executor that uses the standard haxe event loop, but this could then easily be replaced without breaking coroutine aware asys apis.

@skial skial mentioned this pull request Feb 26, 2024
1 task
@Aidan63
Copy link
Contributor

Aidan63 commented Feb 27, 2024

That resume style function seems to be what kotlin does under the hood from what I recall of a talk with a jetbrains bloke I watched the other week explaining some of the implementation details. That abstraction then also makes it easy to shuffle the continuation call else where to resolve the threading issue I previously mentioned, which is where kotlin has its overridable "interceptors" to keep execution on the UI thread for example.

@Simn
Copy link
Member Author

Simn commented Feb 28, 2024

@Apprentice-Alchemist also posted this on Discord: https://try.haxe.org/#20d320bD

So (correct me if I'm wrong) we not only pass a continuation around, but also an executor, and instead of doing Coroutine.suspend we call exec.createResumer.

The runBlocking part isn't clear to me yet, especially how our coro.start and coro.create functions would map to it, and how arguments are handled, and who decides which executor is actually created.

@Apprentice-Alchemist
Copy link
Contributor

Apprentice-Alchemist commented Feb 28, 2024

So (correct me if I'm wrong) we not only pass a continuation around, but also an executor, and instead of doing Coroutine.suspend we call exec.createResumer.

Yes.

@:coroutine function suspend<T>(fn:(resumer:T->Void)->Void):T;
// would desugar to
function suspend<T>(exec:Executor, fn:(resumer:T->Void)->Void, cont: T->Void) {
  var resumer = exec.createResumer(cont);
  fn(resumer);
}

Instead of managing the continuation directly, the callback in suspend gets a "resumer" that tells the executor to schedule invocation of the continuation appropriately.
A thought: instead of a magic suspend function, might it be an idea to allow "manually" implementing a coroutine (and then we could still have a suspend function, but it wouldn't be magic)? The compiler would just need to somehow know "oh, this is actually a coroutine and I need to pass it a continuation when invoking it from within another coroutine".

The runBlocking part isn't clear to me yet, especially how our coro.start and coro.create functions would map to it, and how arguments are handled, and who decides which executor is actually created.

Depends on what coro.start and coro.create are supposed to do.

It seems that coro.start simply starts execution of the coroutine, so that would be equivalent to either directly invoking the coroutine or scheduling it on the executor for execution at eg the next event loop tick.
coro.create on the other hand returns a continuation that still needs to be manually scheduled on an executor I guess?

As for who decides which executor is actually created and used, in the end that should be up to the user.
Side-note: I feel like, independently of coroutines, there should be a way to nicely replace the Haxe event loop and run a custom one. This would be particularly useful for situations where there is already a third-party event loop and where the standard haxe event loop can't be easily integrated (for example, integrating the haxe event loop into haxeui-hxwidgets without incurring high cpu usage has proven complicated).


Unrelated to the executor stuff, I'm a bit confused by the generator implementation in the test suite.
Isn't modeling a generator as Iterator<T> going to break the moment the generating coroutine tries to invoke another coroutine?

So I feel like there would practically need to be two generator variants:

// synchronous generator, can be invoked by any function
@:generator function myGenerator():Int;
// desugars to
function myGenerator():Cont<Void, Int>;
// which returns a coroutine that can invoked multiple times, always returns directly to the parent

// a full asynchronous generator, can only be invoked from a coroutine
@:asyncGenerator function myGenerator():Int;
// would desugar to something like
function myGenerator():Cont<Cont<Int, Void>, Void>;
// which returns a coroutine that can be invoked multiple times and itself takes a continuation

And only the full asynchronous generator could be implemented by a coroutine that takes a yield continuation.


One more thing: I find the mixture of "coroutine" and "continuation" confusing because while coroutines and continuations are similar, they are also different.
What I find quite weird is that with the current implementation a "continuation" can do completely different things when invoked multiple times.
For a coroutine different results when resumed multiple times is expected behavior, but I would expect a continuation to always "refer" to the same code path, no matter how many times it's invoked.

@Simn
Copy link
Member Author

Simn commented Feb 28, 2024

Regarding the generator question, I can't say much more than "it was like this when I found it". I'm just working with the design that was handed to me, which doesn't feel great because I can't answer questions like this, but at least this gives us a chance at making some progress.

The continuation thing looks like a mere nomenclature problem here. It might indeed be a bit confusing to name it that, and I'm happy to clean up the overall naming. What name would you expect for these callbacks?

Side-note: I feel like, independently of coroutines, there should be a way to nicely replace the Haxe event loop and run a custom one. This would be particularly useful for situations where there is already a third-party event loop and where the standard haxe event loop can't be easily integrated (for example, integrating the haxe event loop into haxeui-hxwidgets without incurring high cpu usage has proven complicated).

Yes I'm concerned about this as well. Our event loop handling feels a bit too "all or nothing" at the moment, but I don't have much of a vision what to do here. I think this will have to be cleaned up for coroutines anyway, so there's a good chance we get this kind of improvement as a fallout.

@Aidan63
Copy link
Contributor

Aidan63 commented Mar 2, 2024

Kotlin's runBlocking, launch, async, etc, all have optional arguments which allow you to specify stuff like its interceptors. If those special coroutine launching functions are used inside an existing coroutine and an interceptor is not specified it will inherit the current contexts ones, instead of going with the default. This allows you to provide a custom interceptor once when you initially launch your coroutine and anything internally will inherit it.

The runBlocking in that try haxe haxe still won't work with the more tricky cases surrounding the thread event loop. If a coroutine continued by using Thread.current().events that runBlocking will hang forever as the threads even loop never gets a chance to run. Similar thing will happen with asys calls, the libuv event loop needs to be ran at some point.

@Aidan63
Copy link
Contributor

Aidan63 commented Apr 11, 2024

I've had another look at this and after re-learning most of it I think I understand things a bit more.

For the suspend implementation I think, as the name implies, it always suspends. More specifically it always returns the Suspended intrinsic from kotlin. The kotlin coroutine implementation doc mentions it should never grow the stack so it makes sense that it always returns suspended and bounces things through the event loop / executor / something.

With that in mind I think a simple haxe thread event loop implementation would look like this.

function suspend(cont:(c:Dynamic->Dynamic->Void)->Void, _hx_continuation:Dynamic->Dynamic->Void):CoroutineResult {
    Thread.current().events.run(() -> {
        cont(_hx_continuation);
    });

    return Suspended;
}

This is with the @:suspend part "expanded" and the Suspended is from the enum mentioned a few posts back. Bouncing things through the event loop won't grow the stack, even if the user supplied function immediately calls the continuation function.

I've also been playing around with creating some state machines by hand to test out the internal _hx_stateMachine functions returning that SuspensionResult enum. I've got a gist with a complete example of what it might look like if the suspend function were as above and the state machines return a suspension result.

https://gist.github.com/Aidan63/6eebf0c4f2649c4c55c85e560a592a23

This is what the output of the below haxe coroutine might look like.

class CoroDirectory {
	@:coroutine public static function read_dir() {
		final dir   = open();
		final files = dir.read();

		trace(files);
	}

	@:coroutine public static function open() {
		return Coroutine.suspend(cont -> {
			cont(new GeneratedDirectory(), null);
		});
	}

	@:coroutine public function read() {
		return Coroutine.suspend(cont -> {
			cont([ "foo.txt", "bar.txt" ], null);
		});
	}
}

What feels off about this is the fact that some of the functions want Dynamic->Dynamic->Void but some want Dynamic->Dynamic->CoroutineResult but I just pass them in anyway and let the result get erased. But it does all work, if a suspension point has actually suspended we unwind all the way down where we then pump the event loop until we have result. If a suspension point immediately returns a value we assign it and let it loop again so we don't grow the stack when transitioning to the next state.

I've also spent more time inspecting decompiled kotlin bytecode and reading some of the kotlin coroutine implementation docs and think I've got a good understanding of how they've implented it now. As a though experiment I did the same manually generated state machines in the kotline style, but haxe. I'm not suggesting we bin the current approach and go with this but it does server as a useful comparison of how it solves some of the issues which have been identified and what we might need to do.

Kotlin generates class which implements a IContinuation<T> interface for storing state about a given coroutine (result, error, state id, captured variables, etc) and a resumeWith which continues execution of the given coroutine that continuation holds state for.

interface IContinuation<T> {
    function resumeWith(_hx_result:T, _hx_error:Exception):Void;
}

That resumeWith function does not contain the state machine, that state machine lives in the original function. The open suspend function from the above sample would get transformed into the following.

public static function open(completion:IContinuation<Any>) : CoroutineResult<ClassCoroDirectory> {
    final continuation = if (completion is HxCoro_open) (cast completion : HxCoro_open) else new HxCoro_open(completion);

    while (true) {
        try {
            switch continuation._hx_state {
                case 1: {
                    continuation._hx_state = 2;
                    switch suspend(cont -> cont.resumeWith(new ClassCoroDirectory(), null), continuation) {
                        case Suspended:
                            return Suspended;
                        case Success(v):
                            continuation._hx_result = v;
                        case Error(exn):
                            throw exn;
                    }
                }
                case 2: {
                    continuation._hx_state = -1;
                    continuation.completion.resumeWith(continuation._hx_result, continuation._hx_error);

                    return Success(continuation._hx_result);
                }
                case 3: {
                    continuation._hx_state = -1;
                    throw continuation._hx_error;
                }
                default: {
                    throw new Exception("Invalid coroutine state");
                }
            }
        } catch (_g:Exception) {
            continuation._hx_state = 3;

            continuation.completion.resumeWith(null, _g);

            return Error(_g);
        }
    }
}

and HxCoro_open is the class implementing the continuation interface for this state machine.

private class HxCoro_open implements IContinuation<Any> {
    public final completion : IContinuation<Any>;

    public var _hx_state : Int;

    public var _hx_error : Exception;

    public var _hx_result : Any;

    public function new(completion) {
        this.completion = completion;

        _hx_state = 1;
    }

    public function resumeWith(_hx_result:Any, _hx_error:Exception) {
        this._hx_result = _hx_result;
        this._hx_error  = _hx_error;

        ClassCoroDirectory.open(this);
    }
}

Things to note, the continuation argument of the open function is either the continuation which will be invoked once the coroutine has completed, or a continuation with the suspended state of the that coroutine. This is what that type check on the first line is for. As resumeWith returns nothing the transformed function returns the suspended state enum, this way it doesn't have the return type erasure. The resumeWith function of the continuation implementation just calls the original function again pass in itself.

I've put together a second gist with that same full transformed example.

https://gist.github.com/Aidan63/d4ebb6dcf46e96c3f63fa419f3f26c9e

Both of these manual gists can be ran through any version haxe since it doesn't need any of the coroutine stuff from this branch.

It's easy to see how this method solves the threading issue previously mentioned. The continuation implementations in kotlin have a field of the ContinuationInterceptor interface type which all resumeWith calls pass through to allow the user to shuttle calls onto where ever they want.
I don't know what serialisation mechanisms kotlin provides for its coroutines, but seeing as the state is stored in classes (captured variables are hoisted into fields) serialisation of state is just providing a means to serialise the class.

I started to update coroToExpr.ml based on that first gist to have the functions return that suspension enum but my eyes started to glaze overy looking into what I had to do to generate a switch texpr and enum type, so instead I'm bodging it to return some special values just to test if the principle is sound without needing to manually create the state machines.

Hopefully that all makes sense and is somewhat helpful and I'm not just me being slow on the uptake, repeating things people have already figured out.

@skial skial mentioned this pull request Apr 16, 2024
1 task
@9Morello
Copy link

@Aidan63 Do you think something like libxev could be simpler to integrate with Haxe? It does no allocations at runtime. I think Bun uses some code from it.

@Aidan63
Copy link
Contributor

Aidan63 commented Apr 27, 2024

Do you mean for the asys api? Because it seems to be doing something similar to libuv.

@Aidan63
Copy link
Contributor

Aidan63 commented May 17, 2024

I got about halfway through generating a switch statement before remembering that manually building typed expressions is a right pain and that I'm a lazy sod, so I gave up. I then started to convert the state machine transformation into a macro to make it easier to experiment with and also got half way through before giving up, mainly because I had a feeling I'd already seen one, and going to the original nadako repo there was one!

So I spent some time updating that to work and back porting some changes from the current compiler version and experimenting with that. I've seemingly got suspension working with the marker system previously discussed. I then went on to experiment with a class based approach copied from kotlin's code generation, added scheduling to that to solve the threading issues, and got some initial cancellation working. Plus, most importantly, I finally understand how Coroutine.suspend is supposed to work and have a generic implementation!


I've forked nadakos repo and updated master to still have the function based approach but to be more like the current compiler one. The state machine functions work on a result and error argument and Coroutine.suspend is used instead of the await things.

@:suspend static function delay(ms:Int):Void {
	return Coroutine.suspend(cont -> {
		haxe.Timer.delay(() -> cont(null, null), ms);
	});
}

@:suspend static function getNumber():Int {
	return Coroutine.suspend(cont -> {
		cont(++nextNumber, null);
	});
}

@:suspend static function someAsync():Int {
	trace("hi");

	while (getNumber() < 10) {
		write('wait for it...');

		delay(1000);

		write(Std.string(getNumber()));
	}
	// throw 'bye';
	return 15;
}

Suspension

I've updated the state machine to return the CoroutineResult enum previously discussed (I haven't applied the "combining success and error into one" trick yet, just to make things more explicit while I wrap my head around it).
Whenever a suspending function returns Suspended that is bubbled up and the blocking is implemented by pumping the event loop until a result or error is produced.

This is pretty much what was previously discussed but actually implemented in the macros instead of manually bodging dumps to see if the idea is sound. This is all in master of my fork, I haven't implemented the start function but main contains what it would have to do with pumping the event loop.

Class Style

In the kt_style branch I've changed the generation to be based on what the kotlin compiler currently produces. I.e. each suspending function has a class generated which implements IContinuation<T> and the original function contains the state machine.

e.g. this basic function

@:suspend static function getNumber():Int {
	return Coroutine.suspend(cont -> {
		cont.resume(++nextNumber, null);
	});
}

gets transformed into the following

static function getNumber(_hx_completion:IContinuation<Any>):CoroutineResult<Any> {
	final _hx_continuation = if (_hx_completion is HxCoro_getNumber) (cast _hx_completion : HxCoro_getNumber) else new HxCoro_getNumber(_hx_completion);

	// usual state machine.
}

and HxCoro_getNumber (an implementation of IContinuation<T>) is defined.

class HxCoro_getNumber implements IContinuation<Any> {
	public var _hx_completion(default,never):IContinuation<Any>;

	public var _hx_state:Int;

	public var _hx_result:Any;

	public var _hx_error:haxe.Exception;

	public function new(completion:IContinuation<Any>) {
		_hx_completion = completion;
		_hx_state      = 0;
		_hx_result     = null;
		_hx_error      = null;
	}

	public function resume(result:Any, error:haxe.Exception) {
		_hx_result = result;
		_hx_error  = error;
		
		Main.getNumber(this);
	}
}

Any variables in a suspending function which are used between states would need to be hoisted into these generated classes and any access to them re-mapped to operate on _hx_continuation defined at the top of the function (I have not implemented this yet).

The blocking start function then has to change slightly, I create a custom IContinuation<T> class which will block until a value or exception is produced and pass an instance of that when initially starting the coroutine as the final completion object.

private class WaitingCompletion implements IContinuation<Any> {
	var running : Bool;
	var result : Any;
	var error : Exception;

	public function new() {
		running = true;
		result  = null;
		error   = null;
	}

	public function resume(result:Any, error:Exception) {
		running = false;

		this.result = result;
		this.error  = error;
	}

	public function wait():Any {
		while (running) {
			Thread.current().events.progress();
		}

		if (error != null) {
			throw error;
		} else {
			return result;
		}
	}
}

I've then implemented Coroutine.start as a macro function which for the following final result = Coroutine.start(getNumber) produces

final result = {
	final blocker = new WaitingCompletion();

	switch getNumber(blocker) {
		case Suspended:
			blocker.wait();
		case Result(v):
			v;
		case Error(exn):
			throw exn;
	}
}

Scheduling

Next up I wanted to tackle the threading issues previously mentioned, and with the above class based approach it was shockingly simple. I created a new CoroutineContext class and added that as a field to the IContinuation<T> interface. This context contains a scheduler field of type IScheduler which is a very basic interface.

interface IScheduler {
    function schedule(func:()->Void):Void;
}

The resume functions of the generated classes will use this scheduler to ensure the continuation is performed on the correct thread. E.g. the resume of the HxCoro_getNumber class becomes.

public function resume(result:Any, error:haxe.Exception) {
	_hx_result = result;
	_hx_error  = error;
	
	_hx_context.scheduler.schedule(() -> {
		Main.getNumber(this);
	});
}

These generated classes will set their context to the context of the completion instance they're provided in the constructor, so this way the context (and therefor scheduler) is inherited to all suspending functions. I updated the WaitingCompletion class to assign an initial context with a EventLoopScheduler which simply captures the initial threads event loop and schedulers all functions onto it.

private class EventLoopScheduler implements IScheduler {
	final loop:EventLoop;

	public function new(loop) {
		this.loop = loop;
	}

	public function schedule(func:() -> Void) {
		loop.run(func);
	}
}

This then completely solves the threading issue I mentioned a few comments back. E.g. if you have the following two suspend functions.

@:suspend static function spawnThread():Void {
	return Coroutine.suspend(cont -> {
		Thread.create(() -> {
			trace('Hello from thread ${ Thread.current() }');

			cont.resume(null, null);
		});
	});
}

@:suspend static function schedulerTesting():Int {
	trace('coro from thread ${ Thread.current() }');

	spawnThread();

	trace('coro from thread ${ Thread.current() }');

	return 0;
}

The output you see from schedulerTesting with the event loop scheduler is something along the lines of

coro from thread 0
Hello from thread 1
coro from thread 0

Even though a suspending function resumes from a different thread the implemention of resume will schedule continuation back onto the original thread. As part of this I also updated the Coroutine.start macro to allow an optional second argument to specify you're own scheduler.

final pool      = new FixedThreadPool(4);
final scheduler = new ThreadPoolScheduler(pool);

Coroutine.start(schedulerTesting, scheduler);

Kotlin does things slightly differently, they have an interceptor type which also runs in the resume function, but their implementation creates an entirely new IContinuation<T> where as I've opted for a simpler scheduler type. Not sure what the rational for their version was, but the ground work is in place here atleast if there are some short comings of the simpler scheduler.

Cancellation

I saw there was a closed issue in the nadako repo which said a cancellation mechanism was not planned, that seems pretty mad, surely there needs to be some way to trigger a cancellation of a coroutine and mechanisms in place to hook into the underlying callbacks, promises, whatever.
So I had a go at it. Like kotlin coroutines and dotnet tasks, cancellation is cooperative and I pretty much made a 1:1 copy of dotnet CancellationTokenSource and CancellationToken system as the underlying mechanism for it.

The CoroutineContext has a token field which is of the CancellationToken type and added a Coroutine.isCancellationRequested macro function to check if cancellation of the coroutine has been requested. This function can be used to poll the cancellation request.

@:suspend static function cooperativeCancellation():Int {
	trace('starting work');

	while (Coroutine.isCancellationRequested() == false) {
		accumulated = getNumber();
	}

	return accumulated;
}

I updated the WaitingCompletion to have the token source and added a cancel function to cancel the token. I haven't create a fancier version of Coroutine.start yet which returns a task object of some sorts so to test the cancellation you need to manually do it for now. e.g.

final blocker = new WaitingCompletion(new EventLoopScheduler(Thread.current().events));
final result  = switch cooperativeCancellation(blocker) {
	case Suspended:
		Timer.delay(blocker.cancel, 2000);

		blocker.wait();
	case Success(v):
		v;
	case Error(exn):
		throw exn;
}

trace(result);

The above schedules cancellation in 2 seconds time and after that you will get the accumulated number as a result. Another way of responding to cancellation is with a callback, a naive delay suspending function might look something like this.

@:suspend static function delay(ms:Int):Void {
	return Coroutine.suspend(cont -> {
		haxe.Timer.delay(() -> cont.resume(null, null), ms);
	});
}

The problem with this is that if a cancellation occurs while this coroutine is suspended we won't get a chance to cooperate until atleast we resume. To better support this the CancellationToken allows you to register a callback for when cancellation is received, using this we can create a delay which immediately responds.

@:suspend static function delay(ms:Int):Void {
	return Coroutine.suspend(cont -> {
		var handle : EventHandler = null;

		final events = Thread.current().events;

		handle = events.repeat(() -> {
			events.cancel(handle);

			cont.resume(null, null);
		}, ms);

		cont._hx_context.token.register(() -> {
			events.cancel(handle);

			cont.resume(null, new CancellationException('delay has been cancelled'));
		});
	});
}

We use the repeat API to get a handle to the scheduled function which we can then cancel in a token callback and instead resume with a CancellationException error. If we then change the cooperativeCancellation function to contain a long delay and still schedule a cancellation after 2 seconds the wait call in main will throw a CancellationException exception after two seconds instead of waiting 10 seconds for the delay.

@:suspend static function cooperativeCancellation():Int {
	trace('starting long delay...');

	delay(10000);

	trace('delay over!');

	return 0;
}

Since the token is part of the coroutine context its automatically inherited by called suspend functions so we don't need to do anything to forward tokens or establish a link of some sorts, its done automatically.

My cancellation token implementation was very quickly thrown to gether and is not robust (no threading protection), and I need a way to unregister callbacks so if cancellation doesn't occur during the delay we can stop being notified later on when we're no longer interested. But it shows that the mechanisms work which is the main I wanted to establish.

The Secrets of Coroutine.suspend

It finally makes sense! suspend should be implemented as the following.

public static function suspend(func:(IContinuation<Any>)->Void, cont:IContinuation<Any>):CoroutineResult<Any> {
    final safe = new SafeContinuation(cont);

    func(safe);

	return safe.get();
}

which is about as clear as mud. After much reading, digging though kotlin's implementation and inspecing decompiled bytecode I think I've figured it out. Kotlin's suspendCoroutine is implemented as the following.

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }
}

with suspendCoroutineUninterceptedOrReturn being an "intrinsic" function (implemented in the compiler) which allows you to get at the current continuation. This SafeContinuation implements IContinuation<T> and takes the current continuation as its completion, the safe continuation instance is then passed into the function where at some point resume will be called. What's happening here is a race between getOrThrow and the scheduler of the continuation. Below is the haxe version of SafeContinuation.

class SafeContinuation<T> implements IContinuation<T> {
    final _hx_completion:IContinuation<Any>;
    
    final lock:Mutex;

    var state:Null<CoroutineResult<T>>;

	public final _hx_context:CoroutineContext;

    public function new(completion) {
        _hx_completion = completion;
        _hx_context    = _hx_completion._hx_context;
        lock           = new Mutex();
        state          = null;
    }

    public function resume(result:T, error:Exception) {
        _hx_context.scheduler.schedule(() -> {
            lock.acquire();

            switch state {
                case null:
                    switch error {
                        case null:
                            state = Success(result);
                        case exn:
                            state = Error(exn);
                    }
                    lock.release();
                case _:
                    lock.release();
    
                    _hx_completion.resume(result, error);
            } 
        });
    }

    public function get():CoroutineResult<T> {
        lock.acquire();

        var result = switch state {
            case Success(v):
                Success(v);
            case Error(exn):
                Error(exn);
            case _:
                state = Suspended;
        }

        lock.release();

        return result;
    }
}

Both get and the function passed into the scheduler are racing to be the first to set the state and the other then responds differently based on if they won or not. The get function result is returned from Coroutine.suspend as the status of the coroutine.


Hopefully most of that makes sense and is useful, I've been trying to figured out what underlying mechanisms might be needed to provide higher level functionality as opposed to what that functionality would look like to the end user since there are still many fundamental questions to be answered. The more I progressed down the class based approach the more convienced I've become that it seems to be a much better way of generating coroutines. Adding the basic scheduling and cancellation was really easy with the context inheritance, but haven't really given much thought to how you might fit that into the function based approach.

The kt_style branch of my fork is where all the class based stuff is for anyone who wants to have a dig around. This doesn't have any target specific code (since we now know what Coroutine.suspend is supposed to do) and I've been testing on both the cpp and jvm target where everything works fine. So looking at that branch, the examples in Main.hx, class dumps, as well as stepping through with a target specific debugger might help anyone figure out the finer points of whats going on easier than I've tried to explain here.

Theres one more thing I want to look into which is what kotlin calls "structured concurrency", where a coroutine can spawn new coroutines as children and any errors are propagated up, parent coroutines do not complete until all children complete, and cancelling a parent cancels all children.

@Aidan63
Copy link
Contributor

Aidan63 commented May 17, 2024

Small update.

I've remove the CoroutineResult enum in favour of either returning an instance of the Coroutine.Primitive class to indicate suspension or the actual result. Errors are simply thrown and caught by the calling suspending function and handled that way.

So

@:suspend function getNumber():Int {}

now becomes

function getNumber(_hx_continuation:IContinuation<Any>):Any {}

I've also added Coroutine.launch which gives you a task object you can await or cancel, so lazily starting a coroutine.

final task = Coroutine.launch(cancellationTesting);

Timer.delay(task.cancel, 2000);

trace(task.await());

I've also added startWith and launchWith variants for providing schedulers. Start use to be implemented in a macro but now both start and launch are just bog standard haxe.

@Simn
Copy link
Member Author

Simn commented May 18, 2024

Thank you so much for picking this up! I just read through everything, and I'll definitely have to read it a few more times before I can make useful comments, but here's my initial thoughts:

  • I'm fine with the class-based approach. Last I looked at it, it indeed seems to be the better solution to this problem overall. It should also solve the serialization issue because we can serialize entire class states.
  • Just in general, I'm happy to port a working implementation to the compiler in the end.
  • Something irks me about final _hx_context:CoroutineContext; on the IContinuation interface. It might just be that I don't particularly like interface data fields in general, but even in SafeContinuation it already looks slightly awkward with the field only being there to satisfy the interface.
  • Your CoroutineResult comment makes it sound too simple... My latest conclusion regarding exceptions was that we have to be very careful here because the execution call stack gets messed up, so anything that mentions "simply thrown" alerts me. I'll reserve judgement until I actually look at it properly though!

@Aidan63
Copy link
Contributor

Aidan63 commented May 18, 2024

I agree that fields on interfaces look a bit odd, initially I was initially playing around with abstract class as that would also allow built in convenience functions for resuming with just a result or error. But I'm mostly using the c++ target and after finding that abstract class bug when functions contain default values I decided not to risk things further and stuck to interfaces for now.

Possible continuation alternative.

abstract class Continuation<T> {
	public final _hx_context : CoroutineContext;

	function new(context) {
		_hx_context = context;
	}

	public abstract function resume(result:T, error:Exception);

	public final function resumeWithResult(result:T) {
		resume(result, null);
	}

	public final function resumeWithError(error:Exception) {
		resume(null, error);
	}
}

I'm also very suspicious of the CoroutineResult removal I made last night, especially as it didn't seem to break anything with the small tests I have in my fork. I think kotlin does something similar but I'm not entirely sure (they definitely have an Any return which can be a a special suspend primitive value). What I have noticed is that with both the latest change and the previous CoroutineResult way stack traces aren't correct. E.g. the stack trace from the exception thrown from the someAsync function is

Called from hxcpp::__hxcpp_main
Called from Main::main src/Main.hx line 128
Called from Coroutine::start src/Coroutine.hx line 98
Called from Coroutine::startWith src/Coroutine.hx line 102
Called from Task::await src/Coroutine.hx line 132
Called from _Coroutine.BlockingContinuation::wait src/Coroutine.hx line 246
Error : bye

Which isn't very useful. I'm hoping this is because the original nadako repo is quite old and had zero error support, I just quickly bodged in a try catch around the state machine and have result and errors returned (e.g. there is no ETry transform). It may be wishful thinking but my hope was the current compiler state machine transform is more complete so it wouldn't be a problem as I've made basically no changes to the core state machine generation with my experiments, just how that state machine is contained and invoked.
But the error handling in this probably does deserve closer scrutiny to see if that is all correct.

Another thing I meant to bring up was making sure these changes make sense for the js target. While the original prototypes were seemingly tested exclusively on the js target leading to stuff like threading concerns being forgotten about, I'm somewhat wary the pendulum has swung the other way. Obviously the classes which use the sys api's would need non sys equivilents, but the sum of my personal and professional experience with all things web development is a grand total of zero, so I have no idea if the abstractions I've built in my branch make sense / are workable for the js target!

@Aidan63
Copy link
Contributor

Aidan63 commented May 21, 2024

In digging around the kotlin standard library I found that the abstract class all generated coroutine classes extend also implements a CoroutineStackFrame interface which was added to allow sane stack traces. Unfortunately there's very little info on this outside of an issue tracker description and just sifting through the code.

I had a go at adding something similar to the classes I generate and whenever a suspension occurs generate code which will set the stack frame variable of the current continuation object. Whenever an exception is about to be thrown I currently walk up the "coroutine stack" to build a haxe.Callstack and print it out for a comparison.

// Walk the coroutine stack
final frames = [ _hx_continuation._hx_stackFrame ];
var frame = _hx_continuation.callerFrame();
while (frame != null) {
    frames.push(frame._hx_stackFrame);
    frame = frame.callerFrame();
}

trace((frames:haxe.CallStack).toString());

Generated classes now inherit a FunctionCoroutine abstract class.

abstract class FunctionCoroutine implements IContinuation<Any> implements IStackFrame {
    public final _hx_completion : IContinuation<Any>;

    public final _hx_context : CoroutineContext;

    public var _hx_stackFrame : StackItem;

    function new(_hx_completion, _hx_context, _hx_stackFrame) {
        this._hx_completion = _hx_completion;
        this._hx_context    = _hx_context;
        this._hx_stackFrame = _hx_stackFrame;
    }

    public function callerFrame():Null<IStackFrame> {
        return if (_hx_completion is IStackFrame) {
            (cast _hx_completion : IStackFrame);
        } else {
            null;
        }
    }
}
coro_stack.webm

Not sure how we then integrate this with exceptions yet, but it seems like if we want readable stack traces we need to do some manual tracking. Dotnet async stack traces have also historically just been traces of the internal mechanisms, but when they switched to the dotnet core stuff they also started doing the tracking needed through their AsyncCausalityTracer API to provide sequential looking stack traces.


Don't know how I missed it, but kotlins coroutine context can be extended by user defined keys and these are all copied across automatically as well. That explains how the kotlin standard library coroutine api is very minimal and stuff like cancellation, scopes, jobs, etc are all shifted off to the kotlinx.coroutines external library. The only context element they defined in the standard library is the their interceptor stuff for threading, which seems reasonable as thats too fundamental to be left out. This does make not including cancellation out of the box a bit less mad providing similar context functionality is provided.

Also while digging around in kotlin repos I came across this file tucked away in the folder of the compiler which has the coroutine transformation sources, don't think it's been posted yet in this thread or any of the preceeding ones.

https://github.com/JetBrains/kotlin/blob/master/compiler/backend/src/org/jetbrains/kotlin/codegen/coroutines/coroutines-codegen.md

It covers in a good amount of detal the transformation, optimisations, threading, starting coroutines, safe continuation, etc, etc. It is a behemoth of a document though and not exact a page turner, so bring a strong drink.

@Aidan63
Copy link
Contributor

Aidan63 commented May 26, 2024

I've changed my mind again on the cancellation stuff, if asys is to have some form of cancellation mechanism then it would probably make sense for that to be included in the context by default. But thats something that can be sorted out later.


I made a change to the state machine to resolve an oddity with both the function based approach and the class based one in my test branch. The state machine called the continuation as well as returning a value, error, or suspension marker which could lead to some odd situations.

case 1: {
	_hx_state = -1;
	_hx_continuation.resume("Hello, World!", null);
	return "Hello, World!";
};

To solve this the state machine no longer calls the continuation on result, error, or suspension, it just returns the suspension marker, result, or throws the error. The try catch was also removed from the state machine. It's then the callers responsibility (usually resume of a continuation) to decide what to do next.
The generated continuation implementations then have their resume modified to include the try catch and call the continuation if a result was returned.

function resume(result:Any, error:Exception) {
	_hx_result = result;
	_hx_error  = error;

	try {
		final result = Main.getNumber(this);
		if (result is coro.Primitive) {
			return;
		}

		_hx_completion.resume(result);
	} catch (exn) {
		_hx_completion.resume(null, exn);
	}
}

With this change each invocation to a suspend function will run until a suspension point or a result or error is produced, instead of then immediately calling is completion.


I don't think Coroutine.suspend itself needs to be implemented as some compiler magic, we only need a way to access the current continuation. I added a new CoroutineIntrinsics class (because you can't have macro functions on a class with build meta) and made a currentContinuation function which just returns an expression to the current continuation variable. The suspend function can then be implemented as a normal coroutine function.

@:suspend public static function suspend(func:(IContinuation<Any>)->Void):Any {
    final cont = coro.CoroutineIntrinsics.currentContinuation();
    final safe = new SafeContinuation(cont);

    func(safe);

	return safe.getOrThrow();
}

I applied the most basic single state machine elimination as well which means that function ultimately becomes the following.

public static function suspend(func:coro.IContinuation<Any> -> Void, _hx_completion:coro.IContinuation<Any>) {
	var safe = new coro._Coroutine.SafeContinuation(_hx_completion);
	func(safe);
	return safe.getOrThrow();
}

I'm not too sure on the Any-ness of it though, probably makes more sense to be generic? I tried that but started to get compile errors. Guessing thats just a limit of the this macro version, so might be worth re-visiting with a compiler backed implemention.


I also think the other bit of compiler magic we have, start, should be changed to create and all it does is create a continuation of that function which has not yet been invoked. It's then fairly simply to build blocking, task, promise, whatever else you want on top of that, rather than the current start which tries to do all of that for you.

E.g. if we have the following haxe,

@:suspend function bar(i:Int, s:String):Void {
	trace('$i, $s');
}

function foo() {
	final c = bar.start(7, 'test', new MyCustomContinuation());
}

the start call in foo would get transformed into.

function foo() {
	final c = new HxCoro_bar(7, 'test', new MyContinuation());
}

This allows the user to set a final continuation instance and setup the initial context which would be propagated down. To start the coroutine all that needs to be done is call resume.

What I have not yet figured out though is how this should work with coroutines passed as arguments. Assumably we'd want to allow stuff like the following to work to allow building higer level functionality on top.

function foo(bar:Coroutine<(Int,String)->Void>) {
	final c = bar.start(7, 'test', new MyCustomContinuation());
}

But also to be able to directly call bar if foo is a coroutine function.

@:suspend function foo(bar:Coroutine<(Int,String)->Void>) {
	bar(7, 'test');
}

which makes things a bit tricky.


I've got things working on the js target and have added some examples using nodejs timers, stdout, etc. I don't know how we're supposed to implement a blocking coroutine on node though as from a very brief google there's no way to manually pump the event loop. This is also partly why I think the current blocking start should be changed to create, blocking only makes sense on targets where we own the event loop and for js a promise completion or something similar is probably preferable which could easily be built on top of create.

@skial skial mentioned this pull request May 28, 2024
1 task
@Aidan63
Copy link
Contributor

Aidan63 commented May 28, 2024

I've implemented passing coroutines as arguments, but its one of those things where it feels like there must be a better way.

Firstly I created a series of abstract classes extending Coroutine with increasing type parameters counts. These abstracts have a create function which will return a new, suspended, coroutine. As well as start which will create a new continuation and run it to its result or first suspension and return that. Both of these allow you to specify a completion continuation.

abstract class Coroutine0<TReturn> extends Coroutine<Void->TReturn> {
	public abstract function create(completion:IContinuation<Any>):IContinuation<Any>;

	public abstract function start(completion:IContinuation<Any>):Any;
}

abstract class Coroutine1<TArg0, TReturn> extends Coroutine<TArg0->TReturn> {
	public abstract function create(arg0:TArg0, completion:IContinuation<Any>):IContinuation<Any>;

	public abstract function start(arg0:TArg0, completion:IContinuation<Any>):Any;
}

// etc, etc...

Each suspending function then has another class defined which extends one of these abstracts and implements the create and start functions.

class HxCoro_delayFactory extends Coroutine1<Int, Void> {
	public static final instance:Coroutine1<Int, Void> = new HxCoro_delayFactory();

	private function new() { }

	public function create(arg0:Int, completion:IContinuation<Any>):IContinuation<Any> {
		return new HxCoro_delay(arg0, completion);
	}

	public function start(arg0:Int, completion:IContinuation<Any>):Any {
		return Main.delay(arg0, completion);
	}
}

and then things start to get a bit odd... All function arguments of type coro.Coroutine<T> are replaced with one of those abstract classes based on the generic argument. E.g. coro.Coroutine<()->Void> would be replaced with coro.Coroutine0<Void> and coro.Coroutine<Int->String> with coro.Coroutine1<Int, String> to allow these factory classes to be passed in and have the create and start functions usable. To accompany this any suspend functions being passed as arguments get replaced with the static factory class instance.
So in total, this...

@:coroutine function foo(v:Int):String {
	return 'You entered $v';
}

function bar(c:Coroutine<Int->String>) {
	//
}

function main() {
	bar(foo);
}

turns into this...

@:coroutine function foo(v:Int):String {
	return 'You entered $v';
}

function bar(c:Coroutine1<Int, String>) {
	//
}

function main() {
	bar(HxCoro_fooFactory.instance);
}

This all works but the potentially ever growing abstract coroutine classes and replacing types with more specific ones feels like a bodge, there must be some fancier type system thing we can do here, even if we have to change the Coroutine type a bit. All suggestions welcome!


That's about all I can think of to experiment with for now, all of the stuff covered in my last few comments is in my fork of nadakos repo in the kt_style branch. There are some tests in there to show case usage which run on cpp, jvm, and node. I assume it works fine on other targets since there's no target specific stuff, but I haven't tried.

@Apprentice-Alchemist
Copy link
Contributor

My suggestions is to move the state machine into the generated resume methods, turn the suspendable functions into function foo(...) return new HxCoro_foo(...);, turn Coroutine<...->T> into (...) -> IContinuation<T> and when the compiler encounters coro.start/coro.create directly insert the right code.

Eg something.

@:suspend function delay(ms:Int):Void;

function foo(coro:Coroutine<Int->Void>) {
  coro.create(0, completion);
  coro.start(0, completion);
}

// turns into
function delay(ms:Int, _completion:...): IContinuation<Any>;

function foo(coro:Int->IContinuation<Any>) {
  coro(0, completion);
}

No idea if that's feasible, I don't fully understand how it's all supposed to work.
I feel like the whole continuation approach is elegant in theory but becomes quite complex in practice.

@Aidan63
Copy link
Contributor

Aidan63 commented May 30, 2024

That would work and would be very similar to the current function approach (just in resume instead of a function). The downside to that is that when a suspend function call returns you don't know if it has ran to completion, error, or has been suspended and needs to be resumed at some point. Which is useful to know in many situations.

Having looked at the problem again I think I've made it far more complex than it needs to be. My thinking behind the extra factory classes was that to create a coroutine which is initially suspended needed special support, but thats not the case. You can easily implement it with an extra continuation.

function test_defered() {
	final cont  = new BlockingContinuation(new EventLoopScheduler(Thread.current().events));
	final defer = new Defer(getNumber, cont);

	Assert.equals(0, nextNumber);

	defer.resume(null, null);

	Assert.equals(1, cont.wait());
}

with Defer being.

class Defer implements IContinuation<Any> {
	final lambda : IContinuation<Any>->Any;

	final continuation : IContinuation<Any>;

	public final _hx_context:CoroutineContext;

	public function new(lambda, continuation) {
		this.lambda       = lambda;
		this.continuation = continuation;
		this._hx_context  = continuation._hx_context;
	}

	public function resume(_:Any, _:Exception) {
		try {
			final result = lambda(continuation);
			if (result is Primitive) {
				return;
			}

			continuation.resume(result, null);
		} catch (exn) {
			continuation.resume(null, exn);
		}
	}
}

Calling resume on the defer continuation will start execution of the held coroutine function, so its initially suspended until manually started. So I think my previous comment can be disregarded as a load of rubbish.

@Apprentice-Alchemist
Copy link
Contributor

That would work and would be very similar to the current function approach (just in resume instead of a function). The downside to that is that when a suspend function call returns you don't know if it has ran to completion, error, or has been suspended and needs to be resumed at some point. Which is useful to know in many situations.

But the function call would merely "construct" a coroutine and not start it yet.
To start it you'd need to call resume once which does let you know if it's returned, errored or suspended itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants