Skip to content

Commit

Permalink
Merge pull request #154 from InseeFr/devExecutionContext
Browse files Browse the repository at this point in the history
Merged errors list and kraftwerk execution log into context
  • Loading branch information
alexisszmundy authored Aug 22, 2024
2 parents 83b6a88 + 7e10aa1 commit c4cd066
Show file tree
Hide file tree
Showing 37 changed files with 286 additions and 251 deletions.
2 changes: 1 addition & 1 deletion kraftwerk-api/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Config
#src/main/resources/kraftwerk.properties TODO
src/main/resources/kraftwerk.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package fr.insee.kraftwerk.api.process;

import fr.insee.kraftwerk.core.KraftwerkError;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
import fr.insee.kraftwerk.core.inputs.ModeInputs;
import fr.insee.kraftwerk.core.inputs.UserInputsFile;
Expand All @@ -14,7 +13,7 @@
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.TextFileWriter;
import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
import fr.insee.kraftwerk.core.utils.log.KraftwerkExecutionLog;
import fr.insee.kraftwerk.core.utils.log.KraftwerkExecutionContext;
import fr.insee.kraftwerk.core.vtl.VtlBindings;

import fr.insee.bpm.metadata.model.MetadataModel;
Expand Down Expand Up @@ -50,8 +49,7 @@ public class MainProcessing {
List<UserInputsFile> userInputsFileList; // for file by file process
@Getter
private VtlBindings vtlBindings = new VtlBindings();
private KraftwerkExecutionLog kraftwerkExecutionLog;
private final List<KraftwerkError> errors = new ArrayList<>();
private KraftwerkExecutionContext kraftwerkExecutionContext;
private LocalDateTime executionDateTime;
private final FileUtilsInterface fileUtilsInterface;

Expand Down Expand Up @@ -105,7 +103,7 @@ public void runMain() throws KraftwerkException {
outputFileWriter(writeDatabase);
}
writeErrors();
kraftwerkExecutionLog.setEndTimeStamp(System.currentTimeMillis());
kraftwerkExecutionContext.setEndTimeStamp(System.currentTimeMillis());
writeLog();
} catch (SQLException e) {
log.error(e.toString());
Expand All @@ -115,7 +113,7 @@ public void runMain() throws KraftwerkException {

/* Step 1 : Init */
public void init() throws KraftwerkException {
kraftwerkExecutionLog = new KraftwerkExecutionLog(); //Init logger
kraftwerkExecutionContext = new KraftwerkExecutionContext(); //Init logger
this.executionDateTime = LocalDateTime.now();

inDirectory = controlInputSequence.getInDirectory(inDirectoryParam);
Expand Down Expand Up @@ -149,16 +147,18 @@ private void unimodalProcess() throws KraftwerkException {
BuildBindingsSequence buildBindingsSequence = new BuildBindingsSequence(withAllReportingData, fileUtilsInterface);
for (String dataMode : userInputsFile.getModeInputsMap().keySet()) {
MetadataModel metadataForMode = metadataModels.get(dataMode);
buildBindingsSequence.buildVtlBindings(userInputsFile, dataMode, vtlBindings, metadataForMode, withDDI, kraftwerkExecutionLog);
buildBindingsSequence.buildVtlBindings(userInputsFile, dataMode, vtlBindings, metadataForMode, withDDI, kraftwerkExecutionContext);
UnimodalSequence unimodal = new UnimodalSequence();
unimodal.applyUnimodalSequence(userInputsFile, dataMode, vtlBindings, errors, metadataModels, fileUtilsInterface);
unimodal.applyUnimodalSequence(userInputsFile, dataMode, vtlBindings, kraftwerkExecutionContext,
metadataModels,
fileUtilsInterface);
}
}

/* Step 3 : multimodal VTL data processing */
private void multimodalProcess(){
MultimodalSequence multimodalSequence = new MultimodalSequence();
multimodalSequence.multimodalProcessing(userInputsFile, vtlBindings, errors, metadataModels, fileUtilsInterface);
multimodalSequence.multimodalProcessing(userInputsFile, vtlBindings, kraftwerkExecutionContext, metadataModels, fileUtilsInterface);
}

/* Step 4 : Insert into SQL database */
Expand All @@ -170,17 +170,17 @@ private void insertDatabase(Statement database) {
/* Step 5 : Write output files */
private void outputFileWriter(Statement database) throws KraftwerkException {
WriterSequence writerSequence = new WriterSequence();
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModels, errors, kraftwerkExecutionLog, database, fileUtilsInterface);
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModels, kraftwerkExecutionContext, database, fileUtilsInterface);
}

/* Step 5 : Write errors */
private void writeErrors() {
TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, errors, fileUtilsInterface);
TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, kraftwerkExecutionContext, fileUtilsInterface);
}


