Skip to content

Commit

Permalink
feat: take drop
Browse files Browse the repository at this point in the history
  • Loading branch information
dineug committed Jan 30, 2023
1 parent c17afd2 commit d13bcc4
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 21 deletions.
Binary file modified .yarn/install-state.gz
Binary file not shown.
20 changes: 18 additions & 2 deletions packages/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ go(function* () {
#### low-level operator

```js
const flush = channel => new Promise(resolve => channel.flush(resolve));
const flush = channel =>
new Promise((resolve, reject) => channel.flush(resolve, reject));
```

### fork
Expand Down Expand Up @@ -396,7 +397,22 @@ go(function* () {
#### low-level operator

```js
const take = channel => new Promise(resolve => channel.take(resolve));
const take = channel =>
go(function* () {
let drop = () => false;

const promise = new Promise((resolve, reject) => {
drop = channel.take(resolve, reject);
});

promise.cancel = () => {
drop();
return promise;
};

const value = yield promise;
return value;
});
```

### takeEvery
Expand Down
2 changes: 1 addition & 1 deletion packages/go/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dineug/go",
"version": "0.1.3",
"version": "0.1.4",
"description": "Promise Extension Library",
"main": "dist/go.js",
"module": "dist/go.esm.js",
Expand Down
10 changes: 10 additions & 0 deletions packages/go/src/buffers/limitBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ export class LimitBuffer<T = any> implements ChannelBuffer<T> {
return queue;
}

drop(predicate: (value: T) => boolean) {
const index = this.#queue.findIndex(predicate);
if (index === -1) return false;

const queue = this.#queue.slice();
queue.splice(index, 1);
this.#queue = queue;
return true;
}

#drop() {
if (this.#config.limit === -1) return;

Expand Down
1 change: 1 addition & 0 deletions packages/go/src/buffers/type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export type ChannelBuffer<T = any> = {
put(value: T): void;
take(): T | undefined;
flush(): Array<T>;
drop(predicate: (value: T) => boolean): boolean;
};
39 changes: 30 additions & 9 deletions packages/go/src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { type ChannelBuffer, buffers } from '@/buffers';

export type TakeCallback<T = any> = (value: T) => any;
export type CloseCallback = (error: typeof CLOSED) => void;
type CallbackTuple<T> = [TakeCallback<T>, CloseCallback | undefined];

export const CLOSED = Symbol.for('https://github.com/dineug/go.git#closed');
export const isClosed = (value: any): value is typeof CLOSED =>
value === CLOSED;

export class Channel<T = any> {
#buffer: ChannelBuffer<T>;
#callbackBuffer: ChannelBuffer<TakeCallback<T>> = buffers.limitBuffer();
#callbackBuffer: ChannelBuffer<CallbackTuple<T>> = buffers.limitBuffer();
#closed = false;

get closed() {
Expand All @@ -22,31 +28,46 @@ export class Channel<T = any> {
this.#notify();
}

take(callback: TakeCallback<T>) {
if (this.#closed) return;
take(callback: TakeCallback<T>, close?: (error: typeof CLOSED) => void) {
if (this.#closed) {
close?.(CLOSED);
return () => false;
}

this.#callbackBuffer.put(callback);
this.#callbackBuffer.put([callback, close]);
this.#notify();

return () => this.#callbackBuffer.drop(([cb]) => cb === callback);
}

flush(callback: (values: Array<T>) => void) {
if (this.#closed) return;
flush(
callback: (values: Array<T>) => void,
close?: (error: typeof CLOSED) => void
) {
if (this.#closed) {
close?.(CLOSED);
return;
}

callback(this.#buffer.flush());
}

close() {
this.#closed = true;

this.#callbackBuffer
.flush()
.forEach(([__callback, close]) => close?.(CLOSED));
}

#notify() {
if (this.#callbackBuffer.isEmpty() || this.#buffer.isEmpty()) {
return;
}

const callback = this.#callbackBuffer.take();
const value = this.#buffer.take();
callback?.(value as T);
const [callback] = this.#callbackBuffer.take() as CallbackTuple<T>;
const value = this.#buffer.take() as T;
callback?.(value);
}
}

Expand Down
10 changes: 3 additions & 7 deletions packages/go/src/go.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
isFunction,
isGenerator,
isIterator,
isOperator,
isPromiseLike,
} from '@/is-type';
import {
Expand Down Expand Up @@ -75,12 +74,9 @@ export function go<F extends AnyCallback>(

while (!canceled && !result.done) {
try {
if (isOperator(result.value)) {
const next = toNext(result.value);
process = isArray(next) ? next : [next];
value = await (isArray(next) ? Promise.all(next) : next);
}

const next = toNext(result.value);
process = isArray(next) ? next : [next];
value = await (isArray(next) ? Promise.all(next) : next);
result = await co.next(value);
} catch (error) {
if (isKill(error)) {
Expand Down
2 changes: 1 addition & 1 deletion packages/go/src/operators/flush.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Channel } from '@/channel';

export const flush = <T = any>(channel: Channel<T>) =>
new Promise<Array<T>>(resolve => channel.flush(resolve));
new Promise<Array<T>>((resolve, reject) => channel.flush(resolve, reject));
17 changes: 16 additions & 1 deletion packages/go/src/operators/take.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
import { Channel } from '@/channel';
import { type PromiseWithCancel, go } from '@/go';

export const take = <T = any>(channel: Channel<T>) =>
new Promise<T>(resolve => channel.take(resolve));
go(function* () {
let drop = () => false;

const promise = new Promise<T>((resolve, reject) => {
drop = channel.take(resolve, reject);
}) as PromiseWithCancel<T>;

promise.cancel = () => {
drop();
return promise;
};

const value: T = yield promise;
return value;
});

0 comments on commit d13bcc4

Please sign in to comment.