Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DuckDb implementation #125

Merged
merged 26 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8c6c930
POC duckdb
alexisszmundy Jun 4, 2024
0454229
csv export like before (Double quotes header + correct column order)
alexisszmundy Jun 5, 2024
dc0cc7a
Add SQL tests + fixes
alexisszmundy Jun 6, 2024
5ed6121
File by file support + Kraftwerktests fixes + add file by file tests
alexisszmundy Jun 6, 2024
a399c6d
add paper data tests
alexisszmundy Jun 7, 2024
233a754
Add paper data parser unit tests
alexisszmundy Jun 7, 2024
0f948d8
Fix paper data not parsed
alexisszmundy Jun 7, 2024
fe929b2
fix CSV output + add tests to check format + update changelog
alexisszmundy Jun 7, 2024
d59fdd3
remade csv transformations functions + add & fix problematic \n in data
alexisszmundy Jun 7, 2024
b26648d
fix file by file data not appended when first vtl binding is empty + …
alexisszmundy Jun 11, 2024
b0404b9
fix parquet reading in testing
alexisszmundy Jun 11, 2024
4705993
Merge branch 'main' into devParquetDuckdb
alexisszmundy Jun 11, 2024
74ff03c
Update MainDefinitions.java
alexisszmundy Jun 11, 2024
c99d0d3
Reading and writing into 2 separate databases + appender to insert
alexisszmundy Jun 13, 2024
92d6e21
Merge branch 'main' into devParquetDuckdb
alexisszmundy Jun 13, 2024
51115eb
Put opencsv back, remove reading database
alexisszmundy Jun 13, 2024
601b714
Merge branch 'main' into devParquetDuckdb
alexisszmundy Jun 21, 2024
115f628
Update pom.xml
alexisszmundy Jun 21, 2024
d83edab
Merge branch 'main' into devParquetDuckdb
alexisszmundy Jul 5, 2024
7b1a304
Add bug resolution test
alexisszmundy Jul 5, 2024
9e420bd
Merge branch 'devNumericFormatBug' into devParquetDuckdb
alexisszmundy Jul 5, 2024
bec49dd
added .0 to double format test
alexisszmundy Jul 5, 2024
d22fddb
SQL insert sequence
alexisszmundy Jul 5, 2024
527bdb2
refactors
alexisszmundy Jul 8, 2024
b060c34
Extract the query builder
alicela Jul 8, 2024
e7b3d7d
Changed way to ignore column case in sql schema
alexisszmundy Jul 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
# Changelog
## 2.2.0 [TODO] - DuckDB implementation for output
### Added
- Transfer Vtl datasets into DuckDB before output step
- SQL util class for SQL operations

### Changed
- (File-by-file) Kraftwerk now exports only one .parquet file
- Output is now made from DuckDB instead of VTL dataset

