Skip to content

Commit

Permalink
Merge pull request #16 from Edujugon/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
Edujugon authored May 12, 2023
2 parents b4cf665 + a4f64ad commit f123c01
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 34 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ When you create an instance of the Child Process Parallelizer outside of the Lam
#### CPU & I/O operations (Parallelization per CPU = 1)
```bash
$ node test/benchmark.js
Child Parallelizer x 15.08 ops/sec
Thread Parallelizer x 31.90 ops/sec
Child Parallelizer x 18.08 ops/sec
Thread Parallelizer x 15.90 ops/sec
Without Parallelizer x 2.79 ops/sec

Result:
Fastest is Thread Parallelizer
Fastest is Child Parallelizer
Slowest is Without Parallelizer
```

Expand Down Expand Up @@ -100,10 +100,11 @@ npm i node-parallelizer --save
- `parallelizationPerCPU` (Number) (Default value: 1): If the `parallelization` is set to `false`, this parameter defines the amount of processes/threads per CPU.
- `debug` (Boolean) (Default value: false): Enables the internal logs for debuggin purposes.
#### Main methods
`run(batch)`
`run(batch, params = null)`

**Parameters**
- `batch` (Array): The records you want to process in parallel.
- `params` (Object) (Default value: false): Parameters that will be passed to each child/thread process.

**Returns** (Array): The processes/threads' responses.
#### Using the Node Parallizer in AWS Lambda.
Expand Down Expand Up @@ -173,11 +174,12 @@ module.exports = { batchProcessor }
- `parallelizationPerCPU` (Number) (Default value: 1): If the `parallelization` is set to `false`, this parameter defines the amount of processes/threads per CPU.
- `debug` (Boolean) (Default value: false): Enables the internal logs for debuggin purposes.
#### Main methods
`run([{ id: "only-cpu", batch: batchOne },{ id: "only-io", batch: batchTwo }])`
`run([{ id: "only-cpu", batch: batchOne, params = {var: 1} },{ id: "only-io", batch: batchTwo }])`

**Parameters**
- `id` (String): The unique identifier for your Child/Thread internal instance.
- `batch` (Array): The records you want to process in parallel.
- `params` (Object) (Default value: false): Parameters that will be passed to each child/thread process.

**Returns** (Array): A list with the processes/threads' responses.
#### Using the Node Parallizer in AWS Lambda.
Expand Down
49 changes: 45 additions & 4 deletions examples/basic/src/parallelizer-code.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const fs = require('fs');
const crypto = require('crypto');

