From 4bb27dda16f5ea2be3f18f01448b3bc654c0fa76 Mon Sep 17 00:00:00 2001 From: Andrey Melnikov Date: Wed, 20 Dec 2023 13:29:56 -0800 Subject: [PATCH] Convert worker pool to typescript, use existing methods for worker logic --- src/index.ts | 13 ++-- src/workers/worker-pool.ts | 119 +++++++++++++++++++++++++++++++++++++ src/workers/worker.ts | 24 +------- src/workers/worker_pool.js | 86 --------------------------- webpack.main.config.ts | 12 +++- yarn.lock | 5 ++ 6 files changed, 145 insertions(+), 114 deletions(-) create mode 100644 src/workers/worker-pool.ts delete mode 100644 src/workers/worker_pool.js diff --git a/src/index.ts b/src/index.ts index 881701f..6a7bbdd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,9 +5,8 @@ import { AppFile, HeicFileResponse, ReaddirOptions } from './types/filesystem'; import Store from './configuration/store'; import * as fswin from "fswin"; import { fileExists } from './server/filesystem'; -import WorkerPool from './workers/worker_pool.js'; +import WorkerPool from './workers/worker-pool'; import os from 'node:os'; -import decode from 'heic-decode'; import { TaskAction } from './workers/types'; // This allows TypeScript to pick up the magic constants that's auto-generated by Forge's Webpack @@ -100,7 +99,12 @@ async function filesystemGetImageIconPath(event: IpcMainInvokeEvent, path: strin outputPath: cachedFilePath, width, height - }, (err: any, result: string) => { + }, (err) => { + if (err) { + console.error(err); + return; + } + resolve(cachedFilePath); }); }); @@ -122,7 +126,7 @@ if (require('electron-squirrel-startup')) { app.quit(); } -// Allow showing filesystem images in app. +// Allow showing filesystem images and videos in app. protocol.registerSchemesAsPrivileged([ { scheme: 'app', privileges: { bypassCSP: true, stream: true } } ]); @@ -149,7 +153,6 @@ const createWindow = (): void => { // Some APIs can only be used after this event occurs. app.whenReady().then(async () => { const userDataPath = app.getPath('userData'); - console.log(userDataPath); const store = new Store(resolve(userDataPath, "configuration.json")); const imageCache = resolve(userDataPath, "image_cache"); diff --git a/src/workers/worker-pool.ts b/src/workers/worker-pool.ts new file mode 100644 index 0000000..6924ac6 --- /dev/null +++ b/src/workers/worker-pool.ts @@ -0,0 +1,119 @@ +import { AsyncResource } from 'node:async_hooks'; +import { EventEmitter } from 'node:events'; +import { resolve } from 'node:path'; +import { Worker } from 'node:worker_threads'; +import { WorkerTask } from './types'; + +export type TaskInfoCallback = (err: Error | null, result: unknown) => void; + +class WorkerPoolTaskInfo extends AsyncResource { + constructor(private callback: TaskInfoCallback) { + super('WorkerPoolTaskInfo'); + } + + done(err: Error | null, result: unknown) { + this.runInAsyncScope(this.callback, null, err, result); + this.emitDestroy(); // `TaskInfo`s are used only once. + } +} + +interface WorkerPoolTask { + task: WorkerTask; + callback: TaskInfoCallback; +} + +export default class WorkerPool extends EventEmitter { + static readonly kWorkerFreedEvent = Symbol('kWorkerFreedEvent'); + + private workers: Worker[] = []; + private freeWorkers: Worker[] = []; + private tasks: WorkerPoolTask[] = []; + private workerTasks = new Map(); + + constructor(numThreads: number) { + super(); + + for (let i = 0; i < numThreads; i++) { + this.addNewWorker(); + } + + // Any time the kWorkerFreedEvent is emitted, dispatch + // the next task pending in the queue, if any. + this.on(WorkerPool.kWorkerFreedEvent, () => { + if (this.tasks.length > 0) { + const nextTask = this.tasks.shift(); + if (nextTask) { + this.runTask(nextTask.task, nextTask.callback); + } + } + }); + } + + addNewWorker() { + // TODO is there a nicer way to get this dependency instead of knowing it eventually becomes .js? + const worker = new Worker(resolve(__dirname, 'worker.js')); + worker.on('message', (result) => { + // In case of success: Call the callback that was passed to `runTask`, + // remove the `TaskInfo` associated with the Worker, and mark it as free + // again. + + const taskInfo = this.workerTasks.get(worker); + if (!taskInfo) { + this.emit('error', new Error("Task info not found for worker")); + return; + } + + taskInfo.done(null, result); + this.workerTasks.delete(worker); + + this.freeWorkers.push(worker); + this.emit(WorkerPool.kWorkerFreedEvent); + }); + + worker.on('error', (err) => { + // In case of an uncaught exception: Call the callback that was passed to + // `runTask` with the error. + + const taskInfo = this.workerTasks.get(worker); + if (taskInfo) { + taskInfo.done(err, null); + } else { + this.emit('error', err); + } + + this.workerTasks.delete(worker); + + // Remove the worker from the list and start a new Worker to replace the + // current one. + this.workers.splice(this.workers.indexOf(worker), 1); + this.addNewWorker(); + }); + + this.workers.push(worker); + this.freeWorkers.push(worker); + this.emit(WorkerPool.kWorkerFreedEvent); + } + + runTask(task: WorkerTask, callback: TaskInfoCallback) { + if (this.freeWorkers.length === 0) { + // No free threads, wait until a worker thread becomes free. + this.tasks.push({ task, callback }); + return; + } + + const worker = this.freeWorkers.pop(); + if (worker === undefined) { + this.emit('error', "Unable to get free worker despite length !== 0"); + return; + } + + this.workerTasks.set(worker, new WorkerPoolTaskInfo(callback)); + worker.postMessage(task); + } + + close() { + for (const worker of this.workers) { + worker.terminate(); + } + } +} \ No newline at end of file diff --git a/src/workers/worker.ts b/src/workers/worker.ts index 001dec2..2569ab2 100644 --- a/src/workers/worker.ts +++ b/src/workers/worker.ts @@ -1,27 +1,9 @@ -import { Worker, isMainThread, parentPort, workerData } from "node:worker_threads"; -import { CreateIconTask, TaskAction, WorkerTask } from "./types"; - -import { extname } from "node:path"; -import sharp from "sharp"; +import { parentPort } from "node:worker_threads"; +import { TaskAction, WorkerTask } from "./types"; import { readFile } from "node:fs/promises"; -// TODO types import decode from 'heic-decode'; -import { Task } from "electron"; - -async function createIcon(inputPath: string, outputPath: string, width: number, height: number): Promise { - const ext = extname(inputPath).toLowerCase(); - - if (ext === ".heic") { - const inputBuffer = await readFile(inputPath); - const { data, width: imageWidth, height: imageHeight } = await decode({ buffer: inputBuffer }); - - // TODO 4 is a guess - need to validate - await sharp(data, { raw: { width: imageWidth, height: imageHeight, channels: 4 } }).resize(width, height).jpeg().toFile(outputPath); - } else { - await sharp(inputPath).resize(width, height).jpeg().toFile(outputPath); - } -} +import { createIcon } from "app/server/image"; function main() { if (parentPort === null) { diff --git a/src/workers/worker_pool.js b/src/workers/worker_pool.js deleted file mode 100644 index 78954e7..0000000 --- a/src/workers/worker_pool.js +++ /dev/null @@ -1,86 +0,0 @@ -import { AsyncResource } from 'node:async_hooks'; -import { EventEmitter } from 'node:events'; -import { resolve } from 'node:path'; -import { Worker } from 'node:worker_threads'; - -const kTaskInfo = Symbol('kTaskInfo'); -const kWorkerFreedEvent = Symbol('kWorkerFreedEvent'); - -// TODO Typescript -class WorkerPoolTaskInfo extends AsyncResource { - constructor(callback) { - super('WorkerPoolTaskInfo'); - this.callback = callback; - } - - done(err, result) { - this.runInAsyncScope(this.callback, null, err, result); - this.emitDestroy(); // `TaskInfo`s are used only once. - } -} - -export default class WorkerPool extends EventEmitter { - constructor(numThreads) { - super(); - this.numThreads = numThreads; - this.workers = []; - this.freeWorkers = []; - this.tasks = []; - - for (let i = 0; i < numThreads; i++) - this.addNewWorker(); - - // Any time the kWorkerFreedEvent is emitted, dispatch - // the next task pending in the queue, if any. - this.on(kWorkerFreedEvent, () => { - if (this.tasks.length > 0) { - const { task, callback } = this.tasks.shift(); - this.runTask(task, callback); - } - }); - } - - addNewWorker() { - const worker = new Worker(new URL(resolve(__dirname, 'worker.js'), import.meta.url)); - worker.on('message', (result) => { - // In case of success: Call the callback that was passed to `runTask`, - // remove the `TaskInfo` associated with the Worker, and mark it as free - // again. - worker[kTaskInfo].done(null, result); - worker[kTaskInfo] = null; - this.freeWorkers.push(worker); - this.emit(kWorkerFreedEvent); - }); - worker.on('error', (err) => { - // In case of an uncaught exception: Call the callback that was passed to - // `runTask` with the error. - if (worker[kTaskInfo]) - worker[kTaskInfo].done(err, null); - else - this.emit('error', err); - // Remove the worker from the list and start a new Worker to replace the - // current one. - this.workers.splice(this.workers.indexOf(worker), 1); - this.addNewWorker(); - }); - this.workers.push(worker); - this.freeWorkers.push(worker); - this.emit(kWorkerFreedEvent); - } - - runTask(task, callback) { - if (this.freeWorkers.length === 0) { - // No free threads, wait until a worker thread becomes free. - this.tasks.push({ task, callback }); - return; - } - - const worker = this.freeWorkers.pop(); - worker[kTaskInfo] = new WorkerPoolTaskInfo(callback); - worker.postMessage(task); - } - - close() { - for (const worker of this.workers) worker.terminate(); - } -} \ No newline at end of file diff --git a/webpack.main.config.ts b/webpack.main.config.ts index bc6cbed..53c8926 100644 --- a/webpack.main.config.ts +++ b/webpack.main.config.ts @@ -1,4 +1,6 @@ import type { Configuration } from 'webpack'; +import { resolve } from "node:path"; + import { rules } from './webpack.rules'; import { plugins } from './webpack.plugins'; @@ -10,8 +12,8 @@ export const mainConfig: Configuration = { */ entry: { index: './src/index.ts', - "worker_pool": { import: './src/workers/worker_pool.js', filename: "[name].js" }, - worker: { import: './src/workers/worker.ts', filename: "[name].js" } + "worker-pool": './src/workers/worker_pool.ts', + worker: './src/workers/worker.ts' }, // Put your normal webpack config below here module: { @@ -20,5 +22,11 @@ export const mainConfig: Configuration = { plugins, resolve: { extensions: ['.js', '.ts', '.jsx', '.tsx', '.css', '.json'], + alias: { + app: resolve(__dirname, "src/") + } }, + output: { + filename: '[name].js' + } }; diff --git a/yarn.lock b/yarn.lock index a6c5586..299959a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1534,6 +1534,11 @@ resolved "https://registry.npmjs.org/@types/heic-convert/-/heic-convert-1.2.3.tgz" integrity sha512-5LJ2fGuVk/gnOLihoT56xJwrXxfnNepGvrHwlW5ZtT3HS4jO1AqBaAHCxXUpnY9UaD3zYcyxXMRM2fNN1AFF/Q== +"@types/heic-decode@^1.1.2": + version "1.1.2" + resolved "https://registry.npmjs.org/@types/heic-decode/-/heic-decode-1.1.2.tgz" + integrity sha512-E1Dw+gpIuiHJcTpFBHm+V/6eYsPhtWook1Z6MIG0H1ONWBp9Z6l3FKydUxFw9GL9JqrWB1qa/uQHdbn/KUeXYw== + "@types/html-minifier-terser@^6.0.0": version "6.1.0" resolved "https://registry.npmjs.org/@types/html-minifier-terser/-/html-minifier-terser-6.1.0.tgz"