Skip to content

Commit

Permalink
Merge pull request #125 from InseeFr/devParquetDuckdb
Browse files Browse the repository at this point in the history
DuckDb implementation
  • Loading branch information
alexisszmundy authored Jul 8, 2024
2 parents e4c2d82 + e7b3d7d commit a77a40f
Show file tree
Hide file tree
Showing 62 changed files with 201,502 additions and 1,329 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
# Changelog
## 2.2.0 [TODO] - DuckDB implementation for output
### Added
- Transfer Vtl datasets into DuckDB before output step
- SQL util class for SQL operations

### Changed
- (File-by-file) Kraftwerk now exports only one .parquet file
- Output is now made from DuckDB instead of VTL dataset

### Removed
- Avro
-
## 2.1.0 [2024-06-11] - Change Lunatic reader
### Changed
- Refactor of Lunatic reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import fr.insee.kraftwerk.core.metadata.MetadataModel;
import fr.insee.kraftwerk.core.metadata.MetadataUtils;
import fr.insee.kraftwerk.core.sequence.*;
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.TextFileWriter;
import fr.insee.kraftwerk.core.utils.log.KraftwerkExecutionLog;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
Expand All @@ -17,6 +18,9 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -26,14 +30,14 @@
@Log4j2
public class MainProcessing {

private ControlInputSequence controlInputSequence;
private boolean fileByFile;
private boolean withAllReportingData;
private boolean withDDI;
private final ControlInputSequence controlInputSequence;
private final boolean fileByFile;
private final boolean withAllReportingData;
private final boolean withDDI;


/* SPECIFIC VARIABLES */
private String inDirectoryParam;
private final String inDirectoryParam;
@Getter
private Path inDirectory;

Expand All @@ -43,8 +47,9 @@ public class MainProcessing {
@Getter
private VtlBindings vtlBindings = new VtlBindings();
private KraftwerkExecutionLog kraftwerkExecutionLog;
private List<KraftwerkError> errors = new ArrayList<>();
private final List<KraftwerkError> errors = new ArrayList<>();
private LocalDateTime executionDateTime;


/**
* Map by mode
Expand Down Expand Up @@ -77,51 +82,59 @@ public MainProcessing(String inDirectoryParam, boolean fileByFile, String defaul

public void runMain() throws KraftwerkException {
init();
if (Boolean.TRUE.equals(fileByFile)) { //iterate on files
//iterate on file(s)
try (Connection writeDatabaseConnection = SqlUtils.openConnection()) {
for (UserInputsFile userFile : userInputsFileList) {
this.userInputsFile = userFile;
vtlBindings = new VtlBindings();
unimodalProcess();
multimodalProcess();
outputFileWriter();
try(Statement writeDatabase = writeDatabaseConnection.createStatement()){
insertDatabase(writeDatabase);
}
}
} else {
unimodalProcess();
multimodalProcess();
outputFileWriter();
//Export from database
try(Statement writeDatabase = writeDatabaseConnection.createStatement()){
outputFileWriter(writeDatabase);
}
writeErrors();
kraftwerkExecutionLog.setEndTimeStamp(System.currentTimeMillis());
writeLog();
} catch (SQLException e) {
log.error(e.toString());
throw new KraftwerkException(500, "SQL Error");
}
writeErrors();
kraftwerkExecutionLog.setEndTimeStamp(System.currentTimeMillis());
writeLog();
}

/* Step 1 : Init */
public void init() throws KraftwerkException {
kraftwerkExecutionLog = new KraftwerkExecutionLog(); //Init logger
this.executionDateTime = LocalDateTime.now();

inDirectory = controlInputSequence.getInDirectory(inDirectoryParam);

String campaignName = inDirectory.getFileName().toString();
log.info("Kraftwerk main service started for campaign: " + campaignName);

userInputsFile = controlInputSequence.getUserInputs(inDirectory);
if (withDDI) metadataModels = MetadataUtils.getMetadata(userInputsFile.getModeInputsMap());
if (!withDDI) metadataModels = MetadataUtils.getMetadataFromLunatic(userInputsFile.getModeInputsMap());

if (fileByFile) userInputsFileList = getUserInputsFile(userInputsFile);
metadataModels = withDDI ? MetadataUtils.getMetadata(userInputsFile.getModeInputsMap()) : MetadataUtils.getMetadataFromLunatic(userInputsFile.getModeInputsMap());

userInputsFileList = getUserInputsFile(userInputsFile, fileByFile);

// Check size of data files and throw an exception if it is too big .Limit is 400 Mo for one processing (one file or data folder if not file by file).
//In case of file-by-file processing we check the size of each file.
if (fileByFile) {
if (Boolean.TRUE.equals(fileByFile)) {
for (UserInputsFile userInputs : userInputsFileList) {
isDataTooBig(userInputs,"At least one file size is greater than 400Mo. Split data files greater than 400Mo.", limitSize);
}
}
//In case of main processing we check the folder
if (!fileByFile) {
}else{
//In case of main processing we check the folder
isDataTooBig(userInputsFile,"Data folder size is greater than 400Mo. Use file-by-file processing.", limitSize);
}



}

/* Step 2 : unimodal data */
Expand All @@ -136,15 +149,21 @@ private void unimodalProcess() throws KraftwerkException {
}

/* Step 3 : multimodal VTL data processing */
private void multimodalProcess() {
private void multimodalProcess(){
MultimodalSequence multimodalSequence = new MultimodalSequence();
multimodalSequence.multimodalProcessing(userInputsFile, vtlBindings, errors, metadataModels);
}

/* Step 4 : Write output files */
private void outputFileWriter() throws KraftwerkException {
/* Step 4 : Insert into SQL database */
private void insertDatabase(Statement database) throws SQLException {
InsertDatabaseSequence insertDatabaseSequence = new InsertDatabaseSequence();
insertDatabaseSequence.insertDatabaseProcessing(vtlBindings, database);
}

/* Step 5 : Write output files */
private void outputFileWriter(Statement database) throws KraftwerkException {
WriterSequence writerSequence = new WriterSequence();
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModels, errors, kraftwerkExecutionLog);
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModels, errors, kraftwerkExecutionLog,database);
}

/* Step 5 : Write errors */
Expand All @@ -156,29 +175,33 @@ private void writeErrors() {
/* Step 6 : Write log */
private void writeLog() {TextFileWriter.writeLogFile(inDirectory, executionDateTime, kraftwerkExecutionLog);}

private static List<UserInputsFile> getUserInputsFile(UserInputsFile source) throws KraftwerkException {
private static List<UserInputsFile> getUserInputsFile(UserInputsFile source, boolean fileByFile) throws KraftwerkException {
List<UserInputsFile> userInputsFileList = new ArrayList<>();
for (String dataMode : source.getModeInputsMap().keySet()) {
List<Path> dataFiles = getFilesToProcess(source, dataMode);
for (Path dataFile : dataFiles) {
UserInputsFile currentFileInputs = new UserInputsFile(source.getUserInputFile(),source.getUserInputFile().getParent());
currentFileInputs.setVtlReconciliationFile(source.getVtlReconciliationFile());
currentFileInputs.setVtlInformationLevelsFile(source.getVtlInformationLevelsFile());
currentFileInputs.setVtlTransformationsFile(source.getVtlTransformationsFile());
currentFileInputs.setMultimodeDatasetName(source.getMultimodeDatasetName());
ModeInputs sourceModeInputs = source.getModeInputs(dataMode);
ModeInputs currentFileModeInputs = new ModeInputs();
currentFileModeInputs.setDataFile(dataFile);
currentFileModeInputs.setDdiUrl(sourceModeInputs.getDdiUrl());
currentFileModeInputs.setLunaticFile(sourceModeInputs.getLunaticFile());
currentFileModeInputs.setDataFormat(sourceModeInputs.getDataFormat().toString());
currentFileModeInputs.setDataMode(sourceModeInputs.getDataMode());
currentFileModeInputs.setModeVtlFile(sourceModeInputs.getModeVtlFile());
currentFileModeInputs.setParadataFolder(sourceModeInputs.getParadataFolder());
currentFileModeInputs.setReportingDataFile(sourceModeInputs.getReportingDataFile());
currentFileInputs.getModeInputsMap().put(dataMode, currentFileModeInputs);
userInputsFileList.add(currentFileInputs);
if(Boolean.TRUE.equals(fileByFile)){
for (String dataMode : source.getModeInputsMap().keySet()) {
List<Path> dataFiles = getFilesToProcess(source, dataMode);
for (Path dataFile : dataFiles) {
UserInputsFile currentFileInputs = new UserInputsFile(source.getUserInputFile(), source.getUserInputFile().getParent());
currentFileInputs.setVtlReconciliationFile(source.getVtlReconciliationFile());
currentFileInputs.setVtlInformationLevelsFile(source.getVtlInformationLevelsFile());
currentFileInputs.setVtlTransformationsFile(source.getVtlTransformationsFile());
currentFileInputs.setMultimodeDatasetName(source.getMultimodeDatasetName());
ModeInputs sourceModeInputs = source.getModeInputs(dataMode);
ModeInputs currentFileModeInputs = new ModeInputs();
currentFileModeInputs.setDataFile(dataFile);
currentFileModeInputs.setDdiUrl(sourceModeInputs.getDdiUrl());
currentFileModeInputs.setLunaticFile(sourceModeInputs.getLunaticFile());
currentFileModeInputs.setDataFormat(sourceModeInputs.getDataFormat().toString());
currentFileModeInputs.setDataMode(sourceModeInputs.getDataMode());
currentFileModeInputs.setModeVtlFile(sourceModeInputs.getModeVtlFile());
currentFileModeInputs.setParadataFolder(sourceModeInputs.getParadataFolder());
currentFileModeInputs.setReportingDataFile(sourceModeInputs.getReportingDataFile());
currentFileInputs.getModeInputsMap().put(dataMode, currentFileModeInputs);
userInputsFileList.add(currentFileInputs);
}
}
}else{
userInputsFileList.add(source);
}
return userInputsFileList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
import fr.insee.kraftwerk.core.data.model.SurveyUnitId;
import fr.insee.kraftwerk.core.data.model.SurveyUnitUpdateLatest;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
import fr.insee.kraftwerk.core.exceptions.NullException;
import fr.insee.kraftwerk.core.inputs.UserInputsGenesis;
import fr.insee.kraftwerk.core.metadata.MetadataModel;
import fr.insee.kraftwerk.core.metadata.MetadataUtilsGenesis;
import fr.insee.kraftwerk.core.sequence.BuildBindingsSequenceGenesis;
import fr.insee.kraftwerk.core.sequence.ControlInputSequenceGenesis;
import fr.insee.kraftwerk.core.sequence.InsertDatabaseSequence;
import fr.insee.kraftwerk.core.sequence.MultimodalSequence;
import fr.insee.kraftwerk.core.sequence.UnimodalSequence;
import fr.insee.kraftwerk.core.sequence.WriterSequence;
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.TextFileWriter;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
import lombok.Getter;
Expand All @@ -26,6 +27,9 @@

import java.io.IOException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -39,10 +43,11 @@ public class MainProcessingGenesis {
private ControlInputSequenceGenesis controlInputSequenceGenesis;
@Getter
private VtlBindings vtlBindings = new VtlBindings();
private List<KraftwerkError> errors = new ArrayList<>();
private final List<KraftwerkError> errors = new ArrayList<>();
@Getter
private UserInputsGenesis userInputs;
private LocalDateTime executionDateTime;
private Statement database;

/* SPECIFIC VARIABLES */
@Getter
Expand All @@ -53,7 +58,7 @@ public class MainProcessingGenesis {
@Getter
private Map<String, MetadataModel> metadataModels;

private GenesisClient client;
private final GenesisClient client;

public MainProcessingGenesis(ConfigProperties config) {
this.client = new GenesisClient(new RestTemplateBuilder(), config);
Expand All @@ -77,26 +82,34 @@ public void runMain(String idCampaign) throws KraftwerkException, IOException {
// We limit the size of the batch to 1000 survey units at a time
int batchSize = 1000;
init(idCampaign);
List<String> questionnaireModelIds = client.getQuestionnaireModelIds(idCampaign);
if(questionnaireModelIds.isEmpty()){
throw new KraftwerkException(204, null);
//Try with resources to close database when done
try (Connection tryDatabase = SqlUtils.openConnection()) {
this.database = tryDatabase.createStatement();
List<String> questionnaireModelIds = client.getQuestionnaireModelIds(idCampaign);
if (questionnaireModelIds.isEmpty()) {
throw new KraftwerkException(204, null);
}
for (String questionnaireId : questionnaireModelIds) {
List<SurveyUnitId> ids = client.getSurveyUnitIds(questionnaireId);
List<List<SurveyUnitId>> listIds = ListUtils.partition(ids, batchSize);
for (List<SurveyUnitId> listId : listIds) {
List<SurveyUnitUpdateLatest> suLatest = client.getUEsLatestState(questionnaireId, listId);
log.info("Number of documents retrieved from database : {}", suLatest.size());
vtlBindings = new VtlBindings();
unimodalProcess(suLatest);
multimodalProcess();
insertDatabase();
outputFileWriter();
writeErrors();
}
}
}catch (SQLException e){
log.error(e.toString());
throw new KraftwerkException(500,"SQL error");
}
for (String questionnaireId : questionnaireModelIds) {
List<SurveyUnitId> ids = client.getSurveyUnitIds(questionnaireId);
List<List<SurveyUnitId>> listIds = ListUtils.partition(ids, batchSize);
for (List<SurveyUnitId> listId : listIds) {
List<SurveyUnitUpdateLatest> suLatest = client.getUEsLatestState(questionnaireId, listId);
log.info("Number of documents retrieved from database : {}", suLatest.size());
vtlBindings = new VtlBindings();
unimodalProcess(suLatest);
multimodalProcess();
outputFileWriter();
writeErrors();
}
}
}
}

private void unimodalProcess(List<SurveyUnitUpdateLatest> suLatest) throws NullException {
private void unimodalProcess(List<SurveyUnitUpdateLatest> suLatest) throws KraftwerkException {
BuildBindingsSequenceGenesis buildBindingsSequenceGenesis = new BuildBindingsSequenceGenesis();
for (String dataMode : userInputs.getModeInputsMap().keySet()) {
buildBindingsSequenceGenesis.buildVtlBindings(dataMode, vtlBindings, metadataModels, suLatest, inDirectory);
Expand All @@ -111,13 +124,19 @@ private void multimodalProcess() {
multimodalSequence.multimodalProcessing(userInputs, vtlBindings, errors, metadataModels);
}

/* Step 4 : Write output files */
/* Step 4 : Insert into SQL database */
private void insertDatabase(){
InsertDatabaseSequence insertDatabaseSequence = new InsertDatabaseSequence();
insertDatabaseSequence.insertDatabaseProcessing(vtlBindings, database);
}

/* Step 5 : Write output files */
private void outputFileWriter() throws KraftwerkException {
WriterSequence writerSequence = new WriterSequence();
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputs.getModeInputsMap(), metadataModels, errors);
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputs.getModeInputsMap(), metadataModels, errors, null, database);
}

/* Step 5 : Write errors */
/* Step 6 : Write errors */
private void writeErrors() {
TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, errors);
}
Expand Down
Loading

0 comments on commit a77a40f

Please sign in to comment.