From 8f467e97bdc5782195c6bc5b22b5ae28d6b20e30 Mon Sep 17 00:00:00 2001 From: inv-jishnu <31100916+inv-jishnu@users.noreply.github.com> Date: Thu, 20 Feb 2025 13:31:06 +0530 Subject: [PATCH] Add export tasks (#2450) Co-authored-by: Peckstadt Yves --- .../com/scalar/db/common/error/CoreError.java | 8 + .../core/DataLoaderObjectMapper.java | 14 + .../core/dataexport/CsvExportManager.java | 94 +++++++ .../core/dataexport/ExportManager.java | 243 ++++++++++++++++++ .../core/dataexport/ExportReport.java | 4 +- .../core/dataexport/JsonExportManager.java | 43 ++++ .../dataexport/JsonLineExportManager.java | 39 +++ .../dataexport/producer/CsvProducerTask.java | 152 +++++++++++ .../producer/JsonLineProducerTask.java | 124 +++++++++ .../dataexport/producer/JsonProducerTask.java | 135 ++++++++++ .../dataexport/producer/ProducerResult.java | 13 + .../dataexport/producer/ProducerTask.java | 38 +++ .../producer/ProducerTaskFactory.java | 59 +++++ .../core/dataexport/CsvExportManagerTest.java | 132 ++++++++++ .../core/dataexport/ExportReportTest.java | 6 +- .../dataexport/JsonExportManagerTest.java | 133 ++++++++++ .../dataexport/JsonLineExportManagerTest.java | 132 ++++++++++ .../producer/CsvProducerTaskTest.java | 63 +++++ .../producer/JsonLineProducerTaskTest.java | 63 +++++ .../producer/JsonProducerTaskTest.java | 62 +++++ .../producer/ProducerTaskFactoryTest.java | 55 ++++ 21 files changed, 1607 insertions(+), 5 deletions(-) create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderObjectMapper.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/CsvProducerTask.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTask.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/CsvProducerTaskTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTaskTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTaskTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactoryTest.java diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index d10969ddd..ae04d8847 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -756,6 +756,14 @@ public enum CoreError implements ScalarDbError { ""), DATA_LOADER_MISSING_COLUMN( Category.USER_ERROR, "0176", "Missing field or column mapping for %s", "", ""), + DATA_LOADER_VALUE_TO_STRING_CONVERSION_FAILED( + Category.USER_ERROR, + "0177", + "Something went wrong while converting the ScalarDB values to strings. The table metadata and Value datatype probably do not match. Details: %s", + "", + ""), + DATA_LOADER_FILE_FORMAT_NOT_SUPPORTED( + Category.USER_ERROR, "0178", "The provided file format is not supported : %s", "", ""), // // Errors for the concurrency error category diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderObjectMapper.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderObjectMapper.java new file mode 100644 index 000000000..d90fd49b6 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderObjectMapper.java @@ -0,0 +1,14 @@ +package com.scalar.db.dataloader.core; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +public class DataLoaderObjectMapper extends ObjectMapper { + + public DataLoaderObjectMapper() { + super(); + this.setSerializationInclusion(JsonInclude.Include.NON_NULL); + this.registerModule(new JavaTimeModule()); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java new file mode 100644 index 000000000..81c7ab9ac --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java @@ -0,0 +1,94 @@ +package com.scalar.db.dataloader.core.dataexport; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.util.CsvUtil; +import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils; +import java.io.IOException; +import java.io.Writer; +import java.util.Iterator; +import java.util.List; + +public class CsvExportManager extends ExportManager { + public CsvExportManager( + DistributedStorage storage, ScalarDBDao dao, ProducerTaskFactory producerTaskFactory) { + super(storage, dao, producerTaskFactory); + } + + /** + * Create and add header part for the export file + * + * @param exportOptions Export options for the data export + * @param tableMetadata Metadata of the table to export + * @param writer File writer object + * @throws IOException If any IO exception occurs + */ + @Override + void processHeader(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) + throws IOException { + String header = createCsvHeaderRow(exportOptions, tableMetadata); + writer.append(header); + writer.flush(); + } + + /** + * Create and add footer part for the export file + * + * @param exportOptions Export options for the data export + * @param tableMetadata Metadata of the table to export + * @param writer File writer object + * @throws IOException If any IO exception occurs + */ + @Override + void processFooter(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) + throws IOException {} + + /** + * To generate the header row of CSV export file + * + * @param exportOptions export options + * @param tableMetadata metadata of the table + * @return generated CSV header row + */ + private String createCsvHeaderRow(ExportOptions exportOptions, TableMetadata tableMetadata) { + StringBuilder headerRow = new StringBuilder(); + List projections = exportOptions.getProjectionColumns(); + Iterator iterator = tableMetadata.getColumnNames().iterator(); + while (iterator.hasNext()) { + String columnName = iterator.next(); + if (shouldIgnoreColumn( + exportOptions.isIncludeTransactionMetadata(), columnName, tableMetadata, projections)) { + continue; + } + headerRow.append(columnName); + if (iterator.hasNext()) { + headerRow.append(exportOptions.getDelimiter()); + } + } + CsvUtil.removeTrailingDelimiter(headerRow, exportOptions.getDelimiter()); + headerRow.append("\n"); + return headerRow.toString(); + } + + /** + * To ignore a column or not based on conditions such as if it is a metadata column or if it is + * not include in selected projections + * + * @param isIncludeTransactionMetadata to include transaction metadata or not + * @param columnName column name + * @param tableMetadata table metadata + * @param projections selected columns for projection + * @return ignore the column or not + */ + private boolean shouldIgnoreColumn( + boolean isIncludeTransactionMetadata, + String columnName, + TableMetadata tableMetadata, + List projections) { + return (!isIncludeTransactionMetadata + && ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata)) + || (!projections.isEmpty() && !projections.contains(columnName)); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java new file mode 100644 index 000000000..f66efdc9d --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -0,0 +1,243 @@ +package com.scalar.db.dataloader.core.dataexport; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scanner; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask; +import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; +import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidationException; +import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidator; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.dataloader.core.util.TableMetadataUtil; +import com.scalar.db.io.DataType; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.RequiredArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RequiredArgsConstructor +public abstract class ExportManager { + private static final Logger logger = LoggerFactory.getLogger(ExportManager.class); + + private final DistributedStorage storage; + private final ScalarDBDao dao; + private final ProducerTaskFactory producerTaskFactory; + private final Object lock = new Object(); + + /** + * Create and add header part for the export file + * + * @param exportOptions Export options for the data export + * @param tableMetadata Metadata of the table to export + * @param writer File writer object + * @throws IOException If any IO exception occurs + */ + abstract void processHeader( + ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) throws IOException; + + /** + * Create and add footer part for the export file + * + * @param exportOptions Export options for the data export + * @param tableMetadata Metadata of the table to export + * @param writer File writer object + * @throws IOException If any IO exception occurs + */ + abstract void processFooter( + ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) throws IOException; + /** + * Starts the export process + * + * @param exportOptions Export options + * @param tableMetadata Metadata for a single ScalarDB table + * @param writer Writer to write the exported data + */ + public ExportReport startExport( + ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) { + ExportReport exportReport = new ExportReport(); + try { + validateExportOptions(exportOptions, tableMetadata); + Map dataTypeByColumnName = tableMetadata.getColumnDataTypes(); + handleTransactionMetadata(exportOptions, tableMetadata); + processHeader(exportOptions, tableMetadata, writer); + + int maxThreadCount = + exportOptions.getMaxThreadCount() == 0 + ? Runtime.getRuntime().availableProcessors() + : exportOptions.getMaxThreadCount(); + ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount); + + BufferedWriter bufferedWriter = new BufferedWriter(writer); + boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; + + try (Scanner scanner = createScanner(exportOptions, dao, storage)) { + + Iterator iterator = scanner.iterator(); + AtomicBoolean isFirstBatch = new AtomicBoolean(true); + + while (iterator.hasNext()) { + List dataChunk = fetchDataChunk(iterator, exportOptions.getDataChunkSize()); + executorService.submit( + () -> + processDataChunk( + exportOptions, + tableMetadata, + dataTypeByColumnName, + dataChunk, + bufferedWriter, + isJson, + isFirstBatch, + exportReport)); + } + executorService.shutdown(); + if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { + logger.info("All tasks completed"); + } else { + logger.error("Timeout occurred while waiting for tasks to complete"); + // TODO: handle this + } + processFooter(exportOptions, tableMetadata, bufferedWriter); + } catch (InterruptedException | IOException e) { + logger.error("Error during export: {}", e.getMessage()); + } finally { + bufferedWriter.flush(); + } + } catch (ExportOptionsValidationException | IOException | ScalarDBDaoException e) { + logger.error("Error during export: {}", e.getMessage()); + } + return exportReport; + } + + /** + * To process result data chunk + * + * @param exportOptions export options + * @param tableMetadata metadata of the table + * @param dataTypeByColumnName map of columns and their data types + * @param dataChunk a list with result data + * @param bufferedWriter writer object + * @param isJson if data format is json or not + * @param isFirstBatch is the data going to be process is the first batch or not + * @param exportReport export report which will be updated once the data chunk is processed + */ + private void processDataChunk( + ExportOptions exportOptions, + TableMetadata tableMetadata, + Map dataTypeByColumnName, + List dataChunk, + BufferedWriter bufferedWriter, + boolean isJson, + AtomicBoolean isFirstBatch, + ExportReport exportReport) { + ProducerTask producerTask = + producerTaskFactory.createProducerTask( + exportOptions.getOutputFileFormat(), + exportOptions.getProjectionColumns(), + tableMetadata, + dataTypeByColumnName); + String dataChunkContent = producerTask.process(dataChunk); + + try { + synchronized (lock) { + if (isJson && !isFirstBatch.getAndSet(false)) { + bufferedWriter.write(","); + } + bufferedWriter.write(dataChunkContent); + exportReport.updateExportedRowCount(dataChunk.size()); + } + } catch (IOException e) { + logger.error("Error while writing data chunk: {}", e.getMessage()); + } + } + + /** + * To split result into batches + * + * @param iterator iterator which parse results + * @param batchSize size of batch + * @return a list of results split to batches + */ + private List fetchDataChunk(Iterator iterator, int batchSize) { + List batch = new ArrayList<>(); + int count = 0; + while (iterator.hasNext() && count < batchSize) { + batch.add(iterator.next()); + count++; + } + return batch; + } + + /** + * * To validate export options + * + * @param exportOptions export options + * @param tableMetadata metadata of the table + * @throws ExportOptionsValidationException thrown if any of the export option validation fails + */ + private void validateExportOptions(ExportOptions exportOptions, TableMetadata tableMetadata) + throws ExportOptionsValidationException { + ExportOptionsValidator.validate(exportOptions, tableMetadata); + } + + /** + * To update projection columns of export options if include metadata options is enabled + * + * @param exportOptions export options + * @param tableMetadata metadata of the table + */ + private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadata tableMetadata) { + if (exportOptions.isIncludeTransactionMetadata() + && !exportOptions.getProjectionColumns().isEmpty()) { + List projectionMetadata = + TableMetadataUtil.populateProjectionsWithMetadata( + tableMetadata, exportOptions.getProjectionColumns()); + exportOptions.setProjectionColumns(projectionMetadata); + } + } + + /** + * To create a scanner object + * + * @param exportOptions export options + * @param dao ScalarDB dao object + * @param storage distributed storage object + * @return created scanner + * @throws ScalarDBDaoException throws if any issue occurs in creating scanner object + */ + private Scanner createScanner( + ExportOptions exportOptions, ScalarDBDao dao, DistributedStorage storage) + throws ScalarDBDaoException { + boolean isScanAll = exportOptions.getScanPartitionKey() == null; + if (isScanAll) { + return dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage); + } else { + return dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getScanPartitionKey(), + exportOptions.getScanRange(), + exportOptions.getSortOrders(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage); + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportReport.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportReport.java index 9907fe189..d856b7b0f 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportReport.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportReport.java @@ -20,7 +20,7 @@ public long getExportedRowCount() { return exportedRowCount.sum(); } - public void increaseExportedRowCount() { - this.exportedRowCount.increment(); + public void updateExportedRowCount(long count) { + this.exportedRowCount.add(count); } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java new file mode 100644 index 000000000..13e580452 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java @@ -0,0 +1,43 @@ +package com.scalar.db.dataloader.core.dataexport; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import java.io.IOException; +import java.io.Writer; + +public class JsonExportManager extends ExportManager { + public JsonExportManager( + DistributedStorage storage, ScalarDBDao dao, ProducerTaskFactory producerTaskFactory) { + super(storage, dao, producerTaskFactory); + } + + /** + * Create and add header part for the export file + * + * @param exportOptions Export options for the data export + * @param tableMetadata Metadata of the table to export + * @param writer File writer object + * @throws IOException If any IO exception occurs + */ + @Override + void processHeader(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) + throws IOException { + writer.write("["); + } + + /** + * Create and add footer part for the export file + * + * @param exportOptions Export options for the data export + * @param tableMetadata Metadata of the table to export + * @param writer File writer object + * @throws IOException If any IO exception occurs + */ + @Override + void processFooter(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) + throws IOException { + writer.write("]"); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java new file mode 100644 index 000000000..98f514cbe --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java @@ -0,0 +1,39 @@ +package com.scalar.db.dataloader.core.dataexport; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import java.io.IOException; +import java.io.Writer; + +public class JsonLineExportManager extends ExportManager { + public JsonLineExportManager( + DistributedStorage storage, ScalarDBDao dao, ProducerTaskFactory producerTaskFactory) { + super(storage, dao, producerTaskFactory); + } + + /** + * Create and add header part for the export file + * + * @param exportOptions Export options for the data export + * @param tableMetadata Metadata of the table to export + * @param writer File writer object + * @throws IOException If any IO exception occurs + */ + @Override + void processHeader(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) + throws IOException {} + + /** + * Create and add footer part for the export file + * + * @param exportOptions Export options for the data export + * @param tableMetadata Metadata of the table to export + * @param writer File writer object + * @throws IOException If any IO exception occurs + */ + @Override + void processFooter(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) + throws IOException {} +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/CsvProducerTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/CsvProducerTask.java new file mode 100644 index 000000000..23a82e18f --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/CsvProducerTask.java @@ -0,0 +1,152 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.util.CsvUtil; +import com.scalar.db.dataloader.core.util.DecimalUtil; +import com.scalar.db.io.DataType; +import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils; +import java.nio.charset.Charset; +import java.util.Base64; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Producer that converts ScalarDB scan results to csv content. The output is sent to a queue to be + * processed by a consumer + */ +public class CsvProducerTask extends ProducerTask { + + private static final Logger logger = LoggerFactory.getLogger(CsvProducerTask.class); + + private final String delimiter; + + /** + * Class constructor + * + * @param includeMetadata Include metadata in the exported data + * @param tableMetadata Metadata for a single ScalarDB table + * @param columnDataTypes Map of data types for the all columns in a ScalarDB table + * @param delimiter Delimiter used in csv content + */ + public CsvProducerTask( + boolean includeMetadata, + List projectColumns, + TableMetadata tableMetadata, + Map columnDataTypes, + String delimiter) { + super(includeMetadata, projectColumns, tableMetadata, columnDataTypes); + this.delimiter = delimiter; + } + + /** + * Process scalarDB scan result data and returns CSV data + * + * @param dataChunk list of results + * @return result converted to string + */ + @Override + public String process(List dataChunk) { + StringBuilder csvContent = new StringBuilder(); + for (Result result : dataChunk) { + String csvRow = convertResultToCsv(result); + csvContent.append(csvRow); + } + return csvContent.toString(); + } + + /** + * Convert a ScalarDB scan result to CSV + * + * @param result ScalarDB scan result + * @return CSV string + */ + private String convertResultToCsv(Result result) { + // Initialization + StringBuilder stringBuilder = new StringBuilder(); + LinkedHashSet tableColumnNames = tableMetadata.getColumnNames(); + Iterator iterator = tableColumnNames.iterator(); + + try { + // Loop over the result data list + while (iterator.hasNext()) { + String columnName = iterator.next(); + + // Skip the field if it can be ignored based on check + boolean columnNotProjected = !projectedColumnsSet.contains(columnName); + boolean isMetadataColumn = + ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata); + if (columnNotProjected || (!includeMetadata && isMetadataColumn)) { + continue; + } + + // Convert each value to a string value and add to the StringBuilder + stringBuilder.append( + convertToString(result, columnName, dataTypeByColumnName.get(columnName))); + + if (iterator.hasNext()) { + stringBuilder.append(delimiter); + } + } + + // Double check and remove the character if it's a delimiter. This can occur when the last + // added column was not the last iterator field and did get a delimiter + CsvUtil.removeTrailingDelimiter(stringBuilder, delimiter); + + stringBuilder.append(System.lineSeparator()); + + return stringBuilder.toString(); + } catch (UnsupportedOperationException e) { + logger.error( + CoreError.DATA_LOADER_VALUE_TO_STRING_CONVERSION_FAILED.buildMessage(e.getMessage())); + } + return ""; + } + + /** + * Convert result column value to string + * + * @param result ScalarDB result + * @param columnName column name + * @param dataType datatype of the column + * @return value of result converted to string + */ + private String convertToString(Result result, String columnName, DataType dataType) { + if (result.isNull(columnName)) { + return null; + } + String value = ""; + switch (dataType) { + case INT: + value = Integer.toString(result.getInt(columnName)); + break; + case BIGINT: + value = Long.toString(result.getBigInt(columnName)); + break; + case FLOAT: + value = DecimalUtil.convertToNonScientific(result.getFloat(columnName)); + break; + case DOUBLE: + value = DecimalUtil.convertToNonScientific(result.getDouble(columnName)); + break; + case BLOB: + byte[] encoded = Base64.getEncoder().encode(result.getBlobAsBytes(columnName)); + value = new String(encoded, Charset.defaultCharset()); + break; + case BOOLEAN: + value = Boolean.toString(result.getBoolean(columnName)); + break; + case TEXT: + value = result.getText(columnName); + break; + default: + throw new AssertionError("Unknown data type:" + dataType); + } + return value; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java new file mode 100644 index 000000000..455860b56 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java @@ -0,0 +1,124 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.io.DataType; +import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils; +import java.nio.charset.Charset; +import java.util.Base64; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JsonLineProducerTask extends ProducerTask { + + private final DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + private static final Logger logger = LoggerFactory.getLogger(JsonLineProducerTask.class); + + /** + * Class constructor + * + * @param includeMetadata Include metadata in the exported data + * @param tableMetadata Metadata for a single ScalarDB table + * @param columnDataTypes Map of data types for the all columns in a ScalarDB table + */ + public JsonLineProducerTask( + boolean includeMetadata, + List projectionColumns, + TableMetadata tableMetadata, + Map columnDataTypes) { + super(includeMetadata, projectionColumns, tableMetadata, columnDataTypes); + } + + /** + * Process ScalarDB scan result data and returns CSV data + * + * @param dataChunk list of results + * @return result converted to string + */ + @Override + public String process(List dataChunk) { + StringBuilder jsonLines = new StringBuilder(); + + for (Result result : dataChunk) { + ObjectNode objectNode = generateJsonForResult(result); + jsonLines.append(objectNode.toString()); + jsonLines.append(System.lineSeparator()); + } + return jsonLines.toString(); + } + + /** + * Generate a Json Object based on a ScalarDB Result + * + * @param result ScalarDB Result object instance + * @return JsonObject containing the ScalarDB result data + */ + private ObjectNode generateJsonForResult(Result result) { + LinkedHashSet tableColumns = tableMetadata.getColumnNames(); + + ObjectNode objectNode = objectMapper.createObjectNode(); + + // Loop through all the columns and to the json object + for (String columnName : tableColumns) { + // Skip the field if it can be ignored based on check + boolean columnNotProjected = !projectedColumnsSet.contains(columnName); + boolean isMetadataColumn = + ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata); + if (columnNotProjected || (!includeMetadata && isMetadataColumn)) { + continue; + } + + DataType dataType = dataTypeByColumnName.get(columnName); + addToObjectNode(objectNode, result, columnName, dataType); + } + return objectNode; + } + + /** + * Add result column name and value to json object node + * + * @param result ScalarDB result + * @param columnName column name + * @param dataType datatype of the column + */ + private void addToObjectNode( + ObjectNode objectNode, Result result, String columnName, DataType dataType) { + + if (result.isNull(columnName)) { + return; + } + + switch (dataType) { + case BOOLEAN: + objectNode.put(columnName, result.getBoolean(columnName)); + break; + case INT: + objectNode.put(columnName, result.getInt(columnName)); + break; + case BIGINT: + objectNode.put(columnName, result.getBigInt(columnName)); + break; + case FLOAT: + objectNode.put(columnName, result.getFloat(columnName)); + break; + case DOUBLE: + objectNode.put(columnName, result.getDouble(columnName)); + break; + case TEXT: + objectNode.put(columnName, result.getText(columnName)); + break; + case BLOB: + // convert to base64 string + byte[] encoded = Base64.getEncoder().encode(result.getBlobAsBytes(columnName)); + objectNode.put(columnName, new String(encoded, Charset.defaultCharset())); + break; + default: + throw new AssertionError("Unknown data type:" + dataType); + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java new file mode 100644 index 000000000..353bf2d7d --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java @@ -0,0 +1,135 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.io.DataType; +import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils; +import java.nio.charset.Charset; +import java.util.Base64; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JsonProducerTask extends ProducerTask { + + private final DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + private final boolean prettyPrintJson; + private static final Logger logger = LoggerFactory.getLogger(JsonProducerTask.class); + + /** + * Class constructor + * + * @param includeMetadata Include metadata in the exported data + * @param tableMetadata Metadata for a single ScalarDB table + * @param columnDataTypes Map of data types for the all columns in a ScalarDB table + */ + public JsonProducerTask( + boolean includeMetadata, + List projectionColumns, + TableMetadata tableMetadata, + Map columnDataTypes, + boolean prettyPrintJson) { + super(includeMetadata, projectionColumns, tableMetadata, columnDataTypes); + this.prettyPrintJson = prettyPrintJson; + } + + /** + * Process ScalarDB scan result data and returns CSV data + * + * @param dataChunk list of results + * @return result converted to string + */ + @Override + public String process(List dataChunk) { + ArrayNode arrayNode = objectMapper.createArrayNode(); + + for (Result result : dataChunk) { + ObjectNode objectNode = generateJsonForResult(result); + arrayNode.add(objectNode); + } + + if (prettyPrintJson) { + String json = arrayNode.toPrettyString(); + return json.substring(1, json.length() - 1); + } + + String json = arrayNode.toString(); + // Remove the [] from the json string + return json.substring(1, json.length() - 1); + } + + /** + * Generate a Json Object based on a ScalarDB Result + * + * @param result ScalarDB Result object instance + * @return JsonObject containing the ScalarDB result data + */ + private ObjectNode generateJsonForResult(Result result) { + LinkedHashSet tableColumns = tableMetadata.getColumnNames(); + + ObjectNode objectNode = objectMapper.createObjectNode(); + + // Loop through all the columns and to the json object + for (String columnName : tableColumns) { + // Skip the field if it can be ignored based on check + boolean columnNotProjected = !projectedColumnsSet.contains(columnName); + boolean isMetadataColumn = + ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata); + if (columnNotProjected || (!includeMetadata && isMetadataColumn)) { + continue; + } + + DataType dataType = dataTypeByColumnName.get(columnName); + addToObjectNode(objectNode, result, columnName, dataType); + } + return objectNode; + } + + /** + * Add result column name and value to json object node + * + * @param result ScalarDB result + * @param columnName column name + * @param dataType datatype of the column + */ + private void addToObjectNode( + ObjectNode objectNode, Result result, String columnName, DataType dataType) { + + if (result.isNull(columnName)) { + return; + } + + switch (dataType) { + case BOOLEAN: + objectNode.put(columnName, result.getBoolean(columnName)); + break; + case INT: + objectNode.put(columnName, result.getInt(columnName)); + break; + case BIGINT: + objectNode.put(columnName, result.getBigInt(columnName)); + break; + case FLOAT: + objectNode.put(columnName, result.getFloat(columnName)); + break; + case DOUBLE: + objectNode.put(columnName, result.getDouble(columnName)); + break; + case TEXT: + objectNode.put(columnName, result.getText(columnName)); + break; + case BLOB: + // convert to base64 string + byte[] encoded = Base64.getEncoder().encode(result.getBlobAsBytes(columnName)); + objectNode.put(columnName, new String(encoded, Charset.defaultCharset())); + break; + default: + throw new AssertionError("Unknown data type:" + dataType); + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java new file mode 100644 index 000000000..9506fcd72 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java @@ -0,0 +1,13 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Builder; +import lombok.Value; + +@Builder +@Value +public class ProducerResult { + JsonNode jsonNode; + String csvSource; + boolean poisonPill; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTask.java new file mode 100644 index 000000000..2ebf01ff1 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTask.java @@ -0,0 +1,38 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.io.DataType; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public abstract class ProducerTask { + + protected final TableMetadata tableMetadata; + protected final Map dataTypeByColumnName; + protected final boolean includeMetadata; + protected final Set projectedColumnsSet; + + /** + * Class constructor + * + * @param includeMetadata Include metadata in the exported data + * @param projectionColumns List of column name for projection + * @param tableMetadata Metadata of the ScalarDB table + * @param columnDataTypes Map of data types for the all columns in a ScalarDB table + */ + protected ProducerTask( + boolean includeMetadata, + List projectionColumns, + TableMetadata tableMetadata, + Map columnDataTypes) { + this.includeMetadata = includeMetadata; + this.projectedColumnsSet = new HashSet<>(projectionColumns); + this.tableMetadata = tableMetadata; + this.dataTypeByColumnName = columnDataTypes; + } + + public abstract String process(List dataChunk); +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java new file mode 100644 index 000000000..18adc8de6 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java @@ -0,0 +1,59 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.io.DataType; +import java.util.List; +import java.util.Map; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class ProducerTaskFactory { + + private final String delimiter; + private final boolean includeMetadata; + private final boolean prettyPrintJson; + + /** + * Create a producer task object based on file format + * + * @param fileFormat file format + * @param projectionColumns columns names that are selected + * @param tableMetadata metadata of the table + * @param dataTypeByColumnName map of columns with data types + * @return producer task object of provided file format + */ + public ProducerTask createProducerTask( + FileFormat fileFormat, + List projectionColumns, + TableMetadata tableMetadata, + Map dataTypeByColumnName) { + ProducerTask producerTask; + switch (fileFormat) { + case JSON: + producerTask = + new JsonProducerTask( + includeMetadata, + projectionColumns, + tableMetadata, + dataTypeByColumnName, + prettyPrintJson); + break; + case JSONL: + producerTask = + new JsonLineProducerTask( + includeMetadata, projectionColumns, tableMetadata, dataTypeByColumnName); + break; + case CSV: + producerTask = + new CsvProducerTask( + includeMetadata, projectionColumns, tableMetadata, dataTypeByColumnName, delimiter); + break; + default: + throw new IllegalArgumentException( + CoreError.DATA_LOADER_FILE_FORMAT_NOT_SUPPORTED.buildMessage(fileFormat.toString())); + } + return producerTask; + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java new file mode 100644 index 000000000..86a943a9e --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java @@ -0,0 +1,132 @@ +package com.scalar.db.dataloader.core.dataexport; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scanner; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.ResultImpl; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScanRange; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.io.Column; +import com.scalar.db.io.IntColumn; +import com.scalar.db.io.Key; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.mockito.Spy; + +public class CsvExportManagerTest { + TableMetadata mockData; + DistributedStorage storage; + @Spy ScalarDBDao dao; + ProducerTaskFactory producerTaskFactory; + ExportManager exportManager; + + @BeforeEach + void setup() { + storage = Mockito.mock(DistributedStorage.class); + mockData = UnitTestUtils.createTestTableMetadata(); + dao = Mockito.mock(ScalarDBDao.class); + producerTaskFactory = new ProducerTaskFactory(null, false, true); + } + + @Test + void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() + throws IOException, ScalarDBDaoException { + exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); + Scanner scanner = Mockito.mock(Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + ExportOptions exportOptions = + ExportOptions.builder("namespace", "table", null, FileFormat.CSV) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } + + @Test + void startExport_givenPartitionKey_shouldGenerateOutputFile() + throws IOException, ScalarDBDaoException { + producerTaskFactory = new ProducerTaskFactory(",", false, false); + exportManager = new CsvExportManager(storage, dao, producerTaskFactory); + Scanner scanner = Mockito.mock(Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder( + "namespace", + "table", + Key.newBuilder().add(IntColumn.of("col1", 1)).build(), + FileFormat.CSV) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getScanPartitionKey(), + exportOptions.getScanRange(), + exportOptions.getSortOrders(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/ExportReportTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/ExportReportTest.java index e9820582c..9a4150d3c 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/ExportReportTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/ExportReportTest.java @@ -14,8 +14,8 @@ void getExportedRowCount_afterInitialisation_ShouldBeZero() { @Test void getExportedRowCount_afterIncrementingTwice_ShouldBeTwo() { ExportReport exportReport = new ExportReport(); - exportReport.increaseExportedRowCount(); - exportReport.increaseExportedRowCount(); - Assertions.assertEquals(2, exportReport.getExportedRowCount()); + exportReport.updateExportedRowCount(10); + exportReport.updateExportedRowCount(20); + Assertions.assertEquals(30, exportReport.getExportedRowCount()); } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java new file mode 100644 index 000000000..ac620458a --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java @@ -0,0 +1,133 @@ +package com.scalar.db.dataloader.core.dataexport; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scanner; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.ResultImpl; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScanRange; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.io.Column; +import com.scalar.db.io.IntColumn; +import com.scalar.db.io.Key; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.mockito.Spy; + +public class JsonExportManagerTest { + + TableMetadata mockData; + DistributedStorage storage; + @Spy ScalarDBDao dao; + ProducerTaskFactory producerTaskFactory; + ExportManager exportManager; + + @BeforeEach + void setup() { + storage = Mockito.mock(DistributedStorage.class); + mockData = UnitTestUtils.createTestTableMetadata(); + dao = Mockito.mock(ScalarDBDao.class); + producerTaskFactory = new ProducerTaskFactory(null, false, true); + } + + @Test + void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() + throws IOException, ScalarDBDaoException { + exportManager = new JsonExportManager(storage, dao, producerTaskFactory); + Scanner scanner = Mockito.mock(Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.json"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder("namespace", "table", null, FileFormat.JSON) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } + + @Test + void startExport_givenPartitionKey_shouldGenerateOutputFile() + throws IOException, ScalarDBDaoException { + exportManager = new JsonExportManager(storage, dao, producerTaskFactory); + Scanner scanner = Mockito.mock(Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.json"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder( + "namespace", + "table", + Key.newBuilder().add(IntColumn.of("col1", 1)).build(), + FileFormat.JSON) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getScanPartitionKey(), + exportOptions.getScanRange(), + exportOptions.getSortOrders(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java new file mode 100644 index 000000000..36f01e7c6 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java @@ -0,0 +1,132 @@ +package com.scalar.db.dataloader.core.dataexport; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scanner; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.ResultImpl; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScanRange; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.io.Column; +import com.scalar.db.io.IntColumn; +import com.scalar.db.io.Key; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.mockito.Spy; + +public class JsonLineExportManagerTest { + TableMetadata mockData; + DistributedStorage storage; + @Spy ScalarDBDao dao; + ProducerTaskFactory producerTaskFactory; + ExportManager exportManager; + + @BeforeEach + void setup() { + storage = Mockito.mock(DistributedStorage.class); + mockData = UnitTestUtils.createTestTableMetadata(); + dao = Mockito.mock(ScalarDBDao.class); + producerTaskFactory = new ProducerTaskFactory(null, false, true); + } + + @Test + void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() + throws IOException, ScalarDBDaoException { + exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); + Scanner scanner = Mockito.mock(Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder("namespace", "table", null, FileFormat.JSONL) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } + + @Test + void startExport_givenPartitionKey_shouldGenerateOutputFile() + throws IOException, ScalarDBDaoException { + exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); + Scanner scanner = Mockito.mock(Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder( + "namespace", + "table", + Key.newBuilder().add(IntColumn.of("col1", 1)).build(), + FileFormat.JSONL) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getScanPartitionKey(), + exportOptions.getScanRange(), + exportOptions.getSortOrders(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/CsvProducerTaskTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/CsvProducerTaskTest.java new file mode 100644 index 000000000..f0132d8ee --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/CsvProducerTaskTest.java @@ -0,0 +1,63 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.ResultImpl; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.io.Column; +import com.scalar.db.io.DataType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CsvProducerTaskTest { + + TableMetadata mockMetadata; + List projectedColumns; + Map columnData; + CsvProducerTask csvProducerTask; + + @BeforeEach + void setup() { + mockMetadata = UnitTestUtils.createTestTableMetadata(); + projectedColumns = UnitTestUtils.getColumnsListOfMetadata(); + columnData = UnitTestUtils.getColumnData(); + csvProducerTask = new CsvProducerTask(false, projectedColumns, mockMetadata, columnData, ","); + } + + @Test + void process_withEmptyResultList_shouldReturnEmptyString() { + List results = Collections.emptyList(); + String output = csvProducerTask.process(results); + Assertions.assertEquals("", output); + } + + @Test + void process_withValidResultList_shouldReturnValidCsvString() { + String expectedOutput = + "9007199254740992,2147483647,true,0.000000000000000000000000000000000000000000001401298464324817,0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000049,test value,YmxvYiB0ZXN0IHZhbHVl"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockMetadata); + List resultList = new ArrayList<>(); + resultList.add(result); + String output = csvProducerTask.process(resultList); + Assertions.assertEquals(expectedOutput, output.trim()); + } + + @Test + void process_withValidResultListWithMetadata_shouldReturnValidCsvString() { + csvProducerTask = new CsvProducerTask(true, projectedColumns, mockMetadata, columnData, ","); + String expectedOutput = + "9007199254740992,2147483647,true,0.000000000000000000000000000000000000000000001401298464324817,0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000049,test value,YmxvYiB0ZXN0IHZhbHVl,0.000000000000000000000000000000000000000000001401298464324817,0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000049,test value,YmxvYiB0ZXN0IHZhbHVl,txt value 464654654,2147483647,2147483647,9007199254740992,9007199254740992,test value,2147483647,2147483647,9007199254740992,9007199254740992"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockMetadata); + List resultList = new ArrayList<>(); + resultList.add(result); + String output = csvProducerTask.process(resultList); + Assertions.assertEquals(expectedOutput, output.trim()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTaskTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTaskTest.java new file mode 100644 index 000000000..4a955311e --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTaskTest.java @@ -0,0 +1,63 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.ResultImpl; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.io.Column; +import com.scalar.db.io.DataType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class JsonLineProducerTaskTest { + TableMetadata mockMetadata; + List projectedColumns; + Map columnData; + JsonLineProducerTask jsonLineProducerTask; + + @BeforeEach + void setup() { + mockMetadata = UnitTestUtils.createTestTableMetadata(); + projectedColumns = UnitTestUtils.getColumnsListOfMetadata(); + columnData = UnitTestUtils.getColumnData(); + jsonLineProducerTask = + new JsonLineProducerTask(false, projectedColumns, mockMetadata, columnData); + } + + @Test + void process_withEmptyResultList_shouldReturnEmptyString() { + List results = Collections.emptyList(); + String output = jsonLineProducerTask.process(results); + Assertions.assertEquals("", output); + } + + @Test + void process_withValidResultList_shouldReturnValidJsonLineString() { + ObjectNode rootNode = UnitTestUtils.getOutputDataWithoutMetadata(); + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockMetadata); + List resultList = new ArrayList<>(); + resultList.add(result); + String output = jsonLineProducerTask.process(resultList); + Assertions.assertEquals(rootNode.toString(), output.trim()); + } + + @Test + void process_withValidResultListWithMetadata_shouldReturnValidJsonLineString() { + jsonLineProducerTask = + new JsonLineProducerTask(true, projectedColumns, mockMetadata, columnData); + ObjectNode rootNode = UnitTestUtils.getOutputDataWithMetadata(); + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockMetadata); + List resultList = new ArrayList<>(); + resultList.add(result); + String output = jsonLineProducerTask.process(resultList); + Assertions.assertEquals(rootNode.toString(), output.trim()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTaskTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTaskTest.java new file mode 100644 index 000000000..cded71a92 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTaskTest.java @@ -0,0 +1,62 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.ResultImpl; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.io.Column; +import com.scalar.db.io.DataType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class JsonProducerTaskTest { + TableMetadata mockMetadata; + List projectedColumns; + Map columnData; + JsonProducerTask jsonProducerTask; + + @BeforeEach + void setup() { + mockMetadata = UnitTestUtils.createTestTableMetadata(); + projectedColumns = UnitTestUtils.getColumnsListOfMetadata(); + columnData = UnitTestUtils.getColumnData(); + jsonProducerTask = + new JsonProducerTask(false, projectedColumns, mockMetadata, columnData, true); + } + + @Test + void process_withEmptyResultList_shouldReturnEmptyString() { + List results = Collections.emptyList(); + String output = jsonProducerTask.process(results); + Assertions.assertEquals(" ", output); + } + + @Test + void process_withValidResultList_shouldReturnValidJsonString() { + ObjectNode rootNode = UnitTestUtils.getOutputDataWithoutMetadata(); + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockMetadata); + List resultList = new ArrayList<>(); + resultList.add(result); + String output = jsonProducerTask.process(resultList); + Assertions.assertEquals(rootNode.toPrettyString(), output.trim()); + } + + @Test + void process_withValidResultListWithMetadata_shouldReturnValidJsonString() { + jsonProducerTask = new JsonProducerTask(true, projectedColumns, mockMetadata, columnData, true); + ObjectNode rootNode = UnitTestUtils.getOutputDataWithMetadata(); + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockMetadata); + List resultList = new ArrayList<>(); + resultList.add(result); + String output = jsonProducerTask.process(resultList); + Assertions.assertEquals(rootNode.toPrettyString(), output.trim()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactoryTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactoryTest.java new file mode 100644 index 000000000..f97e80a1d --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactoryTest.java @@ -0,0 +1,55 @@ +package com.scalar.db.dataloader.core.dataexport.producer; + +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.io.DataType; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ProducerTaskFactoryTest { + + TableMetadata mockMetadata; + List projectedColumns; + Map columnData; + + @BeforeEach + void setup() { + mockMetadata = UnitTestUtils.createTestTableMetadata(); + projectedColumns = UnitTestUtils.getColumnsListOfMetadata(); + columnData = UnitTestUtils.getColumnData(); + } + + @Test + void createProducerTask_withJsonFileFormat_shouldReturnJsonProducerTask() { + ProducerTaskFactory producerTaskFactory = new ProducerTaskFactory(null, false, true); + Assertions.assertEquals( + JsonProducerTask.class, + producerTaskFactory + .createProducerTask(FileFormat.JSON, projectedColumns, mockMetadata, columnData) + .getClass()); + } + + @Test + void createProducerTask_withJsonLinesFileFormat_shouldReturnJsonLineProducerTask() { + ProducerTaskFactory producerTaskFactory = new ProducerTaskFactory(null, false, false); + Assertions.assertEquals( + JsonLineProducerTask.class, + producerTaskFactory + .createProducerTask(FileFormat.JSONL, projectedColumns, mockMetadata, columnData) + .getClass()); + } + + @Test + void createProducerTask_withCsvFileFormat_shouldReturnCsvProducerTask() { + ProducerTaskFactory producerTaskFactory = new ProducerTaskFactory(",", false, false); + Assertions.assertEquals( + CsvProducerTask.class, + producerTaskFactory + .createProducerTask(FileFormat.CSV, projectedColumns, mockMetadata, columnData) + .getClass()); + } +}