Skip to content

Commit

Permalink
feat: keep intermediate data
Browse files Browse the repository at this point in the history
  • Loading branch information
Nolife999 committed Dec 2, 2024
1 parent edc47e4 commit 0f77194
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 18 deletions.
3 changes: 2 additions & 1 deletion arc-batch/src/main/java/fr/insee/arc/batch/BatchARC.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import fr.insee.arc.batch.dao.BatchArcDao;
import fr.insee.arc.batch.threadrunners.PhaseParameterKeys;
import fr.insee.arc.batch.threadrunners.PhaseThreadFactory;
import fr.insee.arc.core.model.BatchMode;
import fr.insee.arc.core.model.TraitementEtat;
import fr.insee.arc.core.model.TraitementPhase;
import fr.insee.arc.core.service.global.bo.Sandbox;
Expand Down Expand Up @@ -319,7 +320,7 @@ private void batchParametersGet() {
repertoire = properties.getBatchParametersDirectory();

mapParam.put(PhaseParameterKeys.KEY_FOR_DIRECTORY_LOCATION, repertoire);
mapParam.put(PhaseParameterKeys.KEY_FOR_BATCH_CHUNK_ID, new SimpleDateFormat("yyyyMMddHH").format(new Date()));
mapParam.put(PhaseParameterKeys.KEY_FOR_BATCH_MODE, BatchMode.NORMAL);
mapParam.put(PhaseParameterKeys.KEY_FOR_EXECUTION_ENVIRONMENT, envExecution);
mapParam.put(PhaseParameterKeys.KEY_FOR_MAX_SIZE_RECEPTION, String.valueOf(tailleMaxReceptionEnMb));
mapParam.put(PhaseParameterKeys.KEY_FOR_MAX_FILES_TO_LOAD, String.valueOf(maxFilesToLoad));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public class PhaseParameterKeys {
// keys name for the hashmap mapParam containing the batch parameters
public static final String KEY_FOR_EXECUTION_ENVIRONMENT = "envExecution";
public static final String KEY_FOR_DIRECTORY_LOCATION = "repertoire";
public static final String KEY_FOR_BATCH_CHUNK_ID = "numlot";
public static final String KEY_FOR_BATCH_MODE = "numlot";
public static final String KEY_FOR_MAX_SIZE_RECEPTION = "tailleMaxReceptionEnMb";
public static final String KEY_FOR_MAX_FILES_TO_LOAD = "maxFilesToLoad";
public static final String KEY_FOR_MAX_FILES_PER_PHASE = "maxFilesPerPhase";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void execute() {
mapParam.get(PhaseParameterKeys.KEY_FOR_EXECUTION_ENVIRONMENT), //
Integer.parseInt(mapParam.get(capacityParameterName())), //
Boolean.parseBoolean(PhaseParameterKeys.KEY_FOR_KEEP_IN_DATABASE) ? null
: mapParam.get(PhaseParameterKeys.KEY_FOR_BATCH_CHUNK_ID)) //
: mapParam.get(PhaseParameterKeys.KEY_FOR_BATCH_MODE)) //
.invokeApi();

}
Expand Down
27 changes: 27 additions & 0 deletions arc-core/src/main/java/fr/insee/arc/core/model/BatchMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package fr.insee.arc.core.model;

