-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add export tasks #2450
Merged
Merged
Add export tasks #2450
Changes from 72 commits
Commits
Show all changes
74 commits
Select commit
Hold shift + click to select a range
753618b
Util classes for data loader
inv-jishnu 8d39d02
Fix spotbug issue
inv-jishnu bf94c49
Removed error message and added core error
inv-jishnu 47be388
Applied spotless
inv-jishnu 913eb1c
Fixed unit test failures
inv-jishnu 1f204b8
Merge branch 'master' into feat/data-loader/utils
ypeckstadt 6cfa83a
Basic data import enum and exception
inv-jishnu d381b2b
Removed exception class for now
inv-jishnu 67f2474
Added DECIMAL_FORMAT
inv-jishnu 14e3593
Path util class updated
inv-jishnu a096d51
Feedback changes
inv-jishnu dbf1940
Merge branch 'master' into feat/data-loader/utils
ypeckstadt cd8add9
Merge branch 'master' into feat/data-loader/utils
ypeckstadt 52890c8
Changes
inv-jishnu 5114639
Merge branch 'master' into feat/data-loader/import-data-1
inv-jishnu 4f9cd75
Merge branch 'feat/data-loader/utils' into feat/data-loader/scaladb-dao
inv-jishnu 1997eb8
Added ScalarDB Dao
inv-jishnu 91e6310
Merge branch 'master' into feat/data-loader/scaladb-dao
inv-jishnu 8a7338b
Remove unnecessary files
inv-jishnu 2b52eeb
Initial commit [skip ci]
inv-jishnu e206073
Changes
inv-jishnu 26d3144
Changes
inv-jishnu b86487d
spotbugs exclude
inv-jishnu 818a2b4
spotbugs exclude -2
inv-jishnu 90c4105
Added a file [skip ci]
inv-jishnu 3d5d3e0
Added unit test files [skip ci]
inv-jishnu 6495202
Spotbug fixes
inv-jishnu 90abd9e
Removed use of List.of to fix CI error
inv-jishnu ba2b3dd
Merged changes from master after resolving conflict
inv-jishnu b1b811b
Merge branch 'master' into feat/data-loader/metadata-service
inv-jishnu 30db988
Applied spotless
inv-jishnu e9bb004
Added export options validator
inv-jishnu 03324e1
Minor change in test
inv-jishnu d6aaf85
Applied spotless on CoreError
inv-jishnu 4439dea
Make constructor private and improve javadocs
ypeckstadt ccb1ace
Improve javadocs
ypeckstadt a374f1a
Add private constructor to TableMetadataUtil
ypeckstadt a65c9b5
Apply spotless fix
ypeckstadt b3279ba
Fix the validation for partition and clustering keys
ypeckstadt 78a8170
Fix spotless format
ypeckstadt acedabe
Partial feedback changes
inv-jishnu a95a858
Resolved conflicts and merged latest changes from main
inv-jishnu c05286d
Merge branch 'feat/data-loader/scaladb-dao' into feat/data-loader/exp…
inv-jishnu 0d3f79e
Export tasks added
inv-jishnu 2365460
Merge branch 'feat/data-loader/metadata-service' into feat/data-loade…
inv-jishnu 95022a9
Initial commit [skip ci]
inv-jishnu 89fea78
Added changes
inv-jishnu 29a8c25
Fix spot less issue
inv-jishnu 45adc95
Merge branch 'master' into feat/data-loader/export-tasks
inv-jishnu f6c54ec
Updated test code to remove warning
inv-jishnu b92758c
Merged latest changes from main after resolving conflicts
inv-jishnu 9c4ae23
Resolved conflicts and merge latest changes from master
inv-jishnu c9d01cb
Added default case in switch to resolve sportbugs warning
inv-jishnu 0984d51
Resolved conflicts and merged latest changes from main
inv-jishnu 3ff03d9
Merge branch 'master' into feat/data-loader/export-tasks
inv-jishnu ba9fa8f
Removed scalardb manager class as was removed earlier [skip ci]
inv-jishnu 1bcd3a7
Changes
inv-jishnu 6f0066d
Export count related changes
inv-jishnu c5d3fcb
Revert "Export count related changes"
inv-jishnu e0a9e27
Export row count addition updated
inv-jishnu f93a9ae
Newline added again [skip ci]
inv-jishnu 9c7a94b
Newline added again [skip ci]
inv-jishnu af51049
Merge branch 'master' into feat/data-loader/export-tasks
inv-jishnu 4e5e06d
Resolved conflicts and merged lastest changes from master
inv-jishnu c2248f3
Removed duplicated build code from core error
inv-jishnu 636e737
Updated java doc
inv-jishnu 05eee13
Update ExportManager logic
inv-jishnu a3a17f8
changes
inv-jishnu de52b56
Resolved conflicts and merged latest changes from main
inv-jishnu a4b33d2
Core error formatting
inv-jishnu 4b550bd
Core error class formatted
inv-jishnu 15b0df7
Feedback changes
inv-jishnu 2828244
Removed unused error code
inv-jishnu 10a546d
Merge branch 'master' into feat/data-loader/export-tasks
inv-jishnu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
14 changes: 14 additions & 0 deletions
14
data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderObjectMapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package com.scalar.db.dataloader.core; | ||
|
||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; | ||
|
||
public class DataLoaderObjectMapper extends ObjectMapper { | ||
|
||
public DataLoaderObjectMapper() { | ||
super(); | ||
this.setSerializationInclusion(JsonInclude.Include.NON_NULL); | ||
this.registerModule(new JavaTimeModule()); | ||
} | ||
} |
94 changes: 94 additions & 0 deletions
94
...-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package com.scalar.db.dataloader.core.dataexport; | ||
|
||
import com.scalar.db.api.DistributedStorage; | ||
import com.scalar.db.api.TableMetadata; | ||
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; | ||
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; | ||
import com.scalar.db.dataloader.core.util.CsvUtil; | ||
import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils; | ||
import java.io.IOException; | ||
import java.io.Writer; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
|
||
public class CsvExportManager extends ExportManager { | ||
public CsvExportManager( | ||
DistributedStorage storage, ScalarDBDao dao, ProducerTaskFactory producerTaskFactory) { | ||
super(storage, dao, producerTaskFactory); | ||
} | ||
|
||
/** | ||
* Create and add header part for the export file | ||
* | ||
* @param exportOptions Export options for the data export | ||
* @param tableMetadata Metadata of the table to export | ||
* @param writer File writer object | ||
* @throws IOException If any IO exception occurs | ||
*/ | ||
@Override | ||
void processHeader(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) | ||
throws IOException { | ||
String header = createCsvHeaderRow(exportOptions, tableMetadata); | ||
writer.append(header); | ||
writer.flush(); | ||
} | ||
|
||
/** | ||
* Create and add footer part for the export file | ||
* | ||
* @param exportOptions Export options for the data export | ||
* @param tableMetadata Metadata of the table to export | ||
* @param writer File writer object | ||
* @throws IOException If any IO exception occurs | ||
*/ | ||
@Override | ||
void processFooter(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) | ||
throws IOException {} | ||
|
||
/** | ||
* To generate the header row of CSV export file | ||
* | ||
* @param exportOptions export options | ||
* @param tableMetadata metadata of the table | ||
* @return generated CSV header row | ||
*/ | ||
private String createCsvHeaderRow(ExportOptions exportOptions, TableMetadata tableMetadata) { | ||
StringBuilder headerRow = new StringBuilder(); | ||
List<String> projections = exportOptions.getProjectionColumns(); | ||
Iterator<String> iterator = tableMetadata.getColumnNames().iterator(); | ||
while (iterator.hasNext()) { | ||
String columnName = iterator.next(); | ||
if (shouldIgnoreColumn( | ||
exportOptions.isIncludeTransactionMetadata(), columnName, tableMetadata, projections)) { | ||
continue; | ||
} | ||
headerRow.append(columnName); | ||
if (iterator.hasNext()) { | ||
headerRow.append(exportOptions.getDelimiter()); | ||
} | ||
} | ||
CsvUtil.removeTrailingDelimiter(headerRow, exportOptions.getDelimiter()); | ||
headerRow.append("\n"); | ||
return headerRow.toString(); | ||
} | ||
|
||
/** | ||
* To ignore a column or not based on conditions such as if it is a metadata column or if it is | ||
* not include in selected projections | ||
* | ||
* @param isIncludeTransactionMetadata to include transaction metadata or not | ||
* @param columnName column name | ||
* @param tableMetadata table metadata | ||
* @param projections selected columns for projection | ||
* @return ignore the column or not | ||
*/ | ||
private boolean shouldIgnoreColumn( | ||
boolean isIncludeTransactionMetadata, | ||
String columnName, | ||
TableMetadata tableMetadata, | ||
List<String> projections) { | ||
return (!isIncludeTransactionMetadata | ||
&& ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata)) | ||
|| (!projections.isEmpty() && !projections.contains(columnName)); | ||
} | ||
} |
243 changes: 243 additions & 0 deletions
243
data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
package com.scalar.db.dataloader.core.dataexport; | ||
|
||
import com.scalar.db.api.DistributedStorage; | ||
import com.scalar.db.api.Result; | ||
import com.scalar.db.api.Scanner; | ||
import com.scalar.db.api.TableMetadata; | ||
import com.scalar.db.dataloader.core.FileFormat; | ||
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask; | ||
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; | ||
import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidationException; | ||
import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidator; | ||
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; | ||
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; | ||
import com.scalar.db.dataloader.core.util.TableMetadataUtil; | ||
import com.scalar.db.io.DataType; | ||
import java.io.BufferedWriter; | ||
import java.io.IOException; | ||
import java.io.Writer; | ||
import java.util.ArrayList; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import lombok.RequiredArgsConstructor; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@RequiredArgsConstructor | ||
public abstract class ExportManager { | ||
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class); | ||
|
||
private final DistributedStorage storage; | ||
private final ScalarDBDao dao; | ||
private final ProducerTaskFactory producerTaskFactory; | ||
private final Object lock = new Object(); | ||
|
||
/** | ||
* Create and add header part for the export file | ||
* | ||
* @param exportOptions Export options for the data export | ||
* @param tableMetadata Metadata of the table to export | ||
* @param writer File writer object | ||
* @throws IOException If any IO exception occurs | ||
*/ | ||
abstract void processHeader( | ||
ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) throws IOException; | ||
|
||
/** | ||
* Create and add footer part for the export file | ||
* | ||
* @param exportOptions Export options for the data export | ||
* @param tableMetadata Metadata of the table to export | ||
* @param writer File writer object | ||
* @throws IOException If any IO exception occurs | ||
*/ | ||
abstract void processFooter( | ||
ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) throws IOException; | ||
/** | ||
* Starts the export process | ||
* | ||
* @param exportOptions Export options | ||
* @param tableMetadata Metadata for a single ScalarDB table | ||
* @param writer Writer to write the exported data | ||
*/ | ||
public ExportReport startExport( | ||
ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) { | ||
ExportReport exportReport = new ExportReport(); | ||
try { | ||
validateExportOptions(exportOptions, tableMetadata); | ||
Map<String, DataType> dataTypeByColumnName = tableMetadata.getColumnDataTypes(); | ||
handleTransactionMetadata(exportOptions, tableMetadata); | ||
processHeader(exportOptions, tableMetadata, writer); | ||
|
||
int maxThreadCount = | ||
exportOptions.getMaxThreadCount() == 0 | ||
? Runtime.getRuntime().availableProcessors() | ||
: exportOptions.getMaxThreadCount(); | ||
ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount); | ||
|
||
BufferedWriter bufferedWriter = new BufferedWriter(writer); | ||
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; | ||
|
||
try (Scanner scanner = createScanner(exportOptions, dao, storage)) { | ||
|
||
Iterator<Result> iterator = scanner.iterator(); | ||
AtomicBoolean isFirstBatch = new AtomicBoolean(true); | ||
|
||
while (iterator.hasNext()) { | ||
List<Result> dataChunk = fetchDataChunk(iterator, exportOptions.getDataChunkSize()); | ||
executorService.submit( | ||
() -> | ||
processDataChunk( | ||
exportOptions, | ||
tableMetadata, | ||
dataTypeByColumnName, | ||
dataChunk, | ||
bufferedWriter, | ||
isJson, | ||
isFirstBatch, | ||
exportReport)); | ||
} | ||
executorService.shutdown(); | ||
if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { | ||
logger.info("All tasks completed"); | ||
} else { | ||
logger.error("Timeout occurred while waiting for tasks to complete"); | ||
// TODO: handle this | ||
} | ||
processFooter(exportOptions, tableMetadata, bufferedWriter); | ||
} catch (InterruptedException | IOException e) { | ||
logger.error("Error during export: {}", e.getMessage()); | ||
} finally { | ||
bufferedWriter.flush(); | ||
} | ||
} catch (ExportOptionsValidationException | IOException | ScalarDBDaoException e) { | ||
logger.error("Error during export: {}", e.getMessage()); | ||
} | ||
return exportReport; | ||
} | ||
|
||
/** | ||
* To process result data chunk | ||
* | ||
* @param exportOptions export options | ||
* @param tableMetadata metadata of the table | ||
* @param dataTypeByColumnName map of columns and their data types | ||
* @param dataChunk a list with result data | ||
* @param bufferedWriter writer object | ||
* @param isJson if data format is json or not | ||
* @param isFirstBatch is the data going to be process is the first batch or not | ||
* @param exportReport export report which will be updated once the data chunk is processed | ||
*/ | ||
private void processDataChunk( | ||
ExportOptions exportOptions, | ||
TableMetadata tableMetadata, | ||
Map<String, DataType> dataTypeByColumnName, | ||
List<Result> dataChunk, | ||
BufferedWriter bufferedWriter, | ||
boolean isJson, | ||
AtomicBoolean isFirstBatch, | ||
ExportReport exportReport) { | ||
ProducerTask producerTask = | ||
producerTaskFactory.createProducerTask( | ||
exportOptions.getOutputFileFormat(), | ||
exportOptions.getProjectionColumns(), | ||
tableMetadata, | ||
dataTypeByColumnName); | ||
String dataChunkContent = producerTask.process(dataChunk); | ||
|
||
try { | ||
synchronized (lock) { | ||
if (isJson && !isFirstBatch.getAndSet(false)) { | ||
bufferedWriter.write(","); | ||
} | ||
bufferedWriter.write(dataChunkContent); | ||
exportReport.updateExportedRowCount(dataChunk.size()); | ||
} | ||
} catch (IOException e) { | ||
logger.error("Error while writing data chunk: {}", e.getMessage()); | ||
} | ||
} | ||
|
||
/** | ||
* To split result into batches | ||
* | ||
* @param iterator iterator which parse results | ||
* @param batchSize size of batch | ||
* @return a list of results split to batches | ||
*/ | ||
private List<Result> fetchDataChunk(Iterator<Result> iterator, int batchSize) { | ||
List<Result> batch = new ArrayList<>(); | ||
int count = 0; | ||
while (iterator.hasNext() && count < batchSize) { | ||
batch.add(iterator.next()); | ||
count++; | ||
} | ||
return batch; | ||
} | ||
|
||
/** | ||
* * To validate export options | ||
* | ||
* @param exportOptions export options | ||
* @param tableMetadata metadata of the table | ||
* @throws ExportOptionsValidationException thrown if any of the export option validation fails | ||
*/ | ||
private void validateExportOptions(ExportOptions exportOptions, TableMetadata tableMetadata) | ||
throws ExportOptionsValidationException { | ||
ExportOptionsValidator.validate(exportOptions, tableMetadata); | ||
} | ||
|
||
/** | ||
* To update projection columns of export options if include metadata options is enabled | ||
* | ||
* @param exportOptions export options | ||
* @param tableMetadata metadata of the table | ||
*/ | ||
private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadata tableMetadata) { | ||
if (exportOptions.isIncludeTransactionMetadata() | ||
&& !exportOptions.getProjectionColumns().isEmpty()) { | ||
List<String> projectionMetadata = | ||
TableMetadataUtil.populateProjectionsWithMetadata( | ||
tableMetadata, exportOptions.getProjectionColumns()); | ||
exportOptions.setProjectionColumns(projectionMetadata); | ||
} | ||
} | ||
|
||
/** | ||
* To create a scanner object | ||
* | ||
* @param exportOptions export options | ||
* @param dao ScalarDB dao object | ||
* @param storage distributed storage object | ||
* @return created scanner | ||
* @throws ScalarDBDaoException throws if any issue occurs in creating scanner object | ||
*/ | ||
private Scanner createScanner( | ||
ExportOptions exportOptions, ScalarDBDao dao, DistributedStorage storage) | ||
throws ScalarDBDaoException { | ||
boolean isScanAll = exportOptions.getScanPartitionKey() == null; | ||
if (isScanAll) { | ||
return dao.createScanner( | ||
exportOptions.getNamespace(), | ||
exportOptions.getTableName(), | ||
exportOptions.getProjectionColumns(), | ||
exportOptions.getLimit(), | ||
storage); | ||
} else { | ||
return dao.createScanner( | ||
exportOptions.getNamespace(), | ||
exportOptions.getTableName(), | ||
exportOptions.getScanPartitionKey(), | ||
exportOptions.getScanRange(), | ||
exportOptions.getSortOrders(), | ||
exportOptions.getProjectionColumns(), | ||
exportOptions.getLimit(), | ||
storage); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this message is no longer needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed it in 2828244.
Thank you.