Skip to content

Commit

Permalink
fix aborting, update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
loreanvictor committed Nov 9, 2023
1 parent dd04e32 commit 089d758
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 20 deletions.
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,13 @@
<img src="misc/dark.svg#gh-dark-mode-only" height="96px"/>
<img src="misc/light.svg#gh-light-mode-only" height="96px"/>

_Handle Change with Ease_ <sup>in JavaScript</sup>
_Reactive Expressions for JavaScript_

```bash
npm i quel
```

<br>

**Change** happens when a button is clicked, when value of an input is changed as the user types, when the response to a request arrives, when data is pushed via some socket, when time passes, etc. [**quel**](.) helps you encapsulate sources of change, and combine, process and react to the resulting changing values via simple expressions and observations.
<!--
Most applications written in JavaScript require some degree of [reactive programming](https://en.wikipedia.org/wiki/Reactive_programming). This is either achieved via domain-specific frameworks (such as [React](https://reactjs.org)) or general-purpose libraries like [RxJS](https://rxjs.dev), which are centered around a [functional reactive programming](https://en.wikipedia.org/wiki/Functional_reactive_programming) paradigm.
[**quel**](.) is a general-purpose library for reactive programming with an imperative style, resulting in code more in line with most other JavaScript code, and easier to read, write, understand and maintain.
-->
**quel** is a tiny library for reactive programming in JavaScript. You can use it to write applications that react to user interactions, events, timers, web sockets, etc. using plain JavaScript expressions and functions.

```js
import { from, observe } from 'quel'
Expand All @@ -50,7 +43,8 @@ observe($ => div$.textContent = `${$(chars)} chars, ${$(words)} words`)
<br>
A more involved example:
**quel** focuses on simplicity and composability. Even for more complex use cases (such as higher-order reactive sources, bouncing events, etc.)
it relies on native JavaScript features such async functions and combination, instead of operators, hooks, or other custom abstractions.
```js
//
Expand Down Expand Up @@ -125,12 +119,12 @@ import { from, observe } from 'https://esm.sh/quel'
# Usage
Working with [**quel**](.) involves four stages:
Working with [**quel**](.) involves four steps:
1. Encapsulate (or create) [sources of change](#sources),
2. Process and combine the these changing values using [functions & expressions](#expressions),
3. [Observe](#observation) these changing values and react to them
(or [iterate](#iteration) over them),
4. [Clean up](#cleanup) the sources, releasing resources (e.g. stop listening to user events, close an open socket, etc.).
4. [Clean up](#cleanup) the sources, releasing resources (e.g. stop a timer, remove an event listener, cloe a socket, etc.).
## Sources
Expand Down Expand Up @@ -181,7 +175,7 @@ await src.stops()
## Expressions
Combine two sources:
Combine two sources using simple _expression_ functions:
```js
const sum = $ => $(a) + $(b)
```
Expand All @@ -191,7 +185,7 @@ import { SKIP } from 'quel'

const odd = $ => $(a) % 2 === 0 ? SKIP : $(a)
```
Do async operations:
Expressions can be async:
```js
const response = async $ => {
await sleep(200)
Expand Down Expand Up @@ -546,3 +540,9 @@ npm run bench:mem
```
<br><br>
<div align="center">
<img src="chameleon.png" width="256px" />
</div>
<br><br>
Binary file added chameleon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "quel",
"version": "0.3.5",
"version": "0.3.6",
"description": "Expression-based reactive library for hot listenables",
"main": "dist/commonjs/index.js",
"module": "dist/es/index.js",
Expand Down
75 changes: 74 additions & 1 deletion src/observe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ import { Listener, SourceLike, isSourceLike, Observable, ExprFn, SKIP, STOP, Exp
import { Source } from './source'


/**
* turns an object, that might be an expression function or a source, into a source.
* will attach the created source on the expression function and reuse it on subsequent calls.
*
* @param {Observable<T>} fn the object to normalize
* @returns {SourceLike<T>}
*/
function normalize<T>(fn: Observable<T>): SourceLike<T> {
if (typeof fn === 'function') {
(fn as any).__observed__ ??= observe(fn)
Expand All @@ -13,11 +20,55 @@ function normalize<T>(fn: Observable<T>): SourceLike<T> {
}


/**
* Represents an observation of an expression. An expression is a function that can track
* some other sources and its return value depends on the values of those sources. This tracking
* needs to be done explicitly via the _track function_ passed to the expression.
*
* Whenever a tracked source emits a value, the expression function is re-run, and its new value
* is emitted. For each re-run of the expression function, the latest value emitted by each source
* is used. An initial dry-run is performed upon construction to track necessary sources.
*
* @example
* ```ts
* const a = makeASource<number>()
* const b = makeAnotherSource<number>()
*
* const expr = $ => $(a) + $(b)
* const obs = new Observation(expr)
* ```
*/
export class Observation<T> extends Source<T> {
/**
* A mapping of all tracked sources. For receiving the values of tracked sources,
* a handler is registered with them. this handler is stored in this map for cleanup.
*/
tracked: Map<SourceLike<any>, Listener<any>> = new Map()

/**
* A candidate tracked source for cleanup. If a tracked source initiates a rerun
* by emitting, it is marked as a clean candidate. If the source is not re-tracked (i.e. used)
* in the next run, it will be cleaned up.
*/
cleanCandidate: SourceLike<any> | undefined

/**
* A token to keep track of the current run. If the expression is re-run
* before a previous run is finished (which happens in case of async expressions),
* then the value of the out-of-sync run is discarded.
*/
syncToken = 0

/**
* The last sync token. If this is different from the current sync token,
* then the last started execution has not finished yet.
*/
lastSyncToken = 0

/**
* @param {ExprFn<T>} fn the expression to observe
* @param {Listener<void>} abort a listener to call when async execution is aborted
*/
constructor(
readonly fn: ExprFn<T>,
readonly abort?: Listener<void>,
Expand All @@ -27,9 +78,14 @@ export class Observation<T> extends Source<T> {
this.tracked.clear()
})

// do a dry run on init, to track all sources
this.run()
}

/**
* cleans the clean candidate if present.
* @returns true if observation was already clean (no clean candidate), false otherwise.
*/
protected clean() {
if (this.cleanCandidate) {
const handler = this.tracked.get(this.cleanCandidate)!
Expand All @@ -43,16 +99,29 @@ export class Observation<T> extends Source<T> {
}
}

/**
* creates a new sync token to distinguish async executions that should be aborted.
* will call the abort listener if some execution is aborted.
* @returns a new sync token.
*/
protected nextToken() {
if (this.syncToken > 0) {
this.abort && this.abort()
// check if there is an unfinished run that needs to be aborted
if (this.lastSyncToken !== this.syncToken) {
this.abort && this.abort()
}
// if this is a higher-order observation, the last emitted source
// should be stopped.
isSourceLike(this.last) && this.last.stop()
}

/* istanbul ignore next */
return ++this.syncToken > 10e12 ? this.syncToken = 1 : this.syncToken
}

/**
*
*/
protected run(src?: SourceLike<any>) {
this.cleanCandidate = src
const syncToken = this.nextToken()
Expand All @@ -73,6 +142,10 @@ export class Observation<T> extends Source<T> {
}

protected override emit(res: ExprResultSync<T>) {
// emission means last run is finished,
// so sync tokens should be synced.
this.lastSyncToken = this.syncToken

if (this.clean() && res !== SKIP && res !== STOP) {
super.emit(res)
} else if (res === STOP) {
Expand Down
26 changes: 22 additions & 4 deletions src/test/observe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,35 @@ describe(observe, () => {
expect(cb).toHaveBeenCalledTimes(3)
})

test('accepts an abort listener.', () => {
test('accepts an abort listener.', async () => {
const abort = jest.fn()

const delay = ms => new Promise(resolve => setTimeout(resolve, ms))

const a = new Subject<number>()
observe($ => $(a)! * 2, abort)
observe(async $ => {
await delay(1)

return $(a)! * 2
}, abort)

expect(abort).not.toHaveBeenCalled()
a.set(1)
expect(abort).toHaveBeenCalledTimes(1)
await delay(2)
a.set(2)
expect(abort).toHaveBeenCalledTimes(2)
// sequential runs, no abort.
expect(abort).toHaveBeenCalledTimes(0)
await delay(2)
// make sure still no abort after the last run finishes
expect(abort).toHaveBeenCalledTimes(0)

a.set(3)
a.set(4)
// parallel runs, first one is aborted
expect(abort).toHaveBeenCalledTimes(1)
await delay(2)
// still the same abort
expect(abort).toHaveBeenCalledTimes(1)
})

test('can be stopped.', () => {
Expand Down

0 comments on commit 089d758

Please sign in to comment.