Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: app subprocess restart on error and better reporting #34106

Merged
merged 18 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/giant-nails-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/apps-engine': patch
---

Adds simple app subprocess metrics report
5 changes: 5 additions & 0 deletions .changeset/honest-kings-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/apps-engine': patch
---

Attempts to restart an app subprocess if the spawn command fails
5 changes: 5 additions & 0 deletions .changeset/quiet-radios-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/apps-engine': patch
---

Fixes an issue while collecting the error message from a failed restart attempt of an app subprocess
5 changes: 5 additions & 0 deletions .changeset/young-dots-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/apps-engine': patch
---

Prevents app:getStatus requests from timing out in some cases
56 changes: 28 additions & 28 deletions packages/apps-engine/deno-runtime/handlers/app/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,41 @@ import handleOnUpdate from './handleOnUpdate.ts';
export default async function handleApp(method: string, params: unknown): Promise<Defined | JsonRpcError> {
const [, appMethod] = method.split(':');

// We don't want the getStatus method to generate logs, so we handle it separately
if (appMethod === 'getStatus') {
return handleGetStatus();
}
try {
// We don't want the getStatus method to generate logs, so we handle it separately
if (appMethod === 'getStatus') {
return await handleGetStatus();
}

// `app` will be undefined if the method here is "app:construct"
const app = AppObjectRegistry.get<App>('app');
// `app` will be undefined if the method here is "app:construct"
const app = AppObjectRegistry.get<App>('app');

app?.getLogger().debug(`'${appMethod}' is being called...`);
app?.getLogger().debug(`'${appMethod}' is being called...`);

if (uikitInteractions.includes(appMethod)) {
return handleUIKitInteraction(appMethod, params).then((result) => {
if (result instanceof JsonRpcError) {
app?.getLogger().debug(`'${appMethod}' was unsuccessful.`, result.message);
} else {
app?.getLogger().debug(`'${appMethod}' was successfully called! The result is:`, result);
}
if (uikitInteractions.includes(appMethod)) {
return handleUIKitInteraction(appMethod, params).then((result) => {
if (result instanceof JsonRpcError) {
app?.getLogger().debug(`'${appMethod}' was unsuccessful.`, result.message);
} else {
app?.getLogger().debug(`'${appMethod}' was successfully called! The result is:`, result);
}

return result;
});
}
return result;
});
}

if (appMethod.startsWith('check') || appMethod.startsWith('execute')) {
return handleListener(appMethod, params).then((result) => {
if (result instanceof JsonRpcError) {
app?.getLogger().debug(`'${appMethod}' was unsuccessful.`, result.message);
} else {
app?.getLogger().debug(`'${appMethod}' was successfully called! The result is:`, result);
}
if (appMethod.startsWith('check') || appMethod.startsWith('execute')) {
return handleListener(appMethod, params).then((result) => {
if (result instanceof JsonRpcError) {
app?.getLogger().debug(`'${appMethod}' was unsuccessful.`, result.message);
} else {
app?.getLogger().debug(`'${appMethod}' was successfully called! The result is:`, result);
}

return result;
});
}
return result;
});
}

try {
let result: Defined | JsonRpcError;

switch (appMethod) {
Expand Down
4 changes: 4 additions & 0 deletions packages/apps-engine/deno-runtime/lib/messenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ export const Queue = new (class Queue {
this.queue.push(encoder.encode(message));
this.processQueue();
}

public getCurrentSize() {
return this.queue.length;
}
});

export const Transport = new (class Transporter {
Expand Down
20 changes: 20 additions & 0 deletions packages/apps-engine/deno-runtime/lib/metricsCollector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { writeAll } from "https://deno.land/[email protected]/io/write_all.ts";
import { Queue } from "./messenger.ts";

export function collectMetrics() {
return {
queueSize: Queue.getCurrentSize(),
}
};

const encoder = new TextEncoder();

export async function sendMetrics() {
const metrics = collectMetrics();

await writeAll(Deno.stderr, encoder.encode(JSON.stringify(metrics)));
}

export function startMetricsReport() {
setInterval(sendMetrics, 5000);
}
3 changes: 3 additions & 0 deletions packages/apps-engine/deno-runtime/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import apiHandler from './handlers/api-handler.ts';
import handleApp from './handlers/app/handler.ts';
import handleScheduler from './handlers/scheduler-handler.ts';
import registerErrorListeners from './error-handlers.ts';
import { startMetricsReport } from "./lib/metricsCollector.ts";

type Handlers = {
app: typeof handleApp;
Expand Down Expand Up @@ -130,3 +131,5 @@ async function main() {
registerErrorListeners();

main();

startMetricsReport();
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {

logger.info('Successfully restarted app subprocess');
} catch (e) {
logger.error("Failed to restart app's subprocess", { error: e });
logger.error("Failed to restart app's subprocess", { error: e.message || e });
} finally {
await this.logStorage.storeEntries(AppConsole.toStorageEntry(this.getAppId(), logger));
}
Expand All @@ -322,18 +322,24 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
}

private waitUntilReady(): Promise<void> {
if (this.state === 'ready') {
return;
}

return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => reject(new Error(`[${this.getAppId()}] Timeout: app process not ready`)), this.options.timeout);
let timeoutId: NodeJS.Timeout;

if (this.state === 'ready') {
const handler = () => {
clearTimeout(timeoutId);
return resolve();
}
resolve();
};

this.once('ready', () => {
clearTimeout(timeoutId);
return resolve();
});
timeoutId = setTimeout(() => {
this.off('ready', handler);
reject(new Error(`[${this.getAppId()}] Timeout: app process not ready`));
}, this.options.timeout);

this.once('ready', handler);
});
}

Expand Down Expand Up @@ -637,6 +643,12 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
}

private async parseError(chunk: Buffer): Promise<void> {
console.error('Subprocess stderr', chunk.toString());
try {
const data = JSON.parse(chunk.toString());

this.debug('Metrics received from subprocess: %o', data);
} catch (e) {
console.error('Subprocess stderr', chunk.toString());
}
}
}
15 changes: 11 additions & 4 deletions packages/apps-engine/src/server/runtime/deno/LivenessManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export class LivenessManager {

this.controller.once('ready', () => this.ping());
this.subprocess.once('exit', this.handleExit.bind(this));
this.subprocess.once('error', this.handleError.bind(this));
}

/**
Expand Down Expand Up @@ -155,6 +156,11 @@ export class LivenessManager {
this.messenger.send(COMMAND_PING);
}

private handleError(err: Error) {
this.debug('App has failed to start.`', err);
this.restartProcess(err.message);
}

private handleExit(exitCode: number, signal: string) {
this.pingAbortController.emit('abort');

Expand All @@ -178,22 +184,23 @@ export class LivenessManager {
this.restartProcess(reason);
}

private restartProcess(reason: string) {
private async restartProcess(reason: string) {
if (this.restartCount >= this.options.maxRestarts) {
this.debug('Limit of restarts reached (%d). Aborting restart...', this.options.maxRestarts);
this.controller.stopApp();
return;
}

this.pingTimeoutConsecutiveCount = 0;
this.restartCount++;
this.restartLog.push({
reason,
restartedAt: new Date(),
source: 'liveness-manager',
pid: this.subprocess.pid,
});

this.controller.restartApp();
await this.controller.restartApp();

this.pingTimeoutConsecutiveCount = 0;
this.restartCount++;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { ChildProcess } from 'child_process';
import type { ChildProcess } from 'child_process';

import type { JsonRpc } from 'jsonrpc-lite';

import { encoder } from './codec';

export class ProcessMessenger {
private deno: ChildProcess;
private deno: ChildProcess | undefined;

private _sendStrategy: (message: JsonRpc) => void;

Expand All @@ -30,7 +30,7 @@ export class ProcessMessenger {
}

private switchStrategy() {
if (this.deno instanceof ChildProcess) {
if (this.deno?.stdin?.writable) {
this._sendStrategy = this.strategySend.bind(this);
} else {
this._sendStrategy = this.strategyError.bind(this);
Expand Down
Loading