Skip to content

Commit

Permalink
Merge pull request #101 from InseeFr/scalable_data_retrieval_webservice
Browse files Browse the repository at this point in the history
Scalable data retrieval webservice
  • Loading branch information
Nolife999 authored Dec 4, 2023
2 parents 55f64a1 + 50cec34 commit 1a75ea0
Show file tree
Hide file tree
Showing 29 changed files with 1,422 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public enum ViewEnum {
, WS_INFO("ws_info", SchemaEnum.SANDBOX_GENERATED)
, WS_PENDING("ws_pending", SchemaEnum.SANDBOX_GENERATED)
, WS_KO("ws_ko", SchemaEnum.SANDBOX_GENERATED)
, WS_TRACKING("ws_tracking", SchemaEnum.SANDBOX_GENERATED)

;

Expand Down Expand Up @@ -205,8 +206,12 @@ public String getFullName(String schema) {
return normalizeTableName(schema + SQL.DOT.getSqlCode() + this.tableName);
}

public static String getFullNameNotNormalized(String schema, String providedTableName) {
return providedTableName.contains(SQL.DOT.getSqlCode())? providedTableName : schema + SQL.DOT.getSqlCode() + providedTableName;
}

public static String getFullName(String schema, String providedTableName) {
return normalizeTableName(providedTableName.contains(SQL.DOT.getSqlCode())? providedTableName : schema + SQL.DOT.getSqlCode() + providedTableName);
return normalizeTableName(getFullNameNotNormalized(schema, providedTableName));
}

public static String normalizeTableName(String providedTableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static String buildTableNameWithTokens(String schema, String mainSuffix,
}
s.append(mainSuffix);

return ViewEnum.getFullName(schema, s.toString());
return ViewEnum.getFullNameNotNormalized(schema, s.toString());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ public static int dispatchOnNods(Connection coordinatorConnexion, ThrowingConsum
actionOnCoordinator.accept(coordinatorConnexion);
}


// dispatch when scaled
int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods();

// meta data copy is only necessary when scaled
if (numberOfExecutorNods==0)
{
return numberOfExecutorNods;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private static void copyMetaDataToExecutors(Connection coordinatorConnexion, Con

GenericBean gb = SynchronizeRulesAndMetadataDao.execQuerySelectMetaDataOnlyFrom(coordinatorConnexion, table);

CopyObjectsToDatabase.execCopyFromGenericBeanIfTableNotExists(executorConnection, table, gb);
CopyObjectsToDatabase.execCopyFromGenericBeanWithoutDroppingTargetTable(executorConnection, table, gb);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public void buildTableNameTokensSuffix() {
String client = "ARTEMIS";
long timestamp = System.currentTimeMillis();

assertEquals("arc_bas2.artemis_"+timestamp+"_pilotage_fichier", TableNaming.buildTableNameWithTokens("arc_bas2", ViewEnum.PILOTAGE_FICHIER, client, timestamp));
assertEquals("arc_bas2.artemis_"+timestamp+"_id_source", TableNaming.buildTableNameWithTokens("arc_bas2", ColumnEnum.ID_SOURCE, client, timestamp));
assertEquals("arc_bas2.ARTEMIS_"+timestamp+"_pilotage_fichier", TableNaming.buildTableNameWithTokens("arc_bas2", ViewEnum.PILOTAGE_FICHIER, client, timestamp));
assertEquals("arc_bas2.ARTEMIS_"+timestamp+"_id_source", TableNaming.buildTableNameWithTokens("arc_bas2", ColumnEnum.ID_SOURCE, client, timestamp));
assertEquals(null, TableNaming.buildTableNameWithTokens("arc_bas2", ColumnEnum.ID_SOURCE, null, timestamp));
assertEquals("arc_bas2.artemis_"+timestamp+"_test", TableNaming.buildTableNameWithTokens("arc_bas2", "test", client, timestamp));
assertEquals("arc_bas2.ARTEMIS_"+timestamp+"_test", TableNaming.buildTableNameWithTokens("arc_bas2", "test", client, timestamp));
assertEquals("arc_bas2.test", TableNaming.buildTableNameWithTokens("arc_bas2", "TEST"));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void copyMetadataToExecutorsTestNotScalable() throws SQLException, ArcExc
@Test
public void copyMetadataToExecutorsTestScalable() throws SQLException, ArcException {

buildPropertiesWithScalability(null);
buildPropertiesWithOneExecutor(null);

BddPatcherTest.initializeDatabaseForRetrieveTablesFromSchemaTest(u);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,35 @@ private CopyObjectsToDatabase() {
/**
* execute copy by chunk. It is mandatory for large GenericBean objects
* @param connection
* @param tableName
* @param gb
* @param targetTableName
* @param genericBeanContainingData
* @throws ArcException
*/
public static void execCopyFromGenericBean(Connection connection, String tableName, GenericBean gb)
public static void execCopyFromGenericBean(Connection connection, String targetTableName, GenericBean genericBeanContainingData)
throws ArcException {
execCopyFromGenericBean(connection, tableName, gb, CHUNK_SIZE, true);
execCopyFromGenericBean(connection, targetTableName, genericBeanContainingData, CHUNK_SIZE, true);
}

public static void execCopyFromGenericBeanIfTableNotExists(Connection connection, String tableName, GenericBean gb)
public static void execCopyFromGenericBeanWithoutDroppingTargetTable(Connection targetConnection, String targetTableName, GenericBean genericBeanContainingData)
throws ArcException {
execCopyFromGenericBean(connection, tableName, gb, CHUNK_SIZE, false);
execCopyFromGenericBean(targetConnection, targetTableName, genericBeanContainingData, CHUNK_SIZE, false);
}


/**
* execute copy from GenericBean to database by chunk of size @param chunkSize
*
* @param connection
* @param tableName
* @param gb
* @param targetConnection
* @param targetTableName
* @param genericBeanContainingData
* @param chunkSize
* @throws ArcException
*/
private static void execCopyFromGenericBean(Connection connection, String tableName, GenericBean gb, int chunkSize, boolean replaceTargetTable)
private static void execCopyFromGenericBean(Connection targetConnection, String targetTableName, GenericBean genericBeanContainingData, int chunkSize, boolean replaceTargetTable)
throws ArcException {
GenericPreparedStatementBuilder query = new GenericPreparedStatementBuilder();

query.append(query.createWithGenericBean(tableName, gb, replaceTargetTable));
query.append(query.createWithGenericBean(targetTableName, genericBeanContainingData, replaceTargetTable));

int cursor = 0;
boolean stillToDo = true;
Expand All @@ -55,18 +55,18 @@ private static void execCopyFromGenericBean(Connection connection, String tableN
int startChunk = cursor;
int endChunk = cursor + chunkSize;
cursor = endChunk;
stillToDo=(cursor < gb.getContent().size());
stillToDo=(cursor < genericBeanContainingData.getContent().size());

query.insertWithGenericBeanByChunk(tableName, gb, startChunk, endChunk);
query.insertWithGenericBeanByChunk(targetTableName, genericBeanContainingData, startChunk, endChunk);

// analyze on the table at the end
if (!stillToDo)
{
query.append(SQL.COMMIT).append(SQL.END_QUERY);
query.append(FormatSQL.analyzeSecured(tableName));
query.append(FormatSQL.analyzeSecured(targetTableName));
}

UtilitaireDao.get(0).executeImmediate(connection, query);
UtilitaireDao.get(0).executeImmediate(targetConnection, query);
query = new GenericPreparedStatementBuilder();

} while (stillToDo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public enum ArcExceptionMessage {

WS_RETRIEVE_DATA_FAMILY_FORBIDDEN("Vous ne pouvez pas accéder à cette famille de norme"),
WS_RETRIEVE_DATA_FAMILY_CREATION_FAILED("Les tables de la famille de norme n'ont pas pu être créées"),
WS_RETRIEVE_DATA_SCALABLE_TABLE_MUST_BE_EXPORT_IN_CSV("Scalable tables can only be retrieved in csv_gzip mode"),

IHM_NMCL_COLUMN_IN_FILE_BUT_NOT_IN_SCHEMA("La colonne %s n'est pas déclarée dans le schéma"),
IHM_NMCL_COLUMN_IN_SCHEMA_BUT_NOT_IN_FILE("La colonne est déclarée dans le schéma mais absente du fichier"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,36 @@ public class InitializeQueryTest {

public static Connection e;

public static Connection f;


@Test
public void testConnection()
{
assertNotNull(c);
}

protected static void buildPropertiesWithoutScalability(String repertoire) throws SQLException
public static void buildPropertiesWithoutScalability(String repertoire) throws SQLException
{
buildProperties(repertoire, new Connection[] {c});
}


protected static void buildPropertiesWithScalability(String repertoire) throws SQLException
public static void buildPropertiesWithOneExecutor(String repertoire) throws SQLException
{
e = new TestDatabase().testConnection;
buildProperties(repertoire, new Connection[] {c, e});
}

protected static void buildProperties(String repertoire, Connection[] connections) throws SQLException
public static void buildPropertiesWithTwoExecutors(String repertoire) throws SQLException
{
e = new TestDatabase().testConnection;
f = new TestDatabase().testConnection;
buildProperties(repertoire, new Connection[] {c, e, f});
}


private static void buildProperties(String repertoire, Connection[] connections) throws SQLException
{
PropertiesHandler testProperties=PropertiesHandler.getInstance();

Expand Down
14 changes: 14 additions & 0 deletions arc-ws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@
</properties>

<dependencies>
<dependency>
<groupId>fr.insee.arc</groupId>
<artifactId>arc-utils</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>fr.insee.arc</groupId>
<artifactId>arc-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>fr.insee.arc</groupId>
<artifactId>arc-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.json.JSONArray;
import org.json.JSONObject;

import fr.insee.arc.core.dataobjects.ArcDatabase;
import fr.insee.arc.core.dataobjects.SchemaEnum;
import fr.insee.arc.core.model.Delimiters;
import fr.insee.arc.utils.exception.ArcException;
Expand Down Expand Up @@ -36,7 +37,7 @@ public ImportStep1InitializeClientTablesService(JSONObject dsnRequest) {

this.dsnRequest = dsnRequest;

this.arcClientIdentifier = new ArcClientIdentifier(dsnRequest);
this.arcClientIdentifier = new ArcClientIdentifier(dsnRequest, true);

this.sources = makeSource(dsnRequest);

Expand All @@ -63,25 +64,68 @@ private static List<String> makeSource(JSONObject dsnRequest) {
}

public void execute(SendResponse resp) throws ArcException {
this.clientDao.dropPendingClientTables();

// drop tables from the client that had been requested from a former call
dropPendingClientTables();

this.clientDao.createTableWsStatus();
// create the table that will track the data table which has been built and retrieved
createTrackTable();

// create the wsinfo and the wspending table
// wspending table will be delete when all
createWsTables();

if (!arcClientIdentifier.getEnvironnement().equalsIgnoreCase(SchemaEnum.ARC_METADATA.getSchemaName())) {
clientDao.verificationClientFamille();
tablesMetierNames = clientDao.getIdSrcTableMetier(dsnRequest);
}
// create tables to retrieve family data table
createMetaFamilyTables();

// create data table in an asynchronous parallel thread
startTableCreationInParallel();

// on renvoie l'id du client avec son timestamp
resp.send(arcClientIdentifier.getEnvironnement() + Delimiters.SQL_SCHEMA_DELIMITER
+ arcClientIdentifier.getClient() + Delimiters.SQL_TOKEN_DELIMITER
+ arcClientIdentifier.getClientIdentifier() + Delimiters.SQL_TOKEN_DELIMITER
+ arcClientIdentifier.getTimestamp());

resp.endSending();
}

/**
* 1. check if the client has the right to retrieve the family. If so :
* 2. build the table of id_source to be retrieved in the family data table
* 3. return the list of family data table to retrieve
* @throws ArcException
*/
private void createMetaFamilyTables() throws ArcException {
if (!arcClientIdentifier.getEnvironnement().equalsIgnoreCase(SchemaEnum.ARC_METADATA.getSchemaName())) {

if (!clientDao.verificationClientFamille()) {
throw new ArcException(ArcExceptionMessage.WS_RETRIEVE_DATA_FAMILY_FORBIDDEN);
}

clientDao.createTableOfIdSource(dsnRequest);
tablesMetierNames = clientDao.selectBusinessDataTables();
}
}

/**
* create the table that tracks the client table which had been built
* when the data of a table will be retrieved by the client, the table entry will be deleted from the track table
* @throws ArcException
*/
private void createTrackTable() throws ArcException {
clientDao.createTableTrackRetrievedTables();
}

/**
* create the wsinfo and wspending tables
* wspending will be deleted when all client tables will have been retrieved
* wsinfo table will be looped transfered to the client until wspending table is dropped
* @throws ArcException
*/
private void createWsTables() throws ArcException {
this.clientDao.createTableWsInfo();
}

/**
* Will send handshake to client every @HANDSHAKE_TIMER_IN_MS milliseconds Ugly
* but we failed at fixing that in front of a F5 controller
Expand All @@ -94,28 +138,21 @@ private void startTableCreationInParallel() {
public void run() {
try {
if (tablesMetierNames != null) {
executeIf(ExportSource.MAPPING, () -> clientDao.createImages(tablesMetierNames));

executeIf(ExportSource.MAPPING, () -> createImages(tablesMetierNames));
executeIf(ExportSource.METADATA, () -> clientDao.createTableMetier());
executeIf(ExportSource.METADATA, () -> clientDao.createVarMetier());
executeIf(ExportSource.METADATA, () -> clientDao.createTableVarMetier());
}
executeIf(ExportSource.NOMENCLATURE, () -> clientDao.createNmcl());
executeIf(ExportSource.NOMENCLATURE, () -> clientDao.createTableNmcl());
executeIf(ExportSource.METADATA, () -> clientDao.createTableFamille());
executeIf(ExportSource.METADATA, () -> clientDao.createTablePeriodicite());
} catch (ArcException e) {
try {
clientDao.createTableWsKO();
} catch (ArcException e1) {
new ArcException(ArcExceptionMessage.DATABASE_CONNECTION_FAILED).logFullException();
}
clientDao.registerWsKO();
} finally {
try {
clientDao.dropTableWsPending();
} catch (ArcException e) {
try {
clientDao.createTableWsKO();
} catch (ArcException e1) {
new ArcException(ArcExceptionMessage.DATABASE_CONNECTION_FAILED).logFullException();
}
clientDao.registerWsKO();
}
}

Expand All @@ -125,4 +162,46 @@ public void run() {
maintenance.start();
}


/**
* drop tables on coordinator and executors if the exists
* @throws ArcException
*/
private void dropPendingClientTables() throws ArcException {

this.clientDao.dropPendingClientTables(0);

int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods();
for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR
.getIndex() + numberOfExecutorNods; executorConnectionId++) {
this.clientDao.dropPendingClientTables(executorConnectionId);
}
}



/**
* create image tables on executor nods if connection is scaled, on coordinator
* nod if not
*
* @param tablesMetierNames
* @throws ArcException
*/
private void createImages(List<String> tablesMetierNames) throws ArcException {
int numberOfExecutorNods = ArcDatabase.numberOfExecutorNods();
if (numberOfExecutorNods == 0) {
clientDao.createImages(tablesMetierNames, ArcDatabase.COORDINATOR.getIndex());
} else {
for (int executorConnectionId = ArcDatabase.EXECUTOR.getIndex(); executorConnectionId < ArcDatabase.EXECUTOR
.getIndex() + numberOfExecutorNods; executorConnectionId++) {

// copy the table containing id_source to be retrieved on executor nods
clientDao.copyTableOfIdSourceToExecutorNod(executorConnectionId);

// create the business table containing data of id_source found in table tableOfIdSource
clientDao.createImages(tablesMetierNames, executorConnectionId);
}
}
}

}
Loading

0 comments on commit 1a75ea0

Please sign in to comment.