### Removed
- Avro
-
## 2.1.0 [2024-06-11] - Change Lunatic reader
### Changed
- Refactor of Lunatic reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import fr.insee.kraftwerk.core.metadata.MetadataModel;
import fr.insee.kraftwerk.core.metadata.MetadataUtils;
import fr.insee.kraftwerk.core.sequence.*;
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.TextFileWriter;
import fr.insee.kraftwerk.core.utils.log.KraftwerkExecutionLog;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
Expand All @@ -17,6 +18,9 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -26,14 +30,14 @@
@Log4j2
public class MainProcessing {

private ControlInputSequence controlInputSequence;
private boolean fileByFile;
private boolean withAllReportingData;
private boolean withDDI;
private final ControlInputSequence controlInputSequence;
private final boolean fileByFile;
private final boolean withAllReportingData;
private final boolean withDDI;


/* SPECIFIC VARIABLES */
private String inDirectoryParam;
private final String inDirectoryParam;
@Getter
private Path inDirectory;

Expand All @@ -43,8 +47,9 @@ public class MainProcessing {
@Getter
private VtlBindings vtlBindings = new VtlBindings();
private KraftwerkExecutionLog kraftwerkExecutionLog;
private List<KraftwerkError> errors = new ArrayList<>();
private final List<KraftwerkError> errors = new ArrayList<>();
private LocalDateTime executionDateTime;


/**
* Map by mode
Expand Down Expand Up @@ -77,51 +82,59 @@ public MainProcessing(String inDirectoryParam, boolean fileByFile, String defaul

public void runMain() throws KraftwerkException {
init();
if (Boolean.TRUE.equals(fileByFile)) { //iterate on files
//iterate on file(s)
try (Connection writeDatabaseConnection = SqlUtils.openConnection()) {
for (UserInputsFile userFile : userInputsFileList) {
this.userInputsFile = userFile;
vtlBindings = new VtlBindings();
unimodalProcess();
multimodalProcess();
outputFileWriter();
try(Statement writeDatabase = writeDatabaseConnection.createStatement()){
insertDatabase(writeDatabase);
}
}
} else {
unimodalProcess();
multimodalProcess();
outputFileWriter();
//Export from database
try(Statement writeDatabase = writeDatabaseConnection.createStatement()){
outputFileWriter(writeDatabase);
}
writeErrors();
kraftwerkExecutionLog.setEndTimeStamp(System.currentTimeMillis());
writeLog();
} catch (SQLException e) {
log.error(e.toString());
throw new KraftwerkException(500, "SQL Error");
}
writeErrors();
kraftwerkExecutionLog.setEndTimeStamp(System.currentTimeMillis());
writeLog();
}

/* Step 1 : Init */
public void init() throws KraftwerkException {
kraftwerkExecutionLog = new KraftwerkExecutionLog(); //Init logger
this.executionDateTime = LocalDateTime.now();

inDirectory = controlInputSequence.getInDirectory(inDirectoryParam);

String campaignName = inDirectory.getFileName().toString();
log.info("Kraftwerk main service started for campaign: " + campaignName);

userInputsFile = controlInputSequence.getUserInputs(inDirectory);
if (withDDI) metadataModels = MetadataUtils.getMetadata(userInputsFile.getModeInputsMap());
if (!withDDI) metadataModels = MetadataUtils.getMetadataFromLunatic(userInputsFile.getModeInputsMap());

if (fileByFile) userInputsFileList = getUserInputsFile(userInputsFile);
metadataModels = withDDI ? MetadataUtils.getMetadata(userInputsFile.getModeInputsMap()) : MetadataUtils.getMetadataFromLunatic(userInputsFile.getModeInputsMap());

userInputsFileList = getUserInputsFile(userInputsFile, fileByFile);

// Check size of data files and throw an exception if it is too big .Limit is 400 Mo for one processing (one file or data folder if not file by file).
//In case of file-by-file processing we check the size of each file.
if (fileByFile) {
if (Boolean.TRUE.equals(fileByFile)) {
for (UserInputsFile userInputs : userInputsFileList) {
isDataTooBig(userInputs,"At least one file size is greater than 400Mo. Split data files greater than 400Mo.", limitSize);
}
}
//In case of main processing we check the folder
if (!fileByFile) {
}else{
//In case of main processing we check the folder
isDataTooBig(userInputsFile,"Data folder size is greater than 400Mo. Use file-by-file processing.", limitSize);
}



}

/* Step 2 : unimodal data */
Expand All @@ -136,15 +149,21 @@ private void unimodalProcess() throws KraftwerkException {
}

/* Step 3 : multimodal VTL data processing */
private void multimodalProcess() {
private void multimodalProcess(){
MultimodalSequence multimodalSequence = new MultimodalSequence();
multimodalSequence.multimodalProcessing(userInputsFile, vtlBindings, errors, metadataModels);
}

/* Step 4 : Write output files */
private void outputFileWriter() throws KraftwerkException {
/* Step 4 : Insert into SQL database */
private void insertDatabase(Statement database) throws SQLException {
InsertDatabaseSequence insertDatabaseSequence = new InsertDatabaseSequence();
insertDatabaseSequence.insertDatabaseProcessing(vtlBindings, database);
}

/* Step 5 : Write output files */
private void outputFileWriter(Statement database) throws KraftwerkException {
WriterSequence writerSequence = new WriterSequence();
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModels, errors, kraftwerkExecutionLog);
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModels, errors, kraftwerkExecutionLog,database);
}

/* Step 5 : Write errors */
Expand All @@ -156,29 +175,33 @@ private void writeErrors() {
/* Step 6 : Write log */
private void writeLog() {TextFileWriter.writeLogFile(inDirectory, executionDateTime, kraftwerkExecutionLog);}

private static List<UserInputsFile> getUserInputsFile(UserInputsFile source) throws KraftwerkException {
private static List<UserInputsFile> getUserInputsFile(UserInputsFile source, boolean fileByFile) throws KraftwerkException {
List<UserInputsFile> userInputsFileList = new ArrayList<>();
for (String dataMode : source.getModeInputsMap().keySet()) {
List<Path> dataFiles = getFilesToProcess(source, dataMode);
for (Path dataFile : dataFiles) {
UserInputsFile currentFileInputs = new UserInputsFile(source.getUserInputFile(),source.getUserInputFile().getParent());
currentFileInputs.setVtlReconciliationFile(source.getVtlReconciliationFile());
currentFileInputs.setVtlInformationLevelsFile(source.getVtlInformationLevelsFile());
currentFileInputs.setVtlTransformationsFile(source.getVtlTransformationsFile());
currentFileInputs.setMultimodeDatasetName(source.getMultimodeDatasetName());
ModeInputs sourceModeInputs = source.getModeInputs(dataMode);
ModeInputs currentFileModeInputs = new ModeInputs();
currentFileModeInputs.setDataFile(dataFile);
currentFileModeInputs.setDdiUrl(sourceModeInputs.getDdiUrl());
currentFileModeInputs.setLunaticFile(sourceModeInputs.getLunaticFile());
currentFileModeInputs.setDataFormat(sourceModeInputs.getDataFormat().toString());
currentFileModeInputs.setDataMode(sourceModeInputs.getDataMode());
currentFileModeInputs.setModeVtlFile(sourceModeInputs.getModeVtlFile());
currentFileModeInputs.setParadataFolder(sourceModeInputs.getParadataFolder());
currentFileModeInputs.setReportingDataFile(sourceModeInputs.getReportingDataFile());
currentFileInputs.getModeInputsMap().put(dataMode, currentFileModeInputs);
userInputsFileList.add(currentFileInputs);
if(Boolean.TRUE.equals(fileByFile)){
for (String dataMode : source.getModeInputsMap().keySet()) {
List<Path> dataFiles = getFilesToProcess(source, dataMode);
for (Path dataFile : dataFiles) {
UserInputsFile currentFileInputs = new UserInputsFile(source.getUserInputFile(), source.getUserInputFile().getParent());
currentFileInputs.setVtlReconciliationFile(source.getVtlReconciliationFile());
currentFileInputs.setVtlInformationLevelsFile(source.getVtlInformationLevelsFile());
currentFileInputs.setVtlTransformationsFile(source.getVtlTransformationsFile());
currentFileInputs.setMultimodeDatasetName(source.getMultimodeDatasetName());
ModeInputs sourceModeInputs = source.getModeInputs(dataMode);
ModeInputs currentFileModeInputs = new ModeInputs();
currentFileModeInputs.setDataFile(dataFile);
currentFileModeInputs.setDdiUrl(sourceModeInputs.getDdiUrl());
currentFileModeInputs.setLunaticFile(sourceModeInputs.getLunaticFile());
currentFileModeInputs.setDataFormat(sourceModeInputs.getDataFormat().toString());
currentFileModeInputs.setDataMode(sourceModeInputs.getDataMode());
currentFileModeInputs.setModeVtlFile(sourceModeInputs.getModeVtlFile());
currentFileModeInputs.setParadataFolder(sourceModeInputs.getParadataFolder());
currentFileModeInputs.setReportingDataFile(sourceModeInputs.getReportingDataFile());
currentFileInputs.getModeInputsMap().put(dataMode, currentFileModeInputs);
userInputsFileList.add(currentFileInputs);
}
}
}else{
userInputsFileList.add(source);
}
return userInputsFileList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
import fr.insee.kraftwerk.core.data.model.SurveyUnitId;
import fr.insee.kraftwerk.core.data.model.SurveyUnitUpdateLatest;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
import fr.insee.kraftwerk.core.exceptions.NullException;
import fr.insee.kraftwerk.core.inputs.UserInputsGenesis;
import fr.insee.kraftwerk.core.metadata.MetadataModel;
import fr.insee.kraftwerk.core.metadata.MetadataUtilsGenesis;
import fr.insee.kraftwerk.core.sequence.BuildBindingsSequenceGenesis;
import fr.insee.kraftwerk.core.sequence.ControlInputSequenceGenesis;
import fr.insee.kraftwerk.core.sequence.InsertDatabaseSequence;
import fr.insee.kraftwerk.core.sequence.MultimodalSequence;
import fr.insee.kraftwerk.core.sequence.UnimodalSequence;
import fr.insee.kraftwerk.core.sequence.WriterSequence;
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.TextFileWriter;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
import lombok.Getter;
Expand All @@ -26,6 +27,9 @@

import java.io.IOException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -39,10 +43,11 @@ public class MainProcessingGenesis {
private ControlInputSequenceGenesis controlInputSequenceGenesis;
@Getter
private VtlBindings vtlBindings = new VtlBindings();
private List<KraftwerkError> errors = new ArrayList<>();
private final List<KraftwerkError> errors = new ArrayList<>();
@Getter
private UserInputsGenesis userInputs;
private LocalDateTime executionDateTime;
private Statement database;

/* SPECIFIC VARIABLES */
@Getter
Expand All @@ -53,7 +58,7 @@ public class MainProcessingGenesis {
@Getter
private Map<String, MetadataModel> metadataModels;

private GenesisClient client;
private final GenesisClient client;

public MainProcessingGenesis(ConfigProperties config) {
this.client = new GenesisClient(new RestTemplateBuilder(), config);
Expand All @@ -77,26 +82,34 @@ public void runMain(String idCampaign) throws KraftwerkException, IOException {
// We limit the size of the batch to 1000 survey units at a time
int batchSize = 1000;
init(idCampaign);
List<String> questionnaireModelIds = client.getQuestionnaireModelIds(idCampaign);
if(questionnaireModelIds.isEmpty()){
throw new KraftwerkException(204, null);
//Try with resources to close database when done
try (Connection tryDatabase = SqlUtils.openConnection()) {
this.database = tryDatabase.createStatement();
List<String> questionnaireModelIds = client.getQuestionnaireModelIds(idCampaign);
if (questionnaireModelIds.isEmpty()) {
throw new KraftwerkException(204, null);
}
for (String questionnaireId : questionnaireModelIds) {
List<SurveyUnitId> ids = client.getSurveyUnitIds(questionnaireId);
List<List<SurveyUnitId>> listIds = ListUtils.partition(ids, batchSize);
for (List<SurveyUnitId> listId : listIds) {
List<SurveyUnitUpdateLatest> suLatest = client.getUEsLatestState(questionnaireId, listId);
log.info("Number of documents retrieved from database : {}", suLatest.size());
vtlBindings = new VtlBindings();
unimodalProcess(suLatest);
multimodalProcess();
insertDatabase();
outputFileWriter();
writeErrors();
}
}
}catch (SQLException e){
log.error(e.toString());
throw new KraftwerkException(500,"SQL error");
}
for (String questionnaireId : questionnaireModelIds) {
List<SurveyUnitId> ids = client.getSurveyUnitIds(questionnaireId);
List<List<SurveyUnitId>> listIds = ListUtils.partition(ids, batchSize);
for (List<SurveyUnitId> listId : listIds) {
List<SurveyUnitUpdateLatest> suLatest = client.getUEsLatestState(questionnaireId, listId);
log.info("Number of documents retrieved from database : {}", suLatest.size());
vtlBindings = new VtlBindings();
unimodalProcess(suLatest);
multimodalProcess();
outputFileWriter();
writeErrors();
}
}
}
}

private void unimodalProcess(List<SurveyUnitUpdateLatest> suLatest) throws NullException {
private void unimodalProcess(List<SurveyUnitUpdateLatest> suLatest) throws KraftwerkException {
BuildBindingsSequenceGenesis buildBindingsSequenceGenesis = new BuildBindingsSequenceGenesis();
for (String dataMode : userInputs.getModeInputsMap().keySet()) {
buildBindingsSequenceGenesis.buildVtlBindings(dataMode, vtlBindings, metadataModels, suLatest, inDirectory);
Expand All @@ -111,13 +124,19 @@ private void multimodalProcess() {
multimodalSequence.multimodalProcessing(userInputs, vtlBindings, errors, metadataModels);
}

/* Step 4 : Write output files */
/* Step 4 : Insert into SQL database */
private void insertDatabase(){
InsertDatabaseSequence insertDatabaseSequence = new InsertDatabaseSequence();
insertDatabaseSequence.insertDatabaseProcessing(vtlBindings, database);
}

/* Step 5 : Write output files */
private void outputFileWriter() throws KraftwerkException {
WriterSequence writerSequence = new WriterSequence();
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputs.getModeInputsMap(), metadataModels, errors);
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputs.getModeInputsMap(), metadataModels, errors, null, database);
}

/* Step 5 : Write errors */
/* Step 6 : Write errors */
private void writeErrors() {
TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, errors);
}
Expand Down
Loading
Loading