diff --git a/deno-runtime/handlers/scheduler-handler.ts b/deno-runtime/handlers/scheduler-handler.ts new file mode 100644 index 000000000..e4efddfe6 --- /dev/null +++ b/deno-runtime/handlers/scheduler-handler.ts @@ -0,0 +1,51 @@ +import { Defined, JsonRpcError } from 'jsonrpc-lite'; +import type { App } from '@rocket.chat/apps-engine/definition/App.ts'; +import type { IProcessor } from '@rocket.chat/apps-engine/definition/scheduler/IProcessor.ts'; + +import { AppObjectRegistry } from '../AppObjectRegistry.ts'; +import { AppAccessorsInstance } from '../lib/accessors/mod.ts'; + +export default async function handleScheduler(method: string, params: unknown): Promise { + if (!Array.isArray(params)) { + return JsonRpcError.invalidParams({ message: 'Invalid params' }); + } + + const [context] = params as [Record]; + + const app = AppObjectRegistry.get('app'); + + if (!app) { + return JsonRpcError.internalError({ message: 'App not found' }); + } + + // AppSchedulerManager will append the appId to the processor name to avoid conflicts + const processor = AppObjectRegistry.get(`scheduler:${method}`); + + if (!processor) { + return JsonRpcError.methodNotFound({ + message: `Could not find processor for method ${method}`, + }); + } + + app.getLogger().debug(`Job processor ${processor.id} is being executed...`); + + try { + await processor + .processor( + context, + AppAccessorsInstance.getReader(), + AppAccessorsInstance.getModifier(), + AppAccessorsInstance.getHttp(), + AppAccessorsInstance.getPersistence(), + ); + + app.getLogger().debug(`Job processor ${processor.id} was successfully executed`); + + return null; + } catch (e) { + app.getLogger().error(e); + app.getLogger().error(`Job processor ${processor.id} was unsuccessful`); + + return JsonRpcError.internalError({ message: e.message }); + } +} diff --git a/deno-runtime/handlers/tests/scheduler-handler.test.ts b/deno-runtime/handlers/tests/scheduler-handler.test.ts new file mode 100644 index 000000000..525b3bc2a --- /dev/null +++ b/deno-runtime/handlers/tests/scheduler-handler.test.ts @@ -0,0 +1,46 @@ +import { assertEquals } from 'https://deno.land/std@0.203.0/assert/mod.ts'; +import { afterAll, beforeEach, describe, it } from 'https://deno.land/std@0.203.0/testing/bdd.ts'; + +import { AppObjectRegistry } from '../../AppObjectRegistry.ts'; +import { AppAccessors } from '../../lib/accessors/mod.ts'; +import handleScheduler from '../scheduler-handler.ts'; + +describe('handlers > scheduler', () => { + const mockAppAccessors = new AppAccessors(() => + Promise.resolve({ + id: 'mockId', + result: {}, + jsonrpc: '2.0', + serialize: () => '', + }), + ); + + const mockApp = { + getID: () => 'mockApp', + getLogger: () => ({ + debug: () => {}, + error: () => {}, + }), + }; + + beforeEach(() => { + AppObjectRegistry.clear(); + AppObjectRegistry.set('app', mockApp); + mockAppAccessors.getConfigurationExtend().scheduler.registerProcessors([ + { + id: 'mockId', + processor: () => Promise.resolve('it works!'), + }, + ]); + }); + + afterAll(() => { + AppObjectRegistry.clear(); + }); + + it('correctly executes a request to a processor', async () => { + const result = await handleScheduler('mockId', [{}]); + + assertEquals(result, null); + }); +}); diff --git a/deno-runtime/main.ts b/deno-runtime/main.ts index 0b11e3bb8..8ed8eca27 100644 --- a/deno-runtime/main.ts +++ b/deno-runtime/main.ts @@ -19,6 +19,7 @@ import slashcommandHandler from './handlers/slashcommand-handler.ts'; import videoConferenceHandler from './handlers/videoconference-handler.ts'; import apiHandler from './handlers/api-handler.ts' import handleApp from './handlers/app/handler.ts'; +import handleScheduler from "./handlers/scheduler-handler.ts"; AppObjectRegistry.set('MESSAGE_SEPARATOR', Deno.args.at(-1)); @@ -27,6 +28,7 @@ type Handlers = { 'api': typeof apiHandler, 'slashcommand': typeof slashcommandHandler 'videoconference': typeof videoConferenceHandler + 'scheduler': typeof handleScheduler, } async function requestRouter({ type, payload }: Messenger.JsonRpcRequest): Promise { @@ -34,7 +36,8 @@ async function requestRouter({ type, payload }: Messenger.JsonRpcRequest): Promi 'app': handleApp, 'api': apiHandler, 'slashcommand': slashcommandHandler, - 'videoconference': videoConferenceHandler + 'videoconference': videoConferenceHandler, + 'scheduler': handleScheduler, } // We're not handling notifications at the moment diff --git a/src/server/managers/AppSchedulerManager.ts b/src/server/managers/AppSchedulerManager.ts index 619c604c5..5cbd60bcf 100644 --- a/src/server/managers/AppSchedulerManager.ts +++ b/src/server/managers/AppSchedulerManager.ts @@ -1,11 +1,8 @@ import { AppStatus } from '../../definition/AppStatus'; -import { AppMethod } from '../../definition/metadata'; import type { IJobContext, IOnetimeSchedule, IProcessor, IRecurringSchedule } from '../../definition/scheduler'; import type { AppManager } from '../AppManager'; import type { IInternalSchedulerBridge } from '../bridges/IInternalSchedulerBridge'; import type { SchedulerBridge } from '../bridges/SchedulerBridge'; -import type { AppAccessorManager } from '.'; -import { AppConsole } from '../logging'; function createProcessorId(jobId: string, appId: string): string { return jobId.includes(`_${appId}`) ? jobId : `${jobId}_${appId}`; @@ -14,13 +11,10 @@ function createProcessorId(jobId: string, appId: string): string { export class AppSchedulerManager { private readonly bridge: SchedulerBridge; - private readonly accessors: AppAccessorManager; - private registeredProcessors: Map; constructor(private readonly manager: AppManager) { this.bridge = this.manager.getBridges().getSchedulerBridge(); - this.accessors = this.manager.getAccessorManager(); this.registeredProcessors = new Map(); } @@ -63,29 +57,14 @@ export class AppSchedulerManager { return; } - const logger = app.setupLogger(AppMethod._JOB_PROCESSOR); - logger.debug(`Job processor ${processor.id} is being executed...`); - try { - const codeToRun = `module.exports = processor.processor.apply(null, args)`; - await app.getRuntime().runInSandbox(codeToRun, { - processor, - args: [ - jobContext, - this.accessors.getReader(appId), - this.accessors.getModifier(appId), - this.accessors.getHttp(appId), - this.accessors.getPersistence(appId), - ], + await app.getDenoRuntime().sendRequest({ + method: `scheduler:${processor.id}`, + params: [jobContext], }); - logger.debug(`Job processor ${processor.id} was sucessfully executed`); } catch (e) { - logger.error(e); - logger.debug(`Job processor ${processor.id} was unsuccessful`); - + console.error(e); throw e; - } finally { - await this.manager.getLogStorage().storeEntries(AppConsole.toStorageEntry(appId, logger)); } }; }