Skip to content

Commit

Permalink
feat(nbstore): adapt op pattern (#8808)
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Nov 22, 2024
1 parent 4125038 commit 64656d1
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 1 deletion.
3 changes: 2 additions & 1 deletion packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"private": true,
"sideEffects": false,
"exports": {
".": "./src/index.ts"
".": "./src/index.ts",
"./op": "./src/op/index.ts"
},
"dependencies": {
"@toeverything/infra": "workspace:*",
Expand Down
19 changes: 19 additions & 0 deletions packages/common/nbstore/src/impls/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { Storage } from '../storage';

type StorageConstructor = new (...args: any[]) => Storage;

export const storages: StorageConstructor[] = [];

// in next pr
// eslint-disable-next-line sonarjs/no-empty-collection
const AvailableStorageImplementations = storages.reduce(
(acc, curr) => {
acc[curr.name] = curr;
return acc;
},
{} as Record<string, StorageConstructor>
);

export const getAvailableStorageImplementations = (name: string) => {
return AvailableStorageImplementations[name];
};
134 changes: 134 additions & 0 deletions packages/common/nbstore/src/op/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import type { OpConsumer } from '@toeverything/infra/op';
import { Observable } from 'rxjs';

import { getAvailableStorageImplementations } from '../impls';
import {
BlobStorage,
DocStorage,
HistoricalDocStorage,
SpaceStorage,
type Storage,
type StorageOptions,
SyncStorage,
} from '../storage';
import type { SpaceStorageOps } from './ops';

export class SpaceStorageConsumer extends SpaceStorage {
constructor(private readonly consumer: OpConsumer<SpaceStorageOps>) {
super([]);
this.registerConnectionHandlers();
this.listen();
}

listen() {
this.consumer.listen();
}

add(name: string, options: StorageOptions) {
const Storage = getAvailableStorageImplementations(name);
const storage = new Storage(options);
this.storages.set(storage.storageType, storage);
this.registerStorageHandlers(storage);
}

override async destroy() {
await super.destroy();
this.consumer.destroy();
}

private registerConnectionHandlers() {
this.consumer.register('addStorage', ({ name, opts }) => {
this.add(name, opts);
});
this.consumer.register('connect', this.connect.bind(this));
this.consumer.register('disconnect', this.disconnect.bind(this));
this.consumer.register('connection', () => {
return new Observable(subscriber => {
subscriber.add(
this.on('connection', payload => {
subscriber.next(payload);
})
);
});
});
this.consumer.register('destroy', this.destroy.bind(this));
}

private registerStorageHandlers(storage: Storage) {
if (storage instanceof DocStorage) {
this.registerDocHandlers(storage);
} else if (storage instanceof BlobStorage) {
this.registerBlobHandlers(storage);
} else if (storage instanceof SyncStorage) {
this.registerSyncHandlers(storage);
}
}

private registerDocHandlers(storage: DocStorage) {
this.consumer.register('getDoc', storage.getDoc.bind(storage));
this.consumer.register('getDocDiff', ({ docId, state }) => {
return storage.getDocDiff(docId, state);
});
this.consumer.register(
'pushDocUpdate',
storage.pushDocUpdate.bind(storage)
);
this.consumer.register(
'getDocTimestamps',
storage.getDocTimestamps.bind(storage)
);
this.consumer.register('deleteDoc', storage.deleteDoc.bind(storage));
this.consumer.register('subscribeDocUpdate', () => {
return new Observable(subscriber => {
subscriber.add(
storage.subscribeDocUpdate(update => {
subscriber.next(update);
})
);
});
});

if (storage instanceof HistoricalDocStorage) {
this.consumer.register('listHistory', ({ docId, filter }) => {
return storage.listHistories(docId, filter);
});
this.consumer.register('getHistory', ({ docId, timestamp }) => {
return storage.getHistory(docId, timestamp);
});
this.consumer.register('deleteHistory', ({ docId, timestamp }) => {
return storage.deleteHistory(docId, timestamp);
});
this.consumer.register('rollbackDoc', ({ docId, timestamp }) => {
return storage.rollbackDoc(docId, timestamp);
});
}
}

private registerBlobHandlers(storage: BlobStorage) {
this.consumer.register('getBlob', storage.get.bind(storage));
this.consumer.register('setBlob', storage.set.bind(storage));
this.consumer.register('deleteBlob', ({ key, permanently }) => {
return storage.delete(key, permanently);
});
this.consumer.register('listBlobs', storage.list.bind(storage));
this.consumer.register('releaseBlobs', storage.release.bind(storage));
}

private registerSyncHandlers(storage: SyncStorage) {
this.consumer.register(
'getPeerClocks',
storage.getPeerClocks.bind(storage)
);
this.consumer.register('setPeerClock', ({ peer, ...clock }) => {
return storage.setPeerClock(peer, clock);
});
this.consumer.register(
'getPeerPushedClocks',
storage.getPeerPushedClocks.bind(storage)
);
this.consumer.register('setPeerPushedClock', ({ peer, ...clock }) => {
return storage.setPeerPushedClock(peer, clock);
});
this.consumer.register('clearClocks', storage.clearClocks.bind(storage));
}
}
51 changes: 51 additions & 0 deletions packages/common/nbstore/src/op/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { OpClient } from '@toeverything/infra/op';

import type { Storage } from '../storage';
import type { SpaceStorageOps } from './ops';

export class SpaceStorageClient extends OpClient<SpaceStorageOps> {
/**
* Adding a storage implementation to the backend.
*
* NOTE:
* Because the storage beckend might be put behind a worker, we cant pass the instance but only
* the constructor name and its options to let the backend construct the instance.
*/
async addStorage<T extends new (...args: any) => Storage>(
Impl: T,
...opts: ConstructorParameters<T>
) {
await this.call('addStorage', { name: Impl.name, opts: opts[0] });
}

async connect() {
await this.call('connect');
}

async disconnect() {
await this.call('disconnect');
}

override async destroy() {
await this.call('destroy');
super.destroy();
}

connection$() {
return this.ob$('connection');
}
}

export class SpaceStorageWorkerClient extends SpaceStorageClient {
private readonly worker: Worker;
constructor() {
const worker = new Worker(new URL('./worker.ts', import.meta.url));
super(worker);
this.worker = worker;
}

override async destroy() {
await super.destroy();
this.worker.terminate();
}
}
58 changes: 58 additions & 0 deletions packages/common/nbstore/src/op/ops.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { type OpSchema } from '@toeverything/infra/op';

import type { ConnectionStatus } from '../connection';
import type {
BlobRecord,
DocClock,
DocClocks,
DocDiff,
DocRecord,
DocUpdate,
HistoryFilter,
ListedBlobRecord,
ListedHistory,
StorageOptions,
StorageType,
} from '../storage';

export interface SpaceStorageOps extends OpSchema {
// init
addStorage: [{ name: string; opts: StorageOptions }, void];

// connection
connect: [void, void];
disconnect: [void, void];
connection: [
void,
{ storage: StorageType; status: ConnectionStatus; error?: Error },
];
destroy: [void, void];

// doc
getDoc: [string, DocRecord | null];
getDocDiff: [{ docId: string; state?: Uint8Array }, DocDiff | null];
pushDocUpdate: [DocUpdate, DocClock];
getDocTimestamps: [Date, DocClocks];
deleteDoc: [string, void];
subscribeDocUpdate: [void, DocRecord];

// history
listHistory: [{ docId: string; filter?: HistoryFilter }, ListedHistory[]];
getHistory: [DocClock, DocRecord | null];
deleteHistory: [DocClock, void];
rollbackDoc: [DocClock & { editor?: string }, void];

// blob
getBlob: [string, BlobRecord | null];
setBlob: [BlobRecord, void];
deleteBlob: [{ key: string; permanently: boolean }, void];
releaseBlobs: [void, void];
listBlobs: [void, ListedBlobRecord[]];

// sync
getPeerClocks: [string, DocClocks];
setPeerClock: [{ peer: string } & DocClock, void];
getPeerPushedClocks: [string, DocClocks];
setPeerPushedClock: [{ peer: string } & DocClock, void];
clearClocks: [void, void];
}
11 changes: 11 additions & 0 deletions packages/common/nbstore/src/op/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { OpConsumer } from '@toeverything/infra/op';

import { SpaceStorageConsumer } from './consumer';
import type { SpaceStorageOps } from './ops';

const consumer = new SpaceStorageConsumer(
// @ts-expect-error safe
new OpConsumer<SpaceStorageOps>(self)
);

consumer.listen();

0 comments on commit 64656d1

Please sign in to comment.