Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming allDocs/query API #564

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a2e1351
feature: specs
mabels Dec 17, 2024
4894a40
feat: Add ReadableStream to the allDocs return value
icidasset Dec 18, 2024
a494aa3
fix: Live, not future
icidasset Dec 18, 2024
c9aadd7
test: future
icidasset Dec 18, 2024
0d318b5
fix: Unsub when stream is cancelled
icidasset Jan 14, 2025
8e44811
chore: Map async iterables for snapshots
icidasset Jan 14, 2025
88abf59
chore: Make clockChangesSince an async generator
icidasset Jan 14, 2025
90ac50b
feat: Add ledger.clock prop, add snapshot(since) option and test sinc…
icidasset Jan 15, 2025
9ddcd0b
fix: Current version of query (before refactor)
icidasset Jan 15, 2025
4961326
feat: Refactor query (wip)
icidasset Jan 22, 2025
98ad4a3
test: Query methods
icidasset Jan 27, 2025
e9138e3
feat: Query future
icidasset Jan 28, 2025
adaf0cc
feat: subscribe method
icidasset Jan 29, 2025
68aed90
feat: Remove sinceOptions key and merge it into parent options obj
icidasset Jan 30, 2025
634f593
test: If the default reader of a stream has been closed automatically…
icidasset Feb 3, 2025
032906d
refactor: Inner functions out of allDocs & query
icidasset Feb 4, 2025
e991e75
feat: Add convenience toArray method to QueryResponse
icidasset Feb 5, 2025
e15266a
refactor: Use util fn instead of Array.fromAsync
icidasset Feb 5, 2025
fdd9ee9
refactor: .query() refactor to include prolly tree's key + value & ex…
icidasset Feb 11, 2025
909a268
fix: Query types in ledger
icidasset Feb 12, 2025
0c16a90
fix: More inquiry types + a test
icidasset Feb 12, 2025
0e85632
refactor: Combine .query and .allDocs into .select()
icidasset Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build-docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ then
else
git status
fi

