From 745f789dcfa0e91e47a2f975367d2557a43c6ad0 Mon Sep 17 00:00:00 2001 From: Eduardo Marcos Date: Wed, 10 May 2023 13:45:36 +0200 Subject: [PATCH 1/2] support params --- examples/basic/src/parallelizer-code.js | 2 +- src/child-process.js | 12 ++++++------ src/index.js | 18 +++++++++++------- src/worker-thread.js | 10 +++++----- test/benchmark.js | 5 +++-- 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/examples/basic/src/parallelizer-code.js b/examples/basic/src/parallelizer-code.js index e71d065..fe316a2 100644 --- a/examples/basic/src/parallelizer-code.js +++ b/examples/basic/src/parallelizer-code.js @@ -2,7 +2,7 @@ const fs = require('fs'); const crypto = require('crypto'); -const batchProcessor = async ({ batch }) => { +const batchProcessor = async ({ batch, params }) => { let sum = 0; for (let id = 0; id <= (batch.length * 1000000); id++) { diff --git a/src/child-process.js b/src/child-process.js index de86dd7..5f56575 100644 --- a/src/child-process.js +++ b/src/child-process.js @@ -43,7 +43,7 @@ class ChildProcess { } } - async runBatch(batch) { + async runBatch(batch, params = null) { if (this.childProcesses.length === 0) { throw new Error('No child processes created. Please run "createChildProcesses" method before "runBatch"') } @@ -55,7 +55,7 @@ class ChildProcess { const batches = findSubsets(batch, batchCount); // Process the batches using the child processes. - return await this._processBatchesInForks(batches); + return await this._processBatchesInForks(batches, params); } removeChildProcesses() { @@ -63,7 +63,7 @@ class ChildProcess { this.childProcesses = []; this._removeChildFile(); } - + removeChildThreads() { this.removeChildProcesses(); } @@ -72,7 +72,7 @@ class ChildProcess { this.childProcesses.forEach(child => { child.removeAllListeners('exit'); child.removeAllListeners('message') }); } - async _processBatchesInForks(batches) { + async _processBatchesInForks(batches, params = null) { const batchesCount = batches.length; const childResponses = { responses: [], @@ -157,7 +157,7 @@ class ChildProcess { }); // Send message to child. - this.childProcesses[id].send({ id, batch: batches[id] }); + this.childProcesses[id].send({ id, batch: batches[id], params }); } }) @@ -221,7 +221,7 @@ const mainLogger = ({ message, params = {}, logType = 'log' }) => { // Listening to parent's messages. process.on("message", async (message) => { try { - const reponse = await processBatch({ batch: message.batch, mainLogger }); + const reponse = await processBatch({ batch: message.batch, params: message.params, mainLogger }); process.send({ type: "MESSAGE", status: "SUCCESS", reponse }); } catch (e) { diff --git a/src/index.js b/src/index.js index ac0a39c..3d5e580 100644 --- a/src/index.js +++ b/src/index.js @@ -39,18 +39,22 @@ class Parallelizer { } - async run(paramsList) { + async run(data, params = null) { if (Object.keys(this.childThreads).length == 1) { - return this.childThreads[SINGLE_CHILD_THREAD_ID].runBatch(paramsList); + return this.childThreads[SINGLE_CHILD_THREAD_ID].runBatch(data, params); } - if (!isArray(paramsList)) { - paramsList.id = SINGLE_CHILD_THREAD_ID; - paramsList = [paramsList]; + if (!isArray(data)) { + data.id = SINGLE_CHILD_THREAD_ID; + data.params = data.params || params; + data = [data]; } - return await Promise.all(paramsList.map(({ id, batch }) => { - return this.childThreads[id].runBatch(batch) + return await Promise.all(data.map(item => { + const batch = item.batch + const itemParams = item.params || params + + return this.childThreads[item.id].runBatch(batch, params = itemParams) })); } diff --git a/src/worker-thread.js b/src/worker-thread.js index 7a3dcaf..cfae064 100644 --- a/src/worker-thread.js +++ b/src/worker-thread.js @@ -26,7 +26,7 @@ class WorkerThreads { this.threadsCount = (typeof this.parallelization === 'number') ? this.parallelization : this._getThreadsCount(); } - runBatch = async (batch) => { + runBatch = async (batch, params = null) => { // Get the amount of messages per batch. const batchCount = (batch.length < this.threadsCount) ? 1 : batch.length / this.threadsCount; @@ -34,7 +34,7 @@ class WorkerThreads { const batches = findSubsets(batch, batchCount); // Process the batches using the threads. - return await this._processBatchesInThreads(batches); + return await this._processBatchesInThreads(batches, params); } removeWorkerThreads() { @@ -44,7 +44,7 @@ class WorkerThreads { this._removeThreadFile(); } - _processBatchesInThreads = async (batches) => { + _processBatchesInThreads = async (batches, params = null) => { const batchesCount = batches.length; const threadResponses = { responses: [], @@ -55,7 +55,7 @@ class WorkerThreads { await new Promise((resolve, reject) => { for (let id = 0; id < batchesCount; id++) { - const worker = new Worker(this.tmpPath, { workerData: { id, batch: batches[id] } }); + const worker = new Worker(this.tmpPath, { workerData: { id, batch: batches[id], params } }); worker.on('error', (error) => { logger({ message: `Thread #${id} error message: ${error.message}`, @@ -141,7 +141,7 @@ const { workerData, parentPort } = require("worker_threads"); (async () => { try { - const reponse = await processBatch({ batch: workerData.batch }); + const reponse = await processBatch({ batch: workerData.batch, params: workerData.params }); parentPort.postMessage({ reponse, status: "SUCCESS" }); } catch (err) { parentPort.postMessage({ status: "FAILED", errorMessage: err.toString() }); diff --git a/test/benchmark.js b/test/benchmark.js index fd12896..5017eb0 100644 --- a/test/benchmark.js +++ b/test/benchmark.js @@ -37,11 +37,12 @@ suite })) // add listeners .on('cycle', function (event) { - childParallelizer.removeChildThreads(); - threadParallelizer.removeChildThreads(); console.log(String(event.target)); }) .on('complete', function () { + childParallelizer.removeChildThreads(); + threadParallelizer.removeChildThreads(); + console.log('\nResult: '); console.log('Fastest is ' + this.filter('fastest').map('name')); console.log('Slowest is ' + this.filter('slowest').map('name')); From a4f64ad922c7a1062d2e295f9229ced8f41e68b1 Mon Sep 17 00:00:00 2001 From: Eduardo Marcos Date: Fri, 12 May 2023 12:25:56 +0200 Subject: [PATCH 2/2] Version 3.1.0 - Adding params --- README.md | 12 ++++--- examples/basic/src/parallelizer-code.js | 47 +++++++++++++++++++++++-- package.json | 2 +- test/benchmark.js | 10 +++--- 4 files changed, 57 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 277bd30..168520f 100644 --- a/README.md +++ b/README.md @@ -37,12 +37,12 @@ When you create an instance of the Child Process Parallelizer outside of the Lam #### CPU & I/O operations (Parallelization per CPU = 1) ```bash $ node test/benchmark.js -Child Parallelizer x 15.08 ops/sec -Thread Parallelizer x 31.90 ops/sec +Child Parallelizer x 18.08 ops/sec +Thread Parallelizer x 15.90 ops/sec Without Parallelizer x 2.79 ops/sec Result: -Fastest is Thread Parallelizer +Fastest is Child Parallelizer Slowest is Without Parallelizer ``` @@ -100,10 +100,11 @@ npm i node-parallelizer --save - `parallelizationPerCPU` (Number) (Default value: 1): If the `parallelization` is set to `false`, this parameter defines the amount of processes/threads per CPU. - `debug` (Boolean) (Default value: false): Enables the internal logs for debuggin purposes. #### Main methods -`run(batch)` +`run(batch, params = null)` **Parameters** - `batch` (Array): The records you want to process in parallel. +- `params` (Object) (Default value: false): Parameters that will be passed to each child/thread process. **Returns** (Array): The processes/threads' responses. #### Using the Node Parallizer in AWS Lambda. @@ -173,11 +174,12 @@ module.exports = { batchProcessor } - `parallelizationPerCPU` (Number) (Default value: 1): If the `parallelization` is set to `false`, this parameter defines the amount of processes/threads per CPU. - `debug` (Boolean) (Default value: false): Enables the internal logs for debuggin purposes. #### Main methods -`run([{ id: "only-cpu", batch: batchOne },{ id: "only-io", batch: batchTwo }])` +`run([{ id: "only-cpu", batch: batchOne, params = {var: 1} },{ id: "only-io", batch: batchTwo }])` **Parameters** - `id` (String): The unique identifier for your Child/Thread internal instance. - `batch` (Array): The records you want to process in parallel. +- `params` (Object) (Default value: false): Parameters that will be passed to each child/thread process. **Returns** (Array): A list with the processes/threads' responses. #### Using the Node Parallizer in AWS Lambda. diff --git a/examples/basic/src/parallelizer-code.js b/examples/basic/src/parallelizer-code.js index fe316a2..6f23391 100644 --- a/examples/basic/src/parallelizer-code.js +++ b/examples/basic/src/parallelizer-code.js @@ -30,7 +30,48 @@ const batchProcessor = async ({ batch, params }) => { return { sum, totalFiles }; } -const batchProcessorOnlyCPU = async ({ batch }) => { +const batchProcessor2 = async ({ batch, params }) => { + let sum = 0; + + for (let id = 0; id <= (batch.length * 1000000); id++) { + sum += id; + } + + // const response = await axios.get('https://httpbin.org/get?key=123'); + + let totalFiles = 0; + const promises = []; + for (let id = 0; id <= (batch.length); id++) { + promises.push(createAndDeleteFile()); + totalFiles++; + } + + await Promise.all(promises); + + return { sum, totalFiles }; +} + +const createAndDeleteFile = () => { + return new Promise((resolve, reject) => { + try { + const uniqueId = crypto.randomBytes(16).toString('hex'); + const file = `/tmp/example-file-${uniqueId}.txt`; + + fs.writeFileSync(file, '***Random Code***'); + if (fs.existsSync(file)) { + fs.unlinkSync(file); + } + // console.log(file); + resolve(true); + } catch (err) { + console.log(err.message); + resolve(true); + } + }) +} + + +const batchProcessorOnlyCPU = async ({ batch, params }) => { return new Promise((resolve, reject) => { let sum = 0; @@ -42,7 +83,7 @@ const batchProcessorOnlyCPU = async ({ batch }) => { }) } -const batchProcessorOnlyIO = async ({ batch }) => { +const batchProcessorOnlyIO = async ({ batch, params }) => { let totalFiles = 0; for (let id = 1; id <= (batch.length); id++) { @@ -64,4 +105,4 @@ const batchProcessorOnlyIO = async ({ batch }) => { } -module.exports = { batchProcessor, batchProcessorOnlyCPU, batchProcessorOnlyIO } \ No newline at end of file +module.exports = { batchProcessor, batchProcessor2, batchProcessorOnlyCPU, batchProcessorOnlyIO } \ No newline at end of file diff --git a/package.json b/package.json index 73b9292..30613cf 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-parallelizer", - "version": "3.0.2", + "version": "3.1.0", "description": "A NodeJS package for running code in parallel. Initially created to provide multiprocessing in an AWS Lambda function, but it can be used in any NodeJS environment.", "main": "src/index.js", "scripts": { diff --git a/test/benchmark.js b/test/benchmark.js index 5017eb0..79635ac 100644 --- a/test/benchmark.js +++ b/test/benchmark.js @@ -1,14 +1,14 @@ const Benchmark = require('benchmark'); const { Parallelizer, PARALLELIZER_CHILD, PARALLELIZER_THREADS } = require('../src/index'); -const { batchProcessor } = require('../examples/basic/src/parallelizer-code'); +const { batchProcessor, batchProcessor2 } = require('../examples/basic/src/parallelizer-code'); const path = require('path'); const relativePath = '../examples/basic/src/parallelizer-code'; const absolutePath = path.resolve(__dirname, relativePath); -const childParallelizer = new Parallelizer({ type: PARALLELIZER_CHILD, parallelizationPerCPU: 1, filePath: absolutePath, processBatchFunctionName: 'batchProcessor' }); -const threadParallelizer = new Parallelizer({ type: PARALLELIZER_THREADS, parallelizationPerCPU: 1, filePath: absolutePath, processBatchFunctionName: 'batchProcessor' }); +const childParallelizer = new Parallelizer({ type: PARALLELIZER_CHILD, parallelizationPerCPU: 3, filePath: absolutePath, processBatchFunctionName: 'batchProcessor2' }); +const threadParallelizer = new Parallelizer({ type: PARALLELIZER_THREADS, parallelizationPerCPU: 3, filePath: absolutePath, processBatchFunctionName: 'batchProcessor2' }); const batch = [...Array(100).keys()]; @@ -33,7 +33,7 @@ suite await threadParallelizer.run(batch); })) .add('Without Parallelizer', p(async () => { - await batchProcessor({ batch }); + await batchProcessor2({ batch }); })) // add listeners .on('cycle', function (event) { @@ -42,7 +42,7 @@ suite .on('complete', function () { childParallelizer.removeChildThreads(); threadParallelizer.removeChildThreads(); - + console.log('\nResult: '); console.log('Fastest is ' + this.filter('fastest').map('name')); console.log('Slowest is ' + this.filter('slowest').map('name'));