Skip to content

Commit

Permalink
Merge pull request #4185 from balena-io/reverse-control-flow
Browse files Browse the repository at this point in the history
Patch: switch from node-ipc to ws
  • Loading branch information
flowzone-app[bot] authored Apr 23, 2024
2 parents 5ad8d5a + ccc31bb commit 0a243ca
Show file tree
Hide file tree
Showing 19 changed files with 999 additions and 662 deletions.
4 changes: 2 additions & 2 deletions forge.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const config: ForgeConfig = {
darwinDarkModeSupport: true,
protocols: [{ name: 'etcher', schemes: ['etcher'] }],
extraResource: [
'lib/shared/catalina-sudo/sudo-askpass.osascript-zh.js',
'lib/shared/catalina-sudo/sudo-askpass.osascript-en.js',
'lib/shared/sudo/sudo-askpass.osascript-zh.js',
'lib/shared/sudo/sudo-askpass.osascript-en.js',
],
osxSign: {
optionsForFile: () => ({
Expand Down
6 changes: 3 additions & 3 deletions lib/gui/app/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import * as flashState from './models/flash-state';
import * as settings from './models/settings';
import { Actions, observe, store } from './models/store';
import * as analytics from './modules/analytics';
import { startApiAndSpawnChild } from './modules/api';
import { spawnChildAndConnect } from './modules/api';
import * as exceptionReporter from './modules/exception-reporter';
import * as osDialog from './os/dialog';
import * as windowProgress from './os/window-progress';
Expand Down Expand Up @@ -139,11 +139,11 @@ function setDrives(drives: Dictionary<DrivelistDrive>) {
export let requestMetadata: any;

// start the api and spawn the child process
startApiAndSpawnChild({
spawnChildAndConnect({
withPrivileges: false,
}).then(({ emit, registerHandler }) => {
// start scanning
emit('scan');
emit('scan', {});

// make the sourceMetada awaitable to be used on source selection
requestMetadata = async (params: any): Promise<SourceMetadata> => {
Expand Down
286 changes: 177 additions & 109 deletions lib/gui/app/modules/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@
* - centralise the api for both the writer and the scanner instead of having two instances running
*/

import * as ipc from 'node-ipc';
import { spawn } from 'child_process';
import WebSocket from 'ws'; // (no types for wrapper, this is expected)
import { spawn, exec } from 'child_process';
import * as os from 'os';
import * as path from 'path';
import * as packageJSON from '../../../../package.json';
import * as permissions from '../../../shared/permissions';
import * as errors from '../../../shared/errors';

const THREADS_PER_CPU = 16;

// NOTE: Ensure this isn't disabled, as it will cause
// the stdout maxBuffer size to be exceeded when flashing
ipc.config.silent = true;
const connectionRetryDelay = 1000;
const connectionRetryAttempts = 10;

async function writerArgv(): Promise<string[]> {
let entryPoint = await window.etcher.getEtcherUtilPath();
Expand All @@ -45,139 +42,210 @@ async function writerArgv(): Promise<string[]> {
}
}

function writerEnv(
IPC_CLIENT_ID: string,
IPC_SERVER_ID: string,
IPC_SOCKET_ROOT: string,
async function spawnChild(
withPrivileges: boolean,
etcherServerId: string,
etcherServerAddress: string,
etcherServerPort: string,
) {
return {
IPC_SERVER_ID,
IPC_CLIENT_ID,
IPC_SOCKET_ROOT,
const argv = await writerArgv();
const env: any = {
ETCHER_SERVER_ADDRESS: etcherServerAddress,
ETCHER_SERVER_ID: etcherServerId,
ETCHER_SERVER_PORT: etcherServerPort,
UV_THREADPOOL_SIZE: (os.cpus().length * THREADS_PER_CPU).toString(),
// This environment variable prevents the AppImages
// desktop integration script from presenting the
// "installation" dialog
SKIP: '1',
...(process.platform === 'win32' ? {} : process.env),
};
}

async function spawnChild({
withPrivileges,
IPC_CLIENT_ID,
IPC_SERVER_ID,
IPC_SOCKET_ROOT,
}: {
withPrivileges: boolean;
IPC_CLIENT_ID: string;
IPC_SERVER_ID: string;
IPC_SOCKET_ROOT: string;
}) {
const argv = await writerArgv();
const env = writerEnv(IPC_CLIENT_ID, IPC_SERVER_ID, IPC_SOCKET_ROOT);
if (withPrivileges) {
return await permissions.elevateCommand(argv, {
console.log('... with privileges ...');
return permissions.elevateCommand(argv, {
applicationName: packageJSON.displayName,
environment: env,
env,
});
} else {
const process = await spawn(argv[0], argv.slice(1), {
if (process.platform === 'win32') {
// we need to ensure we reset the env as a previous elevation process might have kept them in a wrong state
const envCommand = [];
for (const key in env) {
if (Object.prototype.hasOwnProperty.call(env, key)) {
envCommand.push(`set ${key}=${env[key]}`);
}
}
await exec(envCommand.join(' && '));
}
const spawned = await spawn(argv[0], argv.slice(1), {
env,
});
return { cancelled: false, process };
return { cancelled: false, spawned };
}
}

function terminateServer(server: any) {
// Turns out we need to destroy all sockets for
// the server to actually close. Otherwise, it
// just stops receiving any further connections,
// but remains open if there are active ones.
// @ts-ignore (no Server.sockets in @types/node-ipc)
for (const socket of server.sockets) {
socket.destroy();
}
server.stop();
}
type ChildApi = {
emit: (type: string, payload: any) => void;
registerHandler: (event: string, handler: any) => void;
failed: boolean;
};

async function connectToChildProcess(
etcherServerAddress: string,
etcherServerPort: string,
etcherServerId: string,
): Promise<ChildApi | { failed: boolean }> {
return new Promise((resolve, reject) => {
// TODO: default to IPC connections https://github.com/websockets/ws/blob/master/doc/ws.md#ipc-connections
// TOOD: use the path as cheap authentication
console.log(etcherServerId);

// TODO: replace the custom ipc events by one generic "message" for all communication with the backend
function startApiAndSpawnChild({
withPrivileges,
}: {
withPrivileges: boolean;
}): Promise<any> {
// There might be multiple Etcher instances running at
// the same time, also we might spawn multiple child and api so we must ensure each IPC
// server/client has a different name.
const IPC_SERVER_ID = `etcher-server-${process.pid}-${Date.now()}-${
withPrivileges ? 'privileged' : 'unprivileged'
}`;
const IPC_CLIENT_ID = `etcher-client-${process.pid}-${Date.now()}-${
withPrivileges ? 'privileged' : 'unprivileged'
}`;

const IPC_SOCKET_ROOT = path.join(
process.env.XDG_RUNTIME_DIR || os.tmpdir(),
path.sep,
);
const url = `ws://${etcherServerAddress}:${etcherServerPort}`;

ipc.config.id = IPC_SERVER_ID;
ipc.config.socketRoot = IPC_SOCKET_ROOT;
const ws = new WebSocket(url);

return new Promise((resolve, reject) => {
ipc.serve();
let heartbeat: any;

// log is special message which brings back the logs from the child process and prints them to the console
ipc.server.on('log', (message: string) => {
console.log(message);
});
const startHeartbeat = (emit: any) => {
console.log('start heartbeat');
heartbeat = setInterval(() => {
emit('heartbeat', {});
}, 1000);
};

// api to register more handlers with callbacks
const registerHandler = (event: string, handler: any) => {
ipc.server.on(event, handler);
const stopHeartbeat = () => {
console.log('stop heartbeat');
clearInterval(heartbeat);
};

// once api is ready (means child process is connected) we pass the emit and terminate function to the caller
ipc.server.on('ready', (_: any, socket) => {
const emit = (channel: string, data: any) => {
ipc.server.emit(socket, channel, data);
ws.on('error', (error: any) => {
if (error.code === 'ECONNREFUSED') {
resolve({
failed: true,
});
} else {
stopHeartbeat();
reject({
failed: true,
});
}
});

ws.on('open', () => {
const emit = (type: string, payload: any) => {
ws.send(JSON.stringify({ type, payload }));
};
resolve({
emit,
terminateServer: () => terminateServer(ipc.server),
registerHandler,

emit('ready', {});

// parse and route messages
const messagesHandler: any = {
log: (message: any) => {
console.log(`CHILD LOG: ${message}`);
},

error: (error: any) => {
const errorObject = errors.fromJSON(error);
console.error('CHILD ERROR', errorObject);
stopHeartbeat();
},

// once api is ready (means child process is connected) we pass the emit function to the caller
ready: () => {
console.log('CHILD READY');

startHeartbeat(emit);

resolve({
failed: false,
emit,
registerHandler,
});
},
};

ws.on('message', (jsonData: any) => {
const data = JSON.parse(jsonData);
const message = messagesHandler[data.type];
if (message) {
message(data.payload);
} else {
throw new Error(`Unknown message type: ${data.type}`);
}
});
});

// on api error we terminate
ipc.server.on('error', (error: any) => {
terminateServer(ipc.server);
const errorObject = errors.fromJSON(error);
reject(errorObject);
// api to register more handlers with callbacks
const registerHandler = (event: string, handler: any) => {
messagesHandler[event] = handler;
};
});
});
}

// when the api is started we spawn the child process
ipc.server.on('start', async () => {
try {
const results = await spawnChild({
withPrivileges,
IPC_CLIENT_ID,
IPC_SERVER_ID,
IPC_SOCKET_ROOT,
});
// this will happen if the child is spawned withPrivileges and privileges has been rejected
if (results.cancelled) {
reject();
}
} catch (error) {
reject(error);
async function spawnChildAndConnect({
withPrivileges,
}: {
withPrivileges: boolean;
}): Promise<ChildApi> {
const etcherServerAddress = process.env.ETCHER_SERVER_ADDRESS ?? '127.0.0.1'; // localhost
const etcherServerPort =
process.env.ETCHER_SERVER_PORT ?? withPrivileges ? '3435' : '3434';
const etcherServerId =
process.env.ETCHER_SERVER_ID ??
`etcher-${Math.random().toString(36).substring(7)}`;

console.log(
`Spawning ${
withPrivileges ? 'priviledged' : 'unpriviledged'
} sidecar on port ${etcherServerPort}`,
);

// spawn the child process, which will act as the ws server
// ETCHER_NO_SPAWN_UTIL can be set to launch a GUI only version of etcher, in that case you'll probably want to set other ENV to match your setup
if (!process.env.ETCHER_NO_SPAWN_UTIL) {
try {
const result = await spawnChild(
withPrivileges,
etcherServerId,
etcherServerAddress,
etcherServerPort,
);
if (result.cancelled) {
throw new Error('Spwaning the child process was cancelled');
}
});
} catch (error) {
console.error('Error spawning child process', error);
throw new Error('Error spawning the child process');
}
}

// start the server
ipc.server.start();
});
// try to connect to the ws server, retrying if necessary, until the connection is established
try {
let retry = 0;
while (retry < connectionRetryAttempts) {
const { emit, registerHandler, failed } = await connectToChildProcess(
etcherServerAddress,
etcherServerPort,
etcherServerId,
);
if (failed) {
retry++;
console.log(
`Retrying to connect to child process in ${connectionRetryDelay}... ${retry} / ${connectionRetryAttempts}`,
);
await new Promise((resolve) =>
setTimeout(resolve, connectionRetryDelay),
);
continue;
}
return { failed, emit, registerHandler };
}
throw new Error('Connection to etcher-util timed out');
} catch (error) {
console.error('Error connecting to child process', error);
throw new Error('Connection to etcher-util failed');
}
}

export { startApiAndSpawnChild };
export { spawnChildAndConnect };
Loading

0 comments on commit 0a243ca

Please sign in to comment.