diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5d592072..fbe44696 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,10 @@
# Changelog
-## 2.2.0 [TODO] - DuckDB implementation for output
+## 3.0.0 [TODO] - Kubernetes support
### Added
+- Launch treatment with command line
+- S3/MinIO support : All file system calls goes through an interface with both OS file system and MinIO implementations
+
+- DuckDB implementation for output
- Transfer Vtl datasets into DuckDB before output step
- SQL util class for SQL operations
@@ -10,7 +14,7 @@
### Removed
- Avro
--
+
## 2.1.0 [2024-06-11] - Change Lunatic reader
### Changed
- Refactor of Lunatic reader
diff --git a/Dockerfile b/Dockerfile
index cc12cfc5..673500f9 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,7 +1,9 @@
# Java 21
FROM gitlab-registry.insee.fr:443/kubernetes/images/run/jre:21.0.1_12-jre-jammy-rootless
-COPY --chown=$JAVA_USER:$JAVA_USER Kraftwerk/kraftwerk-api/target/kraftwerk-api-*app-to-import.jar kraftwerk.jar
+ARG VERSION_APPLICATION
+COPY --chown=$JAVA_USER:$JAVA_USER Kraftwerk/kraftwerk-api/target/kraftwerk-api-$VERSION_APPLICATION.jar kraftwerk.jar
EXPOSE 8080
+#Docker run without additionnal params to use REST API mode, add batch parameters to use batch mode
ENTRYPOINT ["java","-jar","/kraftwerk.jar"]
\ No newline at end of file
diff --git a/README.md b/README.md
index 4d2622b7..50b437fb 100644
--- a/README.md
+++ b/README.md
@@ -21,6 +21,18 @@ User documentation and functional tests are still in an [inhouse project](https:
Kraftwerk uses [Lombok](https://projectlombok.org/).
+## Launch
+
+If no argument is specified in the `java -jar` command, Kraftwerk will launch
+as a REST API.
+Otherwise, it will launch on batch mode and apply treatments on one campaign
+with the specified arguments.
+The required arguments for batch mode are as follows (in order) :
+1. Service to use (`MAIN`,`FILEBYFILE`,`GENESIS`,`LUNATIC_ONLY`)
+2. Archive at end of execution (`false` or `true`)
+3. Integrate all reporting datas (`false` or `true`)
+4. Campaign name
+
:fr:
Kraftwerk est une application Java Spring conçue pour valider et traiter des données provenant d'enquêtes multimodes, afin de générer des tableaux de données prêts à être utilisés à des fins statistiques.
@@ -37,3 +49,14 @@ La documentation utilisateur et les tests fonctionnels sont encore dans un [proj
* Maven 3.6 +
Kraftwerk utilise [Lombok](https://projectlombok.org/).
+
+## Lancement
+
+Si aucun paramètre n'est spécifié dans la commande `java -jar`, Kraftwerk se lancera
+en tant qu'API REST.
+Sinon, il va se lancer en mode batch et appliquer les traitements sur une campagne
+avec les paramètres spécifiés. Les paramètres requis pour le mode batch sont les suivants (dans l'ordre) :
+1. Service à utiliser (`MAIN`,`FILEBYFILE`,`GENESIS`,`LUNATIC_ONLY`)
+2. Archiver à la fin de l'exécution (`false` ou `true`)
+3. Integrate all reporting datas (`false` ou `true`)
+4. Nom de la campagne
\ No newline at end of file
diff --git a/kraftwerk-api/pom.xml b/kraftwerk-api/pom.xml
index 120e3cf4..1dec62d3 100644
--- a/kraftwerk-api/pom.xml
+++ b/kraftwerk-api/pom.xml
@@ -71,6 +71,9 @@
org.springframework.boot
spring-boot-maven-plugin
+
+ JAR
+
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/KraftwerkApi.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/KraftwerkApi.java
index d52cce83..7df38182 100644
--- a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/KraftwerkApi.java
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/KraftwerkApi.java
@@ -1,13 +1,16 @@
package fr.insee.kraftwerk.api;
+import fr.insee.kraftwerk.api.configuration.MinioConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
@SpringBootApplication
@ConfigurationPropertiesScan
+@EnableConfigurationProperties(MinioConfig.class)
public class KraftwerkApi extends SpringBootServletInitializer {
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/batch/KraftwerkBatch.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/batch/KraftwerkBatch.java
new file mode 100644
index 00000000..cd199d59
--- /dev/null
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/batch/KraftwerkBatch.java
@@ -0,0 +1,110 @@
+package fr.insee.kraftwerk.api.batch;
+
+import fr.insee.kraftwerk.api.configuration.ConfigProperties;
+import fr.insee.kraftwerk.api.configuration.MinioConfig;
+import fr.insee.kraftwerk.api.process.MainProcessing;
+import fr.insee.kraftwerk.api.process.MainProcessingGenesis;
+import fr.insee.kraftwerk.api.services.KraftwerkService;
+import fr.insee.kraftwerk.core.utils.files.MinioImpl;
+import io.minio.MinioClient;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class KraftwerkBatch implements CommandLineRunner {
+
+ ConfigProperties configProperties;
+ MinioConfig minioConfig;
+ MinioClient minioClient;
+
+ @Value("${fr.insee.postcollecte.files}")
+ protected String defaultDirectory;
+
+ @Value("${fr.insee.postcollecte.size-limit}")
+ protected long limitSize;
+
+ @Autowired
+ public KraftwerkBatch(ConfigProperties configProperties, MinioConfig minioConfig) {
+ this.configProperties = configProperties;
+ this.minioConfig = minioConfig;
+ if(minioConfig.isEnable()){
+ minioClient = MinioClient.builder().endpoint(minioConfig.getEndpoint()).credentials(minioConfig.getAccessKey(), minioConfig.getSecretKey()).build();
+ }
+ }
+
+ @Override
+ public void run(String... args) {
+ try {
+ //If .jar launched with cli args
+ if (args.length > 0) {
+ log.info("Launching Kraftwerk in CLI mode...");
+
+ //Check arguments
+ checkArgs(args);
+
+ //Parse arguments
+ //0. Service to use (MAIN,FILEBYFILE,GENESIS,LUNATIC_ONLY)
+ //1. Archive at end of execution (false or true)
+ //2. Integrate all reporting datas (false or true)
+ //3. Campaign name
+ KraftwerkServiceType kraftwerkServiceType = KraftwerkServiceType.valueOf(args[0]);
+ boolean archiveAtEnd = Boolean.parseBoolean(args[1]);
+ boolean withAllReportingData = Boolean.parseBoolean(args[2]);
+ String inDirectory = args[3];
+
+ //Kraftwerk service type related parameters
+ boolean fileByFile = kraftwerkServiceType == KraftwerkServiceType.FILE_BY_FILE;
+ boolean withDDI = kraftwerkServiceType != KraftwerkServiceType.LUNATIC_ONLY;
+ if (kraftwerkServiceType != KraftwerkServiceType.MAIN) {
+ withAllReportingData = false;
+ }
+ if (kraftwerkServiceType == KraftwerkServiceType.GENESIS) {
+ archiveAtEnd = false;
+ }
+
+
+ //Run kraftwerk
+ if (kraftwerkServiceType == KraftwerkServiceType.GENESIS) {
+ MainProcessingGenesis mainProcessingGenesis = new MainProcessingGenesis(configProperties, new MinioImpl(minioClient, minioConfig.getBucketName()));
+ mainProcessingGenesis.runMain(inDirectory);
+ } else {
+ MainProcessing mainProcessing = new MainProcessing(inDirectory, fileByFile, withAllReportingData, withDDI, defaultDirectory, limitSize, new MinioImpl(minioClient, minioConfig.getBucketName()));
+ mainProcessing.runMain();
+ }
+
+ //Archive
+ if (Boolean.TRUE.equals(archiveAtEnd)) {
+ KraftwerkService kraftwerkService = new KraftwerkService(minioConfig);
+ kraftwerkService.archive(inDirectory, new MinioImpl(minioClient, minioConfig.getBucketName()));
+ }
+ System.exit(0);
+ }
+ }catch(Exception e){
+ log.error(e.toString());
+ System.exit(1);
+ }
+ log.info("Launching Kraftwerk in API mode...");
+ }
+
+ /**
+ * Throws a IllegalArgumentException if the arguments are not valid (ex: unparseable boolean)
+ * KraftwerkServiceType is already checked by valueOf
+ * @param args list of CLI arguments
+ * @throws IllegalArgumentException if invalid argument
+ */
+ private static void checkArgs(String[] args) throws IllegalArgumentException{
+ if(args.length != 4) {
+ throw new IllegalArgumentException("Invalid number of arguments ! Got %s instead of 4 !".formatted(args.length));
+ }
+ if(!args[1].equals("true") && !args[1].equals("false")){
+ throw new IllegalArgumentException("Invalid archiveAtEnd boolean argument ! : %s".formatted(args[1]));
+ }
+ if(!args[2].equals("true") && !args[2].equals("false")){
+ throw new IllegalArgumentException("Invalid withAllReportingData boolean argument ! %s".formatted(args[2]));
+ }
+ }
+}
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/batch/KraftwerkServiceType.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/batch/KraftwerkServiceType.java
new file mode 100644
index 00000000..031f6294
--- /dev/null
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/batch/KraftwerkServiceType.java
@@ -0,0 +1,11 @@
+package fr.insee.kraftwerk.api.batch;
+
+import lombok.AllArgsConstructor;
+
+@AllArgsConstructor
+public enum KraftwerkServiceType {
+ MAIN,
+ LUNATIC_ONLY,
+ GENESIS,
+ FILE_BY_FILE;
+}
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/configuration/MinioConfig.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/configuration/MinioConfig.java
new file mode 100644
index 00000000..4f20355a
--- /dev/null
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/configuration/MinioConfig.java
@@ -0,0 +1,24 @@
+package fr.insee.kraftwerk.api.configuration;
+
+import lombok.Getter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties
+@Getter
+public class MinioConfig {
+ @Value("${fr.insee.postcollecte.minio.endpoint}")
+ private String endpoint;
+
+ @Value("${fr.insee.postcollecte.minio.access_key}")
+ private String accessKey;
+
+ @Value("${fr.insee.postcollecte.minio.secret_key}")
+ private String secretKey;
+
+ @Value("${fr.insee.postcollecte.minio.enable}")
+ private boolean enable;
+
+ @Value("${fr.insee.postcollecte.minio.bucket_name}")
+ private String bucketName;
+}
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/process/MainProcessing.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/process/MainProcessing.java
index afbba818..b96438bb 100644
--- a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/process/MainProcessing.java
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/process/MainProcessing.java
@@ -7,16 +7,14 @@
import fr.insee.kraftwerk.core.metadata.MetadataModel;
import fr.insee.kraftwerk.core.metadata.MetadataUtils;
import fr.insee.kraftwerk.core.sequence.*;
+import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.TextFileWriter;
import fr.insee.kraftwerk.core.utils.log.KraftwerkExecutionLog;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
-import org.apache.commons.io.FileUtils;
-import java.io.IOException;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
@@ -25,7 +23,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.stream.Stream;
@Log4j2
public class MainProcessing {
@@ -49,6 +46,7 @@ public class MainProcessing {
private KraftwerkExecutionLog kraftwerkExecutionLog;
private final List errors = new ArrayList<>();
private LocalDateTime executionDateTime;
+ private final FileUtilsInterface fileUtilsInterface;
/**
@@ -59,24 +57,26 @@ public class MainProcessing {
private final long limitSize;
- public MainProcessing(String inDirectoryParam, boolean fileByFile,boolean withAllReportingData,boolean withDDI, String defaultDirectory, long limitSize) {
+ public MainProcessing(String inDirectoryParam, boolean fileByFile,boolean withAllReportingData,boolean withDDI, String defaultDirectory, long limitSize, FileUtilsInterface fileUtilsInterface) {
super();
this.inDirectoryParam = inDirectoryParam;
this.fileByFile = fileByFile;
this.withAllReportingData = withAllReportingData;
this.withDDI=withDDI;
this.limitSize = limitSize;
- controlInputSequence = new ControlInputSequence(defaultDirectory);
+ controlInputSequence = new ControlInputSequence(defaultDirectory, fileUtilsInterface);
+ this.fileUtilsInterface = fileUtilsInterface;
}
- public MainProcessing(String inDirectoryParam, boolean fileByFile, String defaultDirectory, long limitSize) {
+ public MainProcessing(String inDirectoryParam, boolean fileByFile, String defaultDirectory, long limitSize, FileUtilsInterface fileUtilsInterface) {
super();
this.inDirectoryParam = inDirectoryParam;
this.fileByFile = fileByFile;
this.withAllReportingData = !fileByFile;
this.withDDI=true;
this.limitSize = limitSize;
- controlInputSequence = new ControlInputSequence(defaultDirectory);
+ controlInputSequence = new ControlInputSequence(defaultDirectory, fileUtilsInterface);
+ this.fileUtilsInterface = fileUtilsInterface;
}
@@ -116,9 +116,9 @@ public void init() throws KraftwerkException {
String campaignName = inDirectory.getFileName().toString();
log.info("Kraftwerk main service started for campaign: " + campaignName);
- userInputsFile = controlInputSequence.getUserInputs(inDirectory);
+ userInputsFile = controlInputSequence.getUserInputs(inDirectory, fileUtilsInterface);
- metadataModels = withDDI ? MetadataUtils.getMetadata(userInputsFile.getModeInputsMap()) : MetadataUtils.getMetadataFromLunatic(userInputsFile.getModeInputsMap());
+ metadataModels = withDDI ? MetadataUtils.getMetadata(userInputsFile.getModeInputsMap(), fileUtilsInterface) : MetadataUtils.getMetadataFromLunatic(userInputsFile.getModeInputsMap(), fileUtilsInterface);
userInputsFileList = getUserInputsFile(userInputsFile, fileByFile);
@@ -139,19 +139,19 @@ public void init() throws KraftwerkException {
/* Step 2 : unimodal data */
private void unimodalProcess() throws KraftwerkException {
- BuildBindingsSequence buildBindingsSequence = new BuildBindingsSequence(withAllReportingData);
+ BuildBindingsSequence buildBindingsSequence = new BuildBindingsSequence(withAllReportingData, fileUtilsInterface);
for (String dataMode : userInputsFile.getModeInputsMap().keySet()) {
MetadataModel metadataForMode = metadataModels.get(dataMode);
buildBindingsSequence.buildVtlBindings(userInputsFile, dataMode, vtlBindings, metadataForMode, withDDI, kraftwerkExecutionLog);
UnimodalSequence unimodal = new UnimodalSequence();
- unimodal.applyUnimodalSequence(userInputsFile, dataMode, vtlBindings, errors, metadataModels);
+ unimodal.applyUnimodalSequence(userInputsFile, dataMode, vtlBindings, errors, metadataModels, fileUtilsInterface);
}
}
/* Step 3 : multimodal VTL data processing */
private void multimodalProcess(){
MultimodalSequence multimodalSequence = new MultimodalSequence();
- multimodalSequence.multimodalProcessing(userInputsFile, vtlBindings, errors, metadataModels);
+ multimodalSequence.multimodalProcessing(userInputsFile, vtlBindings, errors, metadataModels, fileUtilsInterface);
}
/* Step 4 : Insert into SQL database */
@@ -163,17 +163,17 @@ private void insertDatabase(Statement database) throws SQLException {
/* Step 5 : Write output files */
private void outputFileWriter(Statement database) throws KraftwerkException {
WriterSequence writerSequence = new WriterSequence();
- writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModels, errors, kraftwerkExecutionLog,database);
+ writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputsFile.getModeInputsMap(), metadataModels, errors, kraftwerkExecutionLog, database, fileUtilsInterface);
}
/* Step 5 : Write errors */
private void writeErrors() {
- TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, errors);
+ TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, errors, fileUtilsInterface);
}
/* Step 6 : Write log */
- private void writeLog() {TextFileWriter.writeLogFile(inDirectory, executionDateTime, kraftwerkExecutionLog);}
+ private void writeLog() {TextFileWriter.writeLogFile(inDirectory, executionDateTime, kraftwerkExecutionLog, fileUtilsInterface);}
private static List getUserInputsFile(UserInputsFile source, boolean fileByFile) throws KraftwerkException {
List userInputsFileList = new ArrayList<>();
@@ -181,7 +181,7 @@ private static List getUserInputsFile(UserInputsFile source, boo
for (String dataMode : source.getModeInputsMap().keySet()) {
List dataFiles = getFilesToProcess(source, dataMode);
for (Path dataFile : dataFiles) {
- UserInputsFile currentFileInputs = new UserInputsFile(source.getUserInputFile(), source.getUserInputFile().getParent());
+ UserInputsFile currentFileInputs = new UserInputsFile(source.getUserInputFile(), source.getUserInputFile().getParent(), source.getFileUtilsInterface());
currentFileInputs.setVtlReconciliationFile(source.getVtlReconciliationFile());
currentFileInputs.setVtlInformationLevelsFile(source.getVtlInformationLevelsFile());
currentFileInputs.setVtlTransformationsFile(source.getVtlTransformationsFile());
@@ -214,26 +214,25 @@ private static List getFilesToProcess(UserInputsFile userInputsFile, Strin
log.error("Datapath is null");
return files;
}
- if (Files.isRegularFile(dataPath)) {
- files.add(dataPath);
- } else if (Files.isDirectory(dataPath)) {
- try (Stream stream = Files.list(dataPath)) {
- stream.forEach(files::add);
- } catch (IOException e) {
- log.error(String.format("IOException occurred when trying to list data files of folder: %s",
- dataPath));
- }
- } else {
+ if(userInputsFile.getFileUtilsInterface().isDirectory(dataPath.toString()) == null){
log.warn(String.format("Data path given could not be identified as a file or folder: %s", dataPath));
+ return files;
+ }
+ if(Boolean.TRUE.equals(userInputsFile.getFileUtilsInterface().isDirectory(dataPath.toString()))){
+ for(String path : userInputsFile.getFileUtilsInterface().listFilePaths(dataPath.toString())){
+ files.add(Path.of(path));
+ }
+ }else{
+ files.add(dataPath);
}
return files;
}
private void isDataTooBig(UserInputsFile userInputsFile, String errorMessage, long limitSize) throws KraftwerkException {
for (String dataMode : userInputsFile.getModeInputsMap().keySet()){
- long dataSize = FileUtils.sizeOf(userInputsFile.getModeInputs(dataMode).getDataFile().toFile());
+ long dataSize = userInputsFile.getFileUtilsInterface().getSizeOf(userInputsFile.getModeInputs(dataMode).getDataFile().toString());
if (dataSize > limitSize) {
- log.error("Size of data folder/file {} : {}",userInputsFile.getModeInputs(dataMode).getDataFile(), FileUtils.byteCountToDisplaySize(dataSize));
+ log.error("Size of data folder/file {} : {}",userInputsFile.getModeInputs(dataMode).getDataFile(), dataSize);
throw new KraftwerkException(413,errorMessage);
}
}
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/process/MainProcessingGenesis.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/process/MainProcessingGenesis.java
index 40561aa1..1410032c 100644
--- a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/process/MainProcessingGenesis.java
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/process/MainProcessingGenesis.java
@@ -15,6 +15,7 @@
import fr.insee.kraftwerk.core.sequence.MultimodalSequence;
import fr.insee.kraftwerk.core.sequence.UnimodalSequence;
import fr.insee.kraftwerk.core.sequence.WriterSequence;
+import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.TextFileWriter;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
@@ -23,7 +24,6 @@
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections4.ListUtils;
import org.springframework.boot.web.client.RestTemplateBuilder;
-import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.file.Path;
@@ -36,7 +36,6 @@
import java.util.Map;
@Log4j2
-@Component
public class MainProcessingGenesis {
@Setter
@@ -47,6 +46,7 @@ public class MainProcessingGenesis {
@Getter
private UserInputsGenesis userInputs;
private LocalDateTime executionDateTime;
+ private final FileUtilsInterface fileUtilsInterface;
private Statement database;
/* SPECIFIC VARIABLES */
@@ -60,19 +60,21 @@ public class MainProcessingGenesis {
private final GenesisClient client;
- public MainProcessingGenesis(ConfigProperties config) {
+ public MainProcessingGenesis(ConfigProperties config, FileUtilsInterface fileUtilsInterface) {
this.client = new GenesisClient(new RestTemplateBuilder(), config);
+ this.fileUtilsInterface = fileUtilsInterface;
}
- public void init(String idCampaign) throws KraftwerkException, IOException {
+ public void init(String idCampaign) throws KraftwerkException {
log.info("Kraftwerk main service started for campaign: " + idCampaign);
+ this.controlInputSequenceGenesis = new ControlInputSequenceGenesis("", fileUtilsInterface);
this.executionDateTime = LocalDateTime.now();
inDirectory = controlInputSequenceGenesis.getInDirectory(idCampaign);
//First we check the modes present in database for the given questionnaire
//We build userInputs for the given questionnaire
- userInputs = new UserInputsGenesis(controlInputSequenceGenesis.isHasConfigFile(), inDirectory, client.getModes(idCampaign));
+ userInputs = new UserInputsGenesis(controlInputSequenceGenesis.isHasConfigFile(), inDirectory, client.getModes(idCampaign), fileUtilsInterface);
if (!userInputs.getModes().isEmpty()) {
- metadataModels = MetadataUtilsGenesis.getMetadata(userInputs.getModeInputsMap());
+ metadataModels = MetadataUtilsGenesis.getMetadata(userInputs.getModeInputsMap(), fileUtilsInterface);
} else {
log.error("No source found for campaign " + idCampaign);
}
@@ -110,18 +112,18 @@ public void runMain(String idCampaign) throws KraftwerkException, IOException {
}
private void unimodalProcess(List suLatest) throws KraftwerkException {
- BuildBindingsSequenceGenesis buildBindingsSequenceGenesis = new BuildBindingsSequenceGenesis();
+ BuildBindingsSequenceGenesis buildBindingsSequenceGenesis = new BuildBindingsSequenceGenesis(fileUtilsInterface);
for (String dataMode : userInputs.getModeInputsMap().keySet()) {
buildBindingsSequenceGenesis.buildVtlBindings(dataMode, vtlBindings, metadataModels, suLatest, inDirectory);
UnimodalSequence unimodal = new UnimodalSequence();
- unimodal.applyUnimodalSequence(userInputs, dataMode, vtlBindings, errors, metadataModels);
+ unimodal.applyUnimodalSequence(userInputs, dataMode, vtlBindings, errors, metadataModels, fileUtilsInterface);
}
}
/* Step 3 : multimodal VTL data processing */
private void multimodalProcess() {
MultimodalSequence multimodalSequence = new MultimodalSequence();
- multimodalSequence.multimodalProcessing(userInputs, vtlBindings, errors, metadataModels);
+ multimodalSequence.multimodalProcessing(userInputs, vtlBindings, errors, metadataModels, fileUtilsInterface);
}
/* Step 4 : Insert into SQL database */
@@ -133,12 +135,12 @@ private void insertDatabase(){
/* Step 5 : Write output files */
private void outputFileWriter() throws KraftwerkException {
WriterSequence writerSequence = new WriterSequence();
- writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputs.getModeInputsMap(), metadataModels, errors, null, database);
+ writerSequence.writeOutputFiles(inDirectory, executionDateTime, vtlBindings, userInputs.getModeInputsMap(), metadataModels, errors, null, database, fileUtilsInterface);
}
/* Step 6 : Write errors */
private void writeErrors() {
- TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, errors);
+ TextFileWriter.writeErrorsFile(inDirectory, executionDateTime, errors, fileUtilsInterface);
}
}
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/HealthcheckService.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/HealthcheckService.java
index 3dc61a99..9ea54933 100644
--- a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/HealthcheckService.java
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/HealthcheckService.java
@@ -32,7 +32,8 @@ public class HealthcheckService extends KraftwerkService {
@Autowired
public HealthcheckService(ConfigProperties configProperties) {
- this.configProperties = configProperties;
+ super(null);
+ this.configProperties = configProperties;
this.client = new GenesisClient(new RestTemplateBuilder(), configProperties);
}
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/KraftwerkService.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/KraftwerkService.java
index d0632af9..dd1a39b6 100644
--- a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/KraftwerkService.java
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/KraftwerkService.java
@@ -1,14 +1,19 @@
package fr.insee.kraftwerk.api.services;
+import fr.insee.kraftwerk.api.configuration.MinioConfig;
import fr.insee.kraftwerk.core.Constants;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
import fr.insee.kraftwerk.core.sequence.ControlInputSequence;
-import fr.insee.kraftwerk.core.utils.FileUtils;
+import fr.insee.kraftwerk.core.utils.files.FileSystemImpl;
+import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
+import fr.insee.kraftwerk.core.utils.files.MinioImpl;
+import io.minio.MinioClient;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import jakarta.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
@@ -37,17 +42,32 @@ public class KraftwerkService {
@Value("${fr.insee.postcollecte.size-limit}")
protected long limitSize;
+ MinioConfig minioConfig;
+
+
protected ControlInputSequence controlInputSequence ;
+
+ @Autowired
+ public KraftwerkService(MinioConfig minioConfig){
+ this.minioConfig = minioConfig;
+ }
@PostConstruct
public void initializeWithProperties() {
+ FileUtilsInterface fileUtilsInterface;
+ if(minioConfig.isEnable()){
+ MinioClient minioClient = MinioClient.builder().endpoint(minioConfig.getEndpoint()).credentials(minioConfig.getAccessKey(), minioConfig.getSecretKey()).build();
+ fileUtilsInterface = new MinioImpl(minioClient, minioConfig.getBucketName());
+ }else{
+ fileUtilsInterface = new FileSystemImpl();
+ }
if (StringUtils.isNotEmpty(csvOutputsQuoteChar)) {
Constants.setCsvOutputQuoteChar(csvOutputsQuoteChar.trim().charAt(0));
}
- controlInputSequence = new ControlInputSequence(defaultDirectory);
+ controlInputSequence = new ControlInputSequence(defaultDirectory, fileUtilsInterface);
}
- protected ResponseEntity archive(String inDirectoryParam) {
+ public ResponseEntity archive(String inDirectoryParam, FileUtilsInterface fileUtilsInterface) {
Path inDirectory;
try {
inDirectory = controlInputSequence.getInDirectory(inDirectoryParam);
@@ -56,19 +76,19 @@ protected ResponseEntity archive(String inDirectoryParam) {
}
/* Step 4.3 : move kraftwerk.json to a secondary folder */
- FileUtils.renameInputFile(inDirectory);
+ fileUtilsInterface.renameInputFile(inDirectory);
/* Step 4.4 : move differential data to a secondary folder */
try {
- FileUtils.archiveInputFiles(controlInputSequence.getUserInputs(inDirectory));
+ fileUtilsInterface.archiveInputFiles(controlInputSequence.getUserInputs(inDirectory, fileUtilsInterface));
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
//delete temp directory
- Path tempOutputPath = FileUtils.transformToTemp(inDirectory);
+ Path tempOutputPath = FileUtilsInterface.transformToTemp(inDirectory);
try {
- FileUtils.deleteDirectory(tempOutputPath);
+ fileUtilsInterface.deleteDirectory(tempOutputPath);
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/MainService.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/MainService.java
index 07569e29..c287520a 100644
--- a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/MainService.java
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/MainService.java
@@ -2,13 +2,19 @@
import fr.insee.kraftwerk.api.configuration.ConfigProperties;
+import fr.insee.kraftwerk.api.configuration.MinioConfig;
import fr.insee.kraftwerk.api.process.MainProcessing;
import fr.insee.kraftwerk.api.process.MainProcessingGenesis;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
import fr.insee.kraftwerk.core.sequence.ControlInputSequenceGenesis;
+import fr.insee.kraftwerk.core.utils.files.FileSystemImpl;
+import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
+import fr.insee.kraftwerk.core.utils.files.MinioImpl;
+import io.minio.MinioClient;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -21,14 +27,28 @@
@RestController
+@Slf4j
@Tag(name = "${tag.main}")
public class MainService extends KraftwerkService {
ConfigProperties configProperties;
+ MinioClient minioClient;
+ boolean useMinio;
+
@Autowired
- public MainService(ConfigProperties configProperties) {
- this.configProperties = configProperties;
+ public MainService(ConfigProperties configProperties, MinioConfig minioConfig) {
+ super(minioConfig);
+ this.configProperties = configProperties;
+ this.minioConfig = minioConfig;
+ useMinio = false;
+ if(minioConfig == null){
+ log.warn("Minio config null !");
+ }
+ if(minioConfig != null && minioConfig.isEnable()){
+ minioClient = MinioClient.builder().endpoint(minioConfig.getEndpoint()).credentials(minioConfig.getAccessKey(), minioConfig.getSecretKey()).build();
+ useMinio = true;
+ }
}
@PutMapping(value = "/main")
@@ -40,7 +60,14 @@ public ResponseEntity mainService(
) {
boolean fileByFile = false;
boolean withDDI = true;
- MainProcessing mp = new MainProcessing(inDirectoryParam, fileByFile,withAllReportingData,withDDI, defaultDirectory, limitSize);
+ FileUtilsInterface fileUtilsInterface;
+ if(Boolean.TRUE.equals(useMinio)){
+ fileUtilsInterface = new MinioImpl(minioClient, minioConfig.getBucketName());
+ }else{
+ fileUtilsInterface = new FileSystemImpl();
+ }
+
+ MainProcessing mp = new MainProcessing(inDirectoryParam, fileByFile,withAllReportingData,withDDI, defaultDirectory, limitSize, fileUtilsInterface);
try {
mp.runMain();
} catch (KraftwerkException e) {
@@ -48,7 +75,7 @@ public ResponseEntity mainService(
}
/* Step 4.3- 4.4 : Archive */
- if (Boolean.TRUE.equals(archiveAtEnd)) archive(inDirectoryParam);
+ if (Boolean.TRUE.equals(archiveAtEnd)) archive(inDirectoryParam,fileUtilsInterface);
return ResponseEntity.ok(inDirectoryParam);
}
@@ -62,14 +89,21 @@ public ResponseEntity mainFileByFile(
boolean fileByFile = true;
boolean withAllReportingData = false;
boolean withDDI = true;
- MainProcessing mp = new MainProcessing(inDirectoryParam, fileByFile,withAllReportingData,withDDI, defaultDirectory, limitSize);
+ FileUtilsInterface fileUtilsInterface;
+ if(Boolean.TRUE.equals(useMinio)){
+ fileUtilsInterface = new MinioImpl(minioClient, minioConfig.getBucketName());
+ }else{
+ fileUtilsInterface = new FileSystemImpl();
+ }
+
+ MainProcessing mp = new MainProcessing(inDirectoryParam, fileByFile,withAllReportingData,withDDI, defaultDirectory, limitSize, fileUtilsInterface);
try {
mp.runMain();
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
/* Step 4.3- 4.4 : Archive */
- if (Boolean.TRUE.equals(archiveAtEnd)) archive(inDirectoryParam);
+ if (Boolean.TRUE.equals(archiveAtEnd)) archive(inDirectoryParam, fileUtilsInterface);
return ResponseEntity.ok(inDirectoryParam);
}
@@ -83,14 +117,21 @@ public ResponseEntity mainLunaticOnly(
boolean withDDI = false;
boolean fileByFile = false;
boolean withAllReportingData = false;
- MainProcessing mp = new MainProcessing(inDirectoryParam, fileByFile,withAllReportingData,withDDI, defaultDirectory, limitSize);
+ FileUtilsInterface fileUtilsInterface;
+ if(Boolean.TRUE.equals(useMinio)){
+ fileUtilsInterface = new MinioImpl(minioClient, minioConfig.getBucketName());
+ }else{
+ fileUtilsInterface = new FileSystemImpl();
+ }
+
+ MainProcessing mp = new MainProcessing(inDirectoryParam, fileByFile,withAllReportingData,withDDI, defaultDirectory, limitSize, fileUtilsInterface);
try {
mp.runMain();
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
/* Step 4.3- 4.4 : Archive */
- if (Boolean.TRUE.equals(archiveAtEnd)) archive(inDirectoryParam);
+ if (Boolean.TRUE.equals(archiveAtEnd)) archive(inDirectoryParam, fileUtilsInterface);
return ResponseEntity.ok(inDirectoryParam);
}
@@ -99,9 +140,16 @@ public ResponseEntity mainLunaticOnly(
@Operation(operationId = "mainGenesis", summary = "${summary.mainGenesis}", description = "${description.mainGenesis}")
public ResponseEntity mainGenesis(
@Parameter(description = "${param.idCampaign}", required = true, example = INDIRECTORY_EXAMPLE) @RequestBody String idCampaign) {
- MainProcessingGenesis mpGenesis = new MainProcessingGenesis(configProperties);
+ FileUtilsInterface fileUtilsInterface;
+ if(Boolean.TRUE.equals(useMinio)){
+ fileUtilsInterface = new MinioImpl(minioClient, minioConfig.getBucketName());
+ }else{
+ fileUtilsInterface = new FileSystemImpl();
+ }
+
+ MainProcessingGenesis mpGenesis = new MainProcessingGenesis(configProperties, fileUtilsInterface);
+
try {
- mpGenesis.setControlInputSequenceGenesis(new ControlInputSequenceGenesis(defaultDirectory));
mpGenesis.runMain(idCampaign);
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
diff --git a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/SplitterService.java b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/SplitterService.java
index ea4ff382..c8fdbca8 100644
--- a/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/SplitterService.java
+++ b/kraftwerk-api/src/main/java/fr/insee/kraftwerk/api/services/SplitterService.java
@@ -1,9 +1,16 @@
package fr.insee.kraftwerk.api.services;
-import fr.insee.kraftwerk.core.utils.XMLSplitter;
+import fr.insee.kraftwerk.api.configuration.MinioConfig;
+import fr.insee.kraftwerk.core.utils.files.FileSystemImpl;
+import fr.insee.kraftwerk.core.utils.files.FileSystemType;
+import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
+import fr.insee.kraftwerk.core.utils.files.MinioImpl;
+import fr.insee.kraftwerk.core.utils.xml.XmlSplitter;
+import io.minio.MinioClient;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.log4j.Log4j2;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PutMapping;
@@ -15,15 +22,27 @@
@Log4j2
public class SplitterService extends KraftwerkService{
+ @Autowired
+ public SplitterService(MinioConfig minioConfig) {
+ super(minioConfig);
+ }
+
@Operation(summary = "Split a XML file into smaller ones")
@PutMapping(path = "/split/lunatic-xml")
public ResponseEntity