Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify #24

Draft
wants to merge 26 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Speed up synchronous streams.
  • Loading branch information
back2dos committed Feb 11, 2021
commit a1f5db5859c7f29da0d0d7fca7f5727bbad12bc5
83 changes: 45 additions & 38 deletions src/tink/streams/Stream.hx
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ abstract Stream<Item, Quality>(StreamObject<Item, Quality>) from StreamObject<It
return new StreamPair(this, b);

@:from static public function ofIterator<T, Quality>(t:Iterator<T>):Stream<T, Quality>
return AsyncLinkStream.ofIterator(t);
return SyncLinkStream.ofIterator(t);

@:from static public function promise<T>(p:Promise<Stream<T, Error>>):Stream<T, Error>
return new PromiseStream(p);
Expand Down Expand Up @@ -144,9 +144,6 @@ interface StreamObject<Item, Quality> {
function forEach<Result>(f:Consumer<Item, Result>):Future<IterationResult<Item, Result, Quality>>;
}

typedef AsyncLink<Item, Quality> = Future<AsyncLinkKind<Item, Quality>>;
typedef AsyncLinkKind<Item, Quality> = LinkKind<Item, Quality, AsyncLink<Item, Quality>>

private enum LinkKind<Item, Quality, Tail> {
Fin(error:Null<Quality>);
Cons(head:Item, tail:Tail);
Expand Down Expand Up @@ -294,7 +291,10 @@ private class Helper {
}
}

class AsyncLinkStream<Item, Quality> implements StreamObject<Item, Quality> {
private typedef AsyncLink<Item, Quality> = Future<AsyncLinkKind<Item, Quality>>;
private typedef AsyncLinkKind<Item, Quality> = LinkKind<Item, Quality, AsyncLink<Item, Quality>>

private class AsyncLinkStream<Item, Quality> implements StreamObject<Item, Quality> {
final link:AsyncLink<Item, Quality>;

public function new(link)
Expand Down Expand Up @@ -342,49 +342,56 @@ class AsyncLinkStream<Item, Quality> implements StreamObject<Item, Quality> {
loop(link);
return wait;
});

static function iteratorLink<Item, Quality>(i:Iterator<Item>):AsyncLink<Item, Quality>
return Future.lazy(() -> if (i.hasNext()) Cons(i.next(), iteratorLink(i)) else Fin(null));

static public function ofIterator<Item, Quality>(i:Iterator<Item>):Stream<Item, Quality>
return new AsyncLinkStream(iteratorLink(i));
}

// typedef SyncLink<Item, Quality> = LinkKind<Item, Quality, Lazy<SyncLink<Item, Quality>>>;
private typedef SyncLink<Item, Quality> = Lazy<LinkKind<Item, Quality, SyncLink<Item, Quality>>>;

// class SyncLinkStream<Item, Quality> implements StreamObject<Item, Quality> {
// final link:SyncLink<Item, Quality>;
private class SyncLinkStream<Item, Quality> implements StreamObject<Item, Quality> {
final link:SyncLink<Item, Quality>;

// public function new(link)
// this.link = link;
public function new(link)
this.link = link;

// public function forEach<Result>(f:Consumer<Item, Result>)
// return new Future<IterationResult<Item, Result, Quality>>(trigger -> {
// final wait = new CallbackLinkRef();
// var running = true;
public function forEach<Result>(f:Consumer<Item, Result>)
return new Future<IterationResult<Item, Result, Quality>>(trigger -> {
final wait = new CallbackLinkRef();
var streaming = true;

// function yield(v) {
// running = false;
// trigger(v);
// }
function yield(v) {
streaming = false;
trigger(v);
}

function loop(cur:SyncLink<Item, Quality>)
while (streaming)
switch cur.get() {
case Fin(error):
yield(switch error {
case null: Done;
case e: cast Failed(Stream.empty(), cast e);
});
case Cons(item, tail):
wait.link = Helper.trySync(f(item), (val, sync) -> switch val {
case Some(v):
yield(Stopped(new SyncLinkStream(tail), v));
case None:
if (sync) cur = tail;
else loop(tail);
});
if (wait.link == null) continue;
}

// function process(cur:SyncLink<Item, Quality>)
// while (running)
// switch cur {
// case Fin(error):
// yield(switch error {
// case null: Done;
// case e: Stopped(Stream.empty(), Failure(e));
// });
// case Cons(head, tail):
loop(link);

// }
return wait;
});

// process(link);
static function iteratorLink<Item, Quality>(i:Iterator<Item>):SyncLink<Item, Quality>
return () -> if (i.hasNext()) Cons(i.next(), iteratorLink(i)) else Fin(null);

// return wait;
// });
// }
static public function ofIterator<Item, Quality>(i:Iterator<Item>):Stream<Item, Quality>
return new SyncLinkStream(iteratorLink(i));
}

class SignalStream<Item, Quality> extends AsyncLinkStream<Item, Quality> {
public function new(signal:Signal<Yield<Item, Quality>>)
Expand Down