const batchProcessor = async ({ batch }) => {
const batchProcessor = async ({ batch, params }) => {
let sum = 0;

for (let id = 0; id <= (batch.length * 1000000); id++) {
Expand Down Expand Up @@ -30,7 +30,48 @@ const batchProcessor = async ({ batch }) => {
return { sum, totalFiles };
}

const batchProcessorOnlyCPU = async ({ batch }) => {
const batchProcessor2 = async ({ batch, params }) => {
let sum = 0;

for (let id = 0; id <= (batch.length * 1000000); id++) {
sum += id;
}

// const response = await axios.get('https://httpbin.org/get?key=123');

let totalFiles = 0;
const promises = [];
for (let id = 0; id <= (batch.length); id++) {
promises.push(createAndDeleteFile());
totalFiles++;
}

await Promise.all(promises);

return { sum, totalFiles };
}

const createAndDeleteFile = () => {
return new Promise((resolve, reject) => {
try {
const uniqueId = crypto.randomBytes(16).toString('hex');
const file = `/tmp/example-file-${uniqueId}.txt`;

fs.writeFileSync(file, '***Random Code***');
if (fs.existsSync(file)) {
fs.unlinkSync(file);
}
// console.log(file);
resolve(true);
} catch (err) {
console.log(err.message);
resolve(true);
}
})
}


const batchProcessorOnlyCPU = async ({ batch, params }) => {
return new Promise((resolve, reject) => {
let sum = 0;

Expand All @@ -42,7 +83,7 @@ const batchProcessorOnlyCPU = async ({ batch }) => {
})
}

const batchProcessorOnlyIO = async ({ batch }) => {
const batchProcessorOnlyIO = async ({ batch, params }) => {

let totalFiles = 0;
for (let id = 1; id <= (batch.length); id++) {
Expand All @@ -64,4 +105,4 @@ const batchProcessorOnlyIO = async ({ batch }) => {
}


module.exports = { batchProcessor, batchProcessorOnlyCPU, batchProcessorOnlyIO }
module.exports = { batchProcessor, batchProcessor2, batchProcessorOnlyCPU, batchProcessorOnlyIO }
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-parallelizer",
"version": "3.0.2",
"version": "3.1.0",
"description": "A NodeJS package for running code in parallel. Initially created to provide multiprocessing in an AWS Lambda function, but it can be used in any NodeJS environment.",
"main": "src/index.js",
"scripts": {
Expand Down
12 changes: 6 additions & 6 deletions src/child-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ChildProcess {
}
}

async runBatch(batch) {
async runBatch(batch, params = null) {
if (this.childProcesses.length === 0) {
throw new Error('No child processes created. Please run "createChildProcesses" method before "runBatch"')
}
Expand All @@ -55,15 +55,15 @@ class ChildProcess {
const batches = findSubsets(batch, batchCount);

// Process the batches using the child processes.
return await this._processBatchesInForks(batches);
return await this._processBatchesInForks(batches, params);
}

removeChildProcesses() {
this.childProcesses.forEach(process => process.disconnect());
this.childProcesses = [];
this._removeChildFile();
}

removeChildThreads() {
this.removeChildProcesses();
}
Expand All @@ -72,7 +72,7 @@ class ChildProcess {
this.childProcesses.forEach(child => { child.removeAllListeners('exit'); child.removeAllListeners('message') });
}

async _processBatchesInForks(batches) {
async _processBatchesInForks(batches, params = null) {
const batchesCount = batches.length;
const childResponses = {
responses: [],
Expand Down Expand Up @@ -157,7 +157,7 @@ class ChildProcess {
});

// Send message to child.
this.childProcesses[id].send({ id, batch: batches[id] });
this.childProcesses[id].send({ id, batch: batches[id], params });
}
})

Expand Down Expand Up @@ -221,7 +221,7 @@ const mainLogger = ({ message, params = {}, logType = 'log' }) => {
// Listening to parent's messages.
process.on("message", async (message) => {
try {
const reponse = await processBatch({ batch: message.batch, mainLogger });
const reponse = await processBatch({ batch: message.batch, params: message.params, mainLogger });
process.send({ type: "MESSAGE", status: "SUCCESS", reponse });
} catch (e) {
Expand Down
18 changes: 11 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,22 @@ class Parallelizer {
}


async run(paramsList) {
async run(data, params = null) {
if (Object.keys(this.childThreads).length == 1) {
return this.childThreads[SINGLE_CHILD_THREAD_ID].runBatch(paramsList);
return this.childThreads[SINGLE_CHILD_THREAD_ID].runBatch(data, params);
}

if (!isArray(paramsList)) {
paramsList.id = SINGLE_CHILD_THREAD_ID;
paramsList = [paramsList];
if (!isArray(data)) {
data.id = SINGLE_CHILD_THREAD_ID;
data.params = data.params || params;
data = [data];
}

return await Promise.all(paramsList.map(({ id, batch }) => {
return this.childThreads[id].runBatch(batch)
return await Promise.all(data.map(item => {
const batch = item.batch
const itemParams = item.params || params

return this.childThreads[item.id].runBatch(batch, params = itemParams)
}));
}

Expand Down
10 changes: 5 additions & 5 deletions src/worker-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ class WorkerThreads {
this.threadsCount = (typeof this.parallelization === 'number') ? this.parallelization : this._getThreadsCount();
}

runBatch = async (batch) => {
runBatch = async (batch, params = null) => {
// Get the amount of messages per batch.
const batchCount = (batch.length < this.threadsCount) ? 1 : batch.length / this.threadsCount;

// Create the batches
const batches = findSubsets(batch, batchCount);

// Process the batches using the threads.
return await this._processBatchesInThreads(batches);
return await this._processBatchesInThreads(batches, params);
}

removeWorkerThreads() {
Expand All @@ -44,7 +44,7 @@ class WorkerThreads {
this._removeThreadFile();
}

_processBatchesInThreads = async (batches) => {
_processBatchesInThreads = async (batches, params = null) => {
const batchesCount = batches.length;
const threadResponses = {
responses: [],
Expand All @@ -55,7 +55,7 @@ class WorkerThreads {

await new Promise((resolve, reject) => {
for (let id = 0; id < batchesCount; id++) {
const worker = new Worker(this.tmpPath, { workerData: { id, batch: batches[id] } });
const worker = new Worker(this.tmpPath, { workerData: { id, batch: batches[id], params } });
worker.on('error', (error) => {
logger({
message: `Thread #${id} error message: ${error.message}`,
Expand Down Expand Up @@ -141,7 +141,7 @@ const { workerData, parentPort } = require("worker_threads");
(async () => {
try {
const reponse = await processBatch({ batch: workerData.batch });
const reponse = await processBatch({ batch: workerData.batch, params: workerData.params });
parentPort.postMessage({ reponse, status: "SUCCESS" });
} catch (err) {
parentPort.postMessage({ status: "FAILED", errorMessage: err.toString() });
Expand Down
13 changes: 7 additions & 6 deletions test/benchmark.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
const Benchmark = require('benchmark');
const { Parallelizer, PARALLELIZER_CHILD, PARALLELIZER_THREADS } = require('../src/index');
const { batchProcessor } = require('../examples/basic/src/parallelizer-code');
const { batchProcessor, batchProcessor2 } = require('../examples/basic/src/parallelizer-code');
const path = require('path');


const relativePath = '../examples/basic/src/parallelizer-code';
const absolutePath = path.resolve(__dirname, relativePath);

const childParallelizer = new Parallelizer({ type: PARALLELIZER_CHILD, parallelizationPerCPU: 1, filePath: absolutePath, processBatchFunctionName: 'batchProcessor' });
const threadParallelizer = new Parallelizer({ type: PARALLELIZER_THREADS, parallelizationPerCPU: 1, filePath: absolutePath, processBatchFunctionName: 'batchProcessor' });
const childParallelizer = new Parallelizer({ type: PARALLELIZER_CHILD, parallelizationPerCPU: 3, filePath: absolutePath, processBatchFunctionName: 'batchProcessor2' });
const threadParallelizer = new Parallelizer({ type: PARALLELIZER_THREADS, parallelizationPerCPU: 3, filePath: absolutePath, processBatchFunctionName: 'batchProcessor2' });

const batch = [...Array(100).keys()];

Expand All @@ -33,15 +33,16 @@ suite
await threadParallelizer.run(batch);
}))
.add('Without Parallelizer', p(async () => {
await batchProcessor({ batch });
await batchProcessor2({ batch });
}))
// add listeners
.on('cycle', function (event) {
childParallelizer.removeChildThreads();
threadParallelizer.removeChildThreads();
console.log(String(event.target));
})
.on('complete', function () {
childParallelizer.removeChildThreads();
threadParallelizer.removeChildThreads();

console.log('\nResult: ');
console.log('Fastest is ' + this.filter('fastest').map('name'));
console.log('Slowest is ' + this.filter('slowest').map('name'));
Expand Down

0 comments on commit f123c01

Please sign in to comment.