/* Step 6 : Write log */
private void writeLog() {TextFileWriter.writeLogFile(inDirectory, executionDateTime, kraftwerkExecutionLog, fileUtilsInterface);}
private void writeLog() {TextFileWriter.writeLogFile(inDirectory, executionDateTime, kraftwerkExecutionContext, fileUtilsInterface);}

private static List<UserInputsFile> getUserInputsFile(UserInputsFile source, boolean fileByFile) throws KraftwerkException {
List<UserInputsFile> userInputsFileList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.TextFileWriter;
import fr.insee.kraftwerk.core.utils.log.KraftwerkExecutionContext;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -33,7 +34,6 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -44,13 +44,14 @@ public class MainProcessingGenesis {
private ControlInputSequenceGenesis controlInputSequenceGenesis;
@Getter
private VtlBindings vtlBindings = new VtlBindings();
private final List<KraftwerkError> errors = new ArrayList<>();
@Getter
private UserInputsGenesis userInputs;
private LocalDateTime executionDateTime;
private final FileUtilsInterface fileUtilsInterface;
private Statement database;

private KraftwerkExecutionContext kraftwerkExecutionContext;

/* SPECIFIC VARIABLES */
@Getter
private Path inDirectory;
Expand Down Expand Up @@ -122,14 +123,15 @@ private void unimodalProcess(List<SurveyUnitUpdateLatest> suLatest) throws Kraft
for (String dataMode : userInputs.getModeInputsMap().keySet()) {
buildBindingsSequenceGenesis.buildVtlBindings(dataMode, vtlBindings, metadataModels, suLatest, inDirectory);
UnimodalSequence unimodal = new UnimodalSequence();
unimodal.applyUnimodalSequence(userInputs, dataMode, vtlBindings, errors, metadataModels, fileUtilsInterface);
unimodal.applyUnimodalSequence(userInputs, dataMode, vtlBindings, kraftwerkExecutionContext, metadataModels, fileUtilsInterface);
}
}

/* Step 3 : multimodal VTL data processing */
private void multimodalProcess() {
MultimodalSequence multimodalSequence = new MultimodalSequence();
multimodalSequence.multimodalProcessing(userInputs, vtlBindings, errors, metadataModels, fileUtilsInterface);
multimodalSequence.multimodalProcessing(userInputs, vtlBindings, kraftwerkExecutionContext, metadataModels,
fileUtilsInterface);
}

/* Step 4 : Insert into SQL database */
Expand All @@ -141,12 +143,12 @@ private void insertDatabase(){
/* Step 5 : Write output files */
private void outputFileWriter() throws KraftwerkException {
WriterSequence writerSequence = new WriterSequence();
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputs.getModeInputsMap(), metadataModels, errors, null, database, fileUtilsInterface);
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputs.getModeInputsMap(), metadataModels, null, database, fileUtilsInterface);
}

