Skip to content

Commit

Permalink
feat: retrieval ws refactor + test initialisation
Browse files Browse the repository at this point in the history
  • Loading branch information
Nolife999 committed Nov 30, 2023
1 parent bb0981b commit a34b3dd
Show file tree
Hide file tree
Showing 18 changed files with 654 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public enum ViewEnum {
, WS_INFO("ws_info", SchemaEnum.SANDBOX_GENERATED)
, WS_PENDING("ws_pending", SchemaEnum.SANDBOX_GENERATED)
, WS_KO("ws_ko", SchemaEnum.SANDBOX_GENERATED)
, WS_TRACKING("ws_tracking", SchemaEnum.SANDBOX_GENERATED)

;

Expand Down Expand Up @@ -205,8 +206,12 @@ public String getFullName(String schema) {
return normalizeTableName(schema + SQL.DOT.getSqlCode() + this.tableName);
}

public static String getFullNameNotNormalized(String schema, String providedTableName) {
return providedTableName.contains(SQL.DOT.getSqlCode())? providedTableName : schema + SQL.DOT.getSqlCode() + providedTableName;
}

public static String getFullName(String schema, String providedTableName) {
return normalizeTableName(providedTableName.contains(SQL.DOT.getSqlCode())? providedTableName : schema + SQL.DOT.getSqlCode() + providedTableName);
return normalizeTableName(getFullNameNotNormalized(schema, providedTableName));
}

public static String normalizeTableName(String providedTableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static String buildTableNameWithTokens(String schema, String mainSuffix,
}
s.append(mainSuffix);

return ViewEnum.getFullName(schema, s.toString());
return ViewEnum.getFullNameNotNormalized(schema, s.toString());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ public static int dispatchOnNods(Connection coordinatorConnexion, ThrowingConsum
actionOnCoordinator.accept(coordinatorConnexion);
}


// dispatch when scaled
int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods();

// meta data copy is only necessary when scaled
if (numberOfExecutorNods==0)
{
return numberOfExecutorNods;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private static void copyMetaDataToExecutors(Connection coordinatorConnexion, Con

GenericBean gb = SynchronizeRulesAndMetadataDao.execQuerySelectMetaDataOnlyFrom(coordinatorConnexion, table);

CopyObjectsToDatabase.execCopyFromGenericBeanIfTableNotExists(executorConnection, table, gb);
CopyObjectsToDatabase.execCopyFromGenericBeanWithoutDroppingTargetTable(executorConnection, table, gb);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,35 @@ private CopyObjectsToDatabase() {
/**
* execute copy by chunk. It is mandatory for large GenericBean objects
* @param connection
* @param tableName
* @param gb
* @param targetTableName
* @param genericBeanContainingData
* @throws ArcException
*/
public static void execCopyFromGenericBean(Connection connection, String tableName, GenericBean gb)
public static void execCopyFromGenericBean(Connection connection, String targetTableName, GenericBean genericBeanContainingData)
throws ArcException {
execCopyFromGenericBean(connection, tableName, gb, CHUNK_SIZE, true);
execCopyFromGenericBean(connection, targetTableName, genericBeanContainingData, CHUNK_SIZE, true);
}

public static void execCopyFromGenericBeanIfTableNotExists(Connection connection, String tableName, GenericBean gb)
public static void execCopyFromGenericBeanWithoutDroppingTargetTable(Connection targetConnection, String targetTableName, GenericBean genericBeanContainingData)
throws ArcException {
execCopyFromGenericBean(connection, tableName, gb, CHUNK_SIZE, false);
execCopyFromGenericBean(targetConnection, targetTableName, genericBeanContainingData, CHUNK_SIZE, false);
}


/**
* execute copy from GenericBean to database by chunk of size @param chunkSize
*
* @param connection
* @param tableName
* @param gb
* @param targetConnection
* @param targetTableName
* @param genericBeanContainingData
* @param chunkSize
* @throws ArcException
*/
private static void execCopyFromGenericBean(Connection connection, String tableName, GenericBean gb, int chunkSize, boolean replaceTargetTable)
private static void execCopyFromGenericBean(Connection targetConnection, String targetTableName, GenericBean genericBeanContainingData, int chunkSize, boolean replaceTargetTable)
throws ArcException {
GenericPreparedStatementBuilder query = new GenericPreparedStatementBuilder();

query.append(query.createWithGenericBean(tableName, gb, replaceTargetTable));
query.append(query.createWithGenericBean(targetTableName, genericBeanContainingData, replaceTargetTable));

int cursor = 0;
boolean stillToDo = true;
Expand All @@ -55,18 +55,18 @@ private static void execCopyFromGenericBean(Connection connection, String tableN
int startChunk = cursor;
int endChunk = cursor + chunkSize;
cursor = endChunk;
stillToDo=(cursor < gb.getContent().size());
stillToDo=(cursor < genericBeanContainingData.getContent().size());

query.insertWithGenericBeanByChunk(tableName, gb, startChunk, endChunk);
query.insertWithGenericBeanByChunk(targetTableName, genericBeanContainingData, startChunk, endChunk);

// analyze on the table at the end
if (!stillToDo)
{
query.append(SQL.COMMIT).append(SQL.END_QUERY);
query.append(FormatSQL.analyzeSecured(tableName));
query.append(FormatSQL.analyzeSecured(targetTableName));
}

UtilitaireDao.get(0).executeImmediate(connection, query);
UtilitaireDao.get(0).executeImmediate(targetConnection, query);
query = new GenericPreparedStatementBuilder();

} while (stillToDo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public enum ArcExceptionMessage {

WS_RETRIEVE_DATA_FAMILY_FORBIDDEN("Vous ne pouvez pas accéder à cette famille de norme"),
WS_RETRIEVE_DATA_FAMILY_CREATION_FAILED("Les tables de la famille de norme n'ont pas pu être créées"),
WS_RETRIEVE_DATA_SCALABLE_TABLE_MUST_BE_EXPORT_IN_CSV("Scalable tables can only be retrieved in csv_gzip mode"),

IHM_NMCL_COLUMN_IN_FILE_BUT_NOT_IN_SCHEMA("La colonne %s n'est pas déclarée dans le schéma"),
IHM_NMCL_COLUMN_IN_SCHEMA_BUT_NOT_IN_FILE("La colonne est déclarée dans le schéma mais absente du fichier"),
Expand Down
14 changes: 14 additions & 0 deletions arc-ws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@
</properties>

<dependencies>
<dependency>
<groupId>fr.insee.arc</groupId>
<artifactId>arc-utils</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>fr.insee.arc</groupId>
<artifactId>arc-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>fr.insee.arc</groupId>
<artifactId>arc-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.json.JSONArray;
import org.json.JSONObject;

import fr.insee.arc.core.dataobjects.ArcDatabase;
import fr.insee.arc.core.dataobjects.SchemaEnum;
import fr.insee.arc.core.model.Delimiters;
import fr.insee.arc.utils.exception.ArcException;
Expand Down Expand Up @@ -36,7 +37,7 @@ public ImportStep1InitializeClientTablesService(JSONObject dsnRequest) {

this.dsnRequest = dsnRequest;

this.arcClientIdentifier = new ArcClientIdentifier(dsnRequest);
this.arcClientIdentifier = new ArcClientIdentifier(dsnRequest, true);

this.sources = makeSource(dsnRequest);

Expand All @@ -63,25 +64,68 @@ private static List<String> makeSource(JSONObject dsnRequest) {
}

public void execute(SendResponse resp) throws ArcException {
this.clientDao.dropPendingClientTables();

// drop tables from the client that had been requested from a former call
dropPendingClientTables();

this.clientDao.createTableWsStatus();
// create the table that will track the data table which has been built and retrieved
createTrackTable();

// create the wsinfo and the wspending table
// wspending table will be delete when all
createWsTables();

if (!arcClientIdentifier.getEnvironnement().equalsIgnoreCase(SchemaEnum.ARC_METADATA.getSchemaName())) {
clientDao.verificationClientFamille();
tablesMetierNames = clientDao.getIdSrcTableMetier(dsnRequest);
}
// create tables to retrieve family data table
createMetaFamilyTables();

// create data table in an asynchronous parallel thread
startTableCreationInParallel();

// on renvoie l'id du client avec son timestamp
resp.send(arcClientIdentifier.getEnvironnement() + Delimiters.SQL_SCHEMA_DELIMITER
+ arcClientIdentifier.getClient() + Delimiters.SQL_TOKEN_DELIMITER
+ arcClientIdentifier.getClientIdentifier() + Delimiters.SQL_TOKEN_DELIMITER
+ arcClientIdentifier.getTimestamp());

resp.endSending();
}

/**
* 1. check if the client has the right to retrieve the family. If so :
* 2. build the table of id_source to be retrieved in the family data table
* 3. return the list of family data table to retrieve
* @throws ArcException
*/
private void createMetaFamilyTables() throws ArcException {
if (!arcClientIdentifier.getEnvironnement().equalsIgnoreCase(SchemaEnum.ARC_METADATA.getSchemaName())) {

if (!clientDao.verificationClientFamille()) {
throw new ArcException(ArcExceptionMessage.WS_RETRIEVE_DATA_FAMILY_FORBIDDEN);
}

clientDao.createTableOfIdSource(dsnRequest);
tablesMetierNames = clientDao.selectBusinessDataTables();
}
}

/**
* create the table that tracks the client table which had been built
* when the data of a table will be retrieved by the client, the table entry will be deleted from the track table
* @throws ArcException
*/
private void createTrackTable() throws ArcException {
clientDao.createTableTrackRetrievedTables();
}

/**
* create the wsinfo and wspending tables
* wspending will be deleted when all client tables will have been retrieved
* wsinfo table will be looped transfered to the client until wspending table is dropped
* @throws ArcException
*/
private void createWsTables() throws ArcException {
this.clientDao.createTableWsInfo();
}

/**
* Will send handshake to client every @HANDSHAKE_TIMER_IN_MS milliseconds Ugly
* but we failed at fixing that in front of a F5 controller
Expand All @@ -94,11 +138,12 @@ private void startTableCreationInParallel() {
public void run() {
try {
if (tablesMetierNames != null) {
executeIf(ExportSource.MAPPING, () -> clientDao.createImages(tablesMetierNames));

executeIf(ExportSource.MAPPING, () -> createImages(tablesMetierNames));
executeIf(ExportSource.METADATA, () -> clientDao.createTableMetier());
executeIf(ExportSource.METADATA, () -> clientDao.createVarMetier());
executeIf(ExportSource.METADATA, () -> clientDao.createTableVarMetier());
}
executeIf(ExportSource.NOMENCLATURE, () -> clientDao.createNmcl());
executeIf(ExportSource.NOMENCLATURE, () -> clientDao.createTableNmcl());
executeIf(ExportSource.METADATA, () -> clientDao.createTableFamille());
executeIf(ExportSource.METADATA, () -> clientDao.createTablePeriodicite());
} catch (ArcException e) {
Expand All @@ -125,4 +170,46 @@ public void run() {
maintenance.start();
}


/**
* drop tables on coordinator and executors if the exists
* @throws ArcException
*/
private void dropPendingClientTables() throws ArcException {

this.clientDao.dropPendingClientTables(0);

int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods();
for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR
.getIndex() + numberOfExecutorNods; executorConnectionId++) {
this.clientDao.dropPendingClientTables(executorConnectionId);
}
}



/**
* create image tables on executor nods if connection is scaled, on coordinator
* nod if not
*
* @param tablesMetierNames
* @throws ArcException
*/
private void createImages(List<String> tablesMetierNames) throws ArcException {
int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods();
if (numberOfExecutorNods == 0) {
clientDao.createImages(tablesMetierNames, ArcDatabase.COORDINATOR.getIndex());
} else {
for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR
.getIndex() + numberOfExecutorNods; executorConnectionId++) {

// copy the table containing id_source to be retrieved on executor nods
clientDao.copyTableOfIdSourceToExecutorNod(executorConnectionId);

// create the business table containing data of id_source found in table tableOfIdSource
clientDao.createImages(tablesMetierNames, executorConnectionId);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
import org.json.JSONObject;

import fr.insee.arc.utils.exception.ArcException;
import fr.insee.arc.utils.exception.ArcExceptionMessage;
import fr.insee.arc.ws.services.importServlet.actions.SendResponse;
import fr.insee.arc.ws.services.importServlet.bo.ArcClientIdentifier;
import fr.insee.arc.ws.services.importServlet.bo.ExportTrackingType;
import fr.insee.arc.ws.services.importServlet.bo.JsonKeys;
import fr.insee.arc.ws.services.importServlet.bo.TableToRetrieve;
import fr.insee.arc.ws.services.importServlet.dao.ClientDao;
import fr.insee.arc.ws.services.importServlet.dao.NameDao;

Expand All @@ -29,7 +32,7 @@ public ImportStep2GetTableNameService(JSONObject dsnRequest) {

this.dsnRequest = dsnRequest;

this.arcClientIdentifier = new ArcClientIdentifier(dsnRequest);
this.arcClientIdentifier = new ArcClientIdentifier(dsnRequest, false);

reprise = this.dsnRequest.getBoolean(JsonKeys.REPRISE.getKey());

Expand All @@ -39,26 +42,20 @@ public ImportStep2GetTableNameService(JSONObject dsnRequest) {

public void execute(SendResponse resp) throws ArcException {

StringBuilder type = new StringBuilder();

String tableName = this.clientDao.getAClientTable();

if (tableName == null) {
tableName = this.clientDao.getIdTable();

if (!reprise) {
this.clientDao.updatePilotage(tableName);
}
// check if a KO
if (this.clientDao.getAClientTableByType(ExportTrackingType.KO).getTableName() != null) {
throw new ArcException(ArcExceptionMessage.WS_RETRIEVE_DATA_FAMILY_CREATION_FAILED);
}

// try to get a data table
TableToRetrieve table = this.clientDao.getAClientTableByType(ExportTrackingType.DATA);

this.clientDao.dropTable(tableName);
if (table.getTableName() != null) {

resp.send(" ");
resp.endSending();
return;
StringBuilder type = new StringBuilder();

} else {
// récupération du type
List<List<String>> metadataOnlyTable = NameDao.execQuerySelectMetadata(tableName);
List<List<String>> metadataOnlyTable = NameDao.execQuerySelectMetadata(table);

for (int j = 0; j < metadataOnlyTable.get(0).size(); j++) {
if (j > 0) {
Expand All @@ -69,11 +66,31 @@ public void execute(SendResponse resp) throws ArcException {
type.append(" " + metadataOnlyTable.get(i).get(j));
}
}

// renvoie un nom de table du client si il en reste une
resp.send(table.getTableName() + " " + type);
resp.endSending();

return;
}

// renvoie un nom de table du client si il en reste une
resp.send(tableName + " " + type);
// if no data table found, get source table to register
table = this.clientDao.getAClientTableByType(ExportTrackingType.ID_SOURCE);

if (table.getTableName() != null) {
if (!reprise) {
this.clientDao.updatePilotage(table.getTableName());
}

this.clientDao.dropTable(table.getTableName());
}

table = this.clientDao.getAClientTableByType(ExportTrackingType.TRACK);
this.clientDao.dropTable(table.getTableName());

resp.send(" ");
resp.endSending();

}

}
Loading

0 comments on commit a34b3dd

Please sign in to comment.