Skip to content

Commit

Permalink
feat: single connection for batch orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
Nolife999 committed Nov 25, 2024
1 parent 903c785 commit 54f782b
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 94 deletions.
66 changes: 36 additions & 30 deletions arc-batch/src/main/java/fr/insee/arc/batch/BatchARC.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
Expand All @@ -18,7 +19,6 @@
import org.springframework.beans.factory.annotation.Autowired;

import fr.insee.arc.batch.dao.BatchArcDao;
import fr.insee.arc.batch.operation.PhaseInitializationOperation;
import fr.insee.arc.batch.threadrunners.PhaseParameterKeys;
import fr.insee.arc.batch.threadrunners.PhaseThreadFactory;
import fr.insee.arc.core.model.TraitementEtat;
Expand All @@ -35,6 +35,7 @@
import fr.insee.arc.core.util.BDParameters;
import fr.insee.arc.utils.batch.IReturnCode;
import fr.insee.arc.utils.consumer.ThrowingRunnable;
import fr.insee.arc.utils.dao.UtilitaireDao;
import fr.insee.arc.utils.database.ArcDatabase;
import fr.insee.arc.utils.exception.ArcException;
import fr.insee.arc.utils.exception.ArcExceptionMessage;
Expand Down Expand Up @@ -124,14 +125,19 @@ private static void message(String msg) {
LoggerHelper.warn(LOGGER, msg);
}

private BatchArcDao dao;


