Skip to content

Commit

Permalink
Convert worker pool to typescript, use existing methods for worker logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Vafilor committed Dec 20, 2023
1 parent 161f1c3 commit 4bb27dd
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 114 deletions.
13 changes: 8 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import { AppFile, HeicFileResponse, ReaddirOptions } from './types/filesystem';
import Store from './configuration/store';
import * as fswin from "fswin";
import { fileExists } from './server/filesystem';
import WorkerPool from './workers/worker_pool.js';
import WorkerPool from './workers/worker-pool';
import os from 'node:os';
import decode from 'heic-decode';
import { TaskAction } from './workers/types';

// This allows TypeScript to pick up the magic constants that's auto-generated by Forge's Webpack
Expand Down Expand Up @@ -100,7 +99,12 @@ async function filesystemGetImageIconPath(event: IpcMainInvokeEvent, path: strin
outputPath: cachedFilePath,
width,
height
}, (err: any, result: string) => {
}, (err) => {
if (err) {
console.error(err);
return;
}

resolve(cachedFilePath);
});
});
Expand All @@ -122,7 +126,7 @@ if (require('electron-squirrel-startup')) {
app.quit();
}

// Allow showing filesystem images in app.
// Allow showing filesystem images and videos in app.
protocol.registerSchemesAsPrivileged([
{ scheme: 'app', privileges: { bypassCSP: true, stream: true } }
]);
Expand All @@ -149,7 +153,6 @@ const createWindow = (): void => {
// Some APIs can only be used after this event occurs.
app.whenReady().then(async () => {
const userDataPath = app.getPath('userData');
console.log(userDataPath);
const store = new Store(resolve(userDataPath, "configuration.json"));

const imageCache = resolve(userDataPath, "image_cache");
Expand Down
119 changes: 119 additions & 0 deletions src/workers/worker-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import { resolve } from 'node:path';
import { Worker } from 'node:worker_threads';
import { WorkerTask } from './types';

export type TaskInfoCallback = (err: Error | null, result: unknown) => void;

class WorkerPoolTaskInfo extends AsyncResource {
constructor(private callback: TaskInfoCallback) {
super('WorkerPoolTaskInfo');
}

done(err: Error | null, result: unknown) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}

interface WorkerPoolTask {
task: WorkerTask;
callback: TaskInfoCallback;
}

export default class WorkerPool extends EventEmitter {
static readonly kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

private workers: Worker[] = [];
private freeWorkers: Worker[] = [];
private tasks: WorkerPoolTask[] = [];
private workerTasks = new Map<Worker, WorkerPoolTaskInfo>();

constructor(numThreads: number) {
super();

for (let i = 0; i < numThreads; i++) {
this.addNewWorker();
}

// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(WorkerPool.kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const nextTask = this.tasks.shift();
if (nextTask) {
this.runTask(nextTask.task, nextTask.callback);
}
}
});
}

addNewWorker() {
// TODO is there a nicer way to get this dependency instead of knowing it eventually becomes .js?
const worker = new Worker(resolve(__dirname, 'worker.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.

const taskInfo = this.workerTasks.get(worker);
if (!taskInfo) {
this.emit('error', new Error("Task info not found for worker"));
return;
}

taskInfo.done(null, result);
this.workerTasks.delete(worker);

this.freeWorkers.push(worker);
this.emit(WorkerPool.kWorkerFreedEvent);
});

worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.

const taskInfo = this.workerTasks.get(worker);
if (taskInfo) {
taskInfo.done(err, null);
} else {
this.emit('error', err);
}

this.workerTasks.delete(worker);

// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});

this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(WorkerPool.kWorkerFreedEvent);
}

runTask(task: WorkerTask, callback: TaskInfoCallback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}

const worker = this.freeWorkers.pop();
if (worker === undefined) {
this.emit('error', "Unable to get free worker despite length !== 0");
return;
}

this.workerTasks.set(worker, new WorkerPoolTaskInfo(callback));
worker.postMessage(task);
}

close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}
24 changes: 3 additions & 21 deletions src/workers/worker.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,9 @@
import { Worker, isMainThread, parentPort, workerData } from "node:worker_threads";
import { CreateIconTask, TaskAction, WorkerTask } from "./types";

import { extname } from "node:path";
import sharp from "sharp";
import { parentPort } from "node:worker_threads";
import { TaskAction, WorkerTask } from "./types";

import { readFile } from "node:fs/promises";
// TODO types
import decode from 'heic-decode';
import { Task } from "electron";

async function createIcon(inputPath: string, outputPath: string, width: number, height: number): Promise<void> {
const ext = extname(inputPath).toLowerCase();

if (ext === ".heic") {
const inputBuffer = await readFile(inputPath);
const { data, width: imageWidth, height: imageHeight } = await decode({ buffer: inputBuffer });

// TODO 4 is a guess - need to validate
await sharp(data, { raw: { width: imageWidth, height: imageHeight, channels: 4 } }).resize(width, height).jpeg().toFile(outputPath);
} else {
await sharp(inputPath).resize(width, height).jpeg().toFile(outputPath);
}
}
import { createIcon } from "app/server/image";

function main() {
if (parentPort === null) {
Expand Down
86 changes: 0 additions & 86 deletions src/workers/worker_pool.js

This file was deleted.

12 changes: 10 additions & 2 deletions webpack.main.config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { Configuration } from 'webpack';
import { resolve } from "node:path";


import { rules } from './webpack.rules';
import { plugins } from './webpack.plugins';
Expand All @@ -10,8 +12,8 @@ export const mainConfig: Configuration = {
*/
entry: {
index: './src/index.ts',
"worker_pool": { import: './src/workers/worker_pool.js', filename: "[name].js" },
worker: { import: './src/workers/worker.ts', filename: "[name].js" }
"worker-pool": './src/workers/worker_pool.ts',
worker: './src/workers/worker.ts'
},
// Put your normal webpack config below here
module: {
Expand All @@ -20,5 +22,11 @@ export const mainConfig: Configuration = {
plugins,
resolve: {
extensions: ['.js', '.ts', '.jsx', '.tsx', '.css', '.json'],
alias: {
app: resolve(__dirname, "src/")
}
},
output: {
filename: '[name].js'
}
};
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,11 @@
resolved "https://registry.npmjs.org/@types/heic-convert/-/heic-convert-1.2.3.tgz"
integrity sha512-5LJ2fGuVk/gnOLihoT56xJwrXxfnNepGvrHwlW5ZtT3HS4jO1AqBaAHCxXUpnY9UaD3zYcyxXMRM2fNN1AFF/Q==

"@types/heic-decode@^1.1.2":
version "1.1.2"
resolved "https://registry.npmjs.org/@types/heic-decode/-/heic-decode-1.1.2.tgz"
integrity sha512-E1Dw+gpIuiHJcTpFBHm+V/6eYsPhtWook1Z6MIG0H1ONWBp9Z6l3FKydUxFw9GL9JqrWB1qa/uQHdbn/KUeXYw==

"@types/html-minifier-terser@^6.0.0":
version "6.1.0"
resolved "https://registry.npmjs.org/@types/html-minifier-terser/-/html-minifier-terser-6.1.0.tgz"
Expand Down

0 comments on commit 4bb27dd

Please sign in to comment.