6 changes: 4 additions & 2 deletions src/crdt-clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ export class CRDTClock<T extends DocTypes> {
async processUpdates(updatesAcc: DocUpdate<T>[], all: boolean, prevHead: ClockHead) {
let internalUpdates = updatesAcc;
if (this.watchers.size && !all) {
const changes = await clockChangesSince<T>(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger);
internalUpdates = changes.result;
const changes = await Array.fromAsync(clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger));
internalUpdates = changes;
}
this.zoomers.forEach((fn) => fn());
this.notifyWatchers(internalUpdates || []);
Expand All @@ -72,10 +72,12 @@ export class CRDTClock<T extends DocTypes> {

onTick(fn: (updates: DocUpdate<T>[]) => void) {
this.watchers.add(fn);
return () => this.watchers.delete(fn);
}

onTock(fn: () => void) {
this.emptyWatchers.add(fn);
return () => this.emptyWatchers.delete(fn);
}

onZoom(fn: () => void) {
Expand Down
65 changes: 38 additions & 27 deletions src/crdt-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { parse } from "multiformats/link";
import { sha256 as hasher } from "multiformats/hashes/sha2";
import * as codec from "@fireproof/vendor/@ipld/dag-cbor";
import { put, get, entries, root } from "@fireproof/vendor/@web3-storage/pail/crdt";
import { EventBlockView, EventLink, Operation, PutOperation } from "@fireproof/vendor/@web3-storage/pail/crdt/api";
import { EventBlockView, EventLink, Operation, PutOperation, UnknownLink } from "@fireproof/vendor/@web3-storage/pail/crdt/api";
import { EventFetcher, vis } from "@fireproof/vendor/@web3-storage/pail/clock";
import * as Batch from "@fireproof/vendor/@web3-storage/pail/crdt/batch";
import {
Expand All @@ -29,6 +29,7 @@ import {
type DocWithId,
type DocTypes,
throwFalsy,
ClockLink,
} from "./types.js";
import { Result } from "@fireproof/vendor/@web3-storage/pail/crdt/api";
import { Logger } from "@adviser/cement";
Expand Down Expand Up @@ -86,6 +87,10 @@ export async function applyBulkUpdateToCrdt<T extends DocTypes>(
return { head: result.head } as CRDTMeta;
}

export function docUpdateToDocWithId<T extends DocTypes>({ id, del, value }: DocUpdate<T>): DocWithId<T> {
return (del ? { _id: id, _deleted: true } : { _id: id, ...value }) as DocWithId<T>;
}

// this whole thing can get pulled outside of the write queue
async function writeDocContent<T extends DocTypes>(
store: StoreRuntime,
Expand Down Expand Up @@ -234,50 +239,56 @@ class DirtyEventFetcher<T> extends EventFetcher<T> {
}
}

export async function clockChangesSince<T extends DocTypes>(
export async function* clockUpdatesSince<T extends DocTypes>(
blocks: BlockFetcher,
head: ClockHead,
since: ClockHead,
opts: ChangesOptions,
logger: Logger,
): Promise<{ result: DocUpdate<T>[]; head: ClockHead }> {
allowedKeys?: Set<string>,
): AsyncGenerator<DocUpdate<T>> {
for await (const { id, clock, docLink } of clockChangesSince(blocks, head, since, opts, logger, allowedKeys)) {
const docValue = await getValueFromLink<T>(blocks, docLink, logger);
yield { id, value: docValue.doc, del: docValue.del, clock };
}
}

export function clockChangesSince(
blocks: BlockFetcher,
head: ClockHead,
since: ClockHead,
opts: ChangesOptions,
logger: Logger,
allowedKeys?: Set<string>,
): AsyncGenerator<{ id: string; docLink: UnknownLink; clock: ClockLink }> {
const eventsFetcher = (
opts.dirty ? new DirtyEventFetcher<Operation>(logger, blocks) : new EventFetcher<Operation>(blocks)
) as EventFetcher<Operation>;
const keys = new Set<string>();
const updates = await gatherUpdates<T>(
blocks,
eventsFetcher,
head,
since,
[],
keys,
new Set<string>(),
opts.limit || Infinity,
logger,
);
return { result: updates.reverse(), head };
return gatherUpdates(eventsFetcher, head, since, keys, new Set<string>(), opts.limit || Infinity, logger, allowedKeys);
}

async function gatherUpdates<T extends DocTypes>(
blocks: BlockFetcher,
async function* gatherUpdates(
eventsFetcher: EventFetcher<Operation>,
head: ClockHead,
since: ClockHead,
updates: DocUpdate<T>[] = [],
keys: Set<string>,
didLinks: Set<string>,
limit: number,
logger: Logger,
): Promise<DocUpdate<T>[]> {
if (limit <= 0) return updates;
allowedKeys?: Set<string>,
): AsyncGenerator<{ id: string; docLink: UnknownLink; clock: ClockLink }> {
if (limit <= 0) return;

// if (Math.random() < 0.001) console.log('gatherUpdates', head.length, since.length, updates.length)
const sHead = head.map((l) => l.toString());

for (const link of since) {
if (sHead.includes(link.toString())) {
return updates;
return;
}
}

for (const link of head) {
if (didLinks.has(link.toString())) continue;
didLinks.add(link.toString());
Expand All @@ -292,19 +303,17 @@ async function gatherUpdates<T extends DocTypes>(
}
for (let i = ops.length - 1; i >= 0; i--) {
const { key, value } = ops[i];
if (!keys.has(key)) {
if (!keys.has(key) && (allowedKeys === undefined || allowedKeys.has(key))) {
// todo option to see all updates
const docValue = await getValueFromLink<T>(blocks, value, logger);
updates.push({ id: key, value: docValue.doc, del: docValue.del, clock: link });
yield { id: key, docLink: value, clock: link };
limit--;
keys.add(key);
}
}
if (event.parents) {
updates = await gatherUpdates(blocks, eventsFetcher, event.parents, since, updates, keys, didLinks, limit, logger);
yield* gatherUpdates(eventsFetcher, event.parents, since, keys, didLinks, limit, logger);
}
}
return updates;
}

export async function* getAllEntries<T extends DocTypes>(blocks: BlockFetcher, head: ClockHead, logger: Logger) {
Expand Down Expand Up @@ -381,7 +390,9 @@ export async function doCompact(blockLog: CompactFetcher, head: ClockHead, logge
timeEnd("compact root blocks");

time("compact changes");
await clockChangesSince(blockLog, head, [], {}, logger);
for await (const x of clockChangesSince(blockLog, head, [], {}, logger)) {
void x;
}
timeEnd("compact changes");

isCompacting = false;
Expand Down
139 changes: 118 additions & 21 deletions src/crdt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,36 @@ import {
toStoreRuntime,
} from "./blockstore/index.js";
import {
clockChangesSince,
applyBulkUpdateToCrdt,
getValueFromCrdt,
readFiles,
getAllEntries,
clockVis,
getBlock,
doCompact,
docUpdateToDocWithId,
getAllEntries,
clockUpdatesSince,
} from "./crdt-helpers.js";
import type {
DocUpdate,
CRDTMeta,
ClockHead,
ChangesOptions,
DocValue,
IndexKeyType,
DocWithId,
DocTypes,
Falsy,
SuperThis,
IndexTransactionMeta,
QueryResponse,
ListenerFn,
QueryStreamMarker,
ChangesOptions,
} from "./types.js";
import { index, type Index } from "./indexer.js";
import { CRDTClock } from "./crdt-clock.js";
// import { blockstoreFactory } from "./blockstore/transaction.js";
import { ensureLogger } from "./utils.js";
import { arrayFromAsyncIterable, ensureLogger } from "./utils.js";
import { LedgerOpts } from "./ledger.js";

export interface HasCRDT<T extends DocTypes> {
Expand All @@ -55,6 +59,11 @@ export class CRDT<T extends DocTypes> {
readonly logger: Logger;
readonly sthis: SuperThis;

// Subscriptions
_listening = false;
readonly _listeners = new Set<ListenerFn<T>>();
readonly _noupdate_listeners = new Set<ListenerFn<T>>();

constructor(sthis: SuperThis, opts: LedgerOpts) {
this.sthis = sthis;
this.logger = ensureLogger(sthis, "CRDT");
Expand Down Expand Up @@ -151,13 +160,104 @@ export class CRDT<T extends DocTypes> {

// if (snap) await this.clock.applyHead(crdtMeta.head, this.clock.head)

async allDocs(): Promise<{ result: DocUpdate<T>[]; head: ClockHead }> {
await this.ready();
const result: DocUpdate<T>[] = [];
for await (const entry of getAllEntries<T>(this.blockstore, this.clock.head, this.logger)) {
result.push(entry);
/**
* Retrieve the current set of documents.
*/
allDocs<T extends DocTypes>({ waitFor }: { waitFor?: Promise<unknown> } = {}): QueryResponse<T> {
const stream = this.#stream.bind(this);

return {
snapshot: (sinceOpts) => this.#snapshot<T>(sinceOpts, { waitFor }),
subscribe: (callback) => this.#subscribe<T>(callback),
toArray: (sinceOpts) => arrayFromAsyncIterable(this.#snapshot<T>(sinceOpts, { waitFor })),

live(opts?: { since?: ClockHead } & ChangesOptions) {
return stream<T>({ ...opts, futureOnly: false }, { waitFor });
},
future() {
return stream<T>({ futureOnly: true }, { waitFor });
},
};
}

#currentDocs<T extends DocTypes>(since?: ClockHead, sinceOptions?: ChangesOptions) {
return since ? this.changes<T>(since, sinceOptions) : this.all<T>();
}

#snapshot<T extends DocTypes>(
opts: { since?: ClockHead } & ChangesOptions = {},
{ waitFor }: { waitFor?: Promise<unknown> } = {},
): AsyncGenerator<DocWithId<T>> {
const currentDocs = this.#currentDocs.bind(this);
const ready = this.ready.bind(this);

async function* currentDocsWithId() {
await waitFor;
await ready();

for await (const doc of currentDocs<T>(opts.since, opts)) {
yield docUpdateToDocWithId(doc);
}
}
return { result, head: this.clock.head };

return currentDocsWithId();
}

#subscribe<T extends DocTypes>(callback: (doc: DocWithId<T>) => void) {
const unsubscribe = this.clock.onTick((updates: DocUpdate<NonNullable<unknown>>[]) => {
updates.forEach((update) => {
callback(docUpdateToDocWithId(update as DocUpdate<T>));
});
});

return unsubscribe;
}

#stream<T extends DocTypes>(
opts: { futureOnly: boolean; since?: ClockHead } & ChangesOptions,
{ waitFor }: { waitFor?: Promise<unknown> } = {},
) {
const currentDocs = this.#currentDocs.bind(this);
const ready = this.ready.bind(this);
const subscribe = this.#subscribe.bind(this);

let unsubscribe: undefined | (() => void);
let isClosed = false;

return new ReadableStream<{ doc: DocWithId<T>; marker: QueryStreamMarker }>({
async start(controller) {
await waitFor;
await ready();

if (opts.futureOnly === false) {
const it = currentDocs<T>(opts.since, opts);

async function iterate(prevValue: DocUpdate<T>) {
const { done, value } = await it.next();

controller.enqueue({
doc: docUpdateToDocWithId(prevValue),
marker: { kind: "preexisting", done: done || false },
});

if (!done) await iterate(value);
}

const { value } = await it.next();
if (value) await iterate(value);
}

unsubscribe = subscribe<T>((doc) => {
if (isClosed) return;
controller.enqueue({ doc, marker: { kind: "new" } });
});
},

cancel() {
isClosed = true;
unsubscribe?.();
},
});
}

async vis(): Promise<string> {
Expand All @@ -169,6 +269,14 @@ export class CRDT<T extends DocTypes> {
return txt.join("\n");
}

all<T extends DocTypes>(): AsyncGenerator<DocUpdate<T>> {
return getAllEntries<T>(this.blockstore, this.clock.head, this.logger);
}

changes<T extends DocTypes>(since: ClockHead = [], opts: ChangesOptions = {}): AsyncGenerator<DocUpdate<T>> {
return clockUpdatesSince<T>(this.blockstore, this.clock.head, since, opts, this.logger);
}

async getBlock(cidString: string): Promise<Block> {
await this.ready();
return await getBlock(this.blockstore, cidString);
Expand All @@ -181,17 +289,6 @@ export class CRDT<T extends DocTypes> {
return result;
}

async changes(
since: ClockHead = [],
opts: ChangesOptions = {},
): Promise<{
result: DocUpdate<T>[];
head: ClockHead;
}> {
await this.ready();
return await clockChangesSince<T>(this.blockstore, this.clock.head, since, opts, this.logger);
}

async compact(): Promise<void> {
const blocks = this.blockstore as EncryptedBlockstore;
return await blocks.compact();
Expand Down
Loading
Loading