/**
* Lanceur MAIN arc
*
* @param args
*/
void execute() {

try {
try (Connection batchConnection = UtilitaireDao.get(0).getDriverConnexion();)
{

dao = new BatchArcDao(batchConnection);

batchAvoidDnsSpam();

Expand Down Expand Up @@ -235,14 +241,14 @@ private void batchPatchDatabaseAndFileSystem() throws ArcException {

message("Patching database");

new BddPatcher().bddScript(null);
new BddPatcher().bddScript(dao.getBatchConnection());

BDParameters bdParameters = new BDParameters(ArcDatabase.COORDINATOR);

// either we take env and envExecution from database or properties
// default is from properties
if (Boolean.parseBoolean(bdParameters.getString(null, "LanceurARC.envFromDatabase", "false"))) {
envExecution = bdParameters.getString(null, "LanceurARC.envExecution", "arc_prod");
if (Boolean.parseBoolean(bdParameters.getString(dao.getBatchConnection(), "LanceurARC.envFromDatabase", "false"))) {
envExecution = bdParameters.getString(dao.getBatchConnection(), "LanceurARC.envExecution", "arc_prod");
} else {
envExecution = properties.getBatchExecutionEnvironment();
}
Expand All @@ -255,11 +261,11 @@ private void batchPatchDatabaseAndFileSystem() throws ArcException {
message("Patching filesytem");
envExecution=SecurityDao.validateEnvironnement(envExecution);

new BddPatcher().bddScript(null, envExecution);
new BddPatcher().bddScript(dao.getBatchConnection(), envExecution);


// build sandbox filesystem
new BuildFileSystem(null, new String[] {this.envExecution}).execute();
new BuildFileSystem(dao.getBatchConnection(), new String[] {this.envExecution}).execute();

}

Expand All @@ -273,40 +279,40 @@ private void batchParametersGet() {
BDParameters bdParameters = new BDParameters(ArcDatabase.COORDINATOR);

boolean keepInDatabase = Boolean
.parseBoolean(bdParameters.getString(null, "LanceurARC.keepInDatabase", "false"));
.parseBoolean(bdParameters.getString(dao.getBatchConnection(), "LanceurARC.keepInDatabase", "false"));

// pour le batch en cours, l'ensemble des enveloppes traitées ne peut pas
// excéder une certaine taille
int tailleMaxReceptionEnMb = bdParameters.getInt(null, "LanceurARC.tailleMaxReceptionEnMb", 10);
int tailleMaxReceptionEnMb = bdParameters.getInt(dao.getBatchConnection(), "LanceurARC.tailleMaxReceptionEnMb", 10);

// Maximum number of files to load
int maxFilesToLoad = bdParameters.getInt(null, "LanceurARC.maxFilesToLoad", 101);
int maxFilesToLoad = bdParameters.getInt(dao.getBatchConnection(), "LanceurARC.maxFilesToLoad", 101);

// Maximum number of files processed in each phase iteration
int maxFilesPerPhase = bdParameters.getInt(null, "LanceurARC.maxFilesPerPhase", 1000000);
int maxFilesPerPhase = bdParameters.getInt(dao.getBatchConnection(), "LanceurARC.maxFilesPerPhase", 1000000);

// fréquence à laquelle les phases sont démarrées
this.poolingDelay = bdParameters.getInt(null, "LanceurARC.poolingDelay", 1000);
this.poolingDelay = bdParameters.getInt(dao.getBatchConnection(), "LanceurARC.poolingDelay", 1000);

// heure d'initalisation en production
hourToTriggerInitializationInProduction = bdParameters.getInt(null,
hourToTriggerInitializationInProduction = bdParameters.getInt(dao.getBatchConnection(),
"ApiService.HEURE_INITIALISATION_PRODUCTION", 22);

// interval entre chaque initialisation en nb de jours
intervalForInitializationInDay = bdParameters.getInt(null, "LanceurARC.INTERVAL_JOUR_INITIALISATION", 7);
intervalForInitializationInDay = bdParameters.getInt(dao.getBatchConnection(), "LanceurARC.INTERVAL_JOUR_INITIALISATION", 7);

// nombre d'iteration de la boucle batch entre chaque routine de maintenance de
// la base de données
numberOfIterationBewteenDatabaseMaintenanceRoutine = bdParameters.getInt(null,
numberOfIterationBewteenDatabaseMaintenanceRoutine = bdParameters.getInt(dao.getBatchConnection(),
"LanceurARC.DATABASE_MAINTENANCE_ROUTINE_INTERVAL", 500);

// nombre d'iteration de la boucle batch entre chaque routine de vérification du
// reste à faire
numberOfIterationBewteenCheckTodo = bdParameters.getInt(null, "LanceurARC.DATABASE_CHECKTODO_ROUTINE_INTERVAL",
numberOfIterationBewteenCheckTodo = bdParameters.getInt(dao.getBatchConnection(), "LanceurARC.DATABASE_CHECKTODO_ROUTINE_INTERVAL",
10);

// wait executor pods
waitExecutorTimerInMS = bdParameters.getInt(null, "LanceurARC.DATABASE_WAIT_FOR_EXECUTORS_IN_MS",
waitExecutorTimerInMS = bdParameters.getInt(dao.getBatchConnection(), "LanceurARC.DATABASE_WAIT_FOR_EXECUTORS_IN_MS",
30000);


Expand Down Expand Up @@ -371,11 +377,11 @@ private void batchEnvironmentPrepare() throws ArcException {
*/
private void resetPendingFilesFromPilotage() throws ArcException {

BatchArcDao.execQueryResetPendingFilesInPilotageTable(envExecution);
dao.execQueryResetPendingFilesInPilotageTable(envExecution);

// if volatile mode on, put back all the not fully proceeded files in reception
// phase
executeIfVolatile(() -> BatchArcDao.execQueryResetPendingFilesInPilotageTableVolatile(envExecution));
executeIfVolatile(() -> dao.execQueryResetPendingFilesInPilotageTableVolatile(envExecution));
}

private void executorsDatabaseCreate() throws ArcException {
Expand Down Expand Up @@ -426,10 +432,10 @@ private void exportToParquet() throws ArcException {
private void maintenanceTablePilotageBatch() throws ArcException {

// postgres catalog maintenance
DatabaseMaintenance.maintenancePgCatalogAllNods(null, FormatSQL.VACUUM_OPTION_FULL);
DatabaseMaintenance.maintenancePgCatalogAllNods(dao.getBatchConnection(), FormatSQL.VACUUM_OPTION_FULL);

// arc pilotage table maintenance
DatabaseMaintenance.maintenancePilotage(null, envExecution, FormatSQL.VACUUM_OPTION_NONE);
DatabaseMaintenance.maintenancePilotage(dao.getBatchConnection(), envExecution, FormatSQL.VACUUM_OPTION_NONE);

}

Expand Down Expand Up @@ -530,7 +536,7 @@ private void phaseInitializationExecute() throws ArcException {
// date programmée d'initialisation (last_init)
// on ne la lance que s'il n'y a rien en cours (pas essentiel mais plus
// sécurisé)
if ((!dejaEnCours && PhaseInitializationOperation.isInitializationMustTrigger(this.envExecution))) {
if ((!dejaEnCours && dao.isInitializationMustTrigger(this.envExecution))) {
message("Initialisation en cours");

PhaseThreadFactory initialiser = new PhaseThreadFactory(mapParam, TraitementPhase.INITIALISATION);
Expand All @@ -539,7 +545,7 @@ private void phaseInitializationExecute() throws ArcException {

message("Initialisation terminée : " + initialiser.getReport().getDuree() + " ms");

BatchArcDao.execUpdateLastInitialisationTimestamp(envExecution, intervalForInitializationInDay,
dao.execUpdateLastInitialisationTimestamp(envExecution, intervalForInitializationInDay,
hourToTriggerInitializationInProduction);

return;
Expand All @@ -560,7 +566,7 @@ private void synchronizeExecutorsMetadata() throws ArcException {

message("Synchronization vers les executeurs en cours");

new SynchronizeRulesAndMetadataOperation(new Sandbox(null, this.envExecution))
new SynchronizeRulesAndMetadataOperation(new Sandbox(dao.getBatchConnection(), this.envExecution))
.synchroniserSchemaExecutionAllNods();

message("Synchronization terminé");
Expand All @@ -576,8 +582,8 @@ private void synchronizeExecutorsMetadata() throws ArcException {
private void deplacerFichiersNonTraites() throws ArcException {

List<String> aBouger = exportOn ? //
BatchArcDao.execQuerySelectArchiveNotExported(envExecution) //
: BatchArcDao.execQuerySelectArchiveEnCours(envExecution);
dao.execQuerySelectArchiveNotExported(envExecution) //
: dao.execQuerySelectArchiveEnCours(envExecution);

dejaEnCours = (!aBouger.isEmpty());

Expand All @@ -587,7 +593,7 @@ private void deplacerFichiersNonTraites() throws ArcException {
}

// si le s3 est actif, on sauvegarde les archives pending ou KO vers le s3
List<String> aBougerToS3 = ArcS3.INPUT_BUCKET.isS3Off() ? new ArrayList<>():BatchArcDao.execQuerySelectArchivePendingOrKO(envExecution);
List<String> aBougerToS3 = ArcS3.INPUT_BUCKET.isS3Off() ? new ArrayList<>():dao.execQuerySelectArchivePendingOrKO(envExecution);
if (!aBougerToS3.isEmpty()) {
savePendingOrKOArchivesToS3(envExecution, repertoire, aBougerToS3);
}
Expand Down Expand Up @@ -793,7 +799,7 @@ public void run() {
*/
private boolean isNothingLeftToDo(String envExecution) {
boolean isNothingLeftToDo = false;
if (BatchArcDao.execQueryAnythingLeftTodo(envExecution) == 0) {
if (dao.execQueryAnythingLeftTodo(envExecution) == 0) {
isNothingLeftToDo = true;
}
return isNothingLeftToDo;
Expand All @@ -806,7 +812,7 @@ private boolean isNothingLeftToDo(String envExecution) {
* @throws ArcException
*/
private boolean isProductionOn() throws ArcException {
this.productionOn = BatchArcDao.execQueryIsProductionOn(this.envExecution);
this.productionOn = dao.execQueryIsProductionOn(this.envExecution);
return productionOn;
}

Expand Down
Loading

0 comments on commit 54f782b

Please sign in to comment.