public class BatchMode {

private BatchMode() {
throw new IllegalStateException("Utility class");
}

public final static String UNSET = null;
public final static String NORMAL = "N";
public final static String KEEP_INTERMEDIATE_DATA = "K";


public static String computeBatchMode(boolean isBatchMode, boolean isBatchModeMustKeepIntermediateData)
{
if (!isBatchMode) {
return BatchMode.UNSET;
}

if (isBatchModeMustKeepIntermediateData) {
return BatchMode.KEEP_INTERMEDIATE_DATA;
}

return BatchMode.NORMAL;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import org.json.JSONArray;

import fr.insee.arc.core.model.BatchMode;
import fr.insee.arc.core.util.BDParameters;
import fr.insee.arc.utils.database.ArcDatabase;

Expand Down Expand Up @@ -39,20 +40,35 @@ public String getSchema() {

/** Return true if the environment is defined as a production environment.*/
public boolean isEnvSetForProduction() {
JSONArray j=new JSONArray(new BDParameters(ArcDatabase.COORDINATOR).getString(this.connection, "ArcAction.productionEnvironments",DEFAULT_PRODUCTION_ENVIRONMENTS));
Set<String> found=new HashSet<>();
return isEnvSetForGivenParameter("ArcAction.productionEnvironments", DEFAULT_PRODUCTION_ENVIRONMENTS);

j.forEach(item -> {
if (item.toString().equals(this.schema))
{
found.add(item.toString());
}
});
return !found.isEmpty();
}

public String isEnvSetForBatch() {
JSONArray j=new JSONArray(new BDParameters(ArcDatabase.COORDINATOR).getString(this.connection, "ArcAction.batchMode", "[]"));
public String computeBatchMode() {

boolean isBatchMode = isEnvSetForGivenParameter("ArcAction.batchMode", "[]");
boolean isBatchModeMustKeepIntermediateData = isEnvSetForGivenParameter("ArcAction.batchModeKeepIntermediateData", "[]");

if (!isBatchMode) {
return BatchMode.UNSET;
}

if (isBatchModeMustKeepIntermediateData) {
return BatchMode.KEEP_INTERMEDIATE_DATA;
}

return BatchMode.KEEP_INTERMEDIATE_DATA;

}

/**
* Check if environment is declared
* @param parameterName
* @param defaultParameterValue
* @return
*/
public boolean isEnvSetForGivenParameter(String parameterName, String defaultParameterValue) {
JSONArray j=new JSONArray(new BDParameters(ArcDatabase.COORDINATOR).getString(this.connection, parameterName, defaultParameterValue));
Set<String> found=new HashSet<>();

j.forEach(item -> {
Expand All @@ -61,7 +77,7 @@ public String isEnvSetForBatch() {
found.add(item.toString());
}
});
return (found.isEmpty() ? null : "1");
return !found.isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import fr.insee.arc.core.dataobjects.ArcPreparedStatementBuilder;
import fr.insee.arc.core.dataobjects.ColumnEnum;
import fr.insee.arc.core.model.BatchMode;
import fr.insee.arc.core.model.TraitementEtat;
import fr.insee.arc.core.service.global.bo.ArcDateFormat;
import fr.insee.arc.core.service.global.scalability.ScalableConnection;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void marquageFinalDefaultDao(ArcPreparedStatementBuilder query) throws Ar
query.append(marquageFinal(tablePilotageGlobale, tablePilotageThread, idSource));
}

if (paramBatch != null) {
if (paramBatch != null && !paramBatch.equals(BatchMode.KEEP_INTERMEDIATE_DATA)) {
query.append("DROP TABLE IF EXISTS "+HashFileNameConversion.tableOfIdSource(this.tablePrevious,idSource)+";");
}

Expand All @@ -125,6 +126,19 @@ public void marquageFinalDefaultDao(ArcPreparedStatementBuilder query) throws Ar
UtilitaireDao.get(0).executeBlock(connexion.getCoordinatorConnection(), query);
}
}


/**
* Test batch mode to know if the data output of the previous phase must be dropped
* @param paramBatch
* @return
*/
protected static boolean checkPreviousPhaseDataDropCondition(String paramBatch)
{
return paramBatch != null && !paramBatch.equals(BatchMode.KEEP_INTERMEDIATE_DATA);
}



/**
* clean temporary thread objects connexion
Expand Down
3 changes: 3 additions & 0 deletions arc-core/src/main/resources/BdD/script_global.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ UPDATE arc.parameter set description='parameter.ihm.sandbox.sandboxListWithProdu
INSERT INTO arc.parameter VALUES ('ArcAction.batchMode','[]');
UPDATE arc.parameter set description='parameter.ihm.sandbox.sandboxListWithBatchMode' where key='ArcAction.batchMode';

INSERT INTO arc.parameter VALUES ('ArcAction.batchModeKeepIntermediateData','[]');
UPDATE arc.parameter set description='parameter.ihm.sandbox.sandboxListAsBatchKeepIntermediateData' where key='ArcAction.batchModeKeepIntermediateData';

-- parallelism parameters
INSERT INTO arc.parameter VALUES ('ApiChargementService.MAX_PARALLEL_WORKERS','2');
UPDATE arc.parameter set description='parameter.parallel.numberOfThread.p1.load' where key='ApiChargementService.MAX_PARALLEL_WORKERS';
Expand Down
29 changes: 29 additions & 0 deletions arc-core/src/test/java/fr/insee/arc/core/model/BatchModeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package fr.insee.arc.core.model;

import static org.junit.Assert.assertEquals;

import java.lang.reflect.InvocationTargetException;

import org.junit.Test;

import fr.insee.arc.utils.exception.ArcException;
import fr.insee.arc.utils.utils.PrivateConstructorTest;

public class BatchModeTest {

@Test
public void testBatchModeIsUtilityClass() throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
PrivateConstructorTest.testConstructorIsPrivate(BatchMode.class);
}

@Test
public void computeBatchModeTest() throws ArcException
{
assertEquals(BatchMode.UNSET, BatchMode.computeBatchMode(false,true));
assertEquals(BatchMode.UNSET, BatchMode.computeBatchMode(false,false));
assertEquals(BatchMode.NORMAL, BatchMode.computeBatchMode(true,false));
assertEquals(BatchMode.KEEP_INTERMEDIATE_DATA, BatchMode.computeBatchMode(true,true));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package fr.insee.arc.core.service.global.dao;

import static org.junit.Assert.*;

import org.junit.Test;

import fr.insee.arc.core.model.BatchMode;

public class ThreadOperationsTest {

@Test
public void checkPreviousPhaseDataDropConditionTest() {

assertEquals(false,ThreadOperations.checkPreviousPhaseDataDropCondition(BatchMode.UNSET));
assertEquals(false,ThreadOperations.checkPreviousPhaseDataDropCondition(BatchMode.KEEP_INTERMEDIATE_DATA));
assertEquals(true,ThreadOperations.checkPreviousPhaseDataDropCondition(BatchMode.NORMAL));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public abstract class ArcWebGenericService<T extends ArcModel, D extends IDao> i

/** Is the current environment a production environment?*/
private boolean isEnvProd;
/** Is the current environment must be executed as a batch ? */
/** Is the current environment must be executed as a batch ? What is the batch mode ? */
private String isEnvBatch;

/** Are the Kubernetes options enabled?*/
Expand Down Expand Up @@ -185,7 +185,7 @@ public void initializeModel(@ModelAttribute T arcModel, Model model,
Sandbox s = new Sandbox(null, this.bacASable);

this.isEnvProd = s.isEnvSetForProduction();
this.isEnvBatch = s.isEnvSetForBatch();
this.isEnvBatch = s.computeBatchMode();

this.isKube = PropertiesHandler.getInstance().getKubernetesExecutorNumber() > 0;

Expand Down
5 changes: 5 additions & 0 deletions arc-web/src/main/resources/messages_en.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ action.enterPilotageBAS=enterPilotageBAS
action.selectExport=selectExport
action.selectParameters=selectParameters
action.selectOperations=selectOperations
action.selectReport=selectReport

tabNavScope.phase=viewChargement;viewNormage;viewControle;viewMapping;viewExpression;-viewJeuxDeReglesCopie;

Expand All @@ -28,6 +29,7 @@ header.operations=Execute
header.filesystem=Filesystem Management
header.query=Database Management
header.maintenance=Application maintenance
header.reportManagement=Manage reports

header.sandboxChoice=Choose your working environment
header.database.ok=DB connected
Expand Down Expand Up @@ -321,6 +323,8 @@ view.userlist=Users in the selected group

view.parameters=Parameters for application operation

view.report=Archives declared in slips but not received

INITIALISATION=Initialize
RA_INITIALISATION=Reset Sandbox
RECEPTION=Register files
Expand Down Expand Up @@ -349,6 +353,7 @@ MAPPING\u0020KO=Map KO
parameter.ihm.sandbox.maxNumberOfFilesRegisteredAtTheSameTime=Maximum number of file that can be registered by the reception module in the sandbox execution
parameter.ihm.sandbox.sandboxListWithProductionGUI=List of the sandboxes with a production type GUI
parameter.ihm.sandbox.sandboxListWithBatchMode=List of the sandboxes with batch mode
parameter.ihm.sandbox.sandboxListAsBatchKeepIntermediateData=List of the sandboxes where to keep intermediate data in batch mode
parameter.parallel.numberOfThread.p1.load=Number of threads allocated to the module #1 "Load"
parameter.database.version.global=Current global database version identifier corresponding to the ARC version identifier on git
parameter.parallel.numberOfThread.p2.xmlStructurize=Number of threads allocated to the module #2 "Structurize XML"
Expand Down
5 changes: 5 additions & 0 deletions arc-web/src/main/resources/messages_fr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ action.enterPilotageBAS=enterPilotageBAS
action.selectExport=selectExport
action.selectParameters=selectParameters
action.selectOperations=selectOperations
action.selectReport=selectReport

tabNavScope.phase=viewChargement;viewNormage;viewControle;viewMapping;viewExpression;-viewJeuxDeReglesCopie;

Expand All @@ -28,6 +29,7 @@ header.operations=Maintenir
header.filesystem=G\u00e9rer le syst\u00e8me de fichiers
header.query=G\u00e9rer la base de donn\u00e9es
header.maintenance=Maintenance applicative
header.reportManagement=G\u00e9rer les bordereaux

header.sandboxChoice=Choisissez votre environnement
header.database.ok=BD connect\u00e9e
Expand Down Expand Up @@ -322,6 +324,8 @@ view.userlist=Les agents du groupe s\u00e9lectionn\u00e9

view.parameters=Param\u00e9trage du fonctionnement de l''application

view.report=Archives d\u00e9clar\u00e9es aux bordereaux mais non re\u00e7ues

INITIALISATION=Initialisation
RA_INITIALISATION=R\u00e9initialiser
RECEPTION=R\u00e9ception
Expand Down Expand Up @@ -350,6 +354,7 @@ MAPPING\u0020KO=Mapping KO
parameter.ihm.sandbox.maxNumberOfFilesRegisteredAtTheSameTime=Nombre maximum de fichiers pouvant \u00eatre enregistr\u00e9s par le module de r\u00e9ception dans l''ex\u00e9cution du sandbox
parameter.ihm.sandbox.sandboxListWithProductionGUI=Liste des bacs \u00e0 sable avec une interface graphique de type production
parameter.ihm.sandbox.sandboxListWithBatchMode=Liste des bacs \u00e0 sable en mode batch
parameter.ihm.sandbox.sandboxListAsBatchKeepIntermediateData=Liste des bacs \u00e0 sable pour lesquels sont gard\u00e9es les donn\u00e9es interm\u00e9diaires en mode batch
parameter.parallel.numberOfThread.p1.load=Nombre de threads allou\u00e9s au module #1 "Load"
parameter.database.version.global=Identifiant de version de base de donn\u00e9es globale actuelle correspondant \u00e0 l''identifiant de version ARC sur git
parameter.parallel.numberOfThread.p2.xmlStructurize=Nombre de threads allou\u00e9s au module #2 "Structurer XML"
Expand Down

0 comments on commit 0f77194

Please sign in to comment.