Skip to content

Commit

Permalink
fix: decoder error preventing succesfull app subprocess restart (#34858)
Browse files Browse the repository at this point in the history
  • Loading branch information
d-gubert committed Jan 3, 2025
1 parent c28e139 commit 43cf121
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 46 deletions.
22 changes: 22 additions & 0 deletions .changeset/cool-planes-protect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
'@rocket.chat/omnichannel-transcript': patch
'@rocket.chat/authorization-service': patch
'@rocket.chat/stream-hub-service': patch
'@rocket.chat/presence-service': patch
'@rocket.chat/fuselage-ui-kit': patch
'@rocket.chat/account-service': patch
'@rocket.chat/mock-providers': patch
'@rocket.chat/ui-theming': patch
'@rocket.chat/uikit-playground': patch
'@rocket.chat/ddp-streamer': patch
'@rocket.chat/queue-worker': patch
'@rocket.chat/apps-engine': patch
'@rocket.chat/ui-composer': patch
'@rocket.chat/ui-contexts': patch
'@rocket.chat/ui-client': patch
'@rocket.chat/models': patch
'@rocket.chat/sha256': patch
'@rocket.chat/meteor': patch
---

Fixes an issue that prevented the apps-engine from reestablishing communications with subprocesses in some cases
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import * as jsonrpc from 'jsonrpc-lite';
import { LivenessManager } from './LivenessManager';
import { ProcessMessenger } from './ProcessMessenger';
import { bundleLegacyApp } from './bundler';
import { decoder } from './codec';
import { newDecoder } from './codec';
import { AppStatus, AppStatusUtils } from '../../../definition/AppStatus';
import type { AppMethod } from '../../../definition/metadata';
import type { AppManager } from '../../AppManager';
Expand Down Expand Up @@ -383,6 +383,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
console.error(`Failed to startup Deno subprocess for app ${this.getAppId()}`, err);
});
this.once('ready', this.onReady.bind(this));

this.parseStdout(this.deno.stdout);
}

Expand Down Expand Up @@ -604,47 +605,60 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
}

private async parseStdout(stream: Readable): Promise<void> {
for await (const message of decoder.decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}

const JSONRPCMessage = jsonrpc.parseObject(message);

if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}

if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}

if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
this.handleResultMessage(JSONRPCMessage).catch((reason) => console.error(`[${this.getAppId()}] Error executing handler`, reason, message));
continue;
}

console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error(`[${this.getAppId()}] Failed to parse message`);
continue;
try {
for await (const message of newDecoder().decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}

const JSONRPCMessage = jsonrpc.parseObject(message);

if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}

if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}

if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
this.handleResultMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}

console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error(`[${this.getAppId()}] Failed to parse message`);
continue;
}

console.error(`[${this.getAppId()}] Error executing handler`, e, message);
}

console.error(`[${this.getAppId()}] Error executing handler`, e, message);
}
} catch (e) {
console.error(`[${this.getAppId()}]`, e);
this.emit('error', new Error('DECODE_ERROR'));
}
}

private async parseError(chunk: Buffer): Promise<void> {
console.error('Subprocess stderr', chunk.toString());
try {
const data = JSON.parse(chunk.toString());

this.debug('Metrics received from subprocess (via stderr): %o', data);
} catch (e) {
console.error('Subprocess stderr', chunk.toString());
}
}
}
12 changes: 9 additions & 3 deletions packages/apps-engine/src/server/runtime/deno/LivenessManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ export class LivenessManager {
this.pingAbortController = new EventEmitter();

this.options = Object.assign({}, defaultOptions, options);

this.controller.on('ready', () => this.ping());
this.controller.on('error', async (reason) => {
if (reason instanceof Error && reason.message.startsWith('DECODE_ERROR')) {
await this.restartProcess('Decode error', 'controller');
}
})
}

public getRuntimeData() {
Expand All @@ -84,7 +91,6 @@ export class LivenessManager {

this.pingTimeoutConsecutiveCount = 0;

this.controller.once('ready', () => this.ping());
this.subprocess.once('exit', this.handleExit.bind(this));
}

Expand Down Expand Up @@ -182,7 +188,7 @@ export class LivenessManager {
this.restartProcess(reason);
}

private async restartProcess(reason: string) {
private async restartProcess(reason: string, source = 'liveness-manager') {
if (this.restartCount >= this.options.maxRestarts) {
this.debug('Limit of restarts reached (%d). Aborting restart...', this.options.maxRestarts);
this.controller.stopApp();
Expand All @@ -193,8 +199,8 @@ export class LivenessManager {
this.restartCount++;
this.restartLog.push({
reason,
source,
restartedAt: new Date(),
source: 'liveness-manager',
pid: this.subprocess.pid,
});

Expand Down
12 changes: 9 additions & 3 deletions packages/apps-engine/src/server/runtime/deno/ProcessMessenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import { ChildProcess } from 'child_process';

import type { JsonRpc } from 'jsonrpc-lite';

import { encoder } from './codec';
import { type Encoder, newEncoder } from './codec';

export class ProcessMessenger {
private deno: ChildProcess;
private deno: ChildProcess | undefined;

private encoder: Encoder | undefined;

private _sendStrategy: (message: JsonRpc) => void;

Expand All @@ -25,13 +27,17 @@ export class ProcessMessenger {

public clearReceiver() {
delete this.deno;
delete this.encoder;

this.switchStrategy();
}

private switchStrategy() {
if (this.deno instanceof ChildProcess) {
this._sendStrategy = this.strategySend.bind(this);

// Get a clean encoder
this.encoder = newEncoder();
} else {
this._sendStrategy = this.strategyError.bind(this);
}
Expand All @@ -43,6 +49,6 @@ export class ProcessMessenger {

private strategySend(message: JsonRpc) {
this.debug('Sending message to subprocess %o', message);
this.deno.stdin.write(encoder.encode(message));
this.deno.stdin.write(this.encoder.encode(message));
}
}
22 changes: 19 additions & 3 deletions packages/apps-engine/src/server/runtime/deno/codec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Decoder, Encoder, ExtensionCodec } from '@msgpack/msgpack';
import { Decoder as _Decoder, Encoder as _Encoder, ExtensionCodec } from '@msgpack/msgpack';

const extensionCodec = new ExtensionCodec();

Expand All @@ -10,6 +10,7 @@ extensionCodec.register({
return new Uint8Array([0]);
}
},

decode: (_data: Uint8Array) => undefined,
});

Expand All @@ -21,9 +22,24 @@ extensionCodec.register({
return new Uint8Array(object.buffer, object.byteOffset, object.byteLength);
}
},

// msgpack will reuse the Uint8Array instance, so WE NEED to copy it instead of simply creating a view
decode: (data: Uint8Array) => Buffer.from(data),
});

export const encoder = new Encoder({ extensionCodec });
export const decoder = new Decoder({ extensionCodec });
/**
* The Encoder and Decoder classes perform "stateful" operations, i.e. they read from a
* stream, store the data locally and decode it from its buffer.
*
* In practice, this affects the decoder when there is decode error. After an error, the decoder
* keeps the malformed data in its buffer, and even if we try to decode from another source (e.g. different stream)
* it will fail again as there's still data in the buffer.
*
* For that reason, we can't have a singleton instance of Encoder and Decoder, but rather one
* instance for each time we create a new subprocess
*/
export const newEncoder = () => new _Encoder({ extensionCodec });
export const newDecoder = () => new _Decoder({ extensionCodec });

export type Encoder = _Encoder;
export type Decoder = _Decoder;

0 comments on commit 43cf121

Please sign in to comment.