Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: csv.addRow #72

Merged
merged 8 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 4 additions & 42 deletions src/appmixer/utils/csv/AddRow/AddRow.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';
const CSVProcessor = require('../CSVProcessor');
const { expressionTransformer } = require('../helpers');

module.exports = {

Expand All @@ -23,47 +22,10 @@ module.exports = {
parseNumbers,
parseBooleans
});
let lock;
let lockExtendInterval;
try {
lock = await context.lock(fileId, {
ttl: parseInt(context.config.lockTTL, 10) || 1000 * 60 * 1,
retryDelay: 1000
});
await processor.loadHeaders();

let rowAsArray;

if (withHeaders) {
const headers = processor.getHeaders();
const parsed = expressionTransformer(rowWithColumns);
rowAsArray = headers.map(item => '');
parsed.forEach(item => {
const idx = processor.getHeaderIndex(item.column);
rowAsArray[idx] = item.value;
});
} else {
rowAsArray = row.split(delimiter);
}
const savedFile = await processor.addRow({ row, rowWithColumns }, (idx, currentRow, isEndOfFile) => {
return isEndOfFile;
});

for (let i = 0; i < rowAsArray.length; i++) {
const item = rowAsArray[i];
if (item === undefined || item === null) {
rowAsArray[i] = '';
}
}
lockExtendInterval = setInterval(async () => {
if (lock) {
await lock.extend(parseInt(context.config.lockExtendTime, 10) || 1000 * 60 * 1);
}
}, parseInt(context.config.lockExtendInterval, 10) || 30000);
const savedFile = await processor.addRow(rowAsArray, (idx, currentRow, isEndOfFile) => {
return isEndOfFile;
});
return context.sendJson({ fileId: savedFile.fileId }, 'fileId');
} finally {
lock && await lock.unlock();
clearInterval(lockExtendInterval);
}
return context.sendJson({ fileId: savedFile.fileId }, 'fileId');
}
};
90 changes: 67 additions & 23 deletions src/appmixer/utils/csv/CSVProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const AutoDetectDecoderStream = require('autodetect-decoder-stream');
const CsvReadableStream = require('csv-reader');
const stream = require('stream');
const { PassThrough, pipeline } = stream;
const { passesFilter, indexExpressionToArray, passesIndexFilter } = require('./helpers');
const { passesFilter, indexExpressionToArray, passesIndexFilter, expressionTransformer } = require('./helpers');

module.exports = class CSVProcessor {

Expand Down Expand Up @@ -367,34 +367,78 @@ module.exports = class CSVProcessor {
* @return {Promise<*>}
* @public
*/
async addRow(newRow, closure) {
async addRow({ row, rowWithColumns }, closure) {

const stream = await this.loadFile();
const config = this.context.config;
const lockOptions = {
ttl: parseInt(config.lockTTL, 10) || 60000, // Default 1 minute TTL
retryDelay: 500
};
const lock = await this.context.lock(this.fileId, lockOptions);

let idx = 0;
const writeStream = new PassThrough();
stream.on('data', (row) => {
try {
writeStream.write(row.join(this.delimiter) + '\n');
if (closure(idx, row, false)) {
writeStream.write(newRow.join(this.delimiter) + '\n');
}
idx += 1;
} catch (err) {
stream.destroy(err);
}
}).on('error', (err) => {
stream.destroy(err);
}).on('end', async () => {
if (closure(idx, null, true)) {
writeStream.write(newRow.join(this.delimiter) + '\n');
try {
const stream = await this.loadFile();
await this.loadHeaders();

let rowAsArray;

// Process row data based on whether headers are included
if (this.withHeaders) {
const headers = this.getHeaders();
const parsed = expressionTransformer(rowWithColumns);
rowAsArray = headers.map(() => ''); // Initialize rowAsArray with empty strings
parsed.forEach(({ column, value }) => {
const idx = this.getHeaderIndex(column);
if (idx !== -1) { // Ensure the column exists in headers
rowAsArray[idx] = value;
}
});
} else {
rowAsArray = row.split(this.delimiter);
}
writeStream.end();
});

return await this.context.replaceFileStream(this.fileId, writeStream);
// Ensure each item in rowAsArray is not undefined or null
rowAsArray = rowAsArray.map(item => item ?? '');

let idx = 0;
const writeStream = new PassThrough();

stream.on('data', (rowData) => {
writeStream.write(rowData.join(this.delimiter) + '\n');
if (closure(idx, rowData, false)) {
writeStream.write(rowAsArray.join(this.delimiter) + '\n');
}
idx++;

// Extend lock periodically
if (idx % 1000 === 0) {
vtalas marked this conversation as resolved.
Show resolved Hide resolved
vtalas marked this conversation as resolved.
Show resolved Hide resolved
lock.extend(parseInt(config.lockExtendTime, 10) || 120000); // Default 2 minutes extend time
}
});

stream.on('error', (err) => {
lock.unlock();
writeStream.end();
throw err; // Propagate the error
});

stream.on('end', () => {
if (closure(idx, null, true)) {
writeStream.write(rowAsArray.join(this.delimiter) + '\n');
}
writeStream.end();
});

// Replace file stream with writeStream
return await this.context.replaceFileStream(this.fileId, writeStream);
} catch (err) {
throw err; // Propagate the error
} finally {
lock.unlock();
}
}


/**
* @return {Promise<Stream>}
* @protected
Expand Down
Loading