From a2e135116c7d3778e4d3a91a244dbc00e6abac0c Mon Sep 17 00:00:00 2001 From: Meno Abels Date: Tue, 17 Dec 2024 07:04:13 +0100 Subject: [PATCH 01/22] feature: specs --- tests/fireproof/streaming-api.test.ts | 83 +++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 tests/fireproof/streaming-api.test.ts diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts new file mode 100644 index 000000000..bbc85f359 --- /dev/null +++ b/tests/fireproof/streaming-api.test.ts @@ -0,0 +1,83 @@ +/* + * The streaming API to come to query and allDocs in Database class + * + * we should incorporate the changes of this PR into this: + * https://github.com/fireproof-storage/fireproof/pull/315 + * + * We need new Methods or a structure return value like this: + * Due to that we need to harmonize the return values of both methods to + * return the same structure. I think we should go with an approach like + * the Response object in the fetch API. + * interface QueryResponse { + * rows(): Promise; + * iterator(): AsyncIterableIterator; + * stream(): ReadableStream; + * subscribe(callback: (doc: DocWithId) => void): unsubscribe() => void; + * } + * it should only possible to call every method once. + * if you call it twice it should throw an error + * Keep in mind that the iterator and stream should be able to + * substitute the changes method. So we need the possibility to + * pass options to allDocs and or query to change the behavior: + * - SNAPSHOT default -> means it returns the current state of the database and + * closes the stream after that. + * - LIVE -> means it returns the current state of the database and keeps the stream open + * - FUTURE -> means it keeps the stream open for new records which meet the query arguments + * (this might be dropped and simulated with the startpoint option in LIVE mode) + * + * the rows method will only behave in SNAPSHOT mode. + * We should be able to extend in future implemenation pass a startpoint to LIVE. + * + * The first group of tests should verify that query and allDocs both return the same + * QueryResponse object and implement rows method. The test should check if rows + * returns empty arrays if the database is empty and if it returns the correct # documents + * in the database. There should be a test to check for the double call error. + * + * The second group of tests should verify that the iterator and stream method works as expected in + * SNAPSHOT mode. Both should pass the same tests as the rows method. These tests should verify that + * we are able to stream documents from the database without loosing memory. We should test if + * close/unsubscribe of the stream works as expected. + * + * Future milestone: + * The third group of tests should verify that the iterator and stream method works as expected in + * LIVE and FUTURE mode. In this mode we need to check if the stream receives new documents which + * are written to the database after the stream was created. We should think about the raise condition + * of loosing document events between the allDocs and query call and the creation of the stream. + * + */ + +import { fireproof, Ledger } from '@fireproof/core'; + +describe('query api', () => { + let lr: Ledger + beforeEach(async () => { + lr = fireproof("name") + await Promise.all(Array(10).fill(0).map((_, i) => { + lr.put({ id: `doc-${i}`, name: `doc-${i}` }) + })) + }) + afterEach(async () => { + await lr.destroy() + }) + for (const method of [ + { + name: "query", + fn: () => lr.query("name") + }, + { + name: "allDocs", + fn: () => lr.allDocs() + }]) { + describe(`${method.name} method`, () => { + it("double call error", async () => { + const q = await method.fn() + expect(() => q.rows()).not.toThrowError() + expect(async () => method.fn()).toThrowError() + }) + it("test rows method", () => { + method.fn().then(r => r.rows()).then((docs) => { + }) + }) + }) + } +}) From 4894a40a9e42b7809e8d46f92db436deb36203c6 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 18 Dec 2024 18:11:42 +0100 Subject: [PATCH 02/22] feat: Add ReadableStream to the allDocs return value --- src/crdt-helpers.ts | 4 + src/crdt.ts | 114 ++++++++++++++++++----- src/ledger.ts | 128 ++++---------------------- src/types.ts | 13 ++- tests/fireproof/streaming-api.test.ts | 86 ++++++++++------- 5 files changed, 172 insertions(+), 173 deletions(-) diff --git a/src/crdt-helpers.ts b/src/crdt-helpers.ts index 5c66055cf..fba5edd5e 100644 --- a/src/crdt-helpers.ts +++ b/src/crdt-helpers.ts @@ -86,6 +86,10 @@ export async function applyBulkUpdateToCrdt( return { head: result.head } as CRDTMeta; } +export function docUpdateToDocWithId({ id, del, value }: DocUpdate): DocWithId { + return (del ? { _id: id, _deleted: true } : { _id: id, ...value }) as DocWithId; +} + // this whole thing can get pulled outside of the write queue async function writeDocContent( store: StoreRuntime, diff --git a/src/crdt.ts b/src/crdt.ts index d8843b5b3..8a9891053 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -9,20 +9,19 @@ import { toStoreRuntime, } from "./blockstore/index.js"; import { - clockChangesSince, applyBulkUpdateToCrdt, getValueFromCrdt, readFiles, - getAllEntries, clockVis, getBlock, doCompact, + docUpdateToDocWithId, + getAllEntries, } from "./crdt-helpers.js"; import type { DocUpdate, CRDTMeta, ClockHead, - ChangesOptions, DocValue, IndexKeyType, DocWithId, @@ -30,6 +29,9 @@ import type { Falsy, SuperThis, IndexTransactionMeta, + QueryResponse, + ListenerFn, + QueryStreamMarker, } from "./types.js"; import { index, type Index } from "./indexer.js"; import { CRDTClock } from "./crdt-clock.js"; @@ -55,6 +57,11 @@ export class CRDT { readonly logger: Logger; readonly sthis: SuperThis; + // Subscriptions + _listening = false; + readonly _listeners = new Set>(); + readonly _noupdate_listeners = new Set>(); + constructor(sthis: SuperThis, opts: LedgerOpts) { this.sthis = sthis; this.logger = ensureLogger(sthis, "CRDT"); @@ -151,13 +158,89 @@ export class CRDT { // if (snap) await this.clock.applyHead(crdtMeta.head, this.clock.head) - async allDocs(): Promise<{ result: DocUpdate[]; head: ClockHead }> { - await this.ready(); - const result: DocUpdate[] = []; - for await (const entry of getAllEntries(this.blockstore, this.clock.head, this.logger)) { - result.push(entry); - } - return { result, head: this.clock.head }; + /** + * Retrieve the current set of documents. + */ + allDocs(opts: { waitFor?: Promise } = {}): QueryResponse { + const waitFor = opts.waitFor; + + const currentDocs = (since?: ClockHead) => { + void since; + + // TODO: + // const opts: ChangesOptions = {}; + + // TODO: + // if (since) { + // return clockChangesSince(this.blockstore, this.clock.head, since || [], opts, this.logger).then((a) => a.result); + // } + + const iterator = getAllEntries(this.blockstore, this.clock.head, this.logger); + return iterator; + }; + + const snapshot = async () => { + await waitFor; + await this.ready(); + // TODO: Map over async iterable + // return currentDocs().map(docUpdateToDocWithId) + + // NOTE: + // return Array.fromAsync(currentDocs()).then((a) => a.map(docUpdateToDocWithId)); + + const docs: DocWithId[] = []; + for await (const update of currentDocs()) { + docs.push(docUpdateToDocWithId(update)); + } + return docs; + }; + + const stream = (opts: { since?: ClockHead } = {}) => { + const clock = this.clock; + const ready = this.ready.bind(this); + + return new ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>({ + async start(controller) { + await waitFor; + await ready(); + + const it = currentDocs(opts.since); + + async function iterate(prevValue: DocUpdate) { + const { done, value } = await it.next(); + + controller.enqueue({ + doc: docUpdateToDocWithId(prevValue), + marker: { kind: "preexisting", done: done || false }, + }); + + if (!done) await iterate(value); + } + + const { value } = await it.next(); + if (value) await iterate(value); + + clock.onTick((updates: DocUpdate>[]) => { + updates.forEach((update) => { + controller.enqueue({ doc: docUpdateToDocWithId(update as DocUpdate), marker: { kind: "new" } }); + }); + }); + }, + + // NOTE: Ideally we unsubscribe from `onTick` here. + // cancel() {} + }); + }; + + return { + snapshot, + live(opts: { since?: ClockHead } = {}) { + return stream(opts); + }, + future() { + return stream(); + }, + }; } async vis(): Promise { @@ -181,17 +264,6 @@ export class CRDT { return result; } - async changes( - since: ClockHead = [], - opts: ChangesOptions = {}, - ): Promise<{ - result: DocUpdate[]; - head: ClockHead; - }> { - await this.ready(); - return await clockChangesSince(this.blockstore, this.clock.head, since, opts, this.logger); - } - async compact(): Promise { const blocks = this.blockstore as EncryptedBlockstore; return await blocks.compact(); diff --git a/src/ledger.ts b/src/ledger.ts index c64bd4a67..bbfe389fb 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -5,27 +5,21 @@ import { CRDT, HasCRDT } from "./crdt.js"; import { index } from "./indexer.js"; import { type DocUpdate, - type ClockHead, type ConfigOpts, type MapFn, type QueryOpts, - type ChangesOptions, type DocSet, type DocWithId, type IndexKeyType, - type ListenerFn, type DocResponse, type BulkResponse, - type ChangesResponse, type DocTypes, type IndexRows, type DocFragment, - type ChangesResponseRow, type CRDTMeta, - type AllDocsQueryOpts, - type AllDocsResponse, type SuperThis, PARAM, + QueryResponse, } from "./types.js"; import { DbMeta, SerdeGatewayInterceptor, StoreEnDeFile, StoreURIRuntime, StoreUrlsOpts } from "./blockstore/index.js"; import { ensureLogger, ensureSuperThis, NotFoundError, toSortedArray } from "./utils.js"; @@ -79,16 +73,8 @@ export interface Ledger
> extends HasC put(doc: DocSet): Promise; bulk(docs: DocSet[]): Promise; del(id: string): Promise; - changes(since?: ClockHead, opts?: ChangesOptions): Promise>; - allDocs(opts?: AllDocsQueryOpts): Promise>; - allDocuments(): Promise<{ - rows: { - key: string; - value: DocWithId; - }[]; - clock: ClockHead; - }>; - subscribe(listener: ListenerFn, updates?: boolean): () => void; + allDocs(): QueryResponse; + allDocuments(): QueryResponse; query( field: string | MapFn, @@ -145,7 +131,6 @@ export class LedgerShell
> implements get crdt(): CRDT
{ return this.ref.crdt; } - get name(): string { return this.ref.name; } @@ -173,24 +158,12 @@ export class LedgerShell
> implements del(id: string): Promise { return this.ref.del(id); } - changes(since?: ClockHead, opts?: ChangesOptions): Promise> { - return this.ref.changes(since, opts); - } - allDocs(opts?: AllDocsQueryOpts): Promise> { - return this.ref.allDocs(opts); + allDocs(): QueryResponse { + return this.ref.allDocs(); } - allDocuments(): Promise<{ - rows: { - key: string; - value: DocWithId; - }[]; - clock: ClockHead; - }> { + allDocuments(): QueryResponse { return this.ref.allDocuments(); } - subscribe(listener: ListenerFn, updates?: boolean): () => void { - return this.ref.subscribe(listener, updates); - } query( field: string | MapFn, opts?: QueryOpts, @@ -206,9 +179,6 @@ class LedgerImpl
> implements Ledger>(); - readonly _noupdate_listeners = new Set>(); readonly crdt: CRDT
; readonly _writeQueue: WriteQueue
; // readonly blockstore: BaseBlockstore; @@ -267,8 +237,13 @@ class LedgerImpl
> implements Ledger[]) => this.crdt.bulk(updates), this.opts.writeQueue); - this.crdt.clock.onTock(() => this._no_update_notify()); + // this.blockstore = this._crdt.blockstore; // for connector compatibility + this._writeQueue = writeQueue(async (updates: DocUpdate
[]) => { + return await this.crdt.bulk(updates); + }); //, Infinity) + // TODO: Rebase conflict: + // this._writeQueue = writeQueue(this.sthis, async (updates: DocUpdate
[]) => this.crdt.bulk(updates), this.opts.writeQueue); + // this.crdt.clock.onTock(() => this._no_update_notify()); } get name(): string { @@ -327,59 +302,13 @@ class LedgerImpl
> implements Ledger(since: ClockHead = [], opts: ChangesOptions = {}): Promise> { - await this.ready(); - this.logger.Debug().Any("since", since).Any("opts", opts).Msg("changes"); - const { result, head } = await this.crdt.changes(since, opts); - const rows: ChangesResponseRow[] = result.map(({ id: key, value, del, clock }) => ({ - key, - value: (del ? { _id: key, _deleted: true } : { _id: key, ...value }) as DocWithId, - clock, - })); - return { rows, clock: head, name: this.name }; - } - - async allDocs(opts: AllDocsQueryOpts = {}): Promise> { - await this.ready(); - void opts; + allDocs(): QueryResponse { this.logger.Debug().Msg("allDocs"); - const { result, head } = await this.crdt.allDocs(); - const rows = result.map(({ id: key, value, del }) => ({ - key, - value: (del ? { _id: key, _deleted: true } : { _id: key, ...value }) as DocWithId, - })); - return { rows, clock: head, name: this.name }; - } - - async allDocuments(): Promise<{ - rows: { - key: string; - value: DocWithId; - }[]; - clock: ClockHead; - }> { - return this.allDocs(); + return this.crdt.allDocs({ waitFor: this.ready() }); } - subscribe(listener: ListenerFn, updates?: boolean): () => void { - this.logger.Debug().Bool("updates", updates).Msg("subscribe"); - if (updates) { - if (!this._listening) { - this._listening = true; - this.crdt.clock.onTick((updates: DocUpdate>[]) => { - void this._notify(updates); - }); - } - this._listeners.add(listener as ListenerFn>); - return () => { - this._listeners.delete(listener as ListenerFn>); - }; - } else { - this._noupdate_listeners.add(listener as ListenerFn>); - return () => { - this._noupdate_listeners.delete(listener as ListenerFn>); - }; - } + allDocuments(): QueryResponse { + return this.allDocs(); } // todo if we add this onto dbs in fireproof.ts then we can make index.ts a separate package @@ -401,29 +330,6 @@ class LedgerImpl
> implements Ledger>[]) { - await this.ready(); - if (this._listeners.size) { - const docs: DocWithId>[] = updates.map(({ id, value }) => ({ ...value, _id: id })); - for (const listener of this._listeners) { - await (async () => await listener(docs as DocWithId
[]))().catch((e: Error) => { - this.logger.Error().Err(e).Msg("subscriber error"); - }); - } - } - } - - async _no_update_notify() { - await this.ready(); - if (this._noupdate_listeners.size) { - for (const listener of this._noupdate_listeners) { - await (async () => await listener([]))().catch((e: Error) => { - this.logger.Error().Err(e).Msg("subscriber error"); - }); - } - } - } } function defaultURI( diff --git a/src/types.ts b/src/types.ts index 207cafee8..f356eba9d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -270,13 +270,12 @@ export interface AllDocsQueryOpts extends QueryOpts { prefix?: string; } -export interface AllDocsResponse { - readonly rows: { - readonly key: string; - readonly value: DocWithId; - }[]; - readonly clock: ClockHead; - readonly name?: string; +export type QueryStreamMarker = { kind: "preexisting"; done: boolean } | { kind: "new" }; + +export interface QueryResponse { + snapshot(): Promise[]>; + live(opts: { since?: ClockHead }): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; + future(): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; } type EmitFn = (k: IndexKeyType, v?: DocFragment) => void; diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index bbc85f359..8f6e5f0dd 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -16,6 +16,7 @@ * } * it should only possible to call every method once. * if you call it twice it should throw an error + * ------- * Keep in mind that the iterator and stream should be able to * substitute the changes method. So we need the possibility to * pass options to allDocs and or query to change the behavior: @@ -46,38 +47,55 @@ * */ -import { fireproof, Ledger } from '@fireproof/core'; +import { fireproof, Ledger } from "@fireproof/core"; -describe('query api', () => { - let lr: Ledger - beforeEach(async () => { - lr = fireproof("name") - await Promise.all(Array(10).fill(0).map((_, i) => { - lr.put({ id: `doc-${i}`, name: `doc-${i}` }) - })) - }) - afterEach(async () => { - await lr.destroy() - }) - for (const method of [ - { - name: "query", - fn: () => lr.query("name") - }, - { - name: "allDocs", - fn: () => lr.allDocs() - }]) { - describe(`${method.name} method`, () => { - it("double call error", async () => { - const q = await method.fn() - expect(() => q.rows()).not.toThrowError() - expect(async () => method.fn()).toThrowError() - }) - it("test rows method", () => { - method.fn().then(r => r.rows()).then((docs) => { - }) - }) - }) - } -}) +interface DocType { + _id: string; + name: string; +} + +describe("query api", () => { + let lr: Ledger; + + const AMOUNT_OF_DOCS = 10; + + beforeEach(async () => { + lr = fireproof(Date.now().toString()); + + await Promise.all( + Array(AMOUNT_OF_DOCS) + .fill(0) + .map((_, i) => { + return lr.put({ _id: `doc-${i}`, name: `doc-${i}` }); + }), + ); + }); + + afterEach(async () => { + await lr.destroy(); + }); + + describe("allDocs", () => { + it("test `snapshot` method", async () => { + const docs = await lr.allDocs().snapshot(); + expect(docs.length).toBe(AMOUNT_OF_DOCS); + }); + it("test `future` method", async () => { + const stream = lr.allDocs().future(); + let docCount = 0; + + for await (const { doc, marker } of stream) { + void doc; + docCount++; + + if (marker.kind === "preexisting" && marker.done) { + await lr.put({ _id: `doc-${AMOUNT_OF_DOCS}`, name: `doc-${AMOUNT_OF_DOCS}` }); + } + + if (marker.kind === "new") break; + } + + expect(docCount).toBe(AMOUNT_OF_DOCS + 1); + }); + }); +}); From a494aa3f9130cf54a33c836008845fe081ba7a50 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 18 Dec 2024 18:23:29 +0100 Subject: [PATCH 03/22] fix: Live, not future --- src/crdt.ts | 32 ++++++++++++++------------- src/types.ts | 2 +- tests/fireproof/streaming-api.test.ts | 4 ++-- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/crdt.ts b/src/crdt.ts index 8a9891053..b492dd247 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -195,7 +195,7 @@ export class CRDT { return docs; }; - const stream = (opts: { since?: ClockHead } = {}) => { + const stream = (opts: { futureOnly: boolean; since?: ClockHead }) => { const clock = this.clock; const ready = this.ready.bind(this); @@ -204,21 +204,23 @@ export class CRDT { await waitFor; await ready(); - const it = currentDocs(opts.since); + if (opts.futureOnly === false) { + const it = currentDocs(opts.since); - async function iterate(prevValue: DocUpdate) { - const { done, value } = await it.next(); + async function iterate(prevValue: DocUpdate) { + const { done, value } = await it.next(); - controller.enqueue({ - doc: docUpdateToDocWithId(prevValue), - marker: { kind: "preexisting", done: done || false }, - }); + controller.enqueue({ + doc: docUpdateToDocWithId(prevValue), + marker: { kind: "preexisting", done: done || false }, + }); - if (!done) await iterate(value); - } + if (!done) await iterate(value); + } - const { value } = await it.next(); - if (value) await iterate(value); + const { value } = await it.next(); + if (value) await iterate(value); + } clock.onTick((updates: DocUpdate>[]) => { updates.forEach((update) => { @@ -234,11 +236,11 @@ export class CRDT { return { snapshot, - live(opts: { since?: ClockHead } = {}) { - return stream(opts); + live(opts?: { since?: ClockHead }) { + return stream({ futureOnly: false, since: opts?.since }); }, future() { - return stream(); + return stream({ futureOnly: true }); }, }; } diff --git a/src/types.ts b/src/types.ts index f356eba9d..2c529bb04 100644 --- a/src/types.ts +++ b/src/types.ts @@ -274,7 +274,7 @@ export type QueryStreamMarker = { kind: "preexisting"; done: boolean } | { kind: export interface QueryResponse { snapshot(): Promise[]>; - live(opts: { since?: ClockHead }): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; + live(opts?: { since?: ClockHead }): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; future(): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; } diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index 8f6e5f0dd..cd352c34f 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -80,8 +80,8 @@ describe("query api", () => { const docs = await lr.allDocs().snapshot(); expect(docs.length).toBe(AMOUNT_OF_DOCS); }); - it("test `future` method", async () => { - const stream = lr.allDocs().future(); + it("test `live` method", async () => { + const stream = lr.allDocs().live(); let docCount = 0; for await (const { doc, marker } of stream) { From c9aadd764fee0cd2f67c41b079aa933a1fa7e10a Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 18 Dec 2024 18:55:46 +0100 Subject: [PATCH 04/22] test: future --- tests/fireproof/streaming-api.test.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index cd352c34f..a21d91016 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -97,5 +97,23 @@ describe("query api", () => { expect(docCount).toBe(AMOUNT_OF_DOCS + 1); }); + it("test `future` method", async () => { + const stream = lr.allDocs().future(); + let docCount = 0; + + // NOTE: Test could probably be written in a better way. + // We want to start listening before we add the documents. + lr.put({ _id: `doc-${AMOUNT_OF_DOCS + 0}`, name: `doc-${AMOUNT_OF_DOCS + 0}` }); + lr.put({ _id: `doc-${AMOUNT_OF_DOCS + 1}`, name: `doc-${AMOUNT_OF_DOCS + 1}` }); + + for await (const { doc, marker } of stream) { + void doc; + + if (marker.kind === "new") docCount++; + if (docCount === 2) break; + } + + expect(docCount).toBe(2); + }); }); }); From 0d318b5285c68b1d653d08d827b923a20bf4282e Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Tue, 14 Jan 2025 15:36:30 +0100 Subject: [PATCH 05/22] fix: Unsub when stream is cancelled --- src/crdt-clock.ts | 2 ++ src/crdt.ts | 9 ++++++--- src/ledger.ts | 12 +++++++----- tests/fireproof/streaming-api.test.ts | 16 +++++++++++++++- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/crdt-clock.ts b/src/crdt-clock.ts index 07b499c0d..954579f3a 100644 --- a/src/crdt-clock.ts +++ b/src/crdt-clock.ts @@ -72,10 +72,12 @@ export class CRDTClock { onTick(fn: (updates: DocUpdate[]) => void) { this.watchers.add(fn); + return () => this.watchers.delete(fn); } onTock(fn: () => void) { this.emptyWatchers.add(fn); + return () => this.emptyWatchers.delete(fn); } onZoom(fn: () => void) { diff --git a/src/crdt.ts b/src/crdt.ts index b492dd247..6591666d0 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -199,6 +199,8 @@ export class CRDT { const clock = this.clock; const ready = this.ready.bind(this); + let unsubscribe: undefined | (() => void); + return new ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>({ async start(controller) { await waitFor; @@ -222,15 +224,16 @@ export class CRDT { if (value) await iterate(value); } - clock.onTick((updates: DocUpdate>[]) => { + unsubscribe = clock.onTick((updates: DocUpdate>[]) => { updates.forEach((update) => { controller.enqueue({ doc: docUpdateToDocWithId(update as DocUpdate), marker: { kind: "new" } }); }); }); }, - // NOTE: Ideally we unsubscribe from `onTick` here. - // cancel() {} + cancel() { + unsubscribe?.(); + }, }); }; diff --git a/src/ledger.ts b/src/ledger.ts index bbfe389fb..b560c4730 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -238,11 +238,13 @@ class LedgerImpl
> implements Ledger[]) => { - return await this.crdt.bulk(updates); - }); //, Infinity) - // TODO: Rebase conflict: - // this._writeQueue = writeQueue(this.sthis, async (updates: DocUpdate
[]) => this.crdt.bulk(updates), this.opts.writeQueue); + this._writeQueue = writeQueue( + this.sthis, + async (updates: DocUpdate
[]) => { + return await this.crdt.bulk(updates); + }, + this.opts.writeQueue, + ); // this.crdt.clock.onTock(() => this._no_update_notify()); } diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index a21d91016..8e0974a61 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -54,7 +54,7 @@ interface DocType { name: string; } -describe("query api", () => { +describe("Streaming API", () => { let lr: Ledger; const AMOUNT_OF_DOCS = 10; @@ -75,11 +75,16 @@ describe("query api", () => { await lr.destroy(); }); + ////////////// + // ALL DOCS // + ////////////// + describe("allDocs", () => { it("test `snapshot` method", async () => { const docs = await lr.allDocs().snapshot(); expect(docs.length).toBe(AMOUNT_OF_DOCS); }); + it("test `live` method", async () => { const stream = lr.allDocs().live(); let docCount = 0; @@ -97,6 +102,7 @@ describe("query api", () => { expect(docCount).toBe(AMOUNT_OF_DOCS + 1); }); + it("test `future` method", async () => { const stream = lr.allDocs().future(); let docCount = 0; @@ -116,4 +122,12 @@ describe("query api", () => { expect(docCount).toBe(2); }); }); + + /////////// + // QUERY // + /////////// + + // describe("query", () => { + // // + // }); }); From 8e4481120c62f558ef69628609efff8449540e84 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Tue, 14 Jan 2025 16:29:00 +0100 Subject: [PATCH 06/22] chore: Map async iterables for snapshots --- package.json | 1 + pnpm-lock.yaml | 9 +++++++++ src/crdt.ts | 18 ++++++------------ src/index.ts | 8 ++++++++ src/ledger.ts | 1 - 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/package.json b/package.json index 62802a198..03fa40c42 100644 --- a/package.json +++ b/package.json @@ -89,6 +89,7 @@ "charwise": "^3.0.1", "esbuild-plugin-replace": "^1.4.0", "idb": "^8.0.1", + "iterator-helpers-polyfill": "^3.0.1", "multiformats": "^13.3.1", "p-limit": "^6.2.0", "p-map": "^7.0.3", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7c9e30ee3..fe6425a1c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -26,6 +26,9 @@ importers: idb: specifier: ^8.0.1 version: 8.0.1 + iterator-helpers-polyfill: + specifier: ^3.0.1 + version: 3.0.1 multiformats: specifier: ^13.3.1 version: 13.3.1 @@ -2033,6 +2036,10 @@ packages: it-stream-types@2.0.2: resolution: {integrity: sha512-Rz/DEZ6Byn/r9+/SBCuJhpPATDF9D+dz5pbgSUyBsCDtza6wtNATrz/jz1gDyNanC3XdLboriHnOC925bZRBww==} + iterator-helpers-polyfill@3.0.1: + resolution: {integrity: sha512-9uSoKErC0+TG7uoXlv5k7rs196/l/VGr9hb9KbptpMhszsSksxJCwetp0p7FvgM3SwxlxgEkvokmeOi02PARlQ==} + engines: {chrome: '>=63', firefox: '>=57', node: '>=10.0.0', safari: '>=11'} + jackspeak@3.4.0: resolution: {integrity: sha512-JVYhQnN59LVPFCEcVa2C3CrEKYacvjRfqIQl+h8oi91aLYQVWRYbxjPcv1bUiUy/kLmQaANrYfNMCO3kuEDHfw==} engines: {node: '>=14'} @@ -4991,6 +4998,8 @@ snapshots: it-stream-types@2.0.2: {} + iterator-helpers-polyfill@3.0.1: {} + jackspeak@3.4.0: dependencies: '@isaacs/cliui': 8.0.2 diff --git a/src/crdt.ts b/src/crdt.ts index 6591666d0..7cd8b0a26 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -175,24 +175,18 @@ export class CRDT { // return clockChangesSince(this.blockstore, this.clock.head, since || [], opts, this.logger).then((a) => a.result); // } - const iterator = getAllEntries(this.blockstore, this.clock.head, this.logger); - return iterator; + return getAllEntries(this.blockstore, this.clock.head, this.logger); }; const snapshot = async () => { await waitFor; await this.ready(); - // TODO: Map over async iterable - // return currentDocs().map(docUpdateToDocWithId) - // NOTE: - // return Array.fromAsync(currentDocs()).then((a) => a.map(docUpdateToDocWithId)); - - const docs: DocWithId[] = []; - for await (const update of currentDocs()) { - docs.push(docUpdateToDocWithId(update)); - } - return docs; + return await Array.fromAsync( + currentDocs().map((doc: DocUpdate) => { + return docUpdateToDocWithId(doc); + }), + ); }; const stream = (opts: { futureOnly: boolean; since?: ClockHead }) => { diff --git a/src/index.ts b/src/index.ts index cdec6bb9b..f8bdb8e28 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,11 @@ +import { installIntoGlobal } from "iterator-helpers-polyfill"; + +// Polyfill for (async) iterator helpers (eg. map) +// https://github.com/tc39/proposal-iterator-helpers +// TODO: Not entirely sure yet we need this, may delete when finishing PR. +// See `allDocs().snapshot()` function +installIntoGlobal(); + export * from "./ledger.js"; export * from "./types.js"; diff --git a/src/ledger.ts b/src/ledger.ts index b560c4730..9d823d1e4 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -245,7 +245,6 @@ class LedgerImpl
> implements Ledger this._no_update_notify()); } get name(): string { From 88abf599666cd5129c23f49dbdc62d6b7be9cb0b Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Tue, 14 Jan 2025 19:58:51 +0100 Subject: [PATCH 07/22] chore: Make clockChangesSince an async generator --- src/crdt-helpers.ts | 35 ++++++++++----------------- src/crdt.ts | 35 ++++++++++++++++----------- src/types.ts | 4 ++- tests/fireproof/streaming-api.test.ts | 2 ++ 4 files changed, 39 insertions(+), 37 deletions(-) diff --git a/src/crdt-helpers.ts b/src/crdt-helpers.ts index fba5edd5e..104f06686 100644 --- a/src/crdt-helpers.ts +++ b/src/crdt-helpers.ts @@ -238,50 +238,42 @@ class DirtyEventFetcher extends EventFetcher { } } -export async function clockChangesSince( +export function clockChangesSince( blocks: BlockFetcher, head: ClockHead, since: ClockHead, opts: ChangesOptions, logger: Logger, -): Promise<{ result: DocUpdate[]; head: ClockHead }> { +): { result: AsyncGenerator>; head: ClockHead } { const eventsFetcher = ( opts.dirty ? new DirtyEventFetcher(logger, blocks) : new EventFetcher(blocks) ) as EventFetcher; const keys = new Set(); - const updates = await gatherUpdates( - blocks, - eventsFetcher, - head, - since, - [], - keys, - new Set(), - opts.limit || Infinity, - logger, - ); - return { result: updates.reverse(), head }; + const result = gatherUpdates(blocks, eventsFetcher, head, since, keys, new Set(), opts.limit || Infinity, logger); + return { result, head }; } -async function gatherUpdates( +async function* gatherUpdates( blocks: BlockFetcher, eventsFetcher: EventFetcher, head: ClockHead, since: ClockHead, - updates: DocUpdate[] = [], keys: Set, didLinks: Set, limit: number, logger: Logger, -): Promise[]> { - if (limit <= 0) return updates; +): AsyncGenerator> { + if (limit <= 0) return; + // if (Math.random() < 0.001) console.log('gatherUpdates', head.length, since.length, updates.length) const sHead = head.map((l) => l.toString()); + for (const link of since) { if (sHead.includes(link.toString())) { - return updates; + return; } } + for (const link of head) { if (didLinks.has(link.toString())) continue; didLinks.add(link.toString()); @@ -299,16 +291,15 @@ async function gatherUpdates( if (!keys.has(key)) { // todo option to see all updates const docValue = await getValueFromLink(blocks, value, logger); - updates.push({ id: key, value: docValue.doc, del: docValue.del, clock: link }); + yield { id: key, value: docValue.doc, del: docValue.del, clock: link }; limit--; keys.add(key); } } if (event.parents) { - updates = await gatherUpdates(blocks, eventsFetcher, event.parents, since, updates, keys, didLinks, limit, logger); + yield* gatherUpdates(blocks, eventsFetcher, event.parents, since, keys, didLinks, limit, logger); } } - return updates; } export async function* getAllEntries(blocks: BlockFetcher, head: ClockHead, logger: Logger) { diff --git a/src/crdt.ts b/src/crdt.ts index 7cd8b0a26..3d3b95c04 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -12,6 +12,7 @@ import { applyBulkUpdateToCrdt, getValueFromCrdt, readFiles, + clockChangesSince, clockVis, getBlock, doCompact, @@ -32,6 +33,7 @@ import type { QueryResponse, ListenerFn, QueryStreamMarker, + ChangesOptions, } from "./types.js"; import { index, type Index } from "./indexer.js"; import { CRDTClock } from "./crdt-clock.js"; @@ -165,15 +167,11 @@ export class CRDT { const waitFor = opts.waitFor; const currentDocs = (since?: ClockHead) => { - void since; - - // TODO: - // const opts: ChangesOptions = {}; - - // TODO: - // if (since) { - // return clockChangesSince(this.blockstore, this.clock.head, since || [], opts, this.logger).then((a) => a.result); - // } + if (since) { + const opts: ChangesOptions = {}; + const { result } = clockChangesSince(this.blockstore, this.clock.head, since, opts, this.logger); + return result; + } return getAllEntries(this.blockstore, this.clock.head, this.logger); }; @@ -182,11 +180,20 @@ export class CRDT { await waitFor; await this.ready(); - return await Array.fromAsync( - currentDocs().map((doc: DocUpdate) => { - return docUpdateToDocWithId(doc); - }), - ); + async function* currentDocsWithId() { + for await (const doc of currentDocs()) { + yield docUpdateToDocWithId(doc); + } + } + + return await Array.fromAsync(currentDocsWithId()); + + // TODO: Not sure if this is an improvement on the above generator. + // return await Array.fromAsync( + // currentDocs().map((doc: DocUpdate) => { + // return docUpdateToDocWithId(doc); + // }), + // ); }; const stream = (opts: { futureOnly: boolean; since?: ClockHead }) => { diff --git a/src/types.ts b/src/types.ts index 2c529bb04..be59140ab 100644 --- a/src/types.ts +++ b/src/types.ts @@ -270,12 +270,14 @@ export interface AllDocsQueryOpts extends QueryOpts { prefix?: string; } -export type QueryStreamMarker = { kind: "preexisting"; done: boolean } | { kind: "new" }; +export type QueryStreamMarker = { readonly kind: "preexisting"; readonly done: boolean } | { readonly kind: "new" }; export interface QueryResponse { snapshot(): Promise[]>; live(opts?: { since?: ClockHead }): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; future(): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; + + // TODO: clockHead } type EmitFn = (k: IndexKeyType, v?: DocFragment) => void; diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index 8e0974a61..b1981c7c8 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -103,6 +103,8 @@ describe("Streaming API", () => { expect(docCount).toBe(AMOUNT_OF_DOCS + 1); }); + it.todo("test `live` method with `since` parameter"); + it("test `future` method", async () => { const stream = lr.allDocs().future(); let docCount = 0; From 90ac50b68f316c300267c90770067324caad941e Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 15 Jan 2025 18:22:05 +0100 Subject: [PATCH 08/22] feat: Add ledger.clock prop, add snapshot(since) option and test since options --- package.json | 1 - pnpm-lock.yaml | 9 ------ src/crdt.ts | 22 ++++++-------- src/index.ts | 8 ----- src/ledger.ts | 9 ++++++ src/types.ts | 4 +-- tests/fireproof/streaming-api.test.ts | 44 ++++++++++++++++++++++++--- 7 files changed, 60 insertions(+), 37 deletions(-) diff --git a/package.json b/package.json index 03fa40c42..62802a198 100644 --- a/package.json +++ b/package.json @@ -89,7 +89,6 @@ "charwise": "^3.0.1", "esbuild-plugin-replace": "^1.4.0", "idb": "^8.0.1", - "iterator-helpers-polyfill": "^3.0.1", "multiformats": "^13.3.1", "p-limit": "^6.2.0", "p-map": "^7.0.3", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fe6425a1c..7c9e30ee3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -26,9 +26,6 @@ importers: idb: specifier: ^8.0.1 version: 8.0.1 - iterator-helpers-polyfill: - specifier: ^3.0.1 - version: 3.0.1 multiformats: specifier: ^13.3.1 version: 13.3.1 @@ -2036,10 +2033,6 @@ packages: it-stream-types@2.0.2: resolution: {integrity: sha512-Rz/DEZ6Byn/r9+/SBCuJhpPATDF9D+dz5pbgSUyBsCDtza6wtNATrz/jz1gDyNanC3XdLboriHnOC925bZRBww==} - iterator-helpers-polyfill@3.0.1: - resolution: {integrity: sha512-9uSoKErC0+TG7uoXlv5k7rs196/l/VGr9hb9KbptpMhszsSksxJCwetp0p7FvgM3SwxlxgEkvokmeOi02PARlQ==} - engines: {chrome: '>=63', firefox: '>=57', node: '>=10.0.0', safari: '>=11'} - jackspeak@3.4.0: resolution: {integrity: sha512-JVYhQnN59LVPFCEcVa2C3CrEKYacvjRfqIQl+h8oi91aLYQVWRYbxjPcv1bUiUy/kLmQaANrYfNMCO3kuEDHfw==} engines: {node: '>=14'} @@ -4998,8 +4991,6 @@ snapshots: it-stream-types@2.0.2: {} - iterator-helpers-polyfill@3.0.1: {} - jackspeak@3.4.0: dependencies: '@isaacs/cliui': 8.0.2 diff --git a/src/crdt.ts b/src/crdt.ts index 3d3b95c04..ebbabf700 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -176,24 +176,19 @@ export class CRDT { return getAllEntries(this.blockstore, this.clock.head, this.logger); }; - const snapshot = async () => { - await waitFor; - await this.ready(); + const snapshot = (opts: { since?: ClockHead } = {}) => { + const ready = this.ready.bind(this); async function* currentDocsWithId() { - for await (const doc of currentDocs()) { + await waitFor; + await ready(); + + for await (const doc of currentDocs(opts.since)) { yield docUpdateToDocWithId(doc); } } - return await Array.fromAsync(currentDocsWithId()); - - // TODO: Not sure if this is an improvement on the above generator. - // return await Array.fromAsync( - // currentDocs().map((doc: DocUpdate) => { - // return docUpdateToDocWithId(doc); - // }), - // ); + return currentDocsWithId(); }; const stream = (opts: { futureOnly: boolean; since?: ClockHead }) => { @@ -201,6 +196,7 @@ export class CRDT { const ready = this.ready.bind(this); let unsubscribe: undefined | (() => void); + let isClosed = false; return new ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>({ async start(controller) { @@ -226,6 +222,7 @@ export class CRDT { } unsubscribe = clock.onTick((updates: DocUpdate>[]) => { + if (isClosed) return; updates.forEach((update) => { controller.enqueue({ doc: docUpdateToDocWithId(update as DocUpdate), marker: { kind: "new" } }); }); @@ -233,6 +230,7 @@ export class CRDT { }, cancel() { + isClosed = true; unsubscribe?.(); }, }); diff --git a/src/index.ts b/src/index.ts index f8bdb8e28..cdec6bb9b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,11 +1,3 @@ -import { installIntoGlobal } from "iterator-helpers-polyfill"; - -// Polyfill for (async) iterator helpers (eg. map) -// https://github.com/tc39/proposal-iterator-helpers -// TODO: Not entirely sure yet we need this, may delete when finishing PR. -// See `allDocs().snapshot()` function -installIntoGlobal(); - export * from "./ledger.js"; export * from "./types.js"; diff --git a/src/ledger.ts b/src/ledger.ts index 9d823d1e4..1bf970d44 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -20,6 +20,7 @@ import { type SuperThis, PARAM, QueryResponse, + ClockHead, } from "./types.js"; import { DbMeta, SerdeGatewayInterceptor, StoreEnDeFile, StoreURIRuntime, StoreUrlsOpts } from "./blockstore/index.js"; import { ensureLogger, ensureSuperThis, NotFoundError, toSortedArray } from "./utils.js"; @@ -61,6 +62,7 @@ export interface Ledger
> extends HasC readonly sthis: SuperThis; readonly id: string; + readonly clock: ClockHead; readonly name: string; onClosed(fn: () => void): void; @@ -131,6 +133,9 @@ export class LedgerShell
> implements get crdt(): CRDT
{ return this.ref.crdt; } + get clock(): ClockHead { + return this.ref.clock; + } get name(): string { return this.ref.name; } @@ -247,6 +252,10 @@ class LedgerImpl
> implements Ledger { export type QueryStreamMarker = { readonly kind: "preexisting"; readonly done: boolean } | { readonly kind: "new" }; export interface QueryResponse { - snapshot(): Promise[]>; + snapshot(opts?: { since?: ClockHead }): AsyncGenerator>; live(opts?: { since?: ClockHead }): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; future(): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; - - // TODO: clockHead } type EmitFn = (k: IndexKeyType, v?: DocFragment) => void; diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index b1981c7c8..d90ff208e 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -81,7 +81,7 @@ describe("Streaming API", () => { describe("allDocs", () => { it("test `snapshot` method", async () => { - const docs = await lr.allDocs().snapshot(); + const docs = await Array.fromAsync(lr.allDocs().snapshot()); expect(docs.length).toBe(AMOUNT_OF_DOCS); }); @@ -89,8 +89,7 @@ describe("Streaming API", () => { const stream = lr.allDocs().live(); let docCount = 0; - for await (const { doc, marker } of stream) { - void doc; + for await (const { marker } of stream) { docCount++; if (marker.kind === "preexisting" && marker.done) { @@ -103,7 +102,44 @@ describe("Streaming API", () => { expect(docCount).toBe(AMOUNT_OF_DOCS + 1); }); - it.todo("test `live` method with `since` parameter"); + it("test `snapshot` and `live` method with `since` parameter", async () => { + const amountOfNewDocs = 5; + const since = lr.clock; + + await Promise.all( + Array(amountOfNewDocs) + .fill(0) + .map((_, i) => { + return lr.put({ _id: `doc-since-${i}`, name: `doc-since-${i}` }); + }), + ); + + const stream = lr.allDocs().live({ since }); + let docCount = 0; + + for await (const { marker } of stream) { + docCount++; + if (marker.kind === "preexisting" && marker.done) break; + } + + expect(docCount).toBe(amountOfNewDocs); + + // Snapshot + // NOTE: This also tests the stream cancellation process. + const amountOfSnapshotDocs = 3; + const sincePt2 = lr.clock; + + await Promise.all( + Array(amountOfSnapshotDocs) + .fill(0) + .map((_, i) => { + return lr.put({ _id: `doc-snapshot-${i}`, name: `doc-snapshot-${i}` }); + }), + ); + + const docs = await Array.fromAsync(lr.allDocs().snapshot({ since: sincePt2 })); + expect(docs.length).toBe(amountOfSnapshotDocs); + }); it("test `future` method", async () => { const stream = lr.allDocs().future(); From 9ddcd0baeab3dc63026fcd416dcbc17dbe1287d8 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 15 Jan 2025 19:26:50 +0100 Subject: [PATCH 09/22] fix: Current version of query (before refactor) --- src/crdt-helpers.ts | 5 ++--- src/crdt.ts | 26 ++++++++++++++------------ src/indexer.ts | 9 +++++---- src/types.ts | 7 +++++-- 4 files changed, 26 insertions(+), 21 deletions(-) diff --git a/src/crdt-helpers.ts b/src/crdt-helpers.ts index 104f06686..81c5e8d00 100644 --- a/src/crdt-helpers.ts +++ b/src/crdt-helpers.ts @@ -244,13 +244,12 @@ export function clockChangesSince( since: ClockHead, opts: ChangesOptions, logger: Logger, -): { result: AsyncGenerator>; head: ClockHead } { +): AsyncGenerator> { const eventsFetcher = ( opts.dirty ? new DirtyEventFetcher(logger, blocks) : new EventFetcher(blocks) ) as EventFetcher; const keys = new Set(); - const result = gatherUpdates(blocks, eventsFetcher, head, since, keys, new Set(), opts.limit || Infinity, logger); - return { result, head }; + return gatherUpdates(blocks, eventsFetcher, head, since, keys, new Set(), opts.limit || Infinity, logger); } async function* gatherUpdates( diff --git a/src/crdt.ts b/src/crdt.ts index ebbabf700..554e20417 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -166,24 +166,18 @@ export class CRDT { allDocs(opts: { waitFor?: Promise } = {}): QueryResponse { const waitFor = opts.waitFor; - const currentDocs = (since?: ClockHead) => { - if (since) { - const opts: ChangesOptions = {}; - const { result } = clockChangesSince(this.blockstore, this.clock.head, since, opts, this.logger); - return result; - } - - return getAllEntries(this.blockstore, this.clock.head, this.logger); + const currentDocs = (since?: ClockHead, sinceOptions?: ChangesOptions) => { + return since ? this.changes(since, sinceOptions) : this.all(); }; - const snapshot = (opts: { since?: ClockHead } = {}) => { + const snapshot = (opts: { since?: ClockHead; sinceOptions?: ChangesOptions } = {}) => { const ready = this.ready.bind(this); async function* currentDocsWithId() { await waitFor; await ready(); - for await (const doc of currentDocs(opts.since)) { + for await (const doc of currentDocs(opts.since, opts.sinceOptions)) { yield docUpdateToDocWithId(doc); } } @@ -191,7 +185,7 @@ export class CRDT { return currentDocsWithId(); }; - const stream = (opts: { futureOnly: boolean; since?: ClockHead }) => { + const stream = (opts: { futureOnly: boolean; since?: ClockHead; sinceOptions?: ChangesOptions }) => { const clock = this.clock; const ready = this.ready.bind(this); @@ -204,7 +198,7 @@ export class CRDT { await ready(); if (opts.futureOnly === false) { - const it = currentDocs(opts.since); + const it = currentDocs(opts.since, opts.sinceOptions); async function iterate(prevValue: DocUpdate) { const { done, value } = await it.next(); @@ -256,6 +250,14 @@ export class CRDT { return txt.join("\n"); } + all(): AsyncGenerator> { + return getAllEntries(this.blockstore, this.clock.head, this.logger); + } + + changes(since: ClockHead = [], opts: ChangesOptions = {}): AsyncGenerator> { + return clockChangesSince(this.blockstore, this.clock.head, since, opts, this.logger); + } + async getBlock(cidString: string): Promise { await this.ready(); return await getBlock(this.blockstore, cidString); diff --git a/src/indexer.ts b/src/indexer.ts index e44158f10..254730c93 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -246,12 +246,13 @@ export class Index[], head: ClockHead; + let result: DocUpdate[]; + const head = [...this.crdt.clock.head]; if (!this.indexHead || this.indexHead.length === 0) { - ({ result, head } = await this.crdt.allDocs()); - this.logger.Debug().Msg("enter crdt.allDocs"); + result = await Array.fromAsync(this.crdt.all()); + this.logger.Debug().Msg("enter crdt.all"); } else { - ({ result, head } = await this.crdt.changes(this.indexHead)); + result = await Array.fromAsync(this.crdt.changes(this.indexHead)); this.logger.Debug().Msg("enter crdt.changes"); } if (result.length === 0) { diff --git a/src/types.ts b/src/types.ts index 93b61f29c..1c5e13e3a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -273,8 +273,11 @@ export interface AllDocsQueryOpts extends QueryOpts { export type QueryStreamMarker = { readonly kind: "preexisting"; readonly done: boolean } | { readonly kind: "new" }; export interface QueryResponse { - snapshot(opts?: { since?: ClockHead }): AsyncGenerator>; - live(opts?: { since?: ClockHead }): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; + snapshot(opts?: { since?: ClockHead; sinceOptions?: ChangesOptions }): AsyncGenerator>; + live(opts?: { + since?: ClockHead; + sinceOptions?: ChangesOptions; + }): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; future(): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; } From 49613269e1f358c8a34809a871f6f8d26cab3186 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 22 Jan 2025 18:19:30 +0100 Subject: [PATCH 10/22] feat: Refactor query (wip) --- src/indexer-helpers.ts | 42 +++---- src/indexer.ts | 170 ++++++++++++++++++-------- src/ledger.ts | 18 +-- src/types.ts | 8 +- tests/fireproof/streaming-api.test.ts | 12 +- 5 files changed, 151 insertions(+), 99 deletions(-) diff --git a/src/indexer-helpers.ts b/src/indexer-helpers.ts index e1e26e021..37843fd01 100644 --- a/src/indexer-helpers.ts +++ b/src/indexer-helpers.ts @@ -20,7 +20,6 @@ import { DocFragment, IndexUpdate, QueryOpts, - IndexRow, DocWithId, IndexKeyType, IndexKey, @@ -160,36 +159,27 @@ export async function loadIndex; } -export async function applyQuery( +export async function* applyQuery( crdt: CRDT, resp: { result: ProllyIndexRow[] }, query: QueryOpts, -): Promise<{ - rows: IndexRow[]; -}> { - if (query.descending) { - resp.result = resp.result.reverse(); - } - if (query.limit) { - resp.result = resp.result.slice(0, query.limit); +): AsyncGenerator> { + async function* _apply() { + let result = [...resp.result]; + + if (query.descending) result = result.reverse(); + if (query.limit) result = result.slice(0, query.limit); + + for (const row of result) { + yield crdt.get(row.id).then((val) => { + return val ? ({ ...val.doc, _id: row.id } as DocWithId) : undefined; + }); + } } - if (query.includeDocs) { - resp.result = await Promise.all( - resp.result.map(async (row) => { - const val = await crdt.get(row.id); - const doc = val ? ({ ...val.doc, _id: row.id } as DocWithId) : undefined; - return { ...row, doc }; - }), - ); + + for await (const q of _apply()) { + if (q) yield q; } - return { - rows: resp.result.map(({ key, ...row }) => { - return { - key: charwise.decode(key), - ...row, - }; - }), - }; } export function encodeRange(range: [IndexKeyType, IndexKeyType]): [string, string] { diff --git a/src/indexer.ts b/src/indexer.ts index 254730c93..77ea7e01e 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -8,12 +8,15 @@ import { type DocFragment, type IdxMetaMap, type IndexKeyType, - type IndexRows, type DocTypes, type IndexUpdateString, throwFalsy, IndexTransactionMeta, SuperThis, + QueryResponse, + ChangesOptions, + QueryStreamMarker, + DocWithId, } from "./types.js"; import { BaseBlockstore } from "./blockstore/index.js"; @@ -33,6 +36,7 @@ import { import { CRDT, HasCRDT } from "./crdt.js"; import { ensureLogger } from "./utils.js"; import { Logger } from "@adviser/cement"; +import { docUpdateToDocWithId } from "./crdt-helpers.js"; export function index, R extends DocFragment = T>( refDb: HasCRDT, @@ -175,57 +179,123 @@ export class Index = {}): Promise> { - this.logger.Debug().Msg("enter query"); - await this.ready(); - // this._resetIndex(); - this.logger.Debug().Msg("post ready query"); - await this._updateIndex(); - this.logger.Debug().Msg("post _updateIndex query"); - await this._hydrateIndex(); - this.logger.Debug().Msg("post _hydrateIndex query"); - if (!this.byKey.root) { - return await applyQuery(this.crdt, { result: [] }, opts); - } - if (this.includeDocsDefault && opts.includeDocs === undefined) opts.includeDocs = true; - if (opts.range) { - const eRange = encodeRange(opts.range); - return await applyQuery(this.crdt, await throwFalsy(this.byKey.root).range(eRange[0], eRange[1]), opts); - } - if (opts.key) { - const encodedKey = encodeKey(opts.key); - return await applyQuery(this.crdt, await throwFalsy(this.byKey.root).get(encodedKey), opts); - } - if (Array.isArray(opts.keys)) { - const results = await Promise.all( - opts.keys.map(async (key: DocFragment) => { - const encodedKey = encodeKey(key); - return (await applyQuery(this.crdt, await throwFalsy(this.byKey.root).get(encodedKey), opts)).rows; - }), + query(opts: QueryOpts = {}, { waitFor }: { waitFor?: Promise } = {}): QueryResponse { + const query = async (since?: ClockHead, sinceOptions?: ChangesOptions) => { + // TODO: + void since; + void sinceOptions; + + if (!this.byKey.root) { + return applyQuery(this.crdt, { result: [] }, opts); + } + if (opts.range) { + const eRange = encodeRange(opts.range); + return applyQuery(this.crdt, await throwFalsy(this.byKey.root).range(eRange[0], eRange[1]), opts); + } + if (opts.key) { + const encodedKey = encodeKey(opts.key); + return applyQuery(this.crdt, await throwFalsy(this.byKey.root).get(encodedKey), opts); + } + if (opts.prefix) { + if (!Array.isArray(opts.prefix)) opts.prefix = [opts.prefix]; + // prefix should be always an array + const start = [...opts.prefix, NaN]; + const end = [...opts.prefix, Infinity]; + const encodedR = encodeRange([start, end]); + return applyQuery(this.crdt, await this.byKey.root.range(...encodedR), opts); + } + const all = await this.byKey.root.getAllEntries(); // funky return type + return applyQuery( + this.crdt, + { + // @ts-expect-error getAllEntries returns a different type than range + result: all.result.map(({ key: [k, id], value }) => ({ + key: k, + id, + value, + })), + }, + opts, ); - return { rows: results.flat() }; - } - if (opts.prefix) { - if (!Array.isArray(opts.prefix)) opts.prefix = [opts.prefix]; - // prefix should be always an array - const start = [...opts.prefix, NaN]; - const end = [...opts.prefix, Infinity]; - const encodedR = encodeRange([start, end]); - return await applyQuery(this.crdt, await this.byKey.root.range(...encodedR), opts); - } - const all = await this.byKey.root.getAllEntries(); // funky return type - return await applyQuery( - this.crdt, - { - // @ts-expect-error getAllEntries returns a different type than range - result: all.result.map(({ key: [k, id], value }) => ({ - key: k, - id, - value, - })), + }; + + const snapshot = (opts: { since?: ClockHead; sinceOptions?: ChangesOptions } = {}) => { + const ready = this.ready.bind(this); + const updateIndex = this._updateIndex.bind(this); + const hydrateIndex = this._hydrateIndex.bind(this); + + async function* docsWithId() { + await waitFor; + await ready(); + await updateIndex(); + await hydrateIndex(); + + for await (const doc of await query(opts.since, opts.sinceOptions)) { + if (doc) yield doc; + } + } + + return docsWithId(); + }; + + const stream = (opts: { futureOnly: boolean; since?: ClockHead; sinceOptions?: ChangesOptions }) => { + const clock = this.crdt.clock; + const ready = this.ready.bind(this); + const updateIndex = this._updateIndex.bind(this); + const hydrateIndex = this._hydrateIndex.bind(this); + + let unsubscribe: undefined | (() => void); + let isClosed = false; + + return new ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>({ + async start(controller) { + await waitFor; + await ready(); + await updateIndex(); + await hydrateIndex(); + + if (opts.futureOnly === false) { + const it = await query(opts.since, opts.sinceOptions); + + async function iterate(prevValue: DocWithId) { + const { done, value } = await it.next(); + + controller.enqueue({ + doc: prevValue, + marker: { kind: "preexisting", done: done || false }, + }); + + if (!done) await iterate(value); + } + + const { value } = await it.next(); + if (value) await iterate(value); + } + + unsubscribe = clock.onTick((updates: DocUpdate>[]) => { + if (isClosed) return; + updates.forEach((update) => { + controller.enqueue({ doc: docUpdateToDocWithId(update as DocUpdate), marker: { kind: "new" } }); + }); + }); + }, + + cancel() { + isClosed = true; + unsubscribe?.(); + }, + }); + }; + + return { + snapshot, + live(opts?: { since?: ClockHead }) { + return stream({ futureOnly: false, since: opts?.since }); + }, + future() { + return stream({ futureOnly: true }); }, - opts, - ); + }; } _resetIndex() { diff --git a/src/ledger.ts b/src/ledger.ts index 1bf970d44..f537667e2 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -14,7 +14,6 @@ import { type DocResponse, type BulkResponse, type DocTypes, - type IndexRows, type DocFragment, type CRDTMeta, type SuperThis, @@ -78,10 +77,7 @@ export interface Ledger
> extends HasC allDocs(): QueryResponse; allDocuments(): QueryResponse; - query( - field: string | MapFn, - opts?: QueryOpts, - ): Promise>; + query(field: string | MapFn, opts?: QueryOpts): QueryResponse; compact(): Promise; } @@ -169,10 +165,7 @@ export class LedgerShell
> implements allDocuments(): QueryResponse { return this.ref.allDocuments(); } - query( - field: string | MapFn, - opts?: QueryOpts, - ): Promise> { + query(field: string | MapFn, opts?: QueryOpts): QueryResponse { return this.ref.query(field, opts); } compact(): Promise { @@ -322,18 +315,17 @@ class LedgerImpl
> implements Ledger( + query( field: string | MapFn, opts: QueryOpts = {}, - ): Promise> { - await this.ready(); + ): QueryResponse { this.logger.Debug().Any("field", field).Any("opts", opts).Msg("query"); const _crdt = this.crdt as unknown as CRDT; const idx = typeof field === "string" ? index({ crdt: _crdt }, field) : index({ crdt: _crdt }, makeName(field.toString()), field); - return await idx.query(opts); + return idx.query(opts, { waitFor: this.ready() }); } async compact() { diff --git a/src/types.ts b/src/types.ts index 1c5e13e3a..6db11ac53 100644 --- a/src/types.ts +++ b/src/types.ts @@ -227,6 +227,7 @@ export interface IndexRow { readonly rows: IndexRow[]; } + export interface CRDTMeta { readonly head: ClockHead; } @@ -257,19 +258,12 @@ export interface IdxMetaMap { export interface QueryOpts { readonly descending?: boolean; readonly limit?: number; - includeDocs?: boolean; readonly range?: [IndexKeyType, IndexKeyType]; readonly key?: DocFragment; readonly keys?: DocFragment[]; prefix?: IndexKeyType; } -export interface AllDocsQueryOpts extends QueryOpts { - readonly key?: string; - readonly keys?: string[]; - prefix?: string; -} - export type QueryStreamMarker = { readonly kind: "preexisting"; readonly done: boolean } | { readonly kind: "new" }; export interface QueryResponse { diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index d90ff208e..459377291 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -165,7 +165,13 @@ describe("Streaming API", () => { // QUERY // /////////// - // describe("query", () => { - // // - // }); + describe("query", () => { + // ALL + describe("all", () => { + it("test `snapshot` method", async () => { + const docs = await Array.fromAsync(lr.query("name").snapshot()); + expect(docs.length).toBe(AMOUNT_OF_DOCS); + }); + }); + }); }); From 98ad4a323318e15704c9c8931d5f8cfb08967f42 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Mon, 27 Jan 2025 19:03:51 +0100 Subject: [PATCH 11/22] test: Query methods --- src/indexer.ts | 10 +- tests/fireproof/streaming-api.test.ts | 216 ++++++++++++++++++-------- 2 files changed, 154 insertions(+), 72 deletions(-) diff --git a/src/indexer.ts b/src/indexer.ts index 77ea7e01e..3f6944484 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -251,10 +251,11 @@ export class Index) { @@ -274,7 +275,10 @@ export class Index>[]) => { if (isClosed) return; - updates.forEach((update) => { + updates.forEach(async (update) => { + await updateIndex(); + await hydrateIndex(); + controller.enqueue({ doc: docUpdateToDocWithId(update as DocUpdate), marker: { kind: "new" } }); }); }); diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index 459377291..31fef8075 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -47,7 +47,7 @@ * */ -import { fireproof, Ledger } from "@fireproof/core"; +import { ClockHead, DocBase, DocWithId, fireproof, Ledger, QueryStreamMarker } from "@fireproof/core"; interface DocType { _id: string; @@ -75,89 +75,119 @@ describe("Streaming API", () => { await lr.destroy(); }); - ////////////// - // ALL DOCS // - ////////////// + //////// + // 🛠️ // + //////// - describe("allDocs", () => { - it("test `snapshot` method", async () => { - const docs = await Array.fromAsync(lr.allDocs().snapshot()); - expect(docs.length).toBe(AMOUNT_OF_DOCS); - }); + type Snapshot = AsyncGenerator>; + type Stream = ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; - it("test `live` method", async () => { - const stream = lr.allDocs().live(); - let docCount = 0; + async function testSnapshot(snapshot: Snapshot, amountOfDocs: number) { + const docs = await Array.fromAsync(snapshot); + expect(docs.length).toBe(amountOfDocs); + } - for await (const { marker } of stream) { - docCount++; + async function testLive(stream: Stream, amountOfDocs: number) { + let docCount = 0; - if (marker.kind === "preexisting" && marker.done) { - await lr.put({ _id: `doc-${AMOUNT_OF_DOCS}`, name: `doc-${AMOUNT_OF_DOCS}` }); - } + for await (const { marker } of stream) { + docCount++; - if (marker.kind === "new") break; + if (marker.kind === "preexisting" && marker.done) { + await lr.put({ _id: `doc-${amountOfDocs}`, name: `doc-${amountOfDocs}` }); } - expect(docCount).toBe(AMOUNT_OF_DOCS + 1); - }); + if (marker.kind === "new") break; + } - it("test `snapshot` and `live` method with `since` parameter", async () => { - const amountOfNewDocs = 5; - const since = lr.clock; - - await Promise.all( - Array(amountOfNewDocs) - .fill(0) - .map((_, i) => { - return lr.put({ _id: `doc-since-${i}`, name: `doc-since-${i}` }); - }), - ); - - const stream = lr.allDocs().live({ since }); - let docCount = 0; - - for await (const { marker } of stream) { - docCount++; - if (marker.kind === "preexisting" && marker.done) break; - } + expect(docCount).toBe(amountOfDocs + 1); + } - expect(docCount).toBe(amountOfNewDocs); + async function testSince({ + snapshotCreator, + streamCreator, + }: { + snapshotCreator: (since: ClockHead) => Snapshot; + streamCreator: (since: ClockHead) => Stream; + }) { + const amountOfNewDocs = Math.floor(Math.random() * (10 - 1) + 1); + const since = lr.clock; - // Snapshot - // NOTE: This also tests the stream cancellation process. - const amountOfSnapshotDocs = 3; - const sincePt2 = lr.clock; + await Promise.all( + Array(amountOfNewDocs) + .fill(0) + .map((_, i) => { + return lr.put({ _id: `doc-since-${i}`, name: `doc-since-${i}` }); + }), + ); - await Promise.all( - Array(amountOfSnapshotDocs) - .fill(0) - .map((_, i) => { - return lr.put({ _id: `doc-snapshot-${i}`, name: `doc-snapshot-${i}` }); - }), - ); + const stream = streamCreator(since); + let docCount = 0; - const docs = await Array.fromAsync(lr.allDocs().snapshot({ since: sincePt2 })); - expect(docs.length).toBe(amountOfSnapshotDocs); - }); + for await (const { marker } of stream) { + docCount++; + if (marker.kind === "preexisting" && marker.done) break; + } - it("test `future` method", async () => { - const stream = lr.allDocs().future(); - let docCount = 0; + expect(docCount).toBe(amountOfNewDocs); - // NOTE: Test could probably be written in a better way. - // We want to start listening before we add the documents. - lr.put({ _id: `doc-${AMOUNT_OF_DOCS + 0}`, name: `doc-${AMOUNT_OF_DOCS + 0}` }); - lr.put({ _id: `doc-${AMOUNT_OF_DOCS + 1}`, name: `doc-${AMOUNT_OF_DOCS + 1}` }); + // Snapshot + // NOTE: This also tests the stream cancellation process. + const amountOfSnapshotDocs = 3; + const sincePt2 = lr.clock; - for await (const { doc, marker } of stream) { - void doc; + await Promise.all( + Array(amountOfSnapshotDocs) + .fill(0) + .map((_, i) => { + return lr.put({ _id: `doc-snapshot-${i}`, name: `doc-snapshot-${i}` }); + }), + ); - if (marker.kind === "new") docCount++; - if (docCount === 2) break; - } + const docs = await Array.fromAsync(snapshotCreator(sincePt2)); + expect(docs.length).toBe(amountOfSnapshotDocs); + } + + async function testFuture(stream: Stream, amountOfDocs: number) { + let docCount = 0; + + await lr.put({ _id: `doc-${amountOfDocs + 0}`, name: `doc-${amountOfDocs + 0}` }); + await lr.put({ _id: `doc-${amountOfDocs + 1}`, name: `doc-${amountOfDocs + 1}` }); + await lr.put({ _id: `doc-${amountOfDocs + 2}`, name: `doc-${amountOfDocs + 2}` }); + + for await (const { marker } of stream) { + if (marker.kind === "new") docCount++; + if (docCount === 3) break; + } - expect(docCount).toBe(2); + expect(docCount).toBe(3); + } + + ////////////// + // ALL DOCS // + ////////////// + + describe("allDocs", () => { + it("test `snapshot` method", async () => { + const snapshot = lr.allDocs().snapshot(); + await testSnapshot(snapshot, AMOUNT_OF_DOCS); + }); + + it("test `live` method", async () => { + const stream = lr.allDocs().live(); + await testLive(stream, AMOUNT_OF_DOCS); + }); + + it("test `snapshot` and `live` method with `since` parameter", async () => { + await testSince({ + snapshotCreator: (since) => lr.allDocs().snapshot({ since }), + streamCreator: (since) => lr.allDocs().live({ since }), + }); + }); + + it("test `future` method", async () => { + const stream = lr.allDocs().future(); + await testFuture(stream, AMOUNT_OF_DOCS); }); }); @@ -169,8 +199,56 @@ describe("Streaming API", () => { // ALL describe("all", () => { it("test `snapshot` method", async () => { - const docs = await Array.fromAsync(lr.query("name").snapshot()); - expect(docs.length).toBe(AMOUNT_OF_DOCS); + const snapshot = lr.query("name").snapshot(); + await testSnapshot(snapshot, AMOUNT_OF_DOCS); + }); + + it("test `live` method", async () => { + const stream = lr.query("name").live(); + await testLive(stream, AMOUNT_OF_DOCS); + }); + + // TODO: + // it("test `snapshot` and `live` method with `since` parameter", async () => { + // await testSince({ + // snapshotCreator: (since) => lr.query("name").snapshot({ since }), + // streamCreator: (since) => lr.query("name").live({ since }), + // }); + // }); + + it("test `future` method", async () => { + const stream = lr.query("name").future(); + await testFuture(stream, AMOUNT_OF_DOCS); + }); + }); + + // ADDITIONAL + describe("additional items", () => { + const AMOUNT_OF_ADDITIONAL_DOCS = 5; + + beforeEach(async () => { + await Promise.all( + Array(AMOUNT_OF_ADDITIONAL_DOCS) + .fill(0) + .map((_, i) => { + return lr.put({ _id: `doc-add-${i}`, additional: `doc-add-${i}` }); + }), + ); + }); + + it("test `snapshot` method", async () => { + const snapshot = lr.query("additional").snapshot(); + await testSnapshot(snapshot, AMOUNT_OF_ADDITIONAL_DOCS); + }); + + it("test `live` method", async () => { + const stream = lr.query("additional").live(); + await testLive(stream, AMOUNT_OF_ADDITIONAL_DOCS); + }); + + it("test `future` method", async () => { + const stream = lr.query("additional").future(); + await testFuture(stream, AMOUNT_OF_ADDITIONAL_DOCS); }); }); }); From e9138e3172b0e3d083f2853f721ddab14f2dc76c Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Tue, 28 Jan 2025 17:45:52 +0100 Subject: [PATCH 12/22] feat: Query future --- src/crdt-clock.ts | 6 +++-- src/crdt-helpers.ts | 39 +++++++++++++++++++-------- src/crdt.ts | 4 +-- src/indexer-helpers.ts | 22 +++++++++++++-- src/indexer.ts | 38 ++++++++++++++------------ tests/fireproof/streaming-api.test.ts | 26 +++++++++++------- 6 files changed, 91 insertions(+), 44 deletions(-) diff --git a/src/crdt-clock.ts b/src/crdt-clock.ts index 954579f3a..f242904ef 100644 --- a/src/crdt-clock.ts +++ b/src/crdt-clock.ts @@ -58,8 +58,10 @@ export class CRDTClock { async processUpdates(updatesAcc: DocUpdate[], all: boolean, prevHead: ClockHead) { let internalUpdates = updatesAcc; if (this.watchers.size && !all) { - const changes = await clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger); - internalUpdates = changes.result; + const changes = await Array.fromAsync( + clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger), + ); + internalUpdates = changes; } this.zoomers.forEach((fn) => fn()); this.notifyWatchers(internalUpdates || []); diff --git a/src/crdt-helpers.ts b/src/crdt-helpers.ts index 81c5e8d00..ccd6be579 100644 --- a/src/crdt-helpers.ts +++ b/src/crdt-helpers.ts @@ -3,7 +3,7 @@ import { parse } from "multiformats/link"; import { sha256 as hasher } from "multiformats/hashes/sha2"; import * as codec from "@fireproof/vendor/@ipld/dag-cbor"; import { put, get, entries, root } from "@fireproof/vendor/@web3-storage/pail/crdt"; -import { EventBlockView, EventLink, Operation, PutOperation } from "@fireproof/vendor/@web3-storage/pail/crdt/api"; +import { EventBlockView, EventLink, Operation, PutOperation, UnknownLink } from "@fireproof/vendor/@web3-storage/pail/crdt/api"; import { EventFetcher, vis } from "@fireproof/vendor/@web3-storage/pail/clock"; import * as Batch from "@fireproof/vendor/@web3-storage/pail/crdt/batch"; import { @@ -29,6 +29,7 @@ import { type DocWithId, type DocTypes, throwFalsy, + ClockLink, } from "./types.js"; import { Result } from "@fireproof/vendor/@web3-storage/pail/crdt/api"; import { Logger } from "@adviser/cement"; @@ -238,22 +239,36 @@ class DirtyEventFetcher extends EventFetcher { } } -export function clockChangesSince( +export async function* clockUpdatesSince( blocks: BlockFetcher, head: ClockHead, since: ClockHead, opts: ChangesOptions, logger: Logger, + allowedKeys?: Set, ): AsyncGenerator> { + for await (const { id, clock, docLink } of clockChangesSince(blocks, head, since, opts, logger, allowedKeys)) { + const docValue = await getValueFromLink(blocks, docLink, logger); + yield { id, value: docValue.doc, del: docValue.del, clock }; + } +} + +export function clockChangesSince( + blocks: BlockFetcher, + head: ClockHead, + since: ClockHead, + opts: ChangesOptions, + logger: Logger, + allowedKeys?: Set, +): AsyncGenerator<{ id: string; docLink: UnknownLink; clock: ClockLink }> { const eventsFetcher = ( opts.dirty ? new DirtyEventFetcher(logger, blocks) : new EventFetcher(blocks) ) as EventFetcher; const keys = new Set(); - return gatherUpdates(blocks, eventsFetcher, head, since, keys, new Set(), opts.limit || Infinity, logger); + return gatherUpdates(eventsFetcher, head, since, keys, new Set(), opts.limit || Infinity, logger, allowedKeys); } -async function* gatherUpdates( - blocks: BlockFetcher, +async function* gatherUpdates( eventsFetcher: EventFetcher, head: ClockHead, since: ClockHead, @@ -261,7 +276,8 @@ async function* gatherUpdates( didLinks: Set, limit: number, logger: Logger, -): AsyncGenerator> { + allowedKeys?: Set, +): AsyncGenerator<{ id: string; docLink: UnknownLink; clock: ClockLink }> { if (limit <= 0) return; // if (Math.random() < 0.001) console.log('gatherUpdates', head.length, since.length, updates.length) @@ -287,16 +303,15 @@ async function* gatherUpdates( } for (let i = ops.length - 1; i >= 0; i--) { const { key, value } = ops[i]; - if (!keys.has(key)) { + if (!keys.has(key) && (allowedKeys === undefined || allowedKeys.has(key))) { // todo option to see all updates - const docValue = await getValueFromLink(blocks, value, logger); - yield { id: key, value: docValue.doc, del: docValue.del, clock: link }; + yield { id: key, docLink: value, clock: link }; limit--; keys.add(key); } } if (event.parents) { - yield* gatherUpdates(blocks, eventsFetcher, event.parents, since, keys, didLinks, limit, logger); + yield* gatherUpdates(eventsFetcher, event.parents, since, keys, didLinks, limit, logger); } } } @@ -375,7 +390,9 @@ export async function doCompact(blockLog: CompactFetcher, head: ClockHead, logge timeEnd("compact root blocks"); time("compact changes"); - await clockChangesSince(blockLog, head, [], {}, logger); + for await (const x of clockChangesSince(blockLog, head, [], {}, logger)) { + void x; + } timeEnd("compact changes"); isCompacting = false; diff --git a/src/crdt.ts b/src/crdt.ts index 554e20417..89b9ab4d8 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -12,12 +12,12 @@ import { applyBulkUpdateToCrdt, getValueFromCrdt, readFiles, - clockChangesSince, clockVis, getBlock, doCompact, docUpdateToDocWithId, getAllEntries, + clockUpdatesSince, } from "./crdt-helpers.js"; import type { DocUpdate, @@ -255,7 +255,7 @@ export class CRDT { } changes(since: ClockHead = [], opts: ChangesOptions = {}): AsyncGenerator> { - return clockChangesSince(this.blockstore, this.clock.head, since, opts, this.logger); + return clockUpdatesSince(this.blockstore, this.clock.head, since, opts, this.logger); } async getBlock(cidString: string): Promise { diff --git a/src/indexer-helpers.ts b/src/indexer-helpers.ts index 37843fd01..ed6447c1e 100644 --- a/src/indexer-helpers.ts +++ b/src/indexer-helpers.ts @@ -26,10 +26,13 @@ import { DocTypes, DocObject, IndexUpdateString, + ClockHead, + ChangesOptions, } from "./types.js"; import { CarTransaction, BlockFetcher, AnyLink, AnyBlock } from "./blockstore/index.js"; import { CRDT } from "./crdt.js"; import { Logger } from "@adviser/cement"; +import { clockChangesSince } from "./crdt-helpers.js"; export class IndexTree { cid?: AnyLink; @@ -160,13 +163,28 @@ export async function loadIndex( - crdt: CRDT, + { crdt, logger }: { crdt: CRDT; logger: Logger }, resp: { result: ProllyIndexRow[] }, - query: QueryOpts, + query: QueryOpts & { since?: ClockHead; sinceOptions?: ChangesOptions }, ): AsyncGenerator> { async function* _apply() { let result = [...resp.result]; + if (query.since) { + const gen = clockChangesSince(crdt.blockstore, crdt.clock.head, query.since, query.sinceOptions || {}, logger); + const ids = await Array.fromAsync(gen) + .then((arr) => arr.map((a) => a.id)) + .then((arr) => new Set(arr)); + result = result.reduce((acc: ProllyIndexRow[], row) => { + if (ids.has(row.id)) { + ids.delete(row.id); + return [...acc, row]; + } + + return acc; + }, []); + } + if (query.descending) result = result.reverse(); if (query.limit) result = result.slice(0, query.limit); diff --git a/src/indexer.ts b/src/indexer.ts index 3f6944484..357f94889 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -181,32 +181,36 @@ export class Index = {}, { waitFor }: { waitFor?: Promise } = {}): QueryResponse { const query = async (since?: ClockHead, sinceOptions?: ChangesOptions) => { - // TODO: - void since; - void sinceOptions; + const deps = { crdt: this.crdt, logger: this.logger }; + const qry = { ...opts, since, sinceOptions }; if (!this.byKey.root) { - return applyQuery(this.crdt, { result: [] }, opts); + return applyQuery(deps, { result: [] }, qry); } - if (opts.range) { - const eRange = encodeRange(opts.range); - return applyQuery(this.crdt, await throwFalsy(this.byKey.root).range(eRange[0], eRange[1]), opts); + + if (qry.range) { + const eRange = encodeRange(qry.range); + return applyQuery(deps, await throwFalsy(this.byKey.root).range(eRange[0], eRange[1]), qry); } - if (opts.key) { - const encodedKey = encodeKey(opts.key); - return applyQuery(this.crdt, await throwFalsy(this.byKey.root).get(encodedKey), opts); + + if (qry.key) { + const encodedKey = encodeKey(qry.key); + return applyQuery(deps, await throwFalsy(this.byKey.root).get(encodedKey), qry); } - if (opts.prefix) { - if (!Array.isArray(opts.prefix)) opts.prefix = [opts.prefix]; + + if (qry.prefix) { + if (!Array.isArray(qry.prefix)) qry.prefix = [qry.prefix]; // prefix should be always an array - const start = [...opts.prefix, NaN]; - const end = [...opts.prefix, Infinity]; + const start = [...qry.prefix, NaN]; + const end = [...qry.prefix, Infinity]; const encodedR = encodeRange([start, end]); - return applyQuery(this.crdt, await this.byKey.root.range(...encodedR), opts); + return applyQuery(deps, await this.byKey.root.range(...encodedR), qry); } + const all = await this.byKey.root.getAllEntries(); // funky return type + return applyQuery( - this.crdt, + deps, { // @ts-expect-error getAllEntries returns a different type than range result: all.result.map(({ key: [k, id], value }) => ({ @@ -215,7 +219,7 @@ export class Index { Array(amountOfNewDocs) .fill(0) .map((_, i) => { - return lr.put({ _id: `doc-since-${i}`, name: `doc-since-${i}` }); + return lr.put({ _id: `doc-since-${i}`, since: `doc-since-${i}` }); }), ); @@ -133,14 +133,14 @@ describe("Streaming API", () => { // Snapshot // NOTE: This also tests the stream cancellation process. - const amountOfSnapshotDocs = 3; + const amountOfSnapshotDocs = Math.floor(Math.random() * (10 - 1) + 1); const sincePt2 = lr.clock; await Promise.all( Array(amountOfSnapshotDocs) .fill(0) .map((_, i) => { - return lr.put({ _id: `doc-snapshot-${i}`, name: `doc-snapshot-${i}` }); + return lr.put({ _id: `doc-snapshot-${i}`, since: `doc-snapshot-${i}` }); }), ); @@ -208,13 +208,12 @@ describe("Streaming API", () => { await testLive(stream, AMOUNT_OF_DOCS); }); - // TODO: - // it("test `snapshot` and `live` method with `since` parameter", async () => { - // await testSince({ - // snapshotCreator: (since) => lr.query("name").snapshot({ since }), - // streamCreator: (since) => lr.query("name").live({ since }), - // }); - // }); + it("test `snapshot` and `live` method with `since` parameter", async () => { + await testSince({ + snapshotCreator: (since) => lr.query("since").snapshot({ since }), + streamCreator: (since) => lr.query("since").live({ since }), + }); + }); it("test `future` method", async () => { const stream = lr.query("name").future(); @@ -246,6 +245,13 @@ describe("Streaming API", () => { await testLive(stream, AMOUNT_OF_ADDITIONAL_DOCS); }); + it("test `snapshot` and `live` method with `since` parameter", async () => { + await testSince({ + snapshotCreator: (since) => lr.query("since").snapshot({ since }), + streamCreator: (since) => lr.query("since").live({ since }), + }); + }); + it("test `future` method", async () => { const stream = lr.query("additional").future(); await testFuture(stream, AMOUNT_OF_ADDITIONAL_DOCS); From adaf0ccf7bb39d71d94b54362d830e00ae722a6d Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 29 Jan 2025 17:51:19 +0100 Subject: [PATCH 13/22] feat: subscribe method --- build-docs.sh | 1 - src/crdt.ts | 18 ++++-- src/indexer.ts | 22 +++++--- src/types.ts | 2 + tests/fireproof/streaming-api.test.ts | 79 +++++++++------------------ 5 files changed, 57 insertions(+), 65 deletions(-) diff --git a/build-docs.sh b/build-docs.sh index d01d91a35..ba3ba2794 100644 --- a/build-docs.sh +++ b/build-docs.sh @@ -25,4 +25,3 @@ then else git status fi - diff --git a/src/crdt.ts b/src/crdt.ts index 89b9ab4d8..5c83f8c7e 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -185,8 +185,17 @@ export class CRDT { return currentDocsWithId(); }; + const subscribe = (callback: (doc: DocWithId) => void) => { + const unsubscribe = this.clock.onTick((updates: DocUpdate>[]) => { + updates.forEach((update) => { + callback(docUpdateToDocWithId(update as DocUpdate)); + }); + }); + + return unsubscribe; + }; + const stream = (opts: { futureOnly: boolean; since?: ClockHead; sinceOptions?: ChangesOptions }) => { - const clock = this.clock; const ready = this.ready.bind(this); let unsubscribe: undefined | (() => void); @@ -215,11 +224,9 @@ export class CRDT { if (value) await iterate(value); } - unsubscribe = clock.onTick((updates: DocUpdate>[]) => { + unsubscribe = subscribe((doc) => { if (isClosed) return; - updates.forEach((update) => { - controller.enqueue({ doc: docUpdateToDocWithId(update as DocUpdate), marker: { kind: "new" } }); - }); + controller.enqueue({ doc, marker: { kind: "new" } }); }); }, @@ -238,6 +245,7 @@ export class CRDT { future() { return stream({ futureOnly: true }); }, + subscribe, }; } diff --git a/src/indexer.ts b/src/indexer.ts index 357f94889..e2f39613c 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -242,8 +242,17 @@ export class Index) => void) => { + const unsubscribe = this.crdt.clock.onTick((updates: DocUpdate>[]) => { + updates.forEach((update) => { + callback(docUpdateToDocWithId(update as DocUpdate)); + }); + }); + + return unsubscribe; + }; + const stream = (opts: { futureOnly: boolean; since?: ClockHead; sinceOptions?: ChangesOptions }) => { - const clock = this.crdt.clock; const ready = this.ready.bind(this); const updateIndex = this._updateIndex.bind(this); const hydrateIndex = this._hydrateIndex.bind(this); @@ -277,14 +286,12 @@ export class Index>[]) => { + unsubscribe = subscribe(async (doc) => { if (isClosed) return; - updates.forEach(async (update) => { - await updateIndex(); - await hydrateIndex(); + await updateIndex(); + await hydrateIndex(); - controller.enqueue({ doc: docUpdateToDocWithId(update as DocUpdate), marker: { kind: "new" } }); - }); + controller.enqueue({ doc, marker: { kind: "new" } }); }); }, @@ -297,6 +304,7 @@ export class Index { sinceOptions?: ChangesOptions; }): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; future(): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; + /** Convenience function to consume a future stream. */ + subscribe(callback: (doc: DocWithId) => void): () => void; } type EmitFn = (k: IndexKeyType, v?: DocFragment) => void; diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index 5ef105ae7..8a70cfbd4 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -1,53 +1,4 @@ -/* - * The streaming API to come to query and allDocs in Database class - * - * we should incorporate the changes of this PR into this: - * https://github.com/fireproof-storage/fireproof/pull/315 - * - * We need new Methods or a structure return value like this: - * Due to that we need to harmonize the return values of both methods to - * return the same structure. I think we should go with an approach like - * the Response object in the fetch API. - * interface QueryResponse { - * rows(): Promise; - * iterator(): AsyncIterableIterator; - * stream(): ReadableStream; - * subscribe(callback: (doc: DocWithId) => void): unsubscribe() => void; - * } - * it should only possible to call every method once. - * if you call it twice it should throw an error - * ------- - * Keep in mind that the iterator and stream should be able to - * substitute the changes method. So we need the possibility to - * pass options to allDocs and or query to change the behavior: - * - SNAPSHOT default -> means it returns the current state of the database and - * closes the stream after that. - * - LIVE -> means it returns the current state of the database and keeps the stream open - * - FUTURE -> means it keeps the stream open for new records which meet the query arguments - * (this might be dropped and simulated with the startpoint option in LIVE mode) - * - * the rows method will only behave in SNAPSHOT mode. - * We should be able to extend in future implemenation pass a startpoint to LIVE. - * - * The first group of tests should verify that query and allDocs both return the same - * QueryResponse object and implement rows method. The test should check if rows - * returns empty arrays if the database is empty and if it returns the correct # documents - * in the database. There should be a test to check for the double call error. - * - * The second group of tests should verify that the iterator and stream method works as expected in - * SNAPSHOT mode. Both should pass the same tests as the rows method. These tests should verify that - * we are able to stream documents from the database without loosing memory. We should test if - * close/unsubscribe of the stream works as expected. - * - * Future milestone: - * The third group of tests should verify that the iterator and stream method works as expected in - * LIVE and FUTURE mode. In this mode we need to check if the stream receives new documents which - * are written to the database after the stream was created. We should think about the raise condition - * of loosing document events between the allDocs and query call and the creation of the stream. - * - */ - -import { ClockHead, DocBase, DocWithId, fireproof, Ledger, QueryStreamMarker } from "@fireproof/core"; +import { ClockHead, DocBase, DocWithId, fireproof, Ledger, QueryResponse, QueryStreamMarker } from "@fireproof/core"; interface DocType { _id: string; @@ -163,6 +114,18 @@ describe("Streaming API", () => { expect(docCount).toBe(3); } + async function testSubscribe(queryResponse: QueryResponse) { + const doc = await new Promise((resolve) => { + queryResponse.subscribe(resolve); + lr.put({ _id: `doc-extra`, name: `doc-extra` }); + }); + + expect(doc).toBeTruthy(); + expect(doc).toHaveProperty("_id"); + expect(doc).toHaveProperty("name"); + expect((doc as DocType).name).toBe("doc-extra"); + } + ////////////// // ALL DOCS // ////////////// @@ -174,7 +137,7 @@ describe("Streaming API", () => { }); it("test `live` method", async () => { - const stream = lr.allDocs().live(); + const stream = lr.allDocs().live(); await testLive(stream, AMOUNT_OF_DOCS); }); @@ -186,9 +149,13 @@ describe("Streaming API", () => { }); it("test `future` method", async () => { - const stream = lr.allDocs().future(); + const stream = lr.allDocs().future(); await testFuture(stream, AMOUNT_OF_DOCS); }); + + it("test `subscribe` method", async () => { + await testSubscribe(lr.allDocs()); + }); }); /////////// @@ -219,6 +186,10 @@ describe("Streaming API", () => { const stream = lr.query("name").future(); await testFuture(stream, AMOUNT_OF_DOCS); }); + + it("test `subscribe` method", async () => { + await testSubscribe(lr.query("name")); + }); }); // ADDITIONAL @@ -256,6 +227,10 @@ describe("Streaming API", () => { const stream = lr.query("additional").future(); await testFuture(stream, AMOUNT_OF_ADDITIONAL_DOCS); }); + + it("test `subscribe` method", async () => { + await testSubscribe(lr.query("name")); + }); }); }); }); From 68aed900dbb03fe958949c59af163fb514a5b04c Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Thu, 30 Jan 2025 16:56:24 +0100 Subject: [PATCH 14/22] feat: Remove sinceOptions key and merge it into parent options obj --- src/crdt.ts | 12 ++++++------ src/indexer.ts | 8 ++++---- src/types.ts | 7 ++----- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/crdt.ts b/src/crdt.ts index 5c83f8c7e..23ae2ef9c 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -170,14 +170,14 @@ export class CRDT { return since ? this.changes(since, sinceOptions) : this.all(); }; - const snapshot = (opts: { since?: ClockHead; sinceOptions?: ChangesOptions } = {}) => { + const snapshot = (opts: { since?: ClockHead } & ChangesOptions = {}) => { const ready = this.ready.bind(this); async function* currentDocsWithId() { await waitFor; await ready(); - for await (const doc of currentDocs(opts.since, opts.sinceOptions)) { + for await (const doc of currentDocs(opts.since, opts)) { yield docUpdateToDocWithId(doc); } } @@ -195,7 +195,7 @@ export class CRDT { return unsubscribe; }; - const stream = (opts: { futureOnly: boolean; since?: ClockHead; sinceOptions?: ChangesOptions }) => { + const stream = (opts: { futureOnly: boolean; since?: ClockHead } & ChangesOptions) => { const ready = this.ready.bind(this); let unsubscribe: undefined | (() => void); @@ -207,7 +207,7 @@ export class CRDT { await ready(); if (opts.futureOnly === false) { - const it = currentDocs(opts.since, opts.sinceOptions); + const it = currentDocs(opts.since, opts); async function iterate(prevValue: DocUpdate) { const { done, value } = await it.next(); @@ -239,8 +239,8 @@ export class CRDT { return { snapshot, - live(opts?: { since?: ClockHead }) { - return stream({ futureOnly: false, since: opts?.since }); + live(opts?: { since?: ClockHead } & ChangesOptions) { + return stream({ ...opts, futureOnly: false }); }, future() { return stream({ futureOnly: true }); diff --git a/src/indexer.ts b/src/indexer.ts index e2f39613c..acc2ff418 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -223,7 +223,7 @@ export class Index { + const snapshot = (opts: { since?: ClockHead } & ChangesOptions = {}) => { const ready = this.ready.bind(this); const updateIndex = this._updateIndex.bind(this); const hydrateIndex = this._hydrateIndex.bind(this); @@ -234,7 +234,7 @@ export class Index { + const stream = (opts: { futureOnly: boolean; since?: ClockHead } & ChangesOptions) => { const ready = this.ready.bind(this); const updateIndex = this._updateIndex.bind(this); const hydrateIndex = this._hydrateIndex.bind(this); @@ -269,7 +269,7 @@ export class Index) { const { done, value } = await it.next(); diff --git a/src/types.ts b/src/types.ts index 21714adaf..cf3e083f4 100644 --- a/src/types.ts +++ b/src/types.ts @@ -267,11 +267,8 @@ export interface QueryOpts { export type QueryStreamMarker = { readonly kind: "preexisting"; readonly done: boolean } | { readonly kind: "new" }; export interface QueryResponse { - snapshot(opts?: { since?: ClockHead; sinceOptions?: ChangesOptions }): AsyncGenerator>; - live(opts?: { - since?: ClockHead; - sinceOptions?: ChangesOptions; - }): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; + snapshot(opts?: { since?: ClockHead } & ChangesOptions): AsyncGenerator>; + live(opts?: { since?: ClockHead } & ChangesOptions): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; future(): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; /** Convenience function to consume a future stream. */ subscribe(callback: (doc: DocWithId) => void): () => void; From 634f593b7b2c20eff63f3598752c450d35aa6104 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Mon, 3 Feb 2025 19:23:14 +0100 Subject: [PATCH 15/22] test: If the default reader of a stream has been closed automatically when using for await --- tests/fireproof/streaming-api.test.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index 8a70cfbd4..06433fe7f 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -52,6 +52,10 @@ describe("Streaming API", () => { } expect(docCount).toBe(amountOfDocs + 1); + + // Test that the stream has been closed automatically by `for await` + const r = stream.getReader(); + expect(r.closed).resolves.toBe(undefined); } async function testSince({ @@ -82,6 +86,10 @@ describe("Streaming API", () => { expect(docCount).toBe(amountOfNewDocs); + // Test that the stream has been closed automatically by `for await` + const r = stream.getReader(); + expect(r.closed).resolves.toBe(undefined); + // Snapshot // NOTE: This also tests the stream cancellation process. const amountOfSnapshotDocs = Math.floor(Math.random() * (10 - 1) + 1); From 032906dbc9ed86891c701b3bbe9985ed832409bc Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Tue, 4 Feb 2025 17:39:31 +0100 Subject: [PATCH 16/22] refactor: Inner functions out of allDocs & query --- src/crdt.ts | 133 +++++++++++++++-------------- src/indexer.ts | 228 ++++++++++++++++++++++++++----------------------- 2 files changed, 191 insertions(+), 170 deletions(-) diff --git a/src/crdt.ts b/src/crdt.ts index 23ae2ef9c..b31ee9503 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -163,90 +163,99 @@ export class CRDT { /** * Retrieve the current set of documents. */ - allDocs(opts: { waitFor?: Promise } = {}): QueryResponse { - const waitFor = opts.waitFor; + allDocs({ waitFor }: { waitFor?: Promise } = {}): QueryResponse { + const stream = this.#stream.bind(this); - const currentDocs = (since?: ClockHead, sinceOptions?: ChangesOptions) => { - return since ? this.changes(since, sinceOptions) : this.all(); + return { + snapshot: (sinceOpts) => this.#snapshot(sinceOpts, { waitFor }), + subscribe: (callback) => this.#subscribe(callback), + live(opts?: { since?: ClockHead } & ChangesOptions) { + return stream({ ...opts, futureOnly: false }, { waitFor }); + }, + future() { + return stream({ futureOnly: true }, { waitFor }); + }, }; + } - const snapshot = (opts: { since?: ClockHead } & ChangesOptions = {}) => { - const ready = this.ready.bind(this); + #currentDocs(since?: ClockHead, sinceOptions?: ChangesOptions) { + return since ? this.changes(since, sinceOptions) : this.all(); + } - async function* currentDocsWithId() { - await waitFor; - await ready(); + #snapshot( + opts: { since?: ClockHead } & ChangesOptions = {}, + { waitFor }: { waitFor?: Promise } = {}, + ): AsyncGenerator> { + const currentDocs = this.#currentDocs.bind(this); + const ready = this.ready.bind(this); - for await (const doc of currentDocs(opts.since, opts)) { - yield docUpdateToDocWithId(doc); - } + async function* currentDocsWithId() { + await waitFor; + await ready(); + + for await (const doc of currentDocs(opts.since, opts)) { + yield docUpdateToDocWithId(doc); } + } - return currentDocsWithId(); - }; + return currentDocsWithId(); + } - const subscribe = (callback: (doc: DocWithId) => void) => { - const unsubscribe = this.clock.onTick((updates: DocUpdate>[]) => { - updates.forEach((update) => { - callback(docUpdateToDocWithId(update as DocUpdate)); - }); + #subscribe(callback: (doc: DocWithId) => void) { + const unsubscribe = this.clock.onTick((updates: DocUpdate>[]) => { + updates.forEach((update) => { + callback(docUpdateToDocWithId(update as DocUpdate)); }); + }); - return unsubscribe; - }; - - const stream = (opts: { futureOnly: boolean; since?: ClockHead } & ChangesOptions) => { - const ready = this.ready.bind(this); + return unsubscribe; + } - let unsubscribe: undefined | (() => void); - let isClosed = false; + #stream( + opts: { futureOnly: boolean; since?: ClockHead } & ChangesOptions, + { waitFor }: { waitFor?: Promise } = {}, + ) { + const currentDocs = this.#currentDocs.bind(this); + const ready = this.ready.bind(this); + const subscribe = this.#subscribe.bind(this); - return new ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>({ - async start(controller) { - await waitFor; - await ready(); + let unsubscribe: undefined | (() => void); + let isClosed = false; - if (opts.futureOnly === false) { - const it = currentDocs(opts.since, opts); + return new ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>({ + async start(controller) { + await waitFor; + await ready(); - async function iterate(prevValue: DocUpdate) { - const { done, value } = await it.next(); + if (opts.futureOnly === false) { + const it = currentDocs(opts.since, opts); - controller.enqueue({ - doc: docUpdateToDocWithId(prevValue), - marker: { kind: "preexisting", done: done || false }, - }); + async function iterate(prevValue: DocUpdate) { + const { done, value } = await it.next(); - if (!done) await iterate(value); - } + controller.enqueue({ + doc: docUpdateToDocWithId(prevValue), + marker: { kind: "preexisting", done: done || false }, + }); - const { value } = await it.next(); - if (value) await iterate(value); + if (!done) await iterate(value); } - unsubscribe = subscribe((doc) => { - if (isClosed) return; - controller.enqueue({ doc, marker: { kind: "new" } }); - }); - }, - - cancel() { - isClosed = true; - unsubscribe?.(); - }, - }); - }; + const { value } = await it.next(); + if (value) await iterate(value); + } - return { - snapshot, - live(opts?: { since?: ClockHead } & ChangesOptions) { - return stream({ ...opts, futureOnly: false }); + unsubscribe = subscribe((doc) => { + if (isClosed) return; + controller.enqueue({ doc, marker: { kind: "new" } }); + }); }, - future() { - return stream({ futureOnly: true }); + + cancel() { + isClosed = true; + unsubscribe?.(); }, - subscribe, - }; + }); } async vis(): Promise { diff --git a/src/indexer.ts b/src/indexer.ts index acc2ff418..50b97a6d2 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -179,139 +179,151 @@ export class Index = {}, { waitFor }: { waitFor?: Promise } = {}): QueryResponse { - const query = async (since?: ClockHead, sinceOptions?: ChangesOptions) => { - const deps = { crdt: this.crdt, logger: this.logger }; - const qry = { ...opts, since, sinceOptions }; + query(qryOpts: QueryOpts = {}, { waitFor }: { waitFor?: Promise } = {}): QueryResponse { + const stream = this.#stream.bind(this); - if (!this.byKey.root) { - return applyQuery(deps, { result: [] }, qry); - } - - if (qry.range) { - const eRange = encodeRange(qry.range); - return applyQuery(deps, await throwFalsy(this.byKey.root).range(eRange[0], eRange[1]), qry); - } + return { + snapshot: (sinceOpts) => this.#snapshot(qryOpts, sinceOpts, { waitFor }), + subscribe: (callback) => this.#subscribe(callback), + live(opts?: { since?: ClockHead }) { + return stream(qryOpts, { futureOnly: false, since: opts?.since }, { waitFor }); + }, + future() { + return stream(qryOpts, { futureOnly: true }, { waitFor }); + }, + }; + } - if (qry.key) { - const encodedKey = encodeKey(qry.key); - return applyQuery(deps, await throwFalsy(this.byKey.root).get(encodedKey), qry); - } + async #query(queryOptions: QueryOpts = {}, sinceOptions: { since?: ClockHead } & ChangesOptions = {}) { + const deps = { crdt: this.crdt, logger: this.logger }; + const qry = { ...queryOptions, since: sinceOptions.since, sinceOptions }; - if (qry.prefix) { - if (!Array.isArray(qry.prefix)) qry.prefix = [qry.prefix]; - // prefix should be always an array - const start = [...qry.prefix, NaN]; - const end = [...qry.prefix, Infinity]; - const encodedR = encodeRange([start, end]); - return applyQuery(deps, await this.byKey.root.range(...encodedR), qry); - } + if (!this.byKey.root) { + return applyQuery(deps, { result: [] }, qry); + } - const all = await this.byKey.root.getAllEntries(); // funky return type - - return applyQuery( - deps, - { - // @ts-expect-error getAllEntries returns a different type than range - result: all.result.map(({ key: [k, id], value }) => ({ - key: k, - id, - value, - })), - }, - qry, - ); - }; + if (qry.range) { + const eRange = encodeRange(qry.range); + return applyQuery(deps, await throwFalsy(this.byKey.root).range(eRange[0], eRange[1]), qry); + } - const snapshot = (opts: { since?: ClockHead } & ChangesOptions = {}) => { - const ready = this.ready.bind(this); - const updateIndex = this._updateIndex.bind(this); - const hydrateIndex = this._hydrateIndex.bind(this); + if (qry.key) { + const encodedKey = encodeKey(qry.key); + return applyQuery(deps, await throwFalsy(this.byKey.root).get(encodedKey), qry); + } - async function* docsWithId() { - await waitFor; - await ready(); - await updateIndex(); - await hydrateIndex(); + if (qry.prefix) { + if (!Array.isArray(qry.prefix)) qry.prefix = [qry.prefix]; + // prefix should be always an array + const start = [...qry.prefix, NaN]; + const end = [...qry.prefix, Infinity]; + const encodedR = encodeRange([start, end]); + return applyQuery(deps, await this.byKey.root.range(...encodedR), qry); + } - for await (const doc of await query(opts.since, opts)) { - if (doc) yield doc; - } - } + const all = await this.byKey.root.getAllEntries(); // funky return type + + return applyQuery( + deps, + { + // @ts-expect-error getAllEntries returns a different type than range + result: all.result.map(({ key: [k, id], value }) => ({ + key: k, + id, + value, + })), + }, + qry, + ); + } - return docsWithId(); + #snapshot( + qryOpts: QueryOpts = {}, + sinceOpts: { since?: ClockHead } & ChangesOptions = {}, + { waitFor }: { waitFor?: Promise } = {}, + ) { + const generator = async () => { + await waitFor; + await this.ready(); + await this._updateIndex(); + await this._hydrateIndex(); + + return await this.#query(qryOpts, sinceOpts); }; - const subscribe = (callback: (doc: DocWithId) => void) => { - const unsubscribe = this.crdt.clock.onTick((updates: DocUpdate>[]) => { - updates.forEach((update) => { - callback(docUpdateToDocWithId(update as DocUpdate)); - }); - }); - - return unsubscribe; - }; + async function* docsWithId() { + for await (const doc of await generator()) { + if (doc) yield doc; + } + } - const stream = (opts: { futureOnly: boolean; since?: ClockHead } & ChangesOptions) => { - const ready = this.ready.bind(this); - const updateIndex = this._updateIndex.bind(this); - const hydrateIndex = this._hydrateIndex.bind(this); + return docsWithId(); + } - let unsubscribe: undefined | (() => void); - let isClosed = false; + #subscribe(callback: (doc: DocWithId) => void) { + const unsubscribe = this.crdt.clock.onTick((updates: DocUpdate>[]) => { + updates.forEach((update) => { + callback(docUpdateToDocWithId(update as DocUpdate)); + }); + }); - return new ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>({ - async start(controller) { - await waitFor; - await ready(); + return unsubscribe; + } - if (opts.futureOnly === false) { - await updateIndex(); - await hydrateIndex(); + #stream( + qryOpts: QueryOpts = {}, + sinceOpts: { futureOnly: boolean; since?: ClockHead } & ChangesOptions, + { waitFor }: { waitFor?: Promise } = {}, + ) { + const hydrateIndex = this._hydrateIndex.bind(this); + const query = this.#query.bind(this); + const ready = this.ready.bind(this); + const subscribe = this.#subscribe.bind(this); + const updateIndex = this._updateIndex.bind(this); + + let unsubscribe: undefined | (() => void); + let isClosed = false; + + return new ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>({ + async start(controller) { + await waitFor; + await ready(); - const it = await query(opts.since, opts); + if (sinceOpts.futureOnly === false) { + await updateIndex(); + await hydrateIndex(); - async function iterate(prevValue: DocWithId) { - const { done, value } = await it.next(); + const it = await query(qryOpts, sinceOpts); - controller.enqueue({ - doc: prevValue, - marker: { kind: "preexisting", done: done || false }, - }); + async function iterate(prevValue: DocWithId) { + const { done, value } = await it.next(); - if (!done) await iterate(value); - } + controller.enqueue({ + doc: prevValue, + marker: { kind: "preexisting", done: done || false }, + }); - const { value } = await it.next(); - if (value) await iterate(value); + if (!done) await iterate(value); } - unsubscribe = subscribe(async (doc) => { - if (isClosed) return; - await updateIndex(); - await hydrateIndex(); - - controller.enqueue({ doc, marker: { kind: "new" } }); - }); - }, + const { value } = await it.next(); + if (value) await iterate(value); + } - cancel() { - isClosed = true; - unsubscribe?.(); - }, - }); - }; + unsubscribe = subscribe(async (doc) => { + if (isClosed) return; + await updateIndex(); + await hydrateIndex(); - return { - snapshot, - subscribe, - live(opts?: { since?: ClockHead }) { - return stream({ futureOnly: false, since: opts?.since }); + controller.enqueue({ doc, marker: { kind: "new" } }); + }); }, - future() { - return stream({ futureOnly: true }); + + cancel() { + isClosed = true; + unsubscribe?.(); }, - }; + }); } _resetIndex() { From e991e75b7d02126674366eab15f6b2c234b52f5b Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 5 Feb 2025 14:59:55 +0100 Subject: [PATCH 17/22] feat: Add convenience toArray method to QueryResponse --- src/crdt.ts | 2 ++ src/indexer.ts | 2 ++ src/types.ts | 2 ++ tests/fireproof/streaming-api.test.ts | 17 +++++++++++++++++ 4 files changed, 23 insertions(+) diff --git a/src/crdt.ts b/src/crdt.ts index b31ee9503..6be2bb014 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -169,6 +169,8 @@ export class CRDT { return { snapshot: (sinceOpts) => this.#snapshot(sinceOpts, { waitFor }), subscribe: (callback) => this.#subscribe(callback), + toArray: (sinceOpts) => Array.fromAsync(this.#snapshot(sinceOpts, { waitFor })), + live(opts?: { since?: ClockHead } & ChangesOptions) { return stream({ ...opts, futureOnly: false }, { waitFor }); }, diff --git a/src/indexer.ts b/src/indexer.ts index 50b97a6d2..bccd358de 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -185,6 +185,8 @@ export class Index this.#snapshot(qryOpts, sinceOpts, { waitFor }), subscribe: (callback) => this.#subscribe(callback), + toArray: (sinceOpts) => Array.fromAsync(this.#snapshot(qryOpts, sinceOpts, { waitFor })), + live(opts?: { since?: ClockHead }) { return stream(qryOpts, { futureOnly: false, since: opts?.since }, { waitFor }); }, diff --git a/src/types.ts b/src/types.ts index cf3e083f4..b0bdef91c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -272,6 +272,8 @@ export interface QueryResponse { future(): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; /** Convenience function to consume a future stream. */ subscribe(callback: (doc: DocWithId) => void): () => void; + /** Convenience function to get a full snapshot. */ + toArray(opts?: { since?: ClockHead } & ChangesOptions): Promise[]>; } type EmitFn = (k: IndexKeyType, v?: DocFragment) => void; diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index 06433fe7f..b849a4cd1 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -134,6 +134,11 @@ describe("Streaming API", () => { expect((doc as DocType).name).toBe("doc-extra"); } + async function testToArray(queryResponse: QueryResponse, amountOfDocs: number) { + const arr = await queryResponse.toArray(); + expect(arr.length).toBe(amountOfDocs); + } + ////////////// // ALL DOCS // ////////////// @@ -164,6 +169,10 @@ describe("Streaming API", () => { it("test `subscribe` method", async () => { await testSubscribe(lr.allDocs()); }); + + it("test `toArray` method", async () => { + await testToArray(lr.allDocs(), AMOUNT_OF_DOCS); + }); }); /////////// @@ -198,6 +207,10 @@ describe("Streaming API", () => { it("test `subscribe` method", async () => { await testSubscribe(lr.query("name")); }); + + it("test `toArray` method", async () => { + await testToArray(lr.query("name"), AMOUNT_OF_DOCS); + }); }); // ADDITIONAL @@ -239,6 +252,10 @@ describe("Streaming API", () => { it("test `subscribe` method", async () => { await testSubscribe(lr.query("name")); }); + + it("test `toArray` method", async () => { + await testToArray(lr.query("additional"), AMOUNT_OF_ADDITIONAL_DOCS); + }); }); }); }); From e15266af031b73ba4141e297c90ffbe8f0ac7018 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 5 Feb 2025 15:28:05 +0100 Subject: [PATCH 18/22] refactor: Use util fn instead of Array.fromAsync --- src/crdt.ts | 4 ++-- src/indexer.ts | 4 ++-- src/utils.ts | 13 +++++++++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/crdt.ts b/src/crdt.ts index 6be2bb014..c04370c96 100644 --- a/src/crdt.ts +++ b/src/crdt.ts @@ -38,7 +38,7 @@ import type { import { index, type Index } from "./indexer.js"; import { CRDTClock } from "./crdt-clock.js"; // import { blockstoreFactory } from "./blockstore/transaction.js"; -import { ensureLogger } from "./utils.js"; +import { arrayFromAsyncIterable, ensureLogger } from "./utils.js"; import { LedgerOpts } from "./ledger.js"; export interface HasCRDT { @@ -169,7 +169,7 @@ export class CRDT { return { snapshot: (sinceOpts) => this.#snapshot(sinceOpts, { waitFor }), subscribe: (callback) => this.#subscribe(callback), - toArray: (sinceOpts) => Array.fromAsync(this.#snapshot(sinceOpts, { waitFor })), + toArray: (sinceOpts) => arrayFromAsyncIterable(this.#snapshot(sinceOpts, { waitFor })), live(opts?: { since?: ClockHead } & ChangesOptions) { return stream({ ...opts, futureOnly: false }, { waitFor }); diff --git a/src/indexer.ts b/src/indexer.ts index bccd358de..8889d829b 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -34,7 +34,7 @@ import { CompareKey, } from "./indexer-helpers.js"; import { CRDT, HasCRDT } from "./crdt.js"; -import { ensureLogger } from "./utils.js"; +import { arrayFromAsyncIterable, ensureLogger } from "./utils.js"; import { Logger } from "@adviser/cement"; import { docUpdateToDocWithId } from "./crdt-helpers.js"; @@ -185,7 +185,7 @@ export class Index this.#snapshot(qryOpts, sinceOpts, { waitFor }), subscribe: (callback) => this.#subscribe(callback), - toArray: (sinceOpts) => Array.fromAsync(this.#snapshot(qryOpts, sinceOpts, { waitFor })), + toArray: (sinceOpts) => arrayFromAsyncIterable(this.#snapshot(qryOpts, sinceOpts, { waitFor })), live(opts?: { since?: ClockHead }) { return stream(qryOpts, { futureOnly: false, since: opts?.since }, { waitFor }); diff --git a/src/utils.ts b/src/utils.ts index 203360cd9..f2ff12707 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -354,6 +354,19 @@ export function isNotFoundError(e: Error | Result | unknown): e is NotF return false; } +/** + * Array.fromAsync "polyfill" + */ +export async function arrayFromAsyncIterable(it: AsyncIterable) { + const arr = []; + + for await (const a of it) { + arr.push(a); + } + + return arr; +} + export function UInt8ArrayEqual(a: Uint8Array, b: Uint8Array): boolean { if (a.length !== b.length) { return false; From fdd9ee9d09711f3833b6f581b68578c3566bf7de Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Tue, 11 Feb 2025 19:47:35 +0100 Subject: [PATCH 19/22] refactor: .query() refactor to include prolly tree's key + value & excludeDocs option --- src/crdt-clock.ts | 4 +- src/indexer-helpers.ts | 41 ++++--- src/indexer.ts | 103 +++++++++++++----- src/ledger.ts | 30 +++--- src/types.ts | 42 ++++++-- tests/fireproof/streaming-api.test.ts | 148 +++++++++++++++----------- 6 files changed, 238 insertions(+), 130 deletions(-) diff --git a/src/crdt-clock.ts b/src/crdt-clock.ts index f242904ef..d1af485fc 100644 --- a/src/crdt-clock.ts +++ b/src/crdt-clock.ts @@ -58,9 +58,7 @@ export class CRDTClock { async processUpdates(updatesAcc: DocUpdate[], all: boolean, prevHead: ClockHead) { let internalUpdates = updatesAcc; if (this.watchers.size && !all) { - const changes = await Array.fromAsync( - clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger), - ); + const changes = await Array.fromAsync(clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger)); internalUpdates = changes; } this.zoomers.forEach((fn) => fn()); diff --git a/src/indexer-helpers.ts b/src/indexer-helpers.ts index ed6447c1e..74bd11207 100644 --- a/src/indexer-helpers.ts +++ b/src/indexer-helpers.ts @@ -20,14 +20,13 @@ import { DocFragment, IndexUpdate, QueryOpts, - DocWithId, IndexKeyType, IndexKey, DocTypes, - DocObject, IndexUpdateString, ClockHead, ChangesOptions, + IndexRow, } from "./types.js"; import { CarTransaction, BlockFetcher, AnyLink, AnyBlock } from "./blockstore/index.js"; import { CRDT } from "./crdt.js"; @@ -64,9 +63,9 @@ export const byKeyOpts: StaticProllyOptions = { cache, chunker: bf(3 export const byIdOpts: StaticProllyOptions = { cache, chunker: bf(30), codec, hasher, compare: simpleCompare }; -export interface IndexDoc { +export interface IndexDoc { readonly key: IndexKey; - readonly value: DocFragment; + readonly value: R; } export interface IndexDocString { @@ -74,26 +73,26 @@ export interface IndexDocString { readonly value: DocFragment; } -export function indexEntriesForChanges( +export function indexEntriesForChanges( changes: DocUpdate[], - mapFn: MapFn, -): IndexDoc[] { - const indexEntries: IndexDoc[] = []; + mapFn: MapFn, +): IndexDoc[] { + const indexEntries: IndexDoc[] = []; changes.forEach(({ id: key, value, del }) => { if (del || !value) return; let mapCalled = false; - const mapReturn = mapFn({ ...(value as DocWithId), _id: key }, (k: IndexKeyType, v?: DocFragment) => { + const mapReturn = mapFn({ ...value, _id: key }, (k: IndexKeyType, v?: R) => { mapCalled = true; if (typeof k === "undefined") return; indexEntries.push({ key: [charwise.encode(k) as K, key], - value: v || null, + value: (v || null) as R, }); }); if (!mapCalled && mapReturn) { indexEntries.push({ key: [charwise.encode(mapReturn) as K, key], - value: null, + value: null as R, }); } }); @@ -162,11 +161,11 @@ export async function loadIndex; } -export async function* applyQuery( +export async function* applyQuery( { crdt, logger }: { crdt: CRDT; logger: Logger }, resp: { result: ProllyIndexRow[] }, query: QueryOpts & { since?: ClockHead; sinceOptions?: ChangesOptions }, -): AsyncGenerator> { +): AsyncGenerator> { async function* _apply() { let result = [...resp.result]; @@ -188,10 +187,18 @@ export async function* applyQuery { - return val ? ({ ...val.doc, _id: row.id } as DocWithId) : undefined; - }); + if (query.excludeDocs) { + for (const res of result) { + yield res; + } + } else { + for (const res of result) { + yield crdt.get(res.id).then((val) => { + if (!val) return undefined; + const row: IndexRow = { ...res, doc: val.doc }; + return row; + }); + } } } diff --git a/src/indexer.ts b/src/indexer.ts index 8889d829b..1ccd7a168 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -15,8 +15,10 @@ import { SuperThis, QueryResponse, ChangesOptions, - QueryStreamMarker, - DocWithId, + IndexRow, + InquiryResponse, + DocumentRow, + Row, } from "./types.js"; import { BaseBlockstore } from "./blockstore/index.js"; @@ -41,16 +43,16 @@ import { docUpdateToDocWithId } from "./crdt-helpers.js"; export function index, R extends DocFragment = T>( refDb: HasCRDT, name: string, - mapFn?: MapFn, + mapFn?: MapFn, meta?: IdxMeta, ): Index { if (mapFn && meta) throw refDb.crdt.logger.Error().Msg("cannot provide both mapFn and meta").AsError(); if (mapFn && mapFn.constructor.name !== "Function") throw refDb.crdt.logger.Error().Msg("mapFn must be a function").AsError(); if (refDb.crdt.indexers.has(name)) { - const idx = refDb.crdt.indexers.get(name) as unknown as Index; + const idx = refDb.crdt.indexers.get(name) as unknown as Index; idx.applyMapFn(name, mapFn, meta); } else { - const idx = new Index(refDb.crdt.sthis, refDb.crdt, name, mapFn, meta); + const idx = new Index(refDb.crdt.sthis, refDb.crdt, name, mapFn, meta); refDb.crdt.indexers.set(name, idx as unknown as Index, NonNullable>); } return refDb.crdt.indexers.get(name) as unknown as Index; @@ -65,7 +67,7 @@ export class Index; readonly name: string; - mapFn?: MapFn; + mapFn?: MapFn; mapFnString = ""; byKey: IndexTree = new IndexTree(); byId: IndexTree = new IndexTree(); @@ -92,7 +94,7 @@ export class Index | CRDT>, name: string, mapFn?: MapFn, meta?: IdxMeta) { + constructor(sthis: SuperThis, crdt: CRDT | CRDT>, name: string, mapFn?: MapFn, meta?: IdxMeta) { this.logger = ensureLogger(sthis, "Index"); this.blockstore = crdt.indexBlockstore; this.crdt = crdt as CRDT; @@ -112,7 +114,7 @@ export class Index, meta?: IdxMeta) { + applyMapFn(name: string, mapFn?: MapFn, meta?: IdxMeta) { if (mapFn && meta) throw this.logger.Error().Msg("cannot provide both mapFn and meta").AsError(); if (this.name && this.name !== name) throw this.logger.Error().Msg("cannot change name").AsError(); // this.name = name; @@ -153,7 +155,7 @@ export class Index (doc as unknown as Record)[name] ?? undefined) as MapFn; + mapFn = ((doc) => (doc as unknown as Record)[name] ?? undefined) as MapFn; } if (this.mapFnString) { // we already loaded from a header @@ -179,12 +181,13 @@ export class Index = {}, { waitFor }: { waitFor?: Promise } = {}): QueryResponse { + query(qryOpts: QueryOpts & { excludeDocs: true }, intlOpts: { waitFor?: Promise }): InquiryResponse; + query(qryOpts: QueryOpts = {}, { waitFor }: { waitFor?: Promise } = {}): QueryResponse { const stream = this.#stream.bind(this); return { snapshot: (sinceOpts) => this.#snapshot(qryOpts, sinceOpts, { waitFor }), - subscribe: (callback) => this.#subscribe(callback), + subscribe: (callback) => this.#subscribe(qryOpts, callback), toArray: (sinceOpts) => arrayFromAsyncIterable(this.#snapshot(qryOpts, sinceOpts, { waitFor })), live(opts?: { since?: ClockHead }) { @@ -239,11 +242,21 @@ export class Index & { excludeDocs: true }, + sinceOpts: ({ since?: ClockHead } & ChangesOptions) | undefined, + waitOpts: { waitFor?: Promise }, + ): AsyncGenerator>; + #snapshot( + qryOpts: QueryOpts, + sinceOpts: ({ since?: ClockHead } & ChangesOptions) | undefined, + waitOpts: { waitFor?: Promise }, + ): AsyncGenerator>; #snapshot( qryOpts: QueryOpts = {}, sinceOpts: { since?: ClockHead } & ChangesOptions = {}, { waitFor }: { waitFor?: Promise } = {}, - ) { + ): AsyncGenerator | DocumentRow> { const generator = async () => { await waitFor; await this.ready(); @@ -253,19 +266,54 @@ export class Index | DocumentRow> { + for await (const row of await generator()) { + if (!row) continue; + + if (qryOpts.excludeDocs && !row.doc) { + const a: Row = row; + yield a; + } else if (!qryOpts.excludeDocs && row.doc) { + const b = row as DocumentRow; + yield b; + } } } - return docsWithId(); + return rows(); } - #subscribe(callback: (doc: DocWithId) => void) { - const unsubscribe = this.crdt.clock.onTick((updates: DocUpdate>[]) => { - updates.forEach((update) => { - callback(docUpdateToDocWithId(update as DocUpdate)); + #subscribe(qryOpts: { excludeDocs: true }, callback: (row: Row) => void): () => void; + #subscribe( + qryOpts: { excludeDocs: false } | { excludeDocs?: boolean }, + callback: (row: DocumentRow) => void, + ): () => void; + #subscribe( + { excludeDocs }: { excludeDocs: true } | { excludeDocs: false } | { excludeDocs?: boolean } = {}, + callback: ((row: Row) => void) | ((row: DocumentRow) => void), + ): () => void { + // NOTE: Despite using onTick or onTock, it always loads the document (update). + const unsubscribe = this.crdt.clock.onTick(async (updates: DocUpdate[]) => { + await this._updateIndex(); + await this._hydrateIndex(); + + const mapFn = this.mapFn?.bind(this); + if (!mapFn) throw this.logger.Error().Msg("No map function defined").AsError(); + + updates.forEach(async (update) => { + const indexEntries = indexEntriesForChanges([update], mapFn); + const indexEntry = indexEntries[0]; + if (!indexEntry) return; + + if (excludeDocs === true) { + // NOTE: Don't know why the type overloading is not doing its thing here + // (callback as (row: Row) => void)(row); + } else if (!excludeDocs) { + const doc = docUpdateToDocWithId(update); + const docRow: DocumentRow = { ...indexEntry, id: update.id, doc }; + + callback(docRow); + } }); }); @@ -286,7 +334,7 @@ export class Index void); let isClosed = false; - return new ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>({ + return new ReadableStream({ async start(controller) { await waitFor; await ready(); @@ -297,11 +345,11 @@ export class Index) { + async function iterate(prevValue: IndexRow) { const { done, value } = await it.next(); controller.enqueue({ - doc: prevValue, + row: prevValue, marker: { kind: "preexisting", done: done || false }, }); @@ -312,12 +360,9 @@ export class Index { + unsubscribe = subscribe(qryOpts, async (row) => { if (isClosed) return; - await updateIndex(); - await hydrateIndex(); - - controller.enqueue({ doc, marker: { kind: "new" } }); + controller.enqueue({ row, marker: { kind: "new" } }); }); }, @@ -367,7 +412,7 @@ export class Index ({ key, del: true })); removeIdIndexEntries = oldChangeEntries.map((key) => ({ key: key[1], del: true })); } - const indexEntries = indexEntriesForChanges(result, this.mapFn); // use a getter to translate from string + const indexEntries = indexEntriesForChanges(result, this.mapFn); // use a getter to translate from string const byIdIndexEntries: IndexDocString[] = indexEntries.map(({ key }) => ({ key: key[1], value: key, diff --git a/src/ledger.ts b/src/ledger.ts index f537667e2..ba1627ccb 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -74,10 +74,13 @@ export interface Ledger
> extends HasC put(doc: DocSet): Promise; bulk(docs: DocSet[]): Promise; del(id: string): Promise; - allDocs(): QueryResponse; - allDocuments(): QueryResponse; + allDocs(): QueryResponse; + allDocuments(): QueryResponse; - query(field: string | MapFn, opts?: QueryOpts): QueryResponse; + query( + field: string | MapFn, + opts?: QueryOpts, + ): QueryResponse; compact(): Promise; } @@ -159,13 +162,16 @@ export class LedgerShell
> implements del(id: string): Promise { return this.ref.del(id); } - allDocs(): QueryResponse { + allDocs(): QueryResponse { return this.ref.allDocs(); } - allDocuments(): QueryResponse { + allDocuments(): QueryResponse { return this.ref.allDocuments(); } - query(field: string | MapFn, opts?: QueryOpts): QueryResponse { + query( + field: string | MapFn, + opts?: QueryOpts, + ): QueryResponse { return this.ref.query(field, opts); } compact(): Promise { @@ -305,27 +311,27 @@ class LedgerImpl
> implements Ledger(): QueryResponse { + allDocs(): QueryResponse { this.logger.Debug().Msg("allDocs"); - return this.crdt.allDocs({ waitFor: this.ready() }); + return this.crdt.allDocs({ waitFor: this.ready() }); } - allDocuments(): QueryResponse { - return this.allDocs(); + allDocuments(): QueryResponse { + return this.allDocs(); } // todo if we add this onto dbs in fireproof.ts then we can make index.ts a separate package query( field: string | MapFn, opts: QueryOpts = {}, - ): QueryResponse { + ): QueryResponse { this.logger.Debug().Any("field", field).Any("opts", opts).Msg("query"); const _crdt = this.crdt as unknown as CRDT; const idx = typeof field === "string" ? index({ crdt: _crdt }, field) : index({ crdt: _crdt }, makeName(field.toString()), field); - return idx.query(opts, { waitFor: this.ready() }); + return idx.query(opts, { waitFor: this.ready() }); } async compact() { diff --git a/src/types.ts b/src/types.ts index b0bdef91c..8e1ba2bdc 100644 --- a/src/types.ts +++ b/src/types.ts @@ -217,9 +217,19 @@ export interface IndexUpdateString { // export type IndexRow = // T extends DocLiteral ? IndexRowLiteral : IndexRowObject +export interface Row { + readonly id: string; + readonly key: IndexKey; + readonly value: R; +} + +export interface DocumentRow extends Row { + readonly doc: DocWithId; +} + export interface IndexRow { readonly id: string; - readonly key: K; // IndexKey; + readonly key: IndexKey; readonly value: R; readonly doc?: DocWithId; } @@ -257,6 +267,7 @@ export interface IdxMetaMap { // eslint-disable-next-line @typescript-eslint/no-unused-vars export interface QueryOpts { readonly descending?: boolean; + readonly excludeDocs?: boolean; readonly limit?: number; readonly range?: [IndexKeyType, IndexKeyType]; readonly key?: DocFragment; @@ -266,18 +277,31 @@ export interface QueryOpts { export type QueryStreamMarker = { readonly kind: "preexisting"; readonly done: boolean } | { readonly kind: "new" }; -export interface QueryResponse { - snapshot(opts?: { since?: ClockHead } & ChangesOptions): AsyncGenerator>; - live(opts?: { since?: ClockHead } & ChangesOptions): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; - future(): ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; +export interface InquiryResponse { + snapshot(opts?: { since?: ClockHead } & ChangesOptions): AsyncGenerator>; + live(opts?: { since?: ClockHead } & ChangesOptions): ReadableStream<{ row: Row; marker: QueryStreamMarker }>; + future(): ReadableStream<{ row: Row; marker: QueryStreamMarker }>; + /** Convenience function to consume a future stream. */ + subscribe(callback: (row: Row) => void): () => void; + /** Convenience function to get a full snapshot. */ + toArray(opts?: { since?: ClockHead } & ChangesOptions): Promise[]>; +} + +/** + * Same as `InquiryResponse` but with the document attached. + */ +export interface QueryResponse { + snapshot(opts?: { since?: ClockHead } & ChangesOptions): AsyncGenerator>; + live(opts?: { since?: ClockHead } & ChangesOptions): ReadableStream<{ row: DocumentRow; marker: QueryStreamMarker }>; + future(): ReadableStream<{ row: DocumentRow; marker: QueryStreamMarker }>; /** Convenience function to consume a future stream. */ - subscribe(callback: (doc: DocWithId) => void): () => void; + subscribe(callback: (row: DocumentRow) => void): () => void; /** Convenience function to get a full snapshot. */ - toArray(opts?: { since?: ClockHead } & ChangesOptions): Promise[]>; + toArray(opts?: { since?: ClockHead } & ChangesOptions): Promise[]>; } -type EmitFn = (k: IndexKeyType, v?: DocFragment) => void; -export type MapFn = (doc: DocWithId, emit: EmitFn) => DocFragment | unknown; +type EmitFn = (k: IndexKeyType, v?: R) => void; +export type MapFn = (doc: DocWithId, emit: EmitFn) => R | unknown; export interface ChangesOptions { readonly dirty?: boolean; diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index b849a4cd1..152571e75 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -1,4 +1,14 @@ -import { ClockHead, DocBase, DocWithId, fireproof, Ledger, QueryResponse, QueryStreamMarker } from "@fireproof/core"; +import { + ClockHead, + DocFragment, + DocTypes, + DocumentRow, + fireproof, + IndexKeyType, + Ledger, + QueryResponse, + QueryStreamMarker, +} from "@fireproof/core"; interface DocType { _id: string; @@ -13,13 +23,12 @@ describe("Streaming API", () => { beforeEach(async () => { lr = fireproof(Date.now().toString()); - await Promise.all( - Array(AMOUNT_OF_DOCS) - .fill(0) - .map((_, i) => { - return lr.put({ _id: `doc-${i}`, name: `doc-${i}` }); - }), - ); + await Array(AMOUNT_OF_DOCS) + .fill(0) + .reduce(async (acc, _, i) => { + await acc; + await lr.put({ _id: `doc-${i}`, name: `doc-${i}` }); + }, Promise.resolve()); }); afterEach(async () => { @@ -30,22 +39,32 @@ describe("Streaming API", () => { // 🛠️ // //////// - type Snapshot = AsyncGenerator>; - type Stream = ReadableStream<{ doc: DocWithId; marker: QueryStreamMarker }>; + type Snapshot = AsyncGenerator>; + type Stream = ReadableStream<{ + row: DocumentRow; + marker: QueryStreamMarker; + }>; - async function testSnapshot(snapshot: Snapshot, amountOfDocs: number) { + async function testSnapshot( + snapshot: Snapshot, + amountOfDocs: number, + ) { const docs = await Array.fromAsync(snapshot); expect(docs.length).toBe(amountOfDocs); } - async function testLive(stream: Stream, amountOfDocs: number) { + async function testLive( + stream: Stream, + amountOfDocs: number, + newProps: { prefix: string; key: string }, + ) { let docCount = 0; for await (const { marker } of stream) { docCount++; if (marker.kind === "preexisting" && marker.done) { - await lr.put({ _id: `doc-${amountOfDocs}`, name: `doc-${amountOfDocs}` }); + await lr.put({ _id: `${newProps.prefix}${amountOfDocs}`, [newProps.key]: `${newProps.prefix}${amountOfDocs}` }); } if (marker.kind === "new") break; @@ -55,26 +74,25 @@ describe("Streaming API", () => { // Test that the stream has been closed automatically by `for await` const r = stream.getReader(); - expect(r.closed).resolves.toBe(undefined); + await expect(r.closed).resolves.toBe(undefined); } - async function testSince({ + async function testSince({ snapshotCreator, streamCreator, }: { - snapshotCreator: (since: ClockHead) => Snapshot; - streamCreator: (since: ClockHead) => Stream; + snapshotCreator: (since: ClockHead) => Snapshot; + streamCreator: (since: ClockHead) => Stream; }) { const amountOfNewDocs = Math.floor(Math.random() * (10 - 1) + 1); const since = lr.clock; - await Promise.all( - Array(amountOfNewDocs) - .fill(0) - .map((_, i) => { - return lr.put({ _id: `doc-since-${i}`, since: `doc-since-${i}` }); - }), - ); + await Array(amountOfNewDocs) + .fill(0) + .reduce(async (acc, _, i) => { + await acc; + await lr.put({ _id: `doc-since-${i}`, since: `doc-since-${i}` }); + }, Promise.resolve()); const stream = streamCreator(since); let docCount = 0; @@ -88,31 +106,35 @@ describe("Streaming API", () => { // Test that the stream has been closed automatically by `for await` const r = stream.getReader(); - expect(r.closed).resolves.toBe(undefined); + await expect(r.closed).resolves.toBe(undefined); // Snapshot // NOTE: This also tests the stream cancellation process. - const amountOfSnapshotDocs = Math.floor(Math.random() * (10 - 1) + 1); + // NOTE: Concurrency limit disallows for using `Promise.all` with x items + const amountOfSnapshotDocs = Math.floor(Math.random() * (10 - 4) + 4); const sincePt2 = lr.clock; - await Promise.all( - Array(amountOfSnapshotDocs) - .fill(0) - .map((_, i) => { - return lr.put({ _id: `doc-snapshot-${i}`, since: `doc-snapshot-${i}` }); - }), - ); + await Array(amountOfSnapshotDocs) + .fill(0) + .reduce(async (acc, _, i) => { + await acc; + await lr.put({ _id: `doc-snapshot-${i}`, since: `doc-snapshot-${i}` }); + }, Promise.resolve()); const docs = await Array.fromAsync(snapshotCreator(sincePt2)); expect(docs.length).toBe(amountOfSnapshotDocs); } - async function testFuture(stream: Stream, amountOfDocs: number) { + async function testFuture( + stream: Stream, + amountOfDocs: number, + newProps: { prefix: string; key: string }, + ) { let docCount = 0; - await lr.put({ _id: `doc-${amountOfDocs + 0}`, name: `doc-${amountOfDocs + 0}` }); - await lr.put({ _id: `doc-${amountOfDocs + 1}`, name: `doc-${amountOfDocs + 1}` }); - await lr.put({ _id: `doc-${amountOfDocs + 2}`, name: `doc-${amountOfDocs + 2}` }); + await lr.put({ _id: `${newProps.prefix}${amountOfDocs + 0}`, [newProps.key]: `${newProps.prefix}${amountOfDocs + 0}` }); + await lr.put({ _id: `${newProps.prefix}${amountOfDocs + 1}`, [newProps.key]: `${newProps.prefix}${amountOfDocs + 1}` }); + await lr.put({ _id: `${newProps.prefix}${amountOfDocs + 2}`, [newProps.key]: `${newProps.prefix}${amountOfDocs + 2}` }); for await (const { marker } of stream) { if (marker.kind === "new") docCount++; @@ -122,19 +144,26 @@ describe("Streaming API", () => { expect(docCount).toBe(3); } - async function testSubscribe(queryResponse: QueryResponse) { - const doc = await new Promise((resolve) => { + async function testSubscribe( + queryResponse: QueryResponse, + ) { + const row = await new Promise((resolve) => { queryResponse.subscribe(resolve); lr.put({ _id: `doc-extra`, name: `doc-extra` }); }); - expect(doc).toBeTruthy(); - expect(doc).toHaveProperty("_id"); - expect(doc).toHaveProperty("name"); - expect((doc as DocType).name).toBe("doc-extra"); + expect(row).toBeTruthy(); + expect(row).toHaveProperty("id"); + expect(row).toHaveProperty("doc"); + expect((row as DocumentRow).doc).toHaveProperty("name"); + // TODO: + // expect((row as DocumentRow)?.doc).toBe("doc-extra"); } - async function testToArray(queryResponse: QueryResponse, amountOfDocs: number) { + async function testToArray( + queryResponse: QueryResponse, + amountOfDocs: number, + ) { const arr = await queryResponse.toArray(); expect(arr.length).toBe(amountOfDocs); } @@ -143,7 +172,7 @@ describe("Streaming API", () => { // ALL DOCS // ////////////// - describe("allDocs", () => { + describe.skip("allDocs", () => { it("test `snapshot` method", async () => { const snapshot = lr.allDocs().snapshot(); await testSnapshot(snapshot, AMOUNT_OF_DOCS); @@ -151,7 +180,7 @@ describe("Streaming API", () => { it("test `live` method", async () => { const stream = lr.allDocs().live(); - await testLive(stream, AMOUNT_OF_DOCS); + await testLive(stream, AMOUNT_OF_DOCS, { prefix: "doc-", key: "name" }); }); it("test `snapshot` and `live` method with `since` parameter", async () => { @@ -163,15 +192,15 @@ describe("Streaming API", () => { it("test `future` method", async () => { const stream = lr.allDocs().future(); - await testFuture(stream, AMOUNT_OF_DOCS); + await testFuture(stream, AMOUNT_OF_DOCS, { prefix: "doc-", key: "name" }); }); it("test `subscribe` method", async () => { - await testSubscribe(lr.allDocs()); + await testSubscribe(lr.allDocs()); }); it("test `toArray` method", async () => { - await testToArray(lr.allDocs(), AMOUNT_OF_DOCS); + await testToArray(lr.allDocs(), AMOUNT_OF_DOCS); }); }); @@ -189,7 +218,7 @@ describe("Streaming API", () => { it("test `live` method", async () => { const stream = lr.query("name").live(); - await testLive(stream, AMOUNT_OF_DOCS); + await testLive(stream, AMOUNT_OF_DOCS, { prefix: "doc-", key: "name" }); }); it("test `snapshot` and `live` method with `since` parameter", async () => { @@ -201,7 +230,7 @@ describe("Streaming API", () => { it("test `future` method", async () => { const stream = lr.query("name").future(); - await testFuture(stream, AMOUNT_OF_DOCS); + await testFuture(stream, AMOUNT_OF_DOCS, { prefix: "doc-", key: "name" }); }); it("test `subscribe` method", async () => { @@ -218,13 +247,12 @@ describe("Streaming API", () => { const AMOUNT_OF_ADDITIONAL_DOCS = 5; beforeEach(async () => { - await Promise.all( - Array(AMOUNT_OF_ADDITIONAL_DOCS) - .fill(0) - .map((_, i) => { - return lr.put({ _id: `doc-add-${i}`, additional: `doc-add-${i}` }); - }), - ); + await Array(AMOUNT_OF_ADDITIONAL_DOCS) + .fill(0) + .reduce(async (acc, _, i) => { + await acc; + await lr.put({ _id: `doc-add-${i}`, additional: `doc-add-${i}` }); + }, Promise.resolve()); }); it("test `snapshot` method", async () => { @@ -234,7 +262,7 @@ describe("Streaming API", () => { it("test `live` method", async () => { const stream = lr.query("additional").live(); - await testLive(stream, AMOUNT_OF_ADDITIONAL_DOCS); + await testLive(stream, AMOUNT_OF_ADDITIONAL_DOCS, { prefix: "doc-add-future-", key: "additional" }); }); it("test `snapshot` and `live` method with `since` parameter", async () => { @@ -246,7 +274,7 @@ describe("Streaming API", () => { it("test `future` method", async () => { const stream = lr.query("additional").future(); - await testFuture(stream, AMOUNT_OF_ADDITIONAL_DOCS); + await testFuture(stream, AMOUNT_OF_ADDITIONAL_DOCS, { prefix: "doc-add-", key: "additional" }); }); it("test `subscribe` method", async () => { From 909a2680f489021bf9509009b04171ed575f07f3 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 12 Feb 2025 14:36:47 +0100 Subject: [PATCH 20/22] fix: Query types in ledger --- src/indexer.ts | 1 + src/ledger.ts | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/indexer.ts b/src/indexer.ts index 1ccd7a168..ff8676104 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -182,6 +182,7 @@ export class Index & { excludeDocs: true }, intlOpts: { waitFor?: Promise }): InquiryResponse; + query(qryOpts: QueryOpts, { waitFor }: { waitFor?: Promise }): QueryResponse; query(qryOpts: QueryOpts = {}, { waitFor }: { waitFor?: Promise } = {}): QueryResponse { const stream = this.#stream.bind(this); diff --git a/src/ledger.ts b/src/ledger.ts index ba1627ccb..2f0a7a69c 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -78,7 +78,7 @@ export interface Ledger
> extends HasC allDocuments(): QueryResponse; query( - field: string | MapFn, + field: string | MapFn, opts?: QueryOpts, ): QueryResponse; compact(): Promise; @@ -169,7 +169,7 @@ export class LedgerShell
> implements return this.ref.allDocuments(); } query( - field: string | MapFn, + field: string | MapFn, opts?: QueryOpts, ): QueryResponse { return this.ref.query(field, opts); @@ -322,7 +322,7 @@ class LedgerImpl
> implements Ledger( - field: string | MapFn, + field: string | MapFn, opts: QueryOpts = {}, ): QueryResponse { this.logger.Debug().Any("field", field).Any("opts", opts).Msg("query"); @@ -331,7 +331,7 @@ class LedgerImpl
> implements Ledger({ crdt: _crdt }, field) : index({ crdt: _crdt }, makeName(field.toString()), field); - return idx.query(opts, { waitFor: this.ready() }); + return idx.query(opts, { waitFor: this.ready() }); } async compact() { From 0c16a909780675223354a81b893fa71403670ce7 Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 12 Feb 2025 14:52:26 +0100 Subject: [PATCH 21/22] fix: More inquiry types + a test --- src/ledger.ts | 32 +++++++++++++++++++++++++-- tests/fireproof/streaming-api.test.ts | 15 +++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/ledger.ts b/src/ledger.ts index 2f0a7a69c..f5a77a40f 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -20,6 +20,7 @@ import { PARAM, QueryResponse, ClockHead, + InquiryResponse, } from "./types.js"; import { DbMeta, SerdeGatewayInterceptor, StoreEnDeFile, StoreURIRuntime, StoreUrlsOpts } from "./blockstore/index.js"; import { ensureLogger, ensureSuperThis, NotFoundError, toSortedArray } from "./utils.js"; @@ -77,10 +78,19 @@ export interface Ledger
> extends HasC allDocs(): QueryResponse; allDocuments(): QueryResponse; + query( + field: string | MapFn, + opts: QueryOpts & { excludeDocs: true }, + ): InquiryResponse; query( field: string | MapFn, opts?: QueryOpts, ): QueryResponse; + query( + field: string | MapFn, + opts?: QueryOpts, + ): InquiryResponse | QueryResponse; + compact(): Promise; } @@ -168,12 +178,22 @@ export class LedgerShell
> implements allDocuments(): QueryResponse { return this.ref.allDocuments(); } + + query( + field: string | MapFn, + opts: QueryOpts & { excludeDocs: true }, + ): InquiryResponse; + query( + field: string | MapFn, + opts?: QueryOpts, + ): QueryResponse; query( field: string | MapFn, opts?: QueryOpts, - ): QueryResponse { + ): InquiryResponse | QueryResponse { return this.ref.query(field, opts); } + compact(): Promise { return this.ref.compact(); } @@ -321,10 +341,18 @@ class LedgerImpl
> implements Ledger( + field: string | MapFn, + opts: QueryOpts & { excludeDocs: true }, + ): InquiryResponse; + query( + field: string | MapFn, + opts?: QueryOpts, + ): QueryResponse; query( field: string | MapFn, opts: QueryOpts = {}, - ): QueryResponse { + ): InquiryResponse | QueryResponse { this.logger.Debug().Any("field", field).Any("opts", opts).Msg("query"); const _crdt = this.crdt as unknown as CRDT; const idx = diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index 152571e75..a6f4e531b 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -285,5 +285,20 @@ describe("Streaming API", () => { await testToArray(lr.query("additional"), AMOUNT_OF_ADDITIONAL_DOCS); }); }); + + // EXCLUDE DOCS + describe("excludeDocs", () => { + it("inquiry", async () => { + const inquiry = lr.query("name", { + excludeDocs: true, + }); + + const arr = await inquiry.toArray(); + const doc = arr[0]; + + expect(doc).toBeTruthy(); + expect(doc).not.toHaveProperty("doc"); + }); + }); }); }); From 0e85632e9bd0d7d73764f889c370a0d94acf02cd Mon Sep 17 00:00:00 2001 From: Steven Vandevelde Date: Wed, 12 Feb 2025 15:49:40 +0100 Subject: [PATCH 22/22] refactor: Combine .query and .allDocs into .select() --- src/ledger.ts | 71 ++++++++++++++------------- tests/fireproof/streaming-api.test.ts | 44 ++++++++--------- 2 files changed, 59 insertions(+), 56 deletions(-) diff --git a/src/ledger.ts b/src/ledger.ts index f5a77a40f..fe036443a 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -75,20 +75,22 @@ export interface Ledger
> extends HasC put(doc: DocSet): Promise; bulk(docs: DocSet[]): Promise; del(id: string): Promise; - allDocs(): QueryResponse; - allDocuments(): QueryResponse; - query( + select( + opts: QueryOpts & { excludeDocs: true }, + ): InquiryResponse; + select(opts?: QueryOpts): QueryResponse; + select( field: string | MapFn, opts: QueryOpts & { excludeDocs: true }, ): InquiryResponse; - query( + select( field: string | MapFn, opts?: QueryOpts, ): QueryResponse; - query( - field: string | MapFn, - opts?: QueryOpts, + select( + a?: string | MapFn | QueryOpts, + b?: QueryOpts, ): InquiryResponse | QueryResponse; compact(): Promise; @@ -172,26 +174,26 @@ export class LedgerShell
> implements del(id: string): Promise { return this.ref.del(id); } - allDocs(): QueryResponse { - return this.ref.allDocs(); - } - allDocuments(): QueryResponse { - return this.ref.allDocuments(); - } - query( + select( + opts: QueryOpts & { excludeDocs: true }, + ): InquiryResponse; + select(opts?: QueryOpts): QueryResponse; + select( field: string | MapFn, opts: QueryOpts & { excludeDocs: true }, ): InquiryResponse; - query( + select( field: string | MapFn, opts?: QueryOpts, ): QueryResponse; - query( - field: string | MapFn, - opts?: QueryOpts, + select( + a?: string | MapFn | QueryOpts, + b?: QueryOpts, ): InquiryResponse | QueryResponse { - return this.ref.query(field, opts); + const field = typeof a === "string" || typeof a === "function" ? a : undefined; + const opts = b ? b : typeof a === "object" ? a : {}; + return field ? this.ref.select(field, opts) : this.ref.select(opts); } compact(): Promise { @@ -331,29 +333,30 @@ class LedgerImpl
> implements Ledger(): QueryResponse { - this.logger.Debug().Msg("allDocs"); - return this.crdt.allDocs({ waitFor: this.ready() }); - } - - allDocuments(): QueryResponse { - return this.allDocs(); - } - // todo if we add this onto dbs in fireproof.ts then we can make index.ts a separate package - query( + select( + opts: QueryOpts & { excludeDocs: true }, + ): InquiryResponse; + select(opts?: QueryOpts): QueryResponse; + select( field: string | MapFn, opts: QueryOpts & { excludeDocs: true }, ): InquiryResponse; - query( + select( field: string | MapFn, opts?: QueryOpts, ): QueryResponse; - query( - field: string | MapFn, - opts: QueryOpts = {}, + select( + a?: string | MapFn | QueryOpts, + b?: QueryOpts, ): InquiryResponse | QueryResponse { - this.logger.Debug().Any("field", field).Any("opts", opts).Msg("query"); + const field = typeof a === "string" || typeof a === "function" ? a : undefined; + const opts = b ? b : typeof a === "object" ? a : {}; + + if (!field) { + return this.crdt.allDocs({ waitFor: this.ready() }); + } + const _crdt = this.crdt as unknown as CRDT; const idx = typeof field === "string" diff --git a/tests/fireproof/streaming-api.test.ts b/tests/fireproof/streaming-api.test.ts index a6f4e531b..e4834928b 100644 --- a/tests/fireproof/streaming-api.test.ts +++ b/tests/fireproof/streaming-api.test.ts @@ -174,33 +174,33 @@ describe("Streaming API", () => { describe.skip("allDocs", () => { it("test `snapshot` method", async () => { - const snapshot = lr.allDocs().snapshot(); + const snapshot = lr.select().snapshot(); await testSnapshot(snapshot, AMOUNT_OF_DOCS); }); it("test `live` method", async () => { - const stream = lr.allDocs().live(); + const stream = lr.select().live(); await testLive(stream, AMOUNT_OF_DOCS, { prefix: "doc-", key: "name" }); }); it("test `snapshot` and `live` method with `since` parameter", async () => { await testSince({ - snapshotCreator: (since) => lr.allDocs().snapshot({ since }), - streamCreator: (since) => lr.allDocs().live({ since }), + snapshotCreator: (since) => lr.select().snapshot({ since }), + streamCreator: (since) => lr.select().live({ since }), }); }); it("test `future` method", async () => { - const stream = lr.allDocs().future(); + const stream = lr.select().future(); await testFuture(stream, AMOUNT_OF_DOCS, { prefix: "doc-", key: "name" }); }); it("test `subscribe` method", async () => { - await testSubscribe(lr.allDocs()); + await testSubscribe(lr.select()); }); it("test `toArray` method", async () => { - await testToArray(lr.allDocs(), AMOUNT_OF_DOCS); + await testToArray(lr.select(), AMOUNT_OF_DOCS); }); }); @@ -212,33 +212,33 @@ describe("Streaming API", () => { // ALL describe("all", () => { it("test `snapshot` method", async () => { - const snapshot = lr.query("name").snapshot(); + const snapshot = lr.select("name").snapshot(); await testSnapshot(snapshot, AMOUNT_OF_DOCS); }); it("test `live` method", async () => { - const stream = lr.query("name").live(); + const stream = lr.select("name").live(); await testLive(stream, AMOUNT_OF_DOCS, { prefix: "doc-", key: "name" }); }); it("test `snapshot` and `live` method with `since` parameter", async () => { await testSince({ - snapshotCreator: (since) => lr.query("since").snapshot({ since }), - streamCreator: (since) => lr.query("since").live({ since }), + snapshotCreator: (since) => lr.select("since").snapshot({ since }), + streamCreator: (since) => lr.select("since").live({ since }), }); }); it("test `future` method", async () => { - const stream = lr.query("name").future(); + const stream = lr.select("name").future(); await testFuture(stream, AMOUNT_OF_DOCS, { prefix: "doc-", key: "name" }); }); it("test `subscribe` method", async () => { - await testSubscribe(lr.query("name")); + await testSubscribe(lr.select("name")); }); it("test `toArray` method", async () => { - await testToArray(lr.query("name"), AMOUNT_OF_DOCS); + await testToArray(lr.select("name"), AMOUNT_OF_DOCS); }); }); @@ -256,40 +256,40 @@ describe("Streaming API", () => { }); it("test `snapshot` method", async () => { - const snapshot = lr.query("additional").snapshot(); + const snapshot = lr.select("additional").snapshot(); await testSnapshot(snapshot, AMOUNT_OF_ADDITIONAL_DOCS); }); it("test `live` method", async () => { - const stream = lr.query("additional").live(); + const stream = lr.select("additional").live(); await testLive(stream, AMOUNT_OF_ADDITIONAL_DOCS, { prefix: "doc-add-future-", key: "additional" }); }); it("test `snapshot` and `live` method with `since` parameter", async () => { await testSince({ - snapshotCreator: (since) => lr.query("since").snapshot({ since }), - streamCreator: (since) => lr.query("since").live({ since }), + snapshotCreator: (since) => lr.select("since").snapshot({ since }), + streamCreator: (since) => lr.select("since").live({ since }), }); }); it("test `future` method", async () => { - const stream = lr.query("additional").future(); + const stream = lr.select("additional").future(); await testFuture(stream, AMOUNT_OF_ADDITIONAL_DOCS, { prefix: "doc-add-", key: "additional" }); }); it("test `subscribe` method", async () => { - await testSubscribe(lr.query("name")); + await testSubscribe(lr.select("name")); }); it("test `toArray` method", async () => { - await testToArray(lr.query("additional"), AMOUNT_OF_ADDITIONAL_DOCS); + await testToArray(lr.select("additional"), AMOUNT_OF_ADDITIONAL_DOCS); }); }); // EXCLUDE DOCS describe("excludeDocs", () => { it("inquiry", async () => { - const inquiry = lr.query("name", { + const inquiry = lr.select("name", { excludeDocs: true, });