Skip to content

Commit

Permalink
chore: add option to group model update operations together
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre-lehnen-rc committed Oct 22, 2024
1 parent 86bff91 commit 7d858d2
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 26 deletions.
4 changes: 4 additions & 0 deletions apps/meteor/server/models/dummy/BaseDummy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ export class BaseDummy<
return this.updateOne(filter, update, options);
}

async updateOneById(id: T['_id'], update: UpdateFilter<T> | Partial<T>): Promise<void> {
await this.updateOne({ _id: id } as Filter<T>, update);
}

async updateOne(_filter: Filter<T>, _update: UpdateFilter<T> | Partial<T>, _options?: UpdateOptions): Promise<UpdateResult> {
return {
acknowledged: true,
Expand Down
88 changes: 87 additions & 1 deletion apps/meteor/server/models/raw/BaseRaw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type ModelOptions = {
_updatedAtIndexOptions?: Omit<IndexDescription, 'key'>;
};

type MappedHoardedUpdatesResult<T extends { _id: string }> = [T['_id'], UpdateResult | HoardedOperationsError];

class HoardedOperationsError extends Error {
constructor(public errorDetails: any) {
super('hoarded-operations-failed');
}
}

export abstract class BaseRaw<
T extends { _id: string },
C extends DefaultFields<T> = undefined,
Expand All @@ -64,6 +72,8 @@ export abstract class BaseRaw<
*/
private collectionName: string;

private operationHoarders = new Map<T['_id'], Updater<T>>();

/**
* @param db MongoDB instance
* @param name Name of the model without any prefix. Used by trash records to set the `__collection__` field.
Expand Down Expand Up @@ -239,7 +249,83 @@ export abstract class BaseRaw<
return this[operation](filter, update, options);
}

updateOne(filter: Filter<T>, update: UpdateFilter<T> | Partial<T>, options?: UpdateOptions): Promise<UpdateResult> {
async hoardOperations(id: T['_id']): Promise<void> {
if (this.operationHoarders.has(id)) {
throw new Error('Model operations are already being hoarded by an updater');
}

this.operationHoarders.set(id, this.getUpdater());
}

getHoardedOperationsById(id: T['_id']): Updater<T> | undefined {
return this.operationHoarders.get(id);
}

async performHoardedOperationsById(id: T['_id']): Promise<UpdateResult> {
const updater = this.operationHoarders.get(id);

if (!updater) {
throw new Error(`Model operations are not being hoarded for id ${id}`);
}

this.operationHoarders.delete(id);
const update = updater.getUpdateFilter();
return this.updateOne({ _id: id } as Filter<T>, update);
}

/**
* Execute all (or a list of) hoarded update operations
**/
async performHoardedOperations(ids?: T['_id'][]): Promise<MappedHoardedUpdatesResult<T>[]> {
if (ids) {
return Promise.all(
ids.map(
async (id) => [id, await this.performHoardedOperationsById(id).catch((cause) => new HoardedOperationsError(cause))] as const,
),
);
}

const allUpdaters = this.operationHoarders.entries();
this.operationHoarders.clear();

// All updaters will be executed independently - even if one fails, the others won't b
return Promise.all(
[...allUpdaters].map(async ([_id, updater]: [T['_id'], Updater<T>]) => {
const update = updater.getUpdateFilter();
return [
_id,
await this.updateOne({ _id } as Filter<T>, update).catch((cause) => new HoardedOperationsError(cause)),
] satisfies MappedHoardedUpdatesResult<T>;
}),
);
}

/**
* If there is an Updater assigned to this specific id, save the operation to it, otherwise return false
**/
hoardOperation(id: T['_id'], operation: UpdateFilter<T>): Updater<T> | false {
const updater = this.operationHoarders.get(id);
if (!updater) {
return false;
}

updater.addUpdateFilter(operation);
return updater;
}

/**
* Perform an update operation using an id as filter; If the model has any `updater` hoarding operations, add the operation to it instead
**/
async updateOneById(id: string, update: UpdateFilter<T> | Partial<T>): Promise<void> {
const updater = this.hoardOperation(id, update);
if (updater) {
return;
}

await this.updateOne({ _id: id } as Filter<T>, update);
}

async updateOne(filter: Filter<T>, update: UpdateFilter<T> | Partial<T>, options?: UpdateOptions): Promise<UpdateResult> {
this.setUpdatedAt(update);
if (options) {
return this.col.updateOne(filter, update, options);
Expand Down
19 changes: 1 addition & 18 deletions apps/meteor/server/models/raw/VideoConference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,7 @@ import type {
} from '@rocket.chat/core-typings';
import { VideoConferenceStatus } from '@rocket.chat/core-typings';
import type { FindPaginated, InsertionModel, IVideoConferenceModel } from '@rocket.chat/model-typings';
import type {
FindCursor,
UpdateOptions,
UpdateFilter,
UpdateResult,
IndexDescription,
Collection,
Db,
CountDocumentsOptions,
} from 'mongodb';
import type { FindCursor, UpdateFilter, IndexDescription, Collection, Db, CountDocumentsOptions } from 'mongodb';

import { BaseRaw } from './BaseRaw';

Expand Down Expand Up @@ -136,14 +127,6 @@ export class VideoConferenceRaw extends BaseRaw<VideoConference> implements IVid
return (await this.insertOne(call)).insertedId;
}

public updateOneById(
_id: string,
update: UpdateFilter<VideoConference> | Partial<VideoConference>,
options?: UpdateOptions,
): Promise<UpdateResult> {
return this.updateOne({ _id }, update, options);
}

public async setEndedById(callId: string, endedBy?: { _id: string; name: string; username: string }, endedAt?: Date): Promise<void> {
await this.updateOneById(callId, {
$set: {
Expand Down
2 changes: 2 additions & 0 deletions packages/model-typings/src/models/IBaseModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ export interface IBaseModel<
options?: UpdateOptions & { multi?: true },
): Promise<UpdateResult | Document>;

updateOneById(id: string, update: UpdateFilter<T> | Partial<T>): Promise<void>;

updateOne(filter: Filter<T>, update: UpdateFilter<T> | Partial<T>, options?: UpdateOptions): Promise<UpdateResult>;

updateMany(filter: Filter<T>, update: UpdateFilter<T> | Partial<T>, options?: UpdateOptions): Promise<Document | UpdateResult>;
Expand Down
8 changes: 1 addition & 7 deletions packages/model-typings/src/models/IVideoConferenceModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type {
VideoConference,
VideoConferenceStatus,
} from '@rocket.chat/core-typings';
import type { FindCursor, UpdateOptions, UpdateFilter, UpdateResult, FindOptions } from 'mongodb';
import type { FindCursor, FindOptions } from 'mongodb';

import type { FindPaginated, IBaseModel } from './IBaseModel';

Expand Down Expand Up @@ -36,12 +36,6 @@ export interface IVideoConferenceModel extends IBaseModel<VideoConference> {
...callDetails
}: Required<Pick<ILivechatVideoConference, 'rid' | 'createdBy' | 'providerName'>>): Promise<string>;

updateOneById(
_id: string,
update: UpdateFilter<VideoConference> | Partial<VideoConference>,
options?: UpdateOptions,
): Promise<UpdateResult>;

setDataById(callId: string, data: Partial<Omit<VideoConference, '_id'>>): Promise<void>;

setEndedById(callId: string, endedBy?: { _id: string; name: string; username: string }, endedAt?: Date): Promise<void>;
Expand Down

0 comments on commit 7d858d2

Please sign in to comment.