Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: decoder error preventing succesfull app subprocess restart #34858

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .changeset/cool-planes-protect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
'@rocket.chat/omnichannel-transcript': patch
'@rocket.chat/authorization-service': patch
'@rocket.chat/stream-hub-service': patch
'@rocket.chat/presence-service': patch
'@rocket.chat/fuselage-ui-kit': patch
'@rocket.chat/account-service': patch
'@rocket.chat/mock-providers': patch
'@rocket.chat/ui-theming': patch
'@rocket.chat/uikit-playground': patch
'@rocket.chat/ddp-streamer': patch
'@rocket.chat/queue-worker': patch
'@rocket.chat/apps-engine': patch
'@rocket.chat/ui-composer': patch
'@rocket.chat/ui-contexts': patch
'@rocket.chat/ui-client': patch
'@rocket.chat/models': patch
'@rocket.chat/sha256': patch
'@rocket.chat/meteor': patch
---

Fixes an issue that prevented the apps-engine from reestablishing communications with subprocesses in some cases
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import * as jsonrpc from 'jsonrpc-lite';
import { LivenessManager } from './LivenessManager';
import { ProcessMessenger } from './ProcessMessenger';
import { bundleLegacyApp } from './bundler';
import { decoder } from './codec';
import { newDecoder } from './codec';
import { AppStatus, AppStatusUtils } from '../../../definition/AppStatus';
import type { AppMethod } from '../../../definition/metadata';
import type { AppManager } from '../../AppManager';
Expand Down Expand Up @@ -389,6 +389,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
console.error(`Failed to startup Deno subprocess for app ${this.getAppId()}`, err);
});
this.once('ready', this.onReady.bind(this));

this.parseStdout(this.deno.stdout);
}

Expand Down Expand Up @@ -610,51 +611,58 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
}

private async parseStdout(stream: Readable): Promise<void> {
for await (const message of decoder.decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}

const JSONRPCMessage = jsonrpc.parseObject(message);

if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}

if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}

if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
this.handleResultMessage(JSONRPCMessage).catch((reason) => console.error(`[${this.getAppId()}] Error executing handler`, reason, message));
continue;
}

console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error(`[${this.getAppId()}] Failed to parse message`);
continue;
try {
for await (const message of newDecoder().decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}

const JSONRPCMessage = jsonrpc.parseObject(message);

if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}

if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}

if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
this.handleResultMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}

console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error(`[${this.getAppId()}] Failed to parse message`);
continue;
}

console.error(`[${this.getAppId()}] Error executing handler`, e, message);
}

console.error(`[${this.getAppId()}] Error executing handler`, e, message);
}
} catch (e) {
console.error(`[${this.getAppId()}]`, e);
this.emit('error', new Error('DECODE_ERROR'));
d-gubert marked this conversation as resolved.
Show resolved Hide resolved
}
}

private async parseError(chunk: Buffer): Promise<void> {
try {
const data = JSON.parse(chunk.toString());

this.debug('Metrics received from subprocess: %o', data);
this.debug('Metrics received from subprocess (via stderr): %o', data);
} catch (e) {
console.error('Subprocess stderr', chunk.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ export class LivenessManager {
this.pingAbortController = new EventEmitter();

this.options = Object.assign({}, defaultOptions, options);

this.controller.on('ready', () => this.ping());
this.controller.on('error', async (reason) => {
if (reason instanceof Error && reason.message.startsWith('DECODE_ERROR')) {
await this.restartProcess('Decode error', 'controller');
}
})
}

public getRuntimeData() {
Expand All @@ -84,7 +91,6 @@ export class LivenessManager {

this.pingTimeoutConsecutiveCount = 0;

this.controller.once('ready', () => this.ping());
this.subprocess.once('exit', this.handleExit.bind(this));
this.subprocess.once('error', this.handleError.bind(this));
}
Expand Down Expand Up @@ -188,7 +194,7 @@ export class LivenessManager {
this.restartProcess(reason);
}

private async restartProcess(reason: string) {
private async restartProcess(reason: string, source = 'liveness-manager') {
if (this.restartCount >= this.options.maxRestarts) {
this.debug('Limit of restarts reached (%d). Aborting restart...', this.options.maxRestarts);
this.controller.stopApp();
Expand All @@ -197,8 +203,8 @@ export class LivenessManager {

this.restartLog.push({
reason,
source,
restartedAt: new Date(),
source: 'liveness-manager',
pid: this.subprocess.pid,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import type { ChildProcess } from 'child_process';

import type { JsonRpc } from 'jsonrpc-lite';

import { encoder } from './codec';
import { Encoder, newEncoder } from './codec';

export class ProcessMessenger {
private deno: ChildProcess | undefined;
private encoder: Encoder | undefined;

private _sendStrategy: (message: JsonRpc) => void;

Expand All @@ -25,13 +26,17 @@ export class ProcessMessenger {

public clearReceiver() {
delete this.deno;
delete this.encoder;

this.switchStrategy();
}

private switchStrategy() {
if (this.deno?.stdin?.writable) {
this._sendStrategy = this.strategySend.bind(this);

// Get a clean encoder
this.encoder = newEncoder();
} else {
this._sendStrategy = this.strategyError.bind(this);
}
Expand All @@ -43,6 +48,6 @@ export class ProcessMessenger {

private strategySend(message: JsonRpc) {
this.debug('Sending message to subprocess %o', message);
this.deno.stdin.write(encoder.encode(message));
this.deno.stdin.write(this.encoder.encode(message));
}
}
22 changes: 19 additions & 3 deletions packages/apps-engine/src/server/runtime/deno/codec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Decoder, Encoder, ExtensionCodec } from '@msgpack/msgpack';
import { Decoder as _Decoder, Encoder as _Encoder, ExtensionCodec } from '@msgpack/msgpack';

const extensionCodec = new ExtensionCodec();

Expand All @@ -10,6 +10,7 @@ extensionCodec.register({
return new Uint8Array([0]);
}
},

decode: (_data: Uint8Array) => undefined,
});

Expand All @@ -21,9 +22,24 @@ extensionCodec.register({
return new Uint8Array(object.buffer, object.byteOffset, object.byteLength);
}
},

// msgpack will reuse the Uint8Array instance, so WE NEED to copy it instead of simply creating a view
decode: (data: Uint8Array) => Buffer.from(data),
});

export const encoder = new Encoder({ extensionCodec });
export const decoder = new Decoder({ extensionCodec });
/**
* The Encoder and Decoder classes perform "stateful" operations, i.e. they read from a
* stream, store the data locally and decode it from its buffer.
*
* In practice, this affects the decoder when there is decode error. After an error, the decoder
* keeps the malformed data in its buffer, and even if we try to decode from another source (e.g. different stream)
* it will fail again as there's still data in the buffer.
*
* For that reason, we can't have a singleton instance of Encoder and Decoder, but rather one
* instance for each time we create a new subprocess
*/
export const newEncoder = () => new _Encoder({ extensionCodec });
export const newDecoder = () => new _Decoder({ extensionCodec });
d-gubert marked this conversation as resolved.
Show resolved Hide resolved

export type Encoder = _Encoder;
export type Decoder = _Decoder;
Loading