Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler handlers #696

Merged
merged 6 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions deno-runtime/handlers/scheduler-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Defined, JsonRpcError } from 'jsonrpc-lite';
import type { App } from '@rocket.chat/apps-engine/definition/App.ts';
import type { IProcessor } from '@rocket.chat/apps-engine/definition/scheduler/IProcessor.ts';

import { AppObjectRegistry } from '../AppObjectRegistry.ts';
import { AppAccessorsInstance } from '../lib/accessors/mod.ts';

export default async function handleScheduler(method: string, params: unknown): Promise<Defined | JsonRpcError> {
if (!Array.isArray(params)) {
return JsonRpcError.invalidParams({ message: 'Invalid params' });
}

const [context] = params as [Record<string, unknown>];

const app = AppObjectRegistry.get<App>('app');

if (!app) {
return JsonRpcError.internalError({ message: 'App not found' });
}

// AppSchedulerManager will append the appId to the processor name to avoid conflicts
const processor = AppObjectRegistry.get<IProcessor>(`scheduler:${method}`);

if (!processor) {
return JsonRpcError.methodNotFound({
message: `Could not find processor for method ${method}`,
});
}

app.getLogger().debug(`Job processor ${processor.id} is being executed...`);

try {
await processor
.processor(
context,
AppAccessorsInstance.getReader(),
AppAccessorsInstance.getModifier(),
AppAccessorsInstance.getHttp(),
AppAccessorsInstance.getPersistence(),
);

app.getLogger().debug(`Job processor ${processor.id} was successfully executed`);

return null;
} catch (e) {
app.getLogger().error(e);
app.getLogger().error(`Job processor ${processor.id} was unsuccessful`);

return JsonRpcError.internalError({ message: e.message });
}
}
46 changes: 46 additions & 0 deletions deno-runtime/handlers/tests/scheduler-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { assertEquals } from 'https://deno.land/[email protected]/assert/mod.ts';
import { afterAll, beforeEach, describe, it } from 'https://deno.land/[email protected]/testing/bdd.ts';

import { AppObjectRegistry } from '../../AppObjectRegistry.ts';
import { AppAccessors } from '../../lib/accessors/mod.ts';
import handleScheduler from '../scheduler-handler.ts';

describe('handlers > scheduler', () => {
const mockAppAccessors = new AppAccessors(() =>
Promise.resolve({
id: 'mockId',
result: {},
jsonrpc: '2.0',
serialize: () => '',
}),
);

const mockApp = {
getID: () => 'mockApp',
getLogger: () => ({
debug: () => {},
error: () => {},
}),
};

beforeEach(() => {
AppObjectRegistry.clear();
AppObjectRegistry.set('app', mockApp);
mockAppAccessors.getConfigurationExtend().scheduler.registerProcessors([
{
id: 'mockId',
processor: () => Promise.resolve('it works!'),
},
]);
});

afterAll(() => {
AppObjectRegistry.clear();
});

it('correctly executes a request to a processor', async () => {
const result = await handleScheduler('mockId', [{}]);

assertEquals(result, null);
});
});
5 changes: 4 additions & 1 deletion deno-runtime/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import slashcommandHandler from './handlers/slashcommand-handler.ts';
import videoConferenceHandler from './handlers/videoconference-handler.ts';
import apiHandler from './handlers/api-handler.ts'
import handleApp from './handlers/app/handler.ts';
import handleScheduler from "./handlers/scheduler-handler.ts";

AppObjectRegistry.set('MESSAGE_SEPARATOR', Deno.args.at(-1));

Expand All @@ -27,14 +28,16 @@ type Handlers = {
'api': typeof apiHandler,
'slashcommand': typeof slashcommandHandler
'videoconference': typeof videoConferenceHandler
'scheduler': typeof handleScheduler,
}

async function requestRouter({ type, payload }: Messenger.JsonRpcRequest): Promise<void> {
const methodHandlers: Handlers = {
'app': handleApp,
'api': apiHandler,
'slashcommand': slashcommandHandler,
'videoconference': videoConferenceHandler
'videoconference': videoConferenceHandler,
'scheduler': handleScheduler,
}

// We're not handling notifications at the moment
Expand Down
29 changes: 4 additions & 25 deletions src/server/managers/AppSchedulerManager.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import { AppStatus } from '../../definition/AppStatus';
import { AppMethod } from '../../definition/metadata';
import type { IJobContext, IOnetimeSchedule, IProcessor, IRecurringSchedule } from '../../definition/scheduler';
import type { AppManager } from '../AppManager';
import type { IInternalSchedulerBridge } from '../bridges/IInternalSchedulerBridge';
import type { SchedulerBridge } from '../bridges/SchedulerBridge';
import type { AppAccessorManager } from '.';
import { AppConsole } from '../logging';

function createProcessorId(jobId: string, appId: string): string {
return jobId.includes(`_${appId}`) ? jobId : `${jobId}_${appId}`;
Expand All @@ -14,13 +11,10 @@ function createProcessorId(jobId: string, appId: string): string {
export class AppSchedulerManager {
private readonly bridge: SchedulerBridge;

private readonly accessors: AppAccessorManager;

private registeredProcessors: Map<string, { [processorId: string]: IProcessor }>;

constructor(private readonly manager: AppManager) {
this.bridge = this.manager.getBridges().getSchedulerBridge();
this.accessors = this.manager.getAccessorManager();
this.registeredProcessors = new Map();
}

Expand Down Expand Up @@ -63,29 +57,14 @@ export class AppSchedulerManager {
return;
}

const logger = app.setupLogger(AppMethod._JOB_PROCESSOR);
logger.debug(`Job processor ${processor.id} is being executed...`);

try {
const codeToRun = `module.exports = processor.processor.apply(null, args)`;
await app.getRuntime().runInSandbox(codeToRun, {
processor,
args: [
jobContext,
this.accessors.getReader(appId),
this.accessors.getModifier(appId),
this.accessors.getHttp(appId),
this.accessors.getPersistence(appId),
],
await app.getDenoRuntime().sendRequest({
method: `scheduler:${processor.id}`,
params: [jobContext],
});
logger.debug(`Job processor ${processor.id} was sucessfully executed`);
} catch (e) {
logger.error(e);
logger.debug(`Job processor ${processor.id} was unsuccessful`);

console.error(e);
throw e;
} finally {
await this.manager.getLogStorage().storeEntries(AppConsole.toStorageEntry(appId, logger));
}
};
}
Expand Down
Loading