Skip to content

Commit

Permalink
[MODSOURMAN-1049] Add DITF composite job capabilities and logic
Browse files Browse the repository at this point in the history
* Update submodule

* DITF changes

Co-authored-by: Taras_Spashchenko <[email protected]>

* Sonar fixes

* Add additional tests

* Update interface

* Reference better ticket for DI changes

* Minor cleanup

---------

Co-authored-by: Taras_Spashchenko <[email protected]>
  • Loading branch information
ncovercash and TarasSpashchenko authored Oct 9, 2023
1 parent f1d5168 commit e51ec72
Show file tree
Hide file tree
Showing 16 changed files with 687 additions and 100 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [MODSOURMAN-974](https://issues.folio.org/browse/MODSOURMAN-974) MARC bib $9 handling | Remove $9 subfields from linkable fields
* [MODSOURMAN-971](https://issues.folio.org/browse/MODSOURMAN-971) Adjust journal records population to create multiple journal records for each Holdings/Item
* [MODSOURMAN-1014](https://issues.folio.org/browse/MODSOURMAN-1014) Upgrade folio-kafka-wrapper to 3.0.0 version
* [MODDATAIMP-866](https://issues.folio.org/browse/MODDATAIMP-866) Add composite job types to support DI splitting workflow (bump interface `source-manager-job-executions` to version `3.3`)

## 2023-03-xo v3.6.1-SNAPSHOT
* [MODSOURMAN-957](https://issues.folio.org/browse/MODSOURMAN-957) The '1' number of SRS MARC and Instance are displayed in cells in the row with the 'Updated' row header at the individual import job's log
Expand Down
2 changes: 1 addition & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
"provides": [
{
"id": "source-manager-job-executions",
"version": "3.2",
"version": "3.3",
"handlers": [
{
"methods": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
Expand All @@ -19,8 +20,11 @@
import org.folio.dao.util.SortField;
import org.folio.rest.jaxrs.model.DeleteJobExecutionsResp;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionCompositeDetailDto;
import org.folio.rest.jaxrs.model.JobExecutionCompositeDetailsDto;
import org.folio.rest.jaxrs.model.JobExecutionDetail;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDto.SubordinationType;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
import org.folio.rest.jaxrs.model.JobExecutionUserInfo;
import org.folio.rest.jaxrs.model.JobExecutionUserInfoCollection;
Expand Down Expand Up @@ -65,6 +69,8 @@
import static org.folio.dao.util.JobExecutionDBConstants.HRID_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.ID_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.INSERT_SQL;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PART_NUMBER;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_COMPOSITE_DATA;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_DATA_TYPE_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_HIDDEN_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_ID_FIELD;
Expand All @@ -82,6 +88,8 @@
import static org.folio.dao.util.JobExecutionDBConstants.SUBORDINATION_TYPE_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_COUNT_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_JOB_PARTS;
import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_RECORDS_IN_FILE;
import static org.folio.dao.util.JobExecutionDBConstants.UI_STATUS_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.UPDATE_BY_IDS_SQL;
import static org.folio.dao.util.JobExecutionDBConstants.UPDATE_PROGRESS_SQL;
Expand Down Expand Up @@ -125,6 +133,9 @@ public class JobExecutionDaoImpl implements JobExecutionDao {
public static final String DELETE_FROM_JOB_EXECUTION_TABLE = "DELETE from %s.%s where id = ANY ($1)";
public static final String JOB_EXECUTION_SOURCE_CHUNKS_TABLE_NAME = "job_execution_source_chunks";
public static final String JOURNAL_RECORDS_TABLE_NAME = "journal_records";
public static final String JOB_PROFILE_COMPOSITE_DATA_STATUS = "status";
public static final String JOB_PROFILE_COMPOSITE_DATA_TOTAL_RECORDS_COUNT = "total_records_count";
public static final String JOB_PROFILE_COMPOSITE_DATA_CURRENTLY_PROCESSED = "currently_processed";

@Autowired
private PostgresClientFactory pgClientFactory;
Expand All @@ -137,7 +148,8 @@ public Future<JobExecutionDtoCollection> getJobExecutionsWithoutParentMultiple(J
String orderByClause = buildOrderByClause(sortFields);
String jobTable = formatFullTableName(tenantId, TABLE_NAME);
String progressTable = formatFullTableName(tenantId, PROGRESS_TABLE_NAME);
String query = format(GET_JOBS_NOT_PARENT_SQL, jobTable, filterCriteria, jobTable, progressTable, filterCriteria, orderByClause);
String query = format(GET_JOBS_NOT_PARENT_SQL, jobTable, filterCriteria, jobTable, progressTable, jobTable, progressTable, filterCriteria, orderByClause);

pgClientFactory.createInstance(tenantId).selectRead(query, Tuple.of(limit, offset), promise);
} catch (Exception e) {
LOGGER.warn("getJobExecutionsWithoutParentMultiple:: Error while getting Logs", e);
Expand Down Expand Up @@ -285,7 +297,7 @@ public Future<DeleteJobExecutionsResp> softDeleteJobExecutionsByIds(List<String>
try {
Map<String, String> data = new HashMap<>();
data.put(TENANT_NAME, convertToPsqlStandard(tenantId));
data.put(DB_TABLE_NAME_FIELD,TABLE_NAME);
data.put(DB_TABLE_NAME_FIELD, TABLE_NAME);
data.put(SET_FIELD_NAME, IS_DELETED);
data.put(SET_FIELD_VALUE, TRUE);
data.put(SET_CONDITIONAL_FIELD_NAME, ID);
Expand All @@ -299,6 +311,7 @@ public Future<DeleteJobExecutionsResp> softDeleteJobExecutionsByIds(List<String>
}
return promise.future().map(this::mapRowSetToDeleteChangeManagerJobExeResp);
}

@Override
public Future<JobExecutionUserInfoCollection> getRelatedUsersInfo(int offset, int limit, String tenantId) {
Promise<RowSet<Row>> promise = Promise.promise();
Expand Down Expand Up @@ -330,7 +343,7 @@ private JobExecutionUserInfo mapRowToJobExecutionUserInfoDto(Row row) {
return jobExecutionUserInfo;
}

private DeleteJobExecutionsResp mapRowSetToDeleteChangeManagerJobExeResp(RowSet<Row> rowSet){
private DeleteJobExecutionsResp mapRowSetToDeleteChangeManagerJobExeResp(RowSet<Row> rowSet) {
DeleteJobExecutionsResp deleteJobExecutionsResp = new DeleteJobExecutionsResp();
List<JobExecutionDetail> jobExecutionDetails = new ArrayList<>();
rowSet.forEach(row -> {
Expand Down Expand Up @@ -366,7 +379,10 @@ private Tuple mapToTuple(JobExecution jobExecution) {
? jobExecution.getJobProfileInfo().getDataType().toString() : null,
jobExecution.getJobProfileSnapshotWrapper() == null
? null : JsonObject.mapFrom(jobExecution.getJobProfileSnapshotWrapper()),
jobExecution.getJobProfileInfo() != null && jobExecution.getJobProfileInfo().getHidden());
jobExecution.getJobProfileInfo() != null && jobExecution.getJobProfileInfo().getHidden(),
jobExecution.getJobPartNumber(),
jobExecution.getTotalJobParts(),
jobExecution.getTotalRecordsInFile());
}

private JobExecutionDtoCollection mapToJobExecutionDtoCollection(RowSet<Row> rowSet) {
Expand Down Expand Up @@ -411,7 +427,10 @@ private JobExecution mapRowToJobExecution(Row row) {
.withTotal(row.getInteger(PROGRESS_TOTAL_FIELD)))
.withJobProfileInfo(mapRowToJobProfileInfo(row))
.withJobProfileSnapshotWrapper(row.getJsonObject(PROFILE_SNAPSHOT_WRAPPER_FIELD) == null
? null : row.getJsonObject(PROFILE_SNAPSHOT_WRAPPER_FIELD).mapTo(ProfileSnapshotWrapper.class));
? null : row.getJsonObject(PROFILE_SNAPSHOT_WRAPPER_FIELD).mapTo(ProfileSnapshotWrapper.class))
.withJobPartNumber(row.getInteger(JOB_PART_NUMBER))
.withTotalJobParts(row.getInteger(TOTAL_JOB_PARTS))
.withTotalRecordsInFile(row.getInteger(TOTAL_RECORDS_IN_FILE));
}

private JobExecutionDto mapRowToJobExecutionDto(Row row) {
Expand All @@ -433,7 +452,11 @@ private JobExecutionDto mapRowToJobExecutionDto(Row row) {
.withLastName(row.getString(JOB_USER_LAST_NAME_FIELD)))
.withUserId(row.getValue(USER_ID_FIELD).toString())
.withProgress(mapRowToProgress(row))
.withJobProfileInfo(mapRowToJobProfileInfo(row));
.withJobProfileInfo(mapRowToJobProfileInfo(row))
.withJobPartNumber(row.getInteger(JOB_PART_NUMBER))
.withTotalJobParts(row.getInteger(TOTAL_JOB_PARTS))
.withTotalRecordsInFile(row.getInteger(TOTAL_RECORDS_IN_FILE))
.withCompositeDetails(mapToJobExecutionCompositeDetailsDto(row));
}

private Date mapRowToCompletedDate(Row row) {
Expand All @@ -442,6 +465,10 @@ private Date mapRowToCompletedDate(Row row) {
}

private Progress mapRowToProgress(Row row) {
if (row.get(JobExecutionDto.SubordinationType.class, SUBORDINATION_TYPE_FIELD).equals(SubordinationType.COMPOSITE_PARENT)) {
return mapRowToProgressComposite(row);
}

Integer processedCount = row.getInteger(CURRENTLY_PROCESSED_FIELD);
Integer total = row.getInteger(TOTAL_FIELD);
if (processedCount == null) {
Expand All @@ -457,6 +484,29 @@ private Progress mapRowToProgress(Row row) {
.withTotal(total);
}

private Progress mapRowToProgressComposite(Row row) {
JsonArray compositeData = row.getJsonArray(JOB_PROFILE_COMPOSITE_DATA);

if (Objects.nonNull(compositeData) && !compositeData.isEmpty()) {
Progress progressDto = new Progress()
.withJobExecutionId(row.getValue(ID_FIELD).toString());

int processed = 0;
int total = 0;

for (Object o : compositeData) {
JsonObject jo = (JsonObject) o;

processed += Optional.ofNullable(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_CURRENTLY_PROCESSED)).orElse(0);
total += Optional.ofNullable(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_TOTAL_RECORDS_COUNT)).orElse(0);
}

return progressDto.withCurrent(processed).withTotal(total);
}

return null;
}

private JobProfileInfo mapRowToJobProfileInfo(Row row) {
UUID profileId = row.getUUID(JOB_PROFILE_ID_FIELD);
if (Objects.nonNull(profileId)) {
Expand All @@ -470,6 +520,44 @@ private JobProfileInfo mapRowToJobProfileInfo(Row row) {
return null;
}

private JobExecutionCompositeDetailsDto mapToJobExecutionCompositeDetailsDto(Row row) {
if (row.getColumnIndex(JOB_PROFILE_COMPOSITE_DATA) == -1) {
return null;
}
JsonArray compositeData = row.getJsonArray(JOB_PROFILE_COMPOSITE_DATA);
if (Objects.nonNull(compositeData) && !compositeData.isEmpty()) {
JobExecutionCompositeDetailsDto detailsDto = new JobExecutionCompositeDetailsDto();

compositeData.forEach((Object o) -> {
JsonObject jo = (JsonObject) o;
JobExecutionDto.Status status = JobExecutionDto.Status.valueOf(jo.getString(JOB_PROFILE_COMPOSITE_DATA_STATUS));

JobExecutionCompositeDetailDto stateDto = new JobExecutionCompositeDetailDto()
.withChunksCount(jo.getInteger("cnt"))
.withTotalRecordsCount(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_TOTAL_RECORDS_COUNT))
.withCurrentlyProcessedCount(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_CURRENTLY_PROCESSED));

switch (status) {
case NEW -> detailsDto.setNewState(stateDto);
case FILE_UPLOADED -> detailsDto.setFileUploadedState(stateDto);
case PARSING_IN_PROGRESS -> detailsDto.setParsingInProgressState(stateDto);
case PARSING_FINISHED -> detailsDto.setParsingFinishedState(stateDto);
case PROCESSING_IN_PROGRESS -> detailsDto.setProcessingInProgressState(stateDto);
case PROCESSING_FINISHED -> detailsDto.setProcessingFinishedState(stateDto);
case COMMIT_IN_PROGRESS -> detailsDto.setCommitInProgressState(stateDto);
case COMMITTED -> detailsDto.setCommittedState(stateDto);
case ERROR -> detailsDto.setErrorState(stateDto);
case DISCARDED -> detailsDto.setDiscardedState(stateDto);
case CANCELLED -> detailsDto.setCancelledState(stateDto);
default -> throw new IllegalStateException("Invalid child status: " + status);
}
});

return detailsDto;
}
return null;
}

private String formatFullTableName(String tenantId, String table) {
return format("%s.%s", convertToPsqlStandard(tenantId), table);
}
Expand Down Expand Up @@ -517,7 +605,7 @@ public Future<Boolean> hardDeleteJobExecutions(long diffNumberOfDays, String ten
return CompositeFuture.all(jobExecutionProgressFuture, jobExecutionSourceChunksFuture, journalRecordsFuture)
.compose(ar -> Future.<RowSet<Row>>future(rowSetPromise -> deleteFromJobExecutionTable(uuids, sqlConnection, tenantId, rowSetPromise, postgresClient)))
.map(true);
}));
}));
}

private Future<List<String>> mapRowsetValuesToListOfString(RowSet<Row> rowset) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.dao;

import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;

import java.text.SimpleDateFormat;
import java.util.Date;
Expand All @@ -16,6 +17,7 @@
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_HIDDEN_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_ID_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.STATUS_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.SUBORDINATION_TYPE_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.UI_STATUS_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.USER_ID_FIELD;

Expand All @@ -30,6 +32,7 @@ public class JobExecutionFilter {
private String fileNamePattern;
private List<String> fileNameNotAny;
private List<String> profileIdAny;
private List<JobExecutionDto.SubordinationType> subordinationTypeNotAny;
private String userId;
private Date completedAfter;
private Date completedBefore;
Expand Down Expand Up @@ -74,6 +77,11 @@ public JobExecutionFilter withProfileIdAny(List<String> profileIdAny) {
return this;
}

public JobExecutionFilter withSubordinationTypeNotAny(List<JobExecutionDto.SubordinationType> subordinationTypeNotAny) {
this.subordinationTypeNotAny = subordinationTypeNotAny;
return this;
}

public JobExecutionFilter withUserId(String userId) {
this.userId = userId;
return this;
Expand All @@ -94,7 +102,7 @@ public String buildCriteria() {
if (isNotEmpty(statusAny)) {
List<String> statuses = statusAny.stream()
.map(JobExecution.Status::toString)
.collect(Collectors.toList());
.toList();

addCondition(conditionBuilder, buildInCondition(STATUS_FIELD, statuses));
}
Expand All @@ -107,7 +115,7 @@ public String buildCriteria() {
if (isNotEmpty(uiStatusAny)) {
List<String> uiStatuses = uiStatusAny.stream()
.map(JobExecution.UiStatus::toString)
.collect(Collectors.toList());
.toList();

addCondition(conditionBuilder, buildInCondition(UI_STATUS_FIELD, uiStatuses));
}
Expand All @@ -123,11 +131,15 @@ public String buildCriteria() {
}
}
if (isNotEmpty(fileNameNotAny)) {
addCondition(conditionBuilder, buildNotInCondition(FILE_NAME_FIELD,fileNameNotAny));
addCondition(conditionBuilder, buildNotInCondition(FILE_NAME_FIELD, fileNameNotAny));
}
if (isNotEmpty(profileIdAny)) {
addCondition(conditionBuilder, buildInCondition(JOB_PROFILE_ID_FIELD, profileIdAny));
}
if (isNotEmpty(subordinationTypeNotAny)) {
List<String> subordinationTypes = subordinationTypeNotAny.stream().map(JobExecutionDto.SubordinationType::value).toList();
addCondition(conditionBuilder, buildNotInCondition(SUBORDINATION_TYPE_FIELD, subordinationTypes));
}
if (isNotEmpty(userId)) {
addCondition(conditionBuilder, buildEqualCondition(USER_ID_FIELD, userId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public final class JobExecutionDBConstants {
public static final String JOB_PROFILE_NAME_FIELD = "job_profile_name";
public static final String JOB_PROFILE_HIDDEN_FIELD = "job_profile_hidden";
public static final String JOB_PROFILE_DATA_TYPE_FIELD = "job_profile_data_type";
public static final String JOB_PROFILE_COMPOSITE_DATA = "composite_data";
public static final String PROFILE_SNAPSHOT_WRAPPER_FIELD = "job_profile_snapshot_wrapper";
public static final String SOURCE_PATH_FIELD = "source_path";
public static final String FILE_NAME_FIELD = "file_name";
Expand All @@ -29,6 +30,9 @@ public final class JobExecutionDBConstants {
public static final String CURRENTLY_PROCESSED_FIELD = "currently_processed";
public static final String TOTAL_FIELD = "total";
public static final String IS_DELETED_FIELD = "is_deleted";
public static final String JOB_PART_NUMBER = "job_part_number";
public static final String TOTAL_JOB_PARTS = "total_job_parts";
public static final String TOTAL_RECORDS_IN_FILE = "total_records_in_file";

public static final String GET_BY_ID_SQL = "SELECT * FROM %s WHERE id = $1 AND is_deleted = false";
public static final String UPDATE_BY_IDS_SQL = "UPDATE ${tenantName}.${tableName} SET ${setFieldName} = ${setFieldValue} WHERE ${setConditionalFieldName} IN ('${setConditionalFieldValues}') RETURNING ${returningFieldNames}";
Expand All @@ -37,34 +41,45 @@ public final class JobExecutionDBConstants {
"INSERT INTO %s.%s (id, hrid, parent_job_id, subordination_type, source_path, file_name, " +
"progress_current, progress_total, started_date, completed_date, status, ui_status, error_status, job_user_first_name, " +
"job_user_last_name, user_id, job_profile_id, job_profile_name, job_profile_data_type, job_profile_snapshot_wrapper, "
+ "job_profile_hidden) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)";
+ "job_profile_hidden, job_part_number, total_job_parts, total_records_in_file) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)";

public static final String UPDATE_SQL =
"UPDATE %s " +
"SET id = $1, hrid = $2, parent_job_id = $3, subordination_type = $4, source_path = $5, file_name = $6, " +
"progress_current = $7, progress_total = $8, started_date = $9, completed_date = $10, " +
"status = $11, ui_status = $12, error_status = $13, job_user_first_name = $14, job_user_last_name = $15, " +
"user_id = $16, job_profile_id = $17, job_profile_name = $18, job_profile_data_type = $19, " +
"job_profile_snapshot_wrapper = $20, job_profile_hidden = $21" +
"WHERE id = $1";
"job_profile_snapshot_wrapper = $20, job_profile_hidden = $21, job_part_number = $22, total_job_parts = $23, " +
"total_records_in_file = $24 WHERE id = $1";

public static final String GET_CHILDREN_JOBS_BY_PARENT_ID_SQL =
"WITH cte AS (SELECT count(*) AS total_count FROM %s " +
"WHERE parent_job_id = $1 AND subordination_type = 'CHILD' AND is_deleted = false) " +
"WHERE parent_job_id = $1 AND subordination_type in ('CHILD', 'COMPOSITE_CHILD') AND is_deleted = false) " +
"SELECT j.*, cte.*, p.total_records_count total, " +
"p.succeeded_records_count + p.error_records_count currently_processed " +
"FROM %s j " +
"LEFT JOIN %s p ON j.id = p.job_execution_id " +
"LEFT JOIN cte ON true " +
"WHERE parent_job_id = $1 AND subordination_type = 'CHILD' AND is_deleted = false " +
"WHERE parent_job_id = $1 AND subordination_type in ('CHILD', 'COMPOSITE_CHILD') AND is_deleted = false " +
"LIMIT $2 OFFSET $3";

public static final String GET_JOBS_NOT_PARENT_SQL =
"WITH cte AS (SELECT count(*) AS total_count FROM %s " +
"WHERE subordination_type <> 'PARENT_MULTIPLE' AND %s) " +
"SELECT j.*, cte.*, p.total_records_count total, " +
"p.succeeded_records_count + p.error_records_count currently_processed " +
"p.succeeded_records_count + p.error_records_count currently_processed, " +
"(select jsonb_agg(x) composite_data " +
"from (select status, " +
"count(1) cnt, " +
"sum(p1.total_records_count) total_records_count, " +
"sum(p1.succeeded_records_count + p1.error_records_count) currently_processed " +
"FROM %s j1 " +
"LEFT JOIN %s p1 ON j1.id = p1.job_execution_id " +
"where j1.parent_job_id = j.id " +
"and j1.id != j1.parent_job_id " +
"and j1.subordination_type = 'COMPOSITE_CHILD' " +
"group by status) x) composite_data " +
"FROM %s j " +
"LEFT JOIN %s p ON j.id = p.job_execution_id " +
"LEFT JOIN cte ON true " +
Expand Down
Loading

0 comments on commit e51ec72

Please sign in to comment.