/* Step 6 : Write errors */
private void writeErrors() {
TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, errors, fileUtilsInterface);
TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, kraftwerkExecutionContext, fileUtilsInterface);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import fr.insee.bpm.metadata.model.MetadataModel;
import fr.insee.kraftwerk.api.configuration.MinioConfig;
import fr.insee.kraftwerk.api.process.MainProcessing;
import fr.insee.kraftwerk.core.KraftwerkError;
import fr.insee.kraftwerk.core.dataprocessing.StepEnum;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
import fr.insee.kraftwerk.core.inputs.UserInputsFile;
Expand All @@ -14,6 +13,7 @@
import fr.insee.kraftwerk.core.utils.files.MinioImpl;
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.TextFileWriter;
import fr.insee.kraftwerk.core.utils.log.KraftwerkExecutionContext;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
import io.minio.MinioClient;
import io.swagger.v3.oas.annotations.Operation;
Expand All @@ -26,11 +26,9 @@

import java.io.File;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -82,10 +80,11 @@ public ResponseEntity<String> buildVtlBindings(
//Process
BuildBindingsSequence buildBindingsSequence = new BuildBindingsSequence(withAllReportingData, fileUtilsInterface);
VtlReaderWriterSequence vtlWriterSequence = new VtlReaderWriterSequence(fileUtilsInterface);
KraftwerkExecutionContext kraftwerkExecutionContext = new KraftwerkExecutionContext();

for (String dataMode : mp.getUserInputsFile().getModeInputsMap().keySet()) {
try{
buildBindingsSequence.buildVtlBindings(mp.getUserInputsFile(), dataMode, mp.getVtlBindings(),mp.getMetadataModels().get(dataMode), withDDI, null);
buildBindingsSequence.buildVtlBindings(mp.getUserInputsFile(), dataMode, mp.getVtlBindings(),mp.getMetadataModels().get(dataMode), withDDI, kraftwerkExecutionContext);
} catch (KraftwerkException e){
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
Expand Down Expand Up @@ -116,6 +115,7 @@ public ResponseEntity<String> buildVtlBindingsByDataMode(
fileUtilsInterface = new FileSystemImpl();
}

KraftwerkExecutionContext kraftwerkExecutionContext = new KraftwerkExecutionContext();
MainProcessing mp = new MainProcessing(inDirectoryParam, fileByFile,withAllReportingData,withDDI, defaultDirectory, limitSize, fileUtilsInterface);
try {
mp.init();
Expand All @@ -126,7 +126,7 @@ public ResponseEntity<String> buildVtlBindingsByDataMode(
//Process
BuildBindingsSequence buildBindingsSequence = new BuildBindingsSequence(withAllReportingData, fileUtilsInterface);
try{
buildBindingsSequence.buildVtlBindings(mp.getUserInputsFile(), dataMode, mp.getVtlBindings(), mp.getMetadataModels().get(dataMode), withDDI, null);
buildBindingsSequence.buildVtlBindings(mp.getUserInputsFile(), dataMode, mp.getVtlBindings(), mp.getMetadataModels().get(dataMode), withDDI, kraftwerkExecutionContext);
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
Expand All @@ -152,6 +152,7 @@ public ResponseEntity<String> unimodalProcessing(
}else{
fileUtilsInterface = new FileSystemImpl();
}
KraftwerkExecutionContext kraftwerkExecutionContext = new KraftwerkExecutionContext();

//Read data in JSON file
Path inDirectory;
Expand All @@ -167,7 +168,6 @@ public ResponseEntity<String> unimodalProcessing(
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
VtlBindings vtlBindings = new VtlBindings();
List<KraftwerkError> errors = new ArrayList<>();

VtlReaderWriterSequence vtlReaderSequence = new VtlReaderWriterSequence(fileUtilsInterface);
vtlReaderSequence.readDataset(FileUtilsInterface.transformToTemp(inDirectory).toString(),dataMode, StepEnum.BUILD_BINDINGS, vtlBindings);
Expand All @@ -176,12 +176,12 @@ public ResponseEntity<String> unimodalProcessing(

//Process
UnimodalSequence unimodal = new UnimodalSequence();
unimodal.applyUnimodalSequence(userInputsFile, dataMode, vtlBindings, errors, metadataModelMap, fileUtilsInterface);
unimodal.applyUnimodalSequence(userInputsFile, dataMode, vtlBindings, kraftwerkExecutionContext, metadataModelMap, fileUtilsInterface);

//Write technical outputs
VtlReaderWriterSequence vtlWriterSequence = new VtlReaderWriterSequence(fileUtilsInterface);
vtlWriterSequence.writeTempBindings(inDirectory, dataMode, vtlBindings, StepEnum.UNIMODAL_PROCESSING);
TextFileWriter.writeErrorsFile(inDirectory, LocalDateTime.now(), errors, fileUtilsInterface);
TextFileWriter.writeErrorsFile(inDirectory, LocalDateTime.now(), kraftwerkExecutionContext, fileUtilsInterface);

return ResponseEntity.ok(inDirectoryParam+ " - "+dataMode);

Expand Down Expand Up @@ -214,7 +214,7 @@ public ResponseEntity<String> multimodalProcessing(
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
List<KraftwerkError> errors = new ArrayList<>();
KraftwerkExecutionContext kraftwerkExecutionContext = new KraftwerkExecutionContext();


VtlReaderWriterSequence vtlReaderWriterSequence = new VtlReaderWriterSequence(fileUtilsInterface);
Expand All @@ -229,13 +229,13 @@ public ResponseEntity<String> multimodalProcessing(

//Process
MultimodalSequence multimodalSequence = new MultimodalSequence();
multimodalSequence.multimodalProcessing(userInputsFile, vtlBindings, errors, metadataModelMap, fileUtilsInterface);
multimodalSequence.multimodalProcessing(userInputsFile, vtlBindings, kraftwerkExecutionContext, metadataModelMap, fileUtilsInterface);

//Write technical fils
for (String datasetName : vtlBindings.getDatasetNames()) {
vtlReaderWriterSequence.writeTempBindings(inDirectory, datasetName, vtlBindings, StepEnum.MULTIMODAL_PROCESSING);
}
TextFileWriter.writeErrorsFile(inDirectory, LocalDateTime.now(), errors, fileUtilsInterface);
TextFileWriter.writeErrorsFile(inDirectory, LocalDateTime.now(), kraftwerkExecutionContext, fileUtilsInterface);

return ResponseEntity.ok(inDirectoryParam);

Expand All @@ -262,7 +262,8 @@ public ResponseEntity<String> writeOutputFiles(
}
LocalDateTime executionDateTime = LocalDateTime.now();
VtlBindings vtlBindings = new VtlBindings();
List<KraftwerkError> errors = new ArrayList<>();
KraftwerkExecutionContext kraftwerkExecutionContext = new KraftwerkExecutionContext();

// Read all bindings necessary to produce output
String path = FileUtilsInterface.transformToTemp(inDirectory).toString();
List<String> fileNames = fileUtilsInterface.listFileNames(path);
Expand All @@ -282,7 +283,7 @@ public ResponseEntity<String> writeOutputFiles(
}
Map<String, MetadataModel> metadataModelMap = MetadataUtils.getMetadata(userInputsFile.getModeInputsMap(), fileUtilsInterface);
try (Statement database = SqlUtils.openConnection().createStatement()) {
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModelMap, errors, null, database, fileUtilsInterface);
writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModelMap, kraftwerkExecutionContext, database, fileUtilsInterface);
}
return ResponseEntity.ok(inDirectoryParam);

Expand Down
2 changes: 1 addition & 1 deletion kraftwerk-api/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ [email protected]@ @project.version@
fr.insee.kraftwerk.lang=fr

# Import Ops properties
spring.config.import=classpath:i18n/messages_${fr.insee.kraftwerk.lang}.properties,kraftwerk.properties,optional:file:${catalina.base}/webapps/kraftwerk.properties
spring.config.import=classpath:i18n/messages_${fr.insee.kraftwerk.lang}.properties,optional:kraftwerk.properties,optional:file:${catalina.base}/webapps/kraftwerk.properties

# Config Swagger (only for display)
fr.insee.kraftwerk.version=@project.version@
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
##########################################
## Properties that are fixed by OPS ##
##########################################
##################################################
## Properties that are fixed by OPS ##
## For local usage, create kraftwerk.properties ##
##################################################

# Folders for in and out
fr.insee.postcollecte.files = ***
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package fr.insee.kraftwerk.core.dataprocessing;

import fr.insee.bpm.metadata.model.CalculatedVariables;

import fr.insee.kraftwerk.core.KraftwerkError;
import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
import fr.insee.kraftwerk.core.utils.log.KraftwerkExecutionContext;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
import fr.insee.kraftwerk.core.vtl.VtlScript;
import lombok.extern.log4j.Log4j2;
Expand All @@ -18,7 +17,7 @@ public class CalculatedProcessing extends DataProcessing {

/** Maximal number of iterations to resolve the order of execution of VTL expressions. */
public static final int MAXIMAL_RESOLVING_ITERATIONS = 100;
private CalculatedVariables calculatedVariables;
private final CalculatedVariables calculatedVariables;

public CalculatedProcessing(VtlBindings vtlBindings, CalculatedVariables calculatedVariables, FileUtilsInterface fileUtilsInterface) {
super(vtlBindings, fileUtilsInterface);
Expand All @@ -31,13 +30,13 @@ public String getStepName() {
}


public String applyCalculatedVtlTransformations(String bindingName, Path userVtlInstructionsPath, List<KraftwerkError> errors){
public String applyCalculatedVtlTransformations(String bindingName, Path userVtlInstructionsPath, KraftwerkExecutionContext kraftwerkExecutionContext){
// First step
String automatedVtlInstructions = applyAutomatedVtlInstructions(bindingName, errors);
String automatedVtlInstructions = applyAutomatedVtlInstructions(bindingName, kraftwerkExecutionContext);
// Second step
if(userVtlInstructionsPath != null) {
applyUserVtlInstructions(userVtlInstructionsPath, errors);
applyAutomatedVtlInstructions(bindingName, errors);
applyUserVtlInstructions(userVtlInstructionsPath, kraftwerkExecutionContext);
applyAutomatedVtlInstructions(bindingName, kraftwerkExecutionContext);
} else {
log.info(String.format("No user VTL instructions given for dataset named %s (step %s).",
bindingName, getStepName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import fr.insee.bpm.metadata.model.MetadataModel;
import fr.insee.bpm.metadata.model.PaperUcq;
import fr.insee.bpm.metadata.model.VariablesMap;
import fr.insee.kraftwerk.core.KraftwerkError;
import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
import fr.insee.kraftwerk.core.utils.log.KraftwerkExecutionContext;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
import fr.insee.kraftwerk.core.vtl.VtlScript;
import lombok.extern.log4j.Log4j2;
Expand Down Expand Up @@ -45,11 +45,11 @@ public String getStepName() {
* @param userVtlInstructionsPath User vtl script (none for this step).
*/
@Override
public String applyVtlTransformations(String bindingName, Path userVtlInstructionsPath, List<KraftwerkError> errors) {
public String applyVtlTransformations(String bindingName, Path userVtlInstructionsPath, KraftwerkExecutionContext kraftwerkExecutionContext) {
// Remove paper UCQ variables in vtl multimode dataset
VtlScript cleanUpScript = generateVtlInstructions(bindingName);
log.debug("Automated clean up instructions after step {} : {}", getStepName(), cleanUpScript);
vtlExecute.evalVtlScript(cleanUpScript, vtlBindings, errors);
vtlExecute.evalVtlScript(cleanUpScript, vtlBindings, kraftwerkExecutionContext);
// Remove corresponding variables in VariablesMap
removePaperUcqVariables();
// Remove unimodal datasets
Expand Down
Loading

0 comments on commit c4cd066

Please sign in to comment.