Skip to content

Commit

Permalink
utils.csv: fix addRow (#72)
Browse files Browse the repository at this point in the history
* fix: csv.addRow

* update unlock logic

* update version

* add lock extend

* update lock config

* update

* add quota and update lock extend

---------

Co-authored-by: vladimir talas <[email protected]>
  • Loading branch information
harshaio and vtalas authored Apr 24, 2024
1 parent 728bddc commit a24afd1
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 52 deletions.
27 changes: 1 addition & 26 deletions src/appmixer/utils/csv/AddRow/AddRow.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';
const CSVProcessor = require('../CSVProcessor');
const { expressionTransformer } = require('../helpers');

module.exports = {

Expand All @@ -23,31 +22,7 @@ module.exports = {
parseNumbers,
parseBooleans
});

await processor.loadHeaders();

let rowAsArray;

if (withHeaders) {
const headers = processor.getHeaders();
const parsed = expressionTransformer(rowWithColumns);
rowAsArray = headers.map(item => '');
parsed.forEach(item => {
const idx = processor.getHeaderIndex(item.column);
rowAsArray[idx] = item.value;
});
} else {
rowAsArray = row.split(delimiter);
}

for (let i = 0; i < rowAsArray.length; i++) {
const item = rowAsArray[i];
if (item === undefined || item === null) {
rowAsArray[i] = '';
}
}

const savedFile = await processor.addRow(rowAsArray, (idx, currentRow, isEndOfFile) => {
const savedFile = await processor.addRow({ row, rowWithColumns }, (idx, currentRow, isEndOfFile) => {
return isEndOfFile;
});

Expand Down
5 changes: 5 additions & 0 deletions src/appmixer/utils/csv/AddRow/component.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
"module": "1.0.1"
},
"private": false,
"quota": {
"manager": "appmixer:utils:csv",
"resources": "loadFiles",
"maxWait": "120000"
},
"properties": {
"schema": {
"properties": {
Expand Down
90 changes: 65 additions & 25 deletions src/appmixer/utils/csv/CSVProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const AutoDetectDecoderStream = require('autodetect-decoder-stream');
const CsvReadableStream = require('csv-reader');
const stream = require('stream');
const { PassThrough, pipeline } = stream;
const { passesFilter, indexExpressionToArray, passesIndexFilter } = require('./helpers');
const { passesFilter, indexExpressionToArray, passesIndexFilter, expressionTransformer } = require('./helpers');

module.exports = class CSVProcessor {

Expand Down Expand Up @@ -367,40 +367,80 @@ module.exports = class CSVProcessor {
* @return {Promise<*>}
* @public
*/
async addRow(newRow, closure) {
async addRow({ row, rowWithColumns }, closure) {

const lock = await this.context.lock(this.fileId);
const stream = await this.loadFile();
const config = this.context.config;
const lockOptions = {
ttl: parseInt(config.lockTTL, 10) || 60000, // Default 1 minute TTL
retryDelay: 500
};
const lock = await this.context.lock(this.fileId, lockOptions);
let lockExtendInterval;

let idx = 0;
const writeStream = new PassThrough();
stream.on('data', (row) => {
try {
writeStream.write(row.join(this.delimiter) + '\n');
if (closure(idx, row, false)) {
writeStream.write(newRow.join(this.delimiter) + '\n');
}
idx += 1;
} catch (err) {
stream.destroy(err);
}
}).on('error', (err) => {
lock.unlock();
stream.destroy(err);
}).on('end', async () => {
if (closure(idx, null, true)) {
writeStream.write(newRow.join(this.delimiter) + '\n');
try {
lockExtendInterval = setInterval(async () => {

await lock.extend(parseInt(context.config.lockExtendTime, 10) || 1000 * 60 * 1);
}, context.config.lockExtendInterval || 30000);

const stream = await this.loadFile();
await this.loadHeaders();

let rowAsArray;

// Process row data based on whether headers are included
if (this.withHeaders) {
const headers = this.getHeaders();
const parsed = expressionTransformer(rowWithColumns);
rowAsArray = headers.map(() => ''); // Initialize rowAsArray with empty strings
parsed.forEach(({ column, value }) => {
const idx = this.getHeaderIndex(column);
if (idx !== -1) { // Ensure the column exists in headers
rowAsArray[idx] = value;
}
});
} else {
rowAsArray = row.split(this.delimiter);
}
writeStream.end();
});

try {
// Ensure each item in rowAsArray is not undefined or null
rowAsArray = rowAsArray.map(item => item ?? '');

let idx = 0;
const writeStream = new PassThrough();

stream.on('data', (rowData) => {
writeStream.write(rowData.join(this.delimiter) + '\n');
if (closure(idx, rowData, false)) {
writeStream.write(rowAsArray.join(this.delimiter) + '\n');
}
idx++;
});

stream.on('error', (err) => {
lock.unlock();
writeStream.end();
throw err; // Propagate the error
});

stream.on('end', () => {
if (closure(idx, null, true)) {
writeStream.write(rowAsArray.join(this.delimiter) + '\n');
}
writeStream.end();
});

// Replace file stream with writeStream
return await this.context.replaceFileStream(this.fileId, writeStream);
} catch (err) {
throw err; // Propagate the error
} finally {
clearInterval(lockExtendInterval);
lock.unlock();
}
}


/**
* @return {Promise<Stream>}
* @protected
Expand Down
5 changes: 4 additions & 1 deletion src/appmixer/utils/csv/bundle.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "appmixer.utils.csv",
"version": "1.2.4",
"version": "1.2.5",
"changelog": {
"1.0.0": [
"Initial version"
Expand All @@ -22,6 +22,9 @@
],
"1.2.4": [
"Fix issue with CSV module on Appmixer 5.0.0 and higher."
],
"1.2.5": [
"Added lock mechanism to avoid race condition in 'AddRow' component."
]
}
}
13 changes: 13 additions & 0 deletions src/appmixer/utils/csv/quota.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
'use strict';

module.exports = {

rules: [
{
limit: 5,
throttling: 'limit-concurrency',
queueing: 'fifo',
resource: 'loadFiles'
}
]
};

0 comments on commit a24afd1

Please sign in to comment.