diff --git a/apps/meteor/server/models/dummy/BaseDummy.ts b/apps/meteor/server/models/dummy/BaseDummy.ts index 049295c1a28a..883799fcea4f 100644 --- a/apps/meteor/server/models/dummy/BaseDummy.ts +++ b/apps/meteor/server/models/dummy/BaseDummy.ts @@ -111,6 +111,10 @@ export class BaseDummy< return this.updateOne(filter, update, options); } + async updateOneById(id: T['_id'], update: UpdateFilter | Partial): Promise { + await this.updateOne({ _id: id } as Filter, update); + } + async updateOne(_filter: Filter, _update: UpdateFilter | Partial, _options?: UpdateOptions): Promise { return { acknowledged: true, diff --git a/apps/meteor/server/models/raw/BaseRaw.ts b/apps/meteor/server/models/raw/BaseRaw.ts index 2376d84f8980..ef56418c3c8f 100644 --- a/apps/meteor/server/models/raw/BaseRaw.ts +++ b/apps/meteor/server/models/raw/BaseRaw.ts @@ -47,6 +47,14 @@ type ModelOptions = { _updatedAtIndexOptions?: Omit; }; +type MappedHoardedUpdatesResult = [T['_id'], UpdateResult | HoardedOperationsError]; + +class HoardedOperationsError extends Error { + constructor(public errorDetails: any) { + super('hoarded-operations-failed'); + } +} + export abstract class BaseRaw< T extends { _id: string }, C extends DefaultFields = undefined, @@ -64,6 +72,8 @@ export abstract class BaseRaw< */ private collectionName: string; + private operationHoarders = new Map>(); + /** * @param db MongoDB instance * @param name Name of the model without any prefix. Used by trash records to set the `__collection__` field. @@ -239,7 +249,83 @@ export abstract class BaseRaw< return this[operation](filter, update, options); } - updateOne(filter: Filter, update: UpdateFilter | Partial, options?: UpdateOptions): Promise { + async hoardOperations(id: T['_id']): Promise { + if (this.operationHoarders.has(id)) { + throw new Error('Model operations are already being hoarded by an updater'); + } + + this.operationHoarders.set(id, this.getUpdater()); + } + + getHoardedOperationsById(id: T['_id']): Updater | undefined { + return this.operationHoarders.get(id); + } + + async performHoardedOperationsById(id: T['_id']): Promise { + const updater = this.operationHoarders.get(id); + + if (!updater) { + throw new Error(`Model operations are not being hoarded for id ${id}`); + } + + this.operationHoarders.delete(id); + const update = updater.getUpdateFilter(); + return this.updateOne({ _id: id } as Filter, update); + } + + /** + * Execute all (or a list of) hoarded update operations + **/ + async performHoardedOperations(ids?: T['_id'][]): Promise[]> { + if (ids) { + return Promise.all( + ids.map( + async (id) => [id, await this.performHoardedOperationsById(id).catch((cause) => new HoardedOperationsError(cause))] as const, + ), + ); + } + + const allUpdaters = this.operationHoarders.entries(); + this.operationHoarders.clear(); + + // All updaters will be executed independently - even if one fails, the others won't b + return Promise.all( + [...allUpdaters].map(async ([_id, updater]: [T['_id'], Updater]) => { + const update = updater.getUpdateFilter(); + return [ + _id, + await this.updateOne({ _id } as Filter, update).catch((cause) => new HoardedOperationsError(cause)), + ] satisfies MappedHoardedUpdatesResult; + }), + ); + } + + /** + * If there is an Updater assigned to this specific id, save the operation to it, otherwise return false + **/ + hoardOperation(id: T['_id'], operation: UpdateFilter): Updater | false { + const updater = this.operationHoarders.get(id); + if (!updater) { + return false; + } + + updater.addUpdateFilter(operation); + return updater; + } + + /** + * Perform an update operation using an id as filter; If the model has any `updater` hoarding operations, add the operation to it instead + **/ + async updateOneById(id: string, update: UpdateFilter | Partial): Promise { + const updater = this.hoardOperation(id, update); + if (updater) { + return; + } + + await this.updateOne({ _id: id } as Filter, update); + } + + async updateOne(filter: Filter, update: UpdateFilter | Partial, options?: UpdateOptions): Promise { this.setUpdatedAt(update); if (options) { return this.col.updateOne(filter, update, options); diff --git a/apps/meteor/server/models/raw/VideoConference.ts b/apps/meteor/server/models/raw/VideoConference.ts index 4c324d938e2a..070ef5a9b4d2 100644 --- a/apps/meteor/server/models/raw/VideoConference.ts +++ b/apps/meteor/server/models/raw/VideoConference.ts @@ -8,16 +8,7 @@ import type { } from '@rocket.chat/core-typings'; import { VideoConferenceStatus } from '@rocket.chat/core-typings'; import type { FindPaginated, InsertionModel, IVideoConferenceModel } from '@rocket.chat/model-typings'; -import type { - FindCursor, - UpdateOptions, - UpdateFilter, - UpdateResult, - IndexDescription, - Collection, - Db, - CountDocumentsOptions, -} from 'mongodb'; +import type { FindCursor, UpdateFilter, IndexDescription, Collection, Db, CountDocumentsOptions } from 'mongodb'; import { BaseRaw } from './BaseRaw'; @@ -136,14 +127,6 @@ export class VideoConferenceRaw extends BaseRaw implements IVid return (await this.insertOne(call)).insertedId; } - public updateOneById( - _id: string, - update: UpdateFilter | Partial, - options?: UpdateOptions, - ): Promise { - return this.updateOne({ _id }, update, options); - } - public async setEndedById(callId: string, endedBy?: { _id: string; name: string; username: string }, endedAt?: Date): Promise { await this.updateOneById(callId, { $set: { diff --git a/packages/model-typings/src/models/IBaseModel.ts b/packages/model-typings/src/models/IBaseModel.ts index 626f91385a04..0f21b2759db5 100644 --- a/packages/model-typings/src/models/IBaseModel.ts +++ b/packages/model-typings/src/models/IBaseModel.ts @@ -81,6 +81,8 @@ export interface IBaseModel< options?: UpdateOptions & { multi?: true }, ): Promise; + updateOneById(id: string, update: UpdateFilter | Partial): Promise; + updateOne(filter: Filter, update: UpdateFilter | Partial, options?: UpdateOptions): Promise; updateMany(filter: Filter, update: UpdateFilter | Partial, options?: UpdateOptions): Promise; diff --git a/packages/model-typings/src/models/IVideoConferenceModel.ts b/packages/model-typings/src/models/IVideoConferenceModel.ts index 8ef775fb6082..5b4025e33461 100644 --- a/packages/model-typings/src/models/IVideoConferenceModel.ts +++ b/packages/model-typings/src/models/IVideoConferenceModel.ts @@ -6,7 +6,7 @@ import type { VideoConference, VideoConferenceStatus, } from '@rocket.chat/core-typings'; -import type { FindCursor, UpdateOptions, UpdateFilter, UpdateResult, FindOptions } from 'mongodb'; +import type { FindCursor, FindOptions } from 'mongodb'; import type { FindPaginated, IBaseModel } from './IBaseModel'; @@ -36,12 +36,6 @@ export interface IVideoConferenceModel extends IBaseModel { ...callDetails }: Required>): Promise; - updateOneById( - _id: string, - update: UpdateFilter | Partial, - options?: UpdateOptions, - ): Promise; - setDataById(callId: string, data: Partial>): Promise; setEndedById(callId: string, endedBy?: { _id: string; name: string; username: string }, endedAt?: Date): Promise;