Skip to content

Commit

Permalink
feat: Query future
Browse files Browse the repository at this point in the history
  • Loading branch information
icidasset committed Jan 28, 2025
1 parent b556888 commit 1708c2c
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 44 deletions.
6 changes: 4 additions & 2 deletions src/crdt-clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ export class CRDTClock<T extends DocTypes> {
async processUpdates(updatesAcc: DocUpdate<T>[], all: boolean, prevHead: ClockHead) {
let internalUpdates = updatesAcc;
if (this.watchers.size && !all) {
const changes = await clockChangesSince<T>(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger);
internalUpdates = changes.result;
const changes = await Array.fromAsync(
clockChangesSince<T>(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger),
);
internalUpdates = changes;
}
this.zoomers.forEach((fn) => fn());
this.notifyWatchers(internalUpdates || []);
Expand Down
39 changes: 28 additions & 11 deletions src/crdt-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { parse } from "multiformats/link";
import { sha256 as hasher } from "multiformats/hashes/sha2";
import * as codec from "@fireproof/vendor/@ipld/dag-cbor";
import { put, get, entries, root } from "@fireproof/vendor/@web3-storage/pail/crdt";
import { EventBlockView, EventLink, Operation, PutOperation } from "@fireproof/vendor/@web3-storage/pail/crdt/api";
import { EventBlockView, EventLink, Operation, PutOperation, UnknownLink } from "@fireproof/vendor/@web3-storage/pail/crdt/api";
import { EventFetcher, vis } from "@fireproof/vendor/@web3-storage/pail/clock";
import * as Batch from "@fireproof/vendor/@web3-storage/pail/crdt/batch";
import {
Expand All @@ -29,6 +29,7 @@ import {
type DocWithId,
type DocTypes,
throwFalsy,
ClockLink,
} from "./types.js";
import { Result } from "@fireproof/vendor/@web3-storage/pail/crdt/api";
import { Logger } from "@adviser/cement";
Expand Down Expand Up @@ -238,30 +239,45 @@ class DirtyEventFetcher<T> extends EventFetcher<T> {
}
}

export function clockChangesSince<T extends DocTypes>(
export async function* clockUpdatesSince<T extends DocTypes>(
blocks: BlockFetcher,
head: ClockHead,
since: ClockHead,
opts: ChangesOptions,
logger: Logger,
allowedKeys?: Set<string>,
): AsyncGenerator<DocUpdate<T>> {
for await (const { id, clock, docLink } of clockChangesSince(blocks, head, since, opts, logger, allowedKeys)) {
const docValue = await getValueFromLink<T>(blocks, docLink, logger);
yield { id, value: docValue.doc, del: docValue.del, clock };
}
}

export function clockChangesSince(
blocks: BlockFetcher,
head: ClockHead,
since: ClockHead,
opts: ChangesOptions,
logger: Logger,
allowedKeys?: Set<string>,
): AsyncGenerator<{ id: string; docLink: UnknownLink; clock: ClockLink }> {
const eventsFetcher = (
opts.dirty ? new DirtyEventFetcher<Operation>(logger, blocks) : new EventFetcher<Operation>(blocks)
) as EventFetcher<Operation>;
const keys = new Set<string>();
return gatherUpdates<T>(blocks, eventsFetcher, head, since, keys, new Set<string>(), opts.limit || Infinity, logger);
return gatherUpdates(eventsFetcher, head, since, keys, new Set<string>(), opts.limit || Infinity, logger, allowedKeys);
}

async function* gatherUpdates<T extends DocTypes>(
blocks: BlockFetcher,
async function* gatherUpdates(
eventsFetcher: EventFetcher<Operation>,
head: ClockHead,
since: ClockHead,
keys: Set<string>,
didLinks: Set<string>,
limit: number,
logger: Logger,
): AsyncGenerator<DocUpdate<T>> {
allowedKeys?: Set<string>,
): AsyncGenerator<{ id: string; docLink: UnknownLink; clock: ClockLink }> {
if (limit <= 0) return;

// if (Math.random() < 0.001) console.log('gatherUpdates', head.length, since.length, updates.length)
Expand All @@ -287,16 +303,15 @@ async function* gatherUpdates<T extends DocTypes>(
}
for (let i = ops.length - 1; i >= 0; i--) {
const { key, value } = ops[i];
if (!keys.has(key)) {
if (!keys.has(key) && (allowedKeys === undefined || allowedKeys.has(key))) {
// todo option to see all updates
const docValue = await getValueFromLink<T>(blocks, value, logger);
yield { id: key, value: docValue.doc, del: docValue.del, clock: link };
yield { id: key, docLink: value, clock: link };
limit--;
keys.add(key);
}
}
if (event.parents) {
yield* gatherUpdates(blocks, eventsFetcher, event.parents, since, keys, didLinks, limit, logger);
yield* gatherUpdates(eventsFetcher, event.parents, since, keys, didLinks, limit, logger);
}
}
}
Expand Down Expand Up @@ -375,7 +390,9 @@ export async function doCompact(blockLog: CompactFetcher, head: ClockHead, logge
timeEnd("compact root blocks");

time("compact changes");
await clockChangesSince(blockLog, head, [], {}, logger);
for await (const x of clockChangesSince(blockLog, head, [], {}, logger)) {
void x;
}
timeEnd("compact changes");

isCompacting = false;
Expand Down
4 changes: 2 additions & 2 deletions src/crdt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import {
applyBulkUpdateToCrdt,
getValueFromCrdt,
readFiles,
clockChangesSince,
clockVis,
getBlock,
doCompact,
docUpdateToDocWithId,
getAllEntries,
clockUpdatesSince,
} from "./crdt-helpers.js";
import type {
DocUpdate,
Expand Down Expand Up @@ -255,7 +255,7 @@ export class CRDT<T extends DocTypes> {
}

changes<T extends DocTypes>(since: ClockHead = [], opts: ChangesOptions = {}): AsyncGenerator<DocUpdate<T>> {
return clockChangesSince<T>(this.blockstore, this.clock.head, since, opts, this.logger);
return clockUpdatesSince<T>(this.blockstore, this.clock.head, since, opts, this.logger);
}

async getBlock(cidString: string): Promise<Block> {
Expand Down
22 changes: 20 additions & 2 deletions src/indexer-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ import {
DocTypes,
DocObject,
IndexUpdateString,
ClockHead,
ChangesOptions,
} from "./types.js";
import { CarTransaction, BlockFetcher, AnyLink, AnyBlock } from "./blockstore/index.js";
import { CRDT } from "./crdt.js";
import { Logger } from "@adviser/cement";
import { clockChangesSince } from "./crdt-helpers.js";

export class IndexTree<K extends IndexKeyType, R extends DocFragment> {
cid?: AnyLink;
Expand Down Expand Up @@ -160,13 +163,28 @@ export async function loadIndex<K extends IndexKeyType, T extends DocFragment, C
}

export async function* applyQuery<K extends IndexKeyType, T extends DocObject, R extends DocFragment>(
crdt: CRDT<T>,
{ crdt, logger }: { crdt: CRDT<T>; logger: Logger },
resp: { result: ProllyIndexRow<K, R>[] },
query: QueryOpts<K>,
query: QueryOpts<K> & { since?: ClockHead; sinceOptions?: ChangesOptions },
): AsyncGenerator<DocWithId<T>> {
async function* _apply() {
let result = [...resp.result];

if (query.since) {
const gen = clockChangesSince(crdt.blockstore, crdt.clock.head, query.since, query.sinceOptions || {}, logger);
const ids = await Array.fromAsync(gen)
.then((arr) => arr.map((a) => a.id))
.then((arr) => new Set(arr));
result = result.reduce((acc: ProllyIndexRow<K, R>[], row) => {
if (ids.has(row.id)) {
ids.delete(row.id);
return [...acc, row];
}

return acc;
}, []);
}

if (query.descending) result = result.reverse();
if (query.limit) result = result.slice(0, query.limit);

Expand Down
38 changes: 21 additions & 17 deletions src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,32 +181,36 @@ export class Index<K extends IndexKeyType, T extends DocTypes, R extends DocFrag

query(opts: QueryOpts<K> = {}, { waitFor }: { waitFor?: Promise<unknown> } = {}): QueryResponse<T> {
const query = async (since?: ClockHead, sinceOptions?: ChangesOptions) => {
// TODO:
void since;
void sinceOptions;
const deps = { crdt: this.crdt, logger: this.logger };
const qry = { ...opts, since, sinceOptions };

if (!this.byKey.root) {
return applyQuery<K, T, R>(this.crdt, { result: [] }, opts);
return applyQuery<K, T, R>(deps, { result: [] }, qry);
}
if (opts.range) {
const eRange = encodeRange(opts.range);
return applyQuery<K, T, R>(this.crdt, await throwFalsy(this.byKey.root).range(eRange[0], eRange[1]), opts);

if (qry.range) {
const eRange = encodeRange(qry.range);
return applyQuery<K, T, R>(deps, await throwFalsy(this.byKey.root).range(eRange[0], eRange[1]), qry);
}
if (opts.key) {
const encodedKey = encodeKey(opts.key);
return applyQuery<K, T, R>(this.crdt, await throwFalsy(this.byKey.root).get(encodedKey), opts);

if (qry.key) {
const encodedKey = encodeKey(qry.key);
return applyQuery<K, T, R>(deps, await throwFalsy(this.byKey.root).get(encodedKey), qry);
}
if (opts.prefix) {
if (!Array.isArray(opts.prefix)) opts.prefix = [opts.prefix];

if (qry.prefix) {
if (!Array.isArray(qry.prefix)) qry.prefix = [qry.prefix];
// prefix should be always an array
const start = [...opts.prefix, NaN];
const end = [...opts.prefix, Infinity];
const start = [...qry.prefix, NaN];
const end = [...qry.prefix, Infinity];
const encodedR = encodeRange([start, end]);
return applyQuery<K, T, R>(this.crdt, await this.byKey.root.range(...encodedR), opts);
return applyQuery<K, T, R>(deps, await this.byKey.root.range(...encodedR), qry);
}

const all = await this.byKey.root.getAllEntries(); // funky return type

return applyQuery<K, T, R>(
this.crdt,
deps,
{
// @ts-expect-error getAllEntries returns a different type than range
result: all.result.map(({ key: [k, id], value }) => ({
Expand All @@ -215,7 +219,7 @@ export class Index<K extends IndexKeyType, T extends DocTypes, R extends DocFrag
value,
})),
},
opts,
qry,
);
};

Expand Down
26 changes: 16 additions & 10 deletions tests/fireproof/streaming-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ describe("Streaming API", () => {
Array(amountOfNewDocs)
.fill(0)
.map((_, i) => {
return lr.put({ _id: `doc-since-${i}`, name: `doc-since-${i}` });
return lr.put({ _id: `doc-since-${i}`, since: `doc-since-${i}` });
}),
);

Expand All @@ -133,14 +133,14 @@ describe("Streaming API", () => {

// Snapshot
// NOTE: This also tests the stream cancellation process.
const amountOfSnapshotDocs = 3;
const amountOfSnapshotDocs = Math.floor(Math.random() * (10 - 1) + 1);
const sincePt2 = lr.clock;

await Promise.all(
Array(amountOfSnapshotDocs)
.fill(0)
.map((_, i) => {
return lr.put({ _id: `doc-snapshot-${i}`, name: `doc-snapshot-${i}` });
return lr.put({ _id: `doc-snapshot-${i}`, since: `doc-snapshot-${i}` });
}),
);

Expand Down Expand Up @@ -208,13 +208,12 @@ describe("Streaming API", () => {
await testLive(stream, AMOUNT_OF_DOCS);
});

// TODO:
// it("test `snapshot` and `live` method with `since` parameter", async () => {
// await testSince({
// snapshotCreator: (since) => lr.query("name").snapshot({ since }),
// streamCreator: (since) => lr.query("name").live({ since }),
// });
// });
it("test `snapshot` and `live` method with `since` parameter", async () => {
await testSince({
snapshotCreator: (since) => lr.query("since").snapshot({ since }),
streamCreator: (since) => lr.query("since").live({ since }),
});
});

it("test `future` method", async () => {
const stream = lr.query("name").future();
Expand Down Expand Up @@ -246,6 +245,13 @@ describe("Streaming API", () => {
await testLive(stream, AMOUNT_OF_ADDITIONAL_DOCS);
});

it("test `snapshot` and `live` method with `since` parameter", async () => {
await testSince({
snapshotCreator: (since) => lr.query("since").snapshot({ since }),
streamCreator: (since) => lr.query("since").live({ since }),
});
});

it("test `future` method", async () => {
const stream = lr.query("additional").future();
await testFuture(stream, AMOUNT_OF_ADDITIONAL_DOCS);
Expand Down

0 comments on commit 1708c2c

Please sign in to comment.