From 1f53bc8962c5186c2be16953eeae2b9187c11877 Mon Sep 17 00:00:00 2001 From: "Marc J. Schmidt" Date: Wed, 22 May 2024 00:23:22 +0200 Subject: [PATCH] feat(broker): new BrokerKeyValue and broker documentation This completes the Broker API and adds the first version of documentation. --- packages/broker/index.ts | 1 + .../broker/src/adapters/deepkit-adapter.ts | 24 ++-- packages/broker/src/broker-key-value.ts | 122 +++++++++++++++++ packages/broker/src/broker.ts | 9 +- packages/broker/src/kernel.ts | 60 ++++++--- packages/broker/src/model.ts | 1 + packages/broker/tests/broker.spec.ts | 35 +++++ packages/framework/src/module.config.ts | 2 +- packages/framework/src/module.ts | 3 +- .../src/app/pages/documentation.component.ts | 3 +- website/src/pages/documentation/broker.md | 127 +++++++++++++++++- .../documentation/broker/atomic-locks.md | 77 +++++++++++ .../src/pages/documentation/broker/cache.md | 93 +++++++++++++ .../pages/documentation/broker/key-value.md | 93 +++++++++++++ .../pages/documentation/broker/message-bus.md | 74 ++++++++++ .../documentation/broker/message-queue.md | 116 ++++++++++++++++ 16 files changed, 798 insertions(+), 42 deletions(-) create mode 100644 packages/broker/src/broker-key-value.ts create mode 100644 website/src/pages/documentation/broker/atomic-locks.md create mode 100644 website/src/pages/documentation/broker/cache.md create mode 100644 website/src/pages/documentation/broker/key-value.md create mode 100644 website/src/pages/documentation/broker/message-bus.md create mode 100644 website/src/pages/documentation/broker/message-queue.md diff --git a/packages/broker/index.ts b/packages/broker/index.ts index 5bde757f3..a9498918d 100644 --- a/packages/broker/index.ts +++ b/packages/broker/index.ts @@ -12,5 +12,6 @@ export * from './src/kernel.js'; export * from './src/model.js'; export * from './src/broker.js'; export * from './src/broker-cache.js'; +export * from './src/broker-key-value.js'; export * from './src/adapters/deepkit-adapter.js'; export * from './src/adapters/memory-adapter.js'; diff --git a/packages/broker/src/adapters/deepkit-adapter.ts b/packages/broker/src/adapters/deepkit-adapter.ts index 9354d5ec9..8fd529b70 100644 --- a/packages/broker/src/adapters/deepkit-adapter.ts +++ b/packages/broker/src/adapters/deepkit-adapter.ts @@ -3,7 +3,7 @@ import { BrokerAdapterQueueProduceOptionsResolved, BrokerQueueMessage, BrokerTimeOptionsResolved, - Release + Release, } from '../broker.js'; import { getTypeJitContainer, ReflectionKind, Type, TypePropertySignature } from '@deepkit/type'; import { @@ -29,7 +29,7 @@ import { brokerSet, brokerSetCache, BrokerType, - QueueMessageProcessing + QueueMessageProcessing, } from '../model.js'; import { ClientTransportAdapter, @@ -37,18 +37,13 @@ import { RpcBaseClient, RpcMessage, RpcMessageRouteType, - RpcWebSocketClientAdapter + RpcWebSocketClientAdapter, } from '@deepkit/rpc'; -import { - deserializeBSON, - deserializeBSONWithoutOptimiser, - getBSONDeserializer, - getBSONSerializer, - serializeBSON -} from '@deepkit/bson'; +import { deserializeBSON, getBSONDeserializer, getBSONSerializer, serializeBSON } from '@deepkit/bson'; import { arrayRemoveItem } from '@deepkit/core'; import { BrokerCacheItemOptionsResolved } from '../broker-cache.js'; import { fastHash } from '../utils.js'; +import { BrokerKeyValueOptionsResolved } from '../broker-key-value.js'; interface TypeSerialize { encode(v: any): Uint8Array; @@ -225,10 +220,10 @@ export class BrokerDeepkitAdapter implements BrokerAdapter { return first.v && first.ttl !== undefined ? { value: serializer.decode(first.v, 0), ttl: first.ttl } : undefined; } - async set(key: string, value: any, type: Type): Promise { + async set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise { const serializer = getSerializer(type); const v = serializer.encode(value); - await this.pool.getConnection('key/' + key).sendMessage(BrokerType.Set, { n: key, v }).ackThenClose(); + await this.pool.getConnection('key/' + key).sendMessage(BrokerType.Set, { n: key, v, ttl: options.ttl }).ackThenClose(); } async get(key: string, type: Type): Promise { @@ -240,6 +235,11 @@ export class BrokerDeepkitAdapter implements BrokerAdapter { } } + async remove(key: string): Promise { + await this.pool.getConnection('key/' + key) + .sendMessage(BrokerType.Delete, { n: key }).ackThenClose(); + } + async increment(key: string, value: any): Promise { const response = await this.pool.getConnection('increment/' + key) .sendMessage(BrokerType.Increment, { n: key, v: value }) diff --git a/packages/broker/src/broker-key-value.ts b/packages/broker/src/broker-key-value.ts new file mode 100644 index 000000000..de88509c3 --- /dev/null +++ b/packages/broker/src/broker-key-value.ts @@ -0,0 +1,122 @@ +import { parseTime } from './utils.js'; +import { ReceiveType, resolveReceiveType, Type } from '@deepkit/type'; +import { BrokerAdapterBase } from './broker.js'; +import { ConsoleLogger, LoggerInterface } from '@deepkit/logger'; + +export interface BrokerKeyValueOptions { + /** + * Relative time to live in milliseconds. 0 means no ttl. + * + * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'. + */ + ttl: number | string; +} + +export interface BrokerKeyValueOptionsResolved extends BrokerKeyValueOptions { + ttl: number; +} + +export function parseBrokerKeyValueOptions(options: Partial): BrokerKeyValueOptionsResolved { + return { + ttl: parseTime(options.ttl) ?? 0, + }; +} + +export interface BrokerAdapterKeyValue extends BrokerAdapterBase { + get(key: string, type: Type): Promise; + + set(key: string, value: any, options: BrokerKeyValueOptionsResolved, type: Type): Promise; + + remove(key: string): Promise; + + increment(key: string, value: any): Promise; +} + +export class BrokerKeyValueItem { + constructor( + private key: string, + private type: Type, + private adapter: BrokerAdapterKeyValue, + private options: BrokerKeyValueOptionsResolved, + ) { + + } + + /** + * @see BrokerKeyValue.get + */ + async get(): Promise { + return this.adapter.get(this.key, this.type); + } + + /** + * @see BrokerKeyValue.set + */ + async set(value: T): Promise { + await this.adapter.set(this.key, value, this.options, this.type); + } + + /** + * @see BrokerKeyValue.increment + */ + async increment(value: number): Promise { + return this.adapter.increment(this.key, value); + } + + async remove(): Promise { + return this.adapter.remove(this.key); + } +} + +export class BrokerKeyValue { + private config: BrokerKeyValueOptionsResolved; + + constructor( + private adapter: BrokerAdapterKeyValue, + config: Partial = {}, + private logger: LoggerInterface = new ConsoleLogger(), + ) { + this.config = parseBrokerKeyValueOptions(config); + } + + /** + * Returns a new BrokerKeyValueItem for the given key. + */ + item(key: string, options?: Partial, type?: ReceiveType): BrokerKeyValueItem { + return new BrokerKeyValueItem( + key, resolveReceiveType(type), this.adapter, + parseBrokerKeyValueOptions(Object.assign({}, this.config, options)), + ); + } + + /** + * Returns the value for the given key. + */ + async get(key: string, type?: ReceiveType): Promise { + return this.adapter.get(key, resolveReceiveType(type)); + } + + /** + * Sets the value for the given key. + */ + async set(key: string, value: T, options?: Partial, type?: ReceiveType): Promise { + return this.adapter.set(key, value, + options ? parseBrokerKeyValueOptions(Object.assign({}, this.config, options)) : this.config, + resolveReceiveType(type), + ); + } + + async remove(key: string): Promise { + return this.adapter.remove(key); + } + + /** + * Increments the value for the given key by the given value. + * Note that this is not compatible to get/set, as it only works with numbers. + * Since this an atomic increment, there is no way to get the current value via `get` and then increment it, + * but you have to use `increment(0)` to get the current value. + */ + async increment(key: string, value: number): Promise { + return this.adapter.increment(key, value); + } +} diff --git a/packages/broker/src/broker.ts b/packages/broker/src/broker.ts index a0be6a2ba..bb39620aa 100644 --- a/packages/broker/src/broker.ts +++ b/packages/broker/src/broker.ts @@ -3,6 +3,7 @@ import { EventToken } from '@deepkit/event'; import { parseTime } from './utils.js'; import { BrokerAdapterCache } from './broker-cache.js'; import { QueueMessageProcessing } from './model.js'; +import { BrokerAdapterKeyValue } from './broker-key-value.js'; export interface BrokerTimeOptions { /** @@ -140,14 +141,6 @@ export interface BrokerAdapterQueue extends BrokerAdapterBase { produce(name: string, message: any, type: Type, options?: BrokerAdapterQueueProduceOptionsResolved): Promise; } -export interface BrokerAdapterKeyValue extends BrokerAdapterBase { - get(key: string, type: Type): Promise; - - set(key: string, value: any, type: Type): Promise; - - increment(key: string, value: any): Promise; -} - export const onBrokerLock = new EventToken('broker.lock'); export class BrokerQueueMessage { diff --git a/packages/broker/src/kernel.ts b/packages/broker/src/kernel.ts index 8523491c2..6c45f36d6 100644 --- a/packages/broker/src/kernel.ts +++ b/packages/broker/src/kernel.ts @@ -40,6 +40,7 @@ import { brokerResponseGetCacheMeta, brokerResponseIncrement, brokerResponseIsLock, + brokerSet, brokerSetCache, BrokerType, QueueMessage, @@ -92,7 +93,10 @@ export class BrokerConnection extends RpcKernelBaseConnection { for (const connection of this.connections.connections) { if (connection === this) continue; - promises.push(connection.sendMessage(BrokerType.EntityFields, { name, fields }).ackThenClose()); + promises.push(connection.sendMessage(BrokerType.EntityFields, { + name, + fields, + }).ackThenClose()); } await Promise.all(promises); @@ -108,7 +112,7 @@ export class BrokerConnection extends RpcKernelBaseConnection { } response.reply( BrokerType.EntityFields, - { name: body.name, fields: this.state.getEntityFields(body.name) } + { name: body.name, fields: this.state.getEntityFields(body.name) }, ); break; } @@ -120,14 +124,17 @@ export class BrokerConnection extends RpcKernelBaseConnection { } response.reply( BrokerType.EntityFields, - { name: body.name, fields: this.state.getEntityFields(body.name) } + { name: body.name, fields: this.state.getEntityFields(body.name) }, ); break; } case BrokerType.AllEntityFields: { const composite = response.composite(BrokerType.AllEntityFields); for (const name of this.state.entityFields.keys()) { - composite.add(BrokerType.EntityFields, { name, fields: this.state.getEntityFields(name) }); + composite.add(BrokerType.EntityFields, { + name, + fields: this.state.getEntityFields(name), + }); } composite.send(); break; @@ -190,7 +197,11 @@ export class BrokerConnection extends RpcKernelBaseConnection { } case BrokerType.QueueMessageHandled: { const body = message.parseBody(); - this.state.queueMessageHandled(body.c, this, body.id, { error: body.error, success: body.success, delay: body.delay }); + this.state.queueMessageHandled(body.c, this, body.id, { + error: body.error, + success: body.success, + delay: body.delay, + }); response.ack(); break; } @@ -221,8 +232,8 @@ export class BrokerConnection extends RpcKernelBaseConnection { break; } case BrokerType.Set: { - const body = message.parseBody(); - this.state.setKey(body.n, body.v); + const body = message.parseBody(); + this.state.setKey(body.n, body.v, body.ttl); response.ack(); break; } @@ -246,7 +257,7 @@ export class BrokerConnection extends RpcKernelBaseConnection { const message = createRpcMessage( 0, BrokerType.ResponseInvalidationCache, { key: body.n, ttl: entry.ttl }, - RpcMessageRouteType.server + RpcMessageRouteType.server, ); for (const connection of this.state.invalidationCacheMessageConnections) { @@ -271,7 +282,7 @@ export class BrokerConnection extends RpcKernelBaseConnection { case BrokerType.GetCacheMeta: { const body = message.parseBody(); const v = this.state.getCache(body.n); - response.reply(BrokerType.ResponseGetCacheMeta, v ? {ttl: v.ttl} : { missing: true }); + response.reply(BrokerType.ResponseGetCacheMeta, v ? { ttl: v.ttl } : { missing: true }); break; } case BrokerType.Get: { @@ -415,7 +426,7 @@ export class BrokerState { if (!subscriptions) return; const message = createRpcMessage( 0, BrokerType.ResponseSubscribeMessage, - { c: channel, v: v }, RpcMessageRouteType.server + { c: channel, v: v }, RpcMessageRouteType.server, ); for (const connection of subscriptions) { @@ -447,7 +458,16 @@ export class BrokerState { public queuePublish(body: BrokerQueuePublish) { const queue = this.getQueue(body.c); - const m: QueueMessage = { id: queue.currentId++, process: body.process, hash: body.hash, state: QueueMessageState.pending, tries: 0, v: body.v, delay: body.delay || 0, priority: body.priority }; + const m: QueueMessage = { + id: queue.currentId++, + process: body.process, + hash: body.hash, + state: QueueMessageState.pending, + tries: 0, + v: body.v, + delay: body.delay || 0, + priority: body.priority, + }; if (body.process === QueueMessageProcessing.exactlyOnce) { if (!body.deduplicationInterval) { @@ -473,7 +493,7 @@ export class BrokerState { m.lastError = undefined; consumer.con.writer.write(createRpcMessage( 0, BrokerType.QueueResponseHandleMessage, - { c: body.c, v: body.v, id: m.id }, RpcMessageRouteType.server + { c: body.c, v: body.v, id: m.id }, RpcMessageRouteType.server, )); } @@ -483,7 +503,11 @@ export class BrokerState { /** * When a queue message has been sent to a consumer and the consumer answers. */ - public queueMessageHandled(queueName: string, connection: BrokerConnection, id: number, answer: { error?: string, success: boolean, delay?: number }) { + public queueMessageHandled(queueName: string, connection: BrokerConnection, id: number, answer: { + error?: string, + success: boolean, + delay?: number + }) { const queue = this.queues.get(queueName); if (!queue) return; const consumer = queue.consumers.find(v => v.con === connection); @@ -518,13 +542,18 @@ export class BrokerState { public increment(id: string, v?: number): number { const buffer = this.keyStore.get(id); const float64 = buffer ? new Float64Array(buffer.buffer, buffer.byteOffset) : new Float64Array(1); - float64[0] += v || 1; + float64[0] += v || 0; if (!buffer) this.keyStore.set(id, new Uint8Array(float64.buffer)); return float64[0]; } - public setKey(id: string, data: Uint8Array) { + public setKey(id: string, data: Uint8Array, ttl: number) { this.keyStore.set(id, data); + if (ttl > 0) { + setTimeout(() => { + this.keyStore.delete(id); + }, ttl); + } } public getKey(id: string): Uint8Array | undefined { @@ -534,7 +563,6 @@ export class BrokerState { public deleteKey(id: string) { this.keyStore.delete(id); } - } export class BrokerKernel extends RpcKernel { diff --git a/packages/broker/src/model.ts b/packages/broker/src/model.ts index fffbec54e..5c32c51cf 100644 --- a/packages/broker/src/model.ts +++ b/packages/broker/src/model.ts @@ -74,6 +74,7 @@ export interface brokerResponseIncrement { export interface brokerSet { n: string, v: Uint8Array, + ttl: number, } export interface brokerInvalidateCache { diff --git a/packages/broker/tests/broker.spec.ts b/packages/broker/tests/broker.spec.ts index 4163dece3..51ea5e5d5 100644 --- a/packages/broker/tests/broker.spec.ts +++ b/packages/broker/tests/broker.spec.ts @@ -4,6 +4,7 @@ import { BrokerMemoryAdapter } from '../src/adapters/memory-adapter.js'; import { sleep } from '@deepkit/core'; import { BrokerCache } from '../src/broker-cache.js'; import { QueueMessageProcessing } from '../src/model.js'; +import { BrokerKeyValue } from '../src/broker-key-value.js'; jest.setTimeout(10000); @@ -20,6 +21,40 @@ afterEach(() => { type User = { id: number, username: string, created: Date }; +test('key-value 1', async () => { + const keyValue = new BrokerKeyValue(await adapterFactory()); + + const item = keyValue.item('key1'); + expect(await item.get()).toBe(undefined); + await item.set(23); + expect(await item.get()).toBe(23); + + expect(await keyValue.get('key1')).toBe(23); + await keyValue.set('key1', 24); + expect(await item.get()).toBe(24); +}); + +test('key-value 2', async () => { + const keyValue = new BrokerKeyValue(await adapterFactory()); + + const item = keyValue.item('key1'); + expect(await item.increment(2)).toBe(2); + expect(await item.increment(2)).toBe(4); + expect(await item.increment(-2)).toBe(2); +}); + +test('key-value 3', async () => { + const keyValue = new BrokerKeyValue(await adapterFactory()); + + const item = keyValue.item('key1'); + expect(await item.increment(0)).toBe(0); + expect(await item.increment(5)).toBe(5); + expect(await item.increment(-2)).toBe(3); + + await item.remove(); + expect(await item.increment(0)).toBe(0); +}); + test('cache2', async () => { const cache = new BrokerCache(await adapterFactory()); diff --git a/packages/framework/src/module.config.ts b/packages/framework/src/module.config.ts index 4f02df822..3e6cff25b 100644 --- a/packages/framework/src/module.config.ts +++ b/packages/framework/src/module.config.ts @@ -13,7 +13,7 @@ const isWindows = 'undefined' !== typeof process ? process.platform === 'win32' export class BrokerConfig { /** - * @description If startOnBootstrap is true, the broker server stats at this address. Unix socket path or host:port combination + * @description If startOnBootstrap is true, the broker server starts at this address. Unix socket path or host:port combination */ listen: string = isWindows ? 'localhost:8811' : 'var/broker.sock'; diff --git a/packages/framework/src/module.ts b/packages/framework/src/module.ts index 0e0441e25..2644c0cc3 100644 --- a/packages/framework/src/module.ts +++ b/packages/framework/src/module.ts @@ -58,7 +58,7 @@ import { MediaController } from './debug/media.controller.js'; import { DebugHttpController } from './debug/debug-http.controller.js'; import { BrokerServer } from './broker/broker.js'; import { BrokerListener } from './broker/listener.js'; -import { BrokerBus, BrokerCache, BrokerDeepkitAdapter, BrokerLock, BrokerQueue } from '@deepkit/broker'; +import { BrokerBus, BrokerCache, BrokerDeepkitAdapter, BrokerKeyValue, BrokerLock, BrokerQueue } from '@deepkit/broker'; import { getBrokerServers } from './broker.js'; export class FrameworkModule extends createModule({ @@ -99,6 +99,7 @@ export class FrameworkModule extends createModule({ { provide: BrokerLock, useFactory: (adapter: BrokerDeepkitAdapter) => new BrokerLock(adapter) }, { provide: BrokerQueue, useFactory: (adapter: BrokerDeepkitAdapter) => new BrokerQueue(adapter) }, { provide: BrokerBus, useFactory: (adapter: BrokerDeepkitAdapter) => new BrokerBus(adapter) }, + { provide: BrokerKeyValue, useFactory: (adapter: BrokerDeepkitAdapter) => new BrokerKeyValue(adapter) }, //move to HttpModule? { provide: SessionHandler, scope: 'http' }, diff --git a/website/src/app/pages/documentation.component.ts b/website/src/app/pages/documentation.component.ts index 2cbeb200d..0cd1e54c2 100644 --- a/website/src/app/pages/documentation.component.ts +++ b/website/src/app/pages/documentation.component.ts @@ -108,11 +108,12 @@ import { PlatformHelper } from '@app/app/utils';
diff --git a/website/src/pages/documentation/broker.md b/website/src/pages/documentation/broker.md index 4a2cd7ab4..4821d39fe 100644 --- a/website/src/pages/documentation/broker.md +++ b/website/src/pages/documentation/broker.md @@ -14,15 +14,136 @@ Deepkit Broker is installed and activated per default when [Deepkit Framework](. npm install @deepkit/broker ``` +## Broker Classes + +Deepkit Broker provides four main broker classes: + +- **BrokerCache** - L2 cache abstraction with cache invalidation +- **BrokerBus** - Message bus (Pub/Sub) +- **BrokerQueue** - Queue system +- **BrokerLock** - Distributed lock +- **BrokerKeyValue** - Key/Value store + +These classes are designed to take a broker adapter to communicate with a broker server in a type-safe way. + +To give a high level overview of how the classes can be used outside the framework, here is an example: + +```typescript +import { BrokerBus, BrokerAdapterBus } from '@deepkit/broker'; + +class MyAdapter implements BrokerAdapterBus { + disconnect(): Promise { + // implement: disconnect from broker server + } + async publish(name: string, message: any, type: Type): Promise { + // implement: send message to broker server, name is 'my-channel-name', message is { foo: 'bar' } + } + async subscribe(name: string, callback: (message: any) => void, type: Type): Promise { + // implemenet: subscribe to broker server, name is 'my-channel-name' + } +} + +const adapter = new MyAdapter; +const bus = new BrokerBus(adapter); + +interface MyMessage { + foo: string; +} + +const channel = bus.channel('my-channel-name'); + +await channel.subscribe((message) => { + console.log('received message', message); +}); + +await channel.publish({ foo: 'bar' }); +``` + +In this example we write our own adapter to communicate with a broker server. The adapter is then passed to the BrokerBus class, which is used to create a channel. The channel can then be used to subscribe to messages and publish messages. + +Thanks to runtime types and the call `channel('my-channel-name');` everything is type-safe and the message can be validated and serialized automatically in the adapter directly. +The default implementation `BrokerDeepkitAdapter` handles this automatically for you (and uses BSON for serialization). + +Note that each broker class has its own adapter interface, so you can implement only the methods you need. The `BrokerDeepkitAdapter` implements all of these interfaces and can be used in all broker classes. + +To use these classes in a Deepkit application with dependency injection, you can use the `FrameworkModule` which provides a default adapter for the broker server and registers all broker classes as providers. See next chapter Usage for more information. + +Here is an example on how to manually set up an application with BrokerBus. + +```typescript +type MyBusChannel = BrokerBusChannel; + +const app = new App({ + providers: [ + MyAdapter, + provide((adapter: MyAdapter) => new BrokerBus(adapter)), + provide((bus: BrokerBus) => bus.channel('my-channel-name')), + ] +}); + +await app.get().subscribe((message) => { + console.log('received message', message); +}); + +await app.get().publish({ foo: 'bar' }); + +app.get(MyAdapter).disconnect(); +``` + ## Usage +The FrameworkModule provides a default broker adapter for the configured broker server based on the given configuration. +It also registers providers for all the broker classes, so you can inject them into your services directly. + ```typescript +// in an extra file, e.g. broker-channels.ts +type MyBusChannel = BrokerBusChannel; const app = new App({ - + providers: [ + Service, + provide((bus: BrokerBus) => bus.channel('my-channel-name')), + ], + imports: [new FrameworkModule({ + broker: { + // If startOnBootstrap is true, the broker server starts at this address. Unix socket path or host:port combination + listen: 'localhost:8811', // or 'var/broker.sock'; + // If a different broker server should be used, this is its address. Unix socket path or host:port combination. + host: 'localhost:8811', //or 'var/broker.sock'; + // Automatically starts a single broker in the main process. Disable it if you have a custom broker node. + startOnBootstrap: true, + }, + })], }); -app.command('create') +void app.run(); +``` + +You can then inject the broker derived classes (or the broker class directly) into your services: -app.run() +```typescript +import { MyBusChannel } from './broker-channels.ts'; + +class Service { + constructor(private bus: MyBusChannel) { + } + + async addUser() { + await this.bus.publish({ foo: 'bar' }); + } +} ``` + +It's always a good idea to create a derived type for your channels (like above with `MyBusChannel`), so you can easily inject them into your services. +Otherwise, you'd have to inject `BrokerBus` and call `channel('my-channel-name')` every time you need it, which is error-prone and not DRY. + +Almost all broker classes have this kind of derivation, so you can easily define them in one place and use them everywhere. See the appropriate broker class documentation for more information. + +## Custom Adapter + +If you need more connections or a custom adapter, you can create your own adapter by implementing one or more of the following interfaces from `@deepkit/broker`: + +```typescript +export type BrokerAdapter = BrokerAdapterCache & BrokerAdapterBus & BrokerAdapterLock & BrokerAdapterQueue & BrokerAdapterKeyValue; +``` + diff --git a/website/src/pages/documentation/broker/atomic-locks.md b/website/src/pages/documentation/broker/atomic-locks.md new file mode 100644 index 000000000..b21a754c4 --- /dev/null +++ b/website/src/pages/documentation/broker/atomic-locks.md @@ -0,0 +1,77 @@ +# Broker Atomic Locks + +Deepkit Broker Locks is a simple way to create atomic locks across multiple processes or machines. + +It's a simple way to ensure that only one process can execute a certain code block at a time. + +## Usage + +```typescript +import { BrokerLock } from '@deepkit/broker'; + +const lock = new BrokerLock(adapter); + +// lock is alive for 60 seconds. +// acquisition timeout is 10 seconds. +const myLock = lock.item('my-lock', { ttl: '60s', timeout: '10s' }); + +async function criticalSection() { + // Holds the lock until the function is done. + // cleans up the lock automatically, even if the function throws an error. + await using hold = await lock1.hold(); +} +``` + +The lock supports [Explicit Resource Management](https://github.com/tc39/proposal-explicit-resource-management), which means you don't have to use a try-catch block to ensure the lock is released properly. + +To manually acquire and release a lock, you can use the `acquire` and `release` methods. + +```typescript +// this blocks until the lock is acquired. +// or throws if timeout is reached +await myLock.acquire(); + +try { + // do something critical +} finally { + await myLock.release(); +} +``` + +## App Usage + +A full example of how to use the BrokerLock in your application. +The class is automatically available in the dependency injection container if you import the `FrameworkModule`. +See the Getting started page for more information. + +```typescript +import { BrokerLock, BrokerLockItem } from '@deepkit/broker'; +import { FrameworkModule } from '@deepkit/framework'; + +// move this type to a shared file +type MyCriticalLock = BrokerLockItem; + +class Service { + constructor(private criticalLock: MyCriticalLock) { + } + + async doSomethingCritical() { + await using hold = await this.criticalLock.hold(); + + // do something critical, + // lock is released automatically + } +} + +const app = new App({ + providers: [ + Service, + provide((lock: BrokerLock) => lock.item('my-critical-lock', { ttl: '60s', timeout: '10s' })), + ], + imports: [ + new FrameworkModule(), + ], +}); +``` + + diff --git a/website/src/pages/documentation/broker/cache.md b/website/src/pages/documentation/broker/cache.md new file mode 100644 index 000000000..9c1ca0fbe --- /dev/null +++ b/website/src/pages/documentation/broker/cache.md @@ -0,0 +1,93 @@ +# Broker Cache + +Deepkit Broker Cache class is a multi-level (2 levels) cache that keeps a volatile local cache in memory and only fetches data from the broker server +if the data is not in the cache, stale, or was invalidated. This allows to use cache for very high performance and low latency data fetching. + +The cache is designed to be type-safe and automatically serializes and deserializes data (using BSON). It also supports cache invalidation and cache clearing. +The implementation makes sure that per process the cache is rebuilt only once, even if multiple requests are trying to access the same cache item at the same time. + +The data is not persisted on the server, but only kept in memory. If the server restarts, all data is lost. + +## Usage + +Make sure to read the Getting started page to learn how to properly set up your application so BrokerCache is available in the dependency injection container. + +The cache abstraction in Deepkit Broker is very different to a simple key/value store. It works by defining a cache name and a builder function that is called automatically when the cache is empty or stale. This builder function is responsible to build the data that is then stored in the cache. + +```typescript +import { BrokerCache, BrokerCacheItem } from '@deepkit/broker'; + +const cache = new BrokerCache(adapter); + +const cacheItem = cache.item('my-cache', async () => { + // this is the builder function that is called when + // the cache is empty or stale + return 'hello world'; +}); + + +// check if cache is stale or empty +await cacheItem.exists(); + +// get the data from the cache or fetch it from the broker server +// if the cache is empty or stale, the builder function is called +// and result returned and send to the broker server. +const topUsers = await cacheItem.get(); + +// invalidate the cache so next get() call will +// call the builder function again. +// Clears the local cache and server cache. +await cacheItem.invalidate(); + +// manually set data in the cache +await cacheItem.set(xy); +``` + +## App Usage + +A full example of how to use the BrokerCache in your application. +The class is automatically available in the dependency injection container if you import the `FrameworkModule`. +See the Getting started page for more information. + +```typescript +import { BrokerCache, BrokerCacheItem } from '@deepkit/broker'; +import { FrameworkModule } from '@deepkit/framework'; + +// it's a good to have these types defined in a common file so you can reuse them +// and inject them into your services +type MyCacheItem = BrokerCacheItem; + +function createMyCache(cache: BrokerCache, database: Database) { + return cache.item('top-users', async () => { + // this is the builder function that is called when + // the cache is empty or stale + return await database.query(User) + .limit(10).orderBy('score').find(); + }); +} + +class Service { + constructor(private cacheItem: MyCacheItem) { + } + + async getTopUsers() { + return await this.cacheItem.get(); + } +} + +const app = new App({ + providers: [ + Service, + Database, + provide(createMyCache), + ], + imports: [ + new FrameworkModule(), + ], +}); + +const cacheItem = app.get(); + +// get the data from the cache or fetch it from the broker server +const topUsers = await cacheItem.get(); +``` diff --git a/website/src/pages/documentation/broker/key-value.md b/website/src/pages/documentation/broker/key-value.md new file mode 100644 index 000000000..e3e6860c1 --- /dev/null +++ b/website/src/pages/documentation/broker/key-value.md @@ -0,0 +1,93 @@ +# Broker Key-Value + +Deepkit Broker Key-Value class is a simple key/value store abstraction that works with the broker server. It's a simple way to store and retrieve data from the broker server. + +There is no local caching implemented. All `get` calls are real network requests to the broker server, every time. To avoid this, use the Broker Cache abstraction. + +The data is not persisted on the server, but only kept in memory. If the server restarts, all data is lost. + +## Usage + +```typescript +import { BrokerKeyValue } from '@deepkit/broker'; + +const keyValue = new BrokerKeyValue(adapter, { + ttl: '60s', // time to live for each key. 0 means no ttl (default). +}); + +const item = keyValue.item('key1'); + +await item.set(123); +console.log(await item.get()); //123 + +await item.remove(); +``` + +The data is automatically serialized and deserialized using BSON based on the given type. + +The methods `set` and `get` can also be called directly on the `BrokerKeyValue` instance, +but has the disadvantage that you have to pass the key and type every time. + +```typescript +await keyValue.set('key1', 123); +console.log(await keyValue.get('key1')); //123 +``` + +## Increment + +The `increment` method allows you to atomically increment the value of a key by a given amount. + +Note that this creates its own storage entry on the server and is not compatible with `set` or `get`. + +```typescript + +const activeUsers = keyValue.item('activeUsers'); + +// Increment by 1 atomically +await activeUsers.increment(1); + +await activeUsers.increment(-1); + +// The only way to get the current value is to call increment with 0 +const current = await activeUsers.increment(0); + +// removes the entry +await activeUsers.remove(); +``` + +## App Usage + +A full example of how to use the BrokerKeyValue in your application. +The class is automatically available in the dependency injection container if you import the `FrameworkModule`. +See the Getting started page for more information. + +```typescript +import { BrokerKeyValue, BrokerKeyValueItem } from '@deepkit/broker'; +import { FrameworkModule } from '@deepkit/framework'; + +// move this type to a shared file +type MyKeyValueItem = BrokerKeyValueItem; + +class Service { + constructor(private keyValueItem: MyKeyValueItem) { + } + + async getTopUsers(): Promise { + // Might be undefined. You have to handle this case. + // Use Broker Cache if you want to avoid this. + return await this.keyValueItem.get(); + } +} + +const app = new App({ + providers: [ + Service, + provide((keyValue: BrokerKeyValue) => keyValue.item('top-users')), + ], + imports: [ + new FrameworkModule(), + ], +}); +``` + + diff --git a/website/src/pages/documentation/broker/message-bus.md b/website/src/pages/documentation/broker/message-bus.md new file mode 100644 index 000000000..e6531dd1c --- /dev/null +++ b/website/src/pages/documentation/broker/message-bus.md @@ -0,0 +1,74 @@ +# Broker Bus + +Deepkit Message Bus is a message bus system (Pub/Sub, distributed event system) that allows you to send messages or events between different parts of your application. + +It can be used in microservices, monoliths, or any other kind of application. Perfectly suited for event-driven architectures. + +It is different to the Deepkit Event system, which is used for in-process events. The Broker Bus is used for events that need to be sent to other processes or servers. Broker Bus is also perfectly suited when you want to communicate between several works that were automatically started by FrameworkModule, e.g. `new FrameworkModule({workers: 4})`. + +The system is designed to be type-safe and automatically serializes and deserializes messages (using BSON). + +## Usage + +```typescript +import { BrokerBus } from '@deepkit/broker'; + +const bus = new BrokerBus(adapter); + +// move this type to a shared file +type Events = { type: 'user-created', id: number } | { type: 'user-deleted', id: number }; + +const channel = bus.channel('my-events-channel'); + +await channel.subscribe((event) => { + if (event.type === 'user-created') { + console.log('User created', event.id); + } else if (event.type === 'user-deleted') { + console.log('User deleted', event.id); + } +}); + +await channel.publish({ type: 'user-created', id: 1 }); +``` + +By defining a name and a type for the channel, you can ensure that only messages of the correct type are sent and received. +The data is automatically serialized and deserialized (using BSON). + +## App Usage + +A full example of how to use the BrokerBus in your application. +The class is automatically available in the dependency injection container if you import the `FrameworkModule`. +See the Getting started page for more information. + +```typescript +import { BrokerBus, BrokerBusChannel } from '@deepkit/broker'; +import { FrameworkModule } from '@deepkit/framework'; + +// it's a good to have these types defined in a common file so you can reuse them +// and inject them into your services +type MyChannelMessage = { + id: number; + name: string; +}; +// move this type to a shared file +type MyChannel = BrokerBusChannel; + +class Service { + constructor(private channel: MyChannel) { + } + + async update() { + await this.channel.publish({ id: 1, name: 'Peter' }); + } +} + +const app = new App({ + providers: [ + Service, + provide((bus: BrokerBus) => bus.channel('my-channel')), + ], + imports: [ + new FrameworkModule(), + ], +}); +``` diff --git a/website/src/pages/documentation/broker/message-queue.md b/website/src/pages/documentation/broker/message-queue.md new file mode 100644 index 000000000..e3bcf6165 --- /dev/null +++ b/website/src/pages/documentation/broker/message-queue.md @@ -0,0 +1,116 @@ +# Broker Queue + +Deepkit Message Queue is a message queue system that allows you to send messages to the queue server and have workers process them. + +The system is designed to be type-safe and automatically serializes and deserializes messages (using BSON). + +The data is persisted on the server, so even if the server crashes, the data is not lost. + +## Usage + +```typescript +import { BrokerQueue, BrokerQueueChannel } from '@deepkit/broker'; + +const queue = new BrokerQueue(adapter); + +type User = { id: number, username: string }; + +const registrationChannel = queue.channel('user/registered', { + process: QueueMessageProcessing.exactlyOnce, + deduplicationInterval: '1s', +}); + +// a worker consumes the messages. +// this is usually done in a separate process. +await registrationChannel.consume(async (user) => { + console.log('User registered', user); + // if the worker crashes here, the message is not lost. + // it will be automatically redelivered to another worker. + // if this callback returns without an error, the message is + // marked as processed and eventually removed. +}); + +// the application sending the message +await registrationChannel.produce({ id: 1, username: 'Peter' }); +``` + +## App Usage + +A full example of how to use the BrokerQueue in your application. +The class is automatically available in the dependency injection container if you import the `FrameworkModule`. +See the Getting started page for more information. + +To utilize the queue systems the most, it is recommended to start several workers that consume the messages. +You would write a separate `App` that is different to the main application that might have http routes etc. + +You share common services via a shared app module. Channel definitions are shared via the rest of the application in a common file. + +```typescript +// file: channels.ts + +export type RegistrationChannel = BrokerQueueChannel; +export const registrationChannelProvider = provide((queue: BrokerQueue) => queue.channel('user/registered', { + process: QueueMessageProcessing.exactlyOnce, + deduplicationInterval: '1s', +})); +``` + +```typescript +// file: worker.ts +import { RegistrationChannel, registrationChannelProvider } from './channels'; + +async function consumerCommand( + channel: RegistrationChannel, + database: Database) { + + await channel.consume(async (user) => { + // do something with the user, + // maybe store information, send emails, etc. + }); + + // the connection to the broker keeps the process alive. +} + +const app = new App({ + providers: [ + Database, + registrationChannelProvider, + ], + imports: [ + new FrameworkModule({}), + ], +}); + +app.command('consumer', consumerCommand); + +// start the worker command above directly +void app.run('consumer'); +``` + +And in your application you produce messages like this: + +```typescript +// file: app.ts +import { RegistrationChannel, registrationChannelProvider } from './channels'; + +class Service { + constructor(private channel: RegistrationChannel) { + } + + async registerUser(user: User) { + await this.channel.produce(user); + } +} + +const app = new App({ + providers: [ + Service, + registrationChannelProvider, + ], + imports: [ + new FrameworkModule({}), + ], +}); + +void app.run(); +```