From 320d159b00d33f4ae9544d6826b617c0e9ffadd9 Mon Sep 17 00:00:00 2001 From: Noah Overcash Date: Mon, 2 Oct 2023 13:26:13 -0400 Subject: [PATCH] DITF changes Co-authored-by: Taras_Spashchenko --- .../org/folio/dao/JobExecutionDaoImpl.java | 119 ++++++++++- .../org/folio/dao/JobExecutionFilter.java | 14 +- .../dao/util/JobExecutionDBConstants.java | 29 ++- .../folio/rest/impl/MetadataProviderImpl.java | 18 +- .../services/JobExecutionServiceImpl.java | 195 ++++++++++++------ ...cordProcessedEventHandlingServiceImpl.java | 40 +++- ...job_executions_table_for_chunk_support.sql | 11 + ...job_executions_table_for_total_records.sql | 2 + .../templates/db_scripts/schema.json | 10 + .../org/folio/rest/impl/AbstractRestTest.java | 36 ++++ .../changeManager/ChangeManagerAPITest.java | 123 ++++++++++- ramls/metadata-provider.raml | 5 + 12 files changed, 517 insertions(+), 85 deletions(-) create mode 100644 mod-source-record-manager-server/src/main/resources/templates/db_scripts/alter_job_executions_table_for_chunk_support.sql create mode 100644 mod-source-record-manager-server/src/main/resources/templates/db_scripts/alter_job_executions_table_for_total_records.sql diff --git a/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionDaoImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionDaoImpl.java index a8213aed5..e6cb85fd5 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionDaoImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionDaoImpl.java @@ -4,6 +4,7 @@ import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; @@ -19,8 +20,11 @@ import org.folio.dao.util.SortField; import org.folio.rest.jaxrs.model.DeleteJobExecutionsResp; import org.folio.rest.jaxrs.model.JobExecution; +import org.folio.rest.jaxrs.model.JobExecutionCompositeDetailDto; +import org.folio.rest.jaxrs.model.JobExecutionCompositeDetailsDto; import org.folio.rest.jaxrs.model.JobExecutionDetail; import org.folio.rest.jaxrs.model.JobExecutionDto; +import org.folio.rest.jaxrs.model.JobExecutionDto.SubordinationType; import org.folio.rest.jaxrs.model.JobExecutionDtoCollection; import org.folio.rest.jaxrs.model.JobExecutionUserInfo; import org.folio.rest.jaxrs.model.JobExecutionUserInfoCollection; @@ -65,6 +69,8 @@ import static org.folio.dao.util.JobExecutionDBConstants.HRID_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.ID_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.INSERT_SQL; +import static org.folio.dao.util.JobExecutionDBConstants.JOB_PART_NUMBER; +import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_COMPOSITE_DATA; import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_DATA_TYPE_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_HIDDEN_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_ID_FIELD; @@ -82,6 +88,8 @@ import static org.folio.dao.util.JobExecutionDBConstants.SUBORDINATION_TYPE_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_COUNT_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_FIELD; +import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_JOB_PARTS; +import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_RECORDS_IN_FILE; import static org.folio.dao.util.JobExecutionDBConstants.UI_STATUS_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.UPDATE_BY_IDS_SQL; import static org.folio.dao.util.JobExecutionDBConstants.UPDATE_PROGRESS_SQL; @@ -125,6 +133,9 @@ public class JobExecutionDaoImpl implements JobExecutionDao { public static final String DELETE_FROM_JOB_EXECUTION_TABLE = "DELETE from %s.%s where id = ANY ($1)"; public static final String JOB_EXECUTION_SOURCE_CHUNKS_TABLE_NAME = "job_execution_source_chunks"; public static final String JOURNAL_RECORDS_TABLE_NAME = "journal_records"; + public static final String JOB_PROFILE_COMPOSITE_DATA_STATUS = "status"; + public static final String JOB_PROFILE_COMPOSITE_DATA_TOTAL_RECORDS_COUNT = "total_records_count"; + public static final String JOB_PROFILE_COMPOSITE_DATA_CURRENTLY_PROCESSED = "currently_processed"; @Autowired private PostgresClientFactory pgClientFactory; @@ -137,7 +148,8 @@ public Future getJobExecutionsWithoutParentMultiple(J String orderByClause = buildOrderByClause(sortFields); String jobTable = formatFullTableName(tenantId, TABLE_NAME); String progressTable = formatFullTableName(tenantId, PROGRESS_TABLE_NAME); - String query = format(GET_JOBS_NOT_PARENT_SQL, jobTable, filterCriteria, jobTable, progressTable, filterCriteria, orderByClause); + String query = format(GET_JOBS_NOT_PARENT_SQL, jobTable, filterCriteria, jobTable, progressTable, jobTable, progressTable, filterCriteria, orderByClause); +// LOGGER.warn("------> query: " + query); pgClientFactory.createInstance(tenantId).selectRead(query, Tuple.of(limit, offset), promise); } catch (Exception e) { LOGGER.warn("getJobExecutionsWithoutParentMultiple:: Error while getting Logs", e); @@ -285,7 +297,7 @@ public Future softDeleteJobExecutionsByIds(List try { Map data = new HashMap<>(); data.put(TENANT_NAME, convertToPsqlStandard(tenantId)); - data.put(DB_TABLE_NAME_FIELD,TABLE_NAME); + data.put(DB_TABLE_NAME_FIELD, TABLE_NAME); data.put(SET_FIELD_NAME, IS_DELETED); data.put(SET_FIELD_VALUE, TRUE); data.put(SET_CONDITIONAL_FIELD_NAME, ID); @@ -299,6 +311,7 @@ public Future softDeleteJobExecutionsByIds(List } return promise.future().map(this::mapRowSetToDeleteChangeManagerJobExeResp); } + @Override public Future getRelatedUsersInfo(int offset, int limit, String tenantId) { Promise> promise = Promise.promise(); @@ -330,7 +343,7 @@ private JobExecutionUserInfo mapRowToJobExecutionUserInfoDto(Row row) { return jobExecutionUserInfo; } - private DeleteJobExecutionsResp mapRowSetToDeleteChangeManagerJobExeResp(RowSet rowSet){ + private DeleteJobExecutionsResp mapRowSetToDeleteChangeManagerJobExeResp(RowSet rowSet) { DeleteJobExecutionsResp deleteJobExecutionsResp = new DeleteJobExecutionsResp(); List jobExecutionDetails = new ArrayList<>(); rowSet.forEach(row -> { @@ -366,7 +379,10 @@ private Tuple mapToTuple(JobExecution jobExecution) { ? jobExecution.getJobProfileInfo().getDataType().toString() : null, jobExecution.getJobProfileSnapshotWrapper() == null ? null : JsonObject.mapFrom(jobExecution.getJobProfileSnapshotWrapper()), - jobExecution.getJobProfileInfo() != null && jobExecution.getJobProfileInfo().getHidden()); + jobExecution.getJobProfileInfo() != null && jobExecution.getJobProfileInfo().getHidden(), + jobExecution.getJobPartNumber(), + jobExecution.getTotalJobParts(), + jobExecution.getTotalRecordsInFile()); } private JobExecutionDtoCollection mapToJobExecutionDtoCollection(RowSet rowSet) { @@ -411,7 +427,10 @@ private JobExecution mapRowToJobExecution(Row row) { .withTotal(row.getInteger(PROGRESS_TOTAL_FIELD))) .withJobProfileInfo(mapRowToJobProfileInfo(row)) .withJobProfileSnapshotWrapper(row.getJsonObject(PROFILE_SNAPSHOT_WRAPPER_FIELD) == null - ? null : row.getJsonObject(PROFILE_SNAPSHOT_WRAPPER_FIELD).mapTo(ProfileSnapshotWrapper.class)); + ? null : row.getJsonObject(PROFILE_SNAPSHOT_WRAPPER_FIELD).mapTo(ProfileSnapshotWrapper.class)) + .withJobPartNumber(row.getInteger(JOB_PART_NUMBER)) + .withTotalJobParts(row.getInteger(TOTAL_JOB_PARTS)) + .withTotalRecordsInFile(row.getInteger(TOTAL_RECORDS_IN_FILE)); } private JobExecutionDto mapRowToJobExecutionDto(Row row) { @@ -433,7 +452,11 @@ private JobExecutionDto mapRowToJobExecutionDto(Row row) { .withLastName(row.getString(JOB_USER_LAST_NAME_FIELD))) .withUserId(row.getValue(USER_ID_FIELD).toString()) .withProgress(mapRowToProgress(row)) - .withJobProfileInfo(mapRowToJobProfileInfo(row)); + .withJobProfileInfo(mapRowToJobProfileInfo(row)) + .withJobPartNumber(row.getInteger(JOB_PART_NUMBER)) + .withTotalJobParts(row.getInteger(TOTAL_JOB_PARTS)) + .withTotalRecordsInFile(row.getInteger(TOTAL_RECORDS_IN_FILE)) + .withCompositeDetails(mapToJobExecutionCompositeDetailsDto(row)); } private Date mapRowToCompletedDate(Row row) { @@ -442,6 +465,10 @@ private Date mapRowToCompletedDate(Row row) { } private Progress mapRowToProgress(Row row) { + if (row.get(JobExecutionDto.SubordinationType.class, SUBORDINATION_TYPE_FIELD).equals(SubordinationType.COMPOSITE_PARENT)) { + return mapRowToProgressComposite(row); + } + Integer processedCount = row.getInteger(CURRENTLY_PROCESSED_FIELD); Integer total = row.getInteger(TOTAL_FIELD); if (processedCount == null) { @@ -457,6 +484,29 @@ private Progress mapRowToProgress(Row row) { .withTotal(total); } + private Progress mapRowToProgressComposite(Row row) { + JsonArray compositeData = row.getJsonArray(JOB_PROFILE_COMPOSITE_DATA); + + if (Objects.nonNull(compositeData) && !compositeData.isEmpty()) { + Progress progressDto = new Progress() + .withJobExecutionId(row.getValue(ID_FIELD).toString()); + + int processed = 0; + int total = 0; + + for (Object o : compositeData) { + JsonObject jo = (JsonObject) o; + + processed += Optional.ofNullable(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_CURRENTLY_PROCESSED)).orElse(0); + total += Optional.ofNullable(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_TOTAL_RECORDS_COUNT)).orElse(0); + } + + return progressDto.withCurrent(processed).withTotal(total); + } + + return null; + } + private JobProfileInfo mapRowToJobProfileInfo(Row row) { UUID profileId = row.getUUID(JOB_PROFILE_ID_FIELD); if (Objects.nonNull(profileId)) { @@ -470,6 +520,61 @@ private JobProfileInfo mapRowToJobProfileInfo(Row row) { return null; } + private JobExecutionCompositeDetailsDto mapToJobExecutionCompositeDetailsDto(Row row) { + if (row.getColumnIndex(JOB_PROFILE_COMPOSITE_DATA) == -1) { + return null; + } + var compositeData = row.getJsonArray(JOB_PROFILE_COMPOSITE_DATA); + if (Objects.nonNull(compositeData) && !compositeData.isEmpty()) { + var detailsDto = new JobExecutionCompositeDetailsDto(); + compositeData.forEach(o -> { + JsonObject jo = (JsonObject) o; + var status = JobExecutionDto.Status.valueOf(jo.getString(JOB_PROFILE_COMPOSITE_DATA_STATUS)); + var stateDto = new JobExecutionCompositeDetailDto() + .withChunksCount(jo.getInteger("cnt")) + .withTotalRecordsCount(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_TOTAL_RECORDS_COUNT)) + .withCurrentlyProcessedCount(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_CURRENTLY_PROCESSED)); + switch (status) { + case NEW: + detailsDto.setNewState(stateDto); + break; + case FILE_UPLOADED: + detailsDto.setFileUploadedState(stateDto); + break; + case PARSING_IN_PROGRESS: + detailsDto.setNewState(stateDto); + break; + case PARSING_FINISHED: + detailsDto.setParsingFinishedState(stateDto); + break; + case PROCESSING_IN_PROGRESS: + detailsDto.setProcessingInProgressState(stateDto); + break; + case PROCESSING_FINISHED: + detailsDto.setProcessingFinishedState(stateDto); + break; + case COMMIT_IN_PROGRESS: + detailsDto.setCommitInProgressState(stateDto); + break; + case COMMITTED: + detailsDto.setCommittedState(stateDto); + break; + case ERROR: + detailsDto.setErrorState(stateDto); + break; + case DISCARDED: + detailsDto.setDiscardedState(stateDto); + break; + case CANCELLED: + detailsDto.setCancelledState(stateDto); + break; + } + }); + return detailsDto; + } + return null; + } + private String formatFullTableName(String tenantId, String table) { return format("%s.%s", convertToPsqlStandard(tenantId), table); } @@ -517,7 +622,7 @@ public Future hardDeleteJobExecutions(long diffNumberOfDays, String ten return CompositeFuture.all(jobExecutionProgressFuture, jobExecutionSourceChunksFuture, journalRecordsFuture) .compose(ar -> Future.>future(rowSetPromise -> deleteFromJobExecutionTable(uuids, sqlConnection, tenantId, rowSetPromise, postgresClient))) .map(true); - })); + })); } private Future> mapRowsetValuesToListOfString(RowSet rowset) { diff --git a/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionFilter.java b/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionFilter.java index c7366467b..b705bd18a 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionFilter.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionFilter.java @@ -1,6 +1,7 @@ package org.folio.dao; import org.folio.rest.jaxrs.model.JobExecution; +import org.folio.rest.jaxrs.model.JobExecutionDto; import java.text.SimpleDateFormat; import java.util.Date; @@ -16,6 +17,7 @@ import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_HIDDEN_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_ID_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.STATUS_FIELD; +import static org.folio.dao.util.JobExecutionDBConstants.SUBORDINATION_TYPE_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.UI_STATUS_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.USER_ID_FIELD; @@ -30,6 +32,7 @@ public class JobExecutionFilter { private String fileNamePattern; private List fileNameNotAny; private List profileIdAny; + private List subordinationTypeNotAny; private String userId; private Date completedAfter; private Date completedBefore; @@ -74,6 +77,11 @@ public JobExecutionFilter withProfileIdAny(List profileIdAny) { return this; } + public JobExecutionFilter withSubordinationTypeNotAny(List subordinationTypeNotAny) { + this.subordinationTypeNotAny = subordinationTypeNotAny; + return this; + } + public JobExecutionFilter withUserId(String userId) { this.userId = userId; return this; @@ -123,11 +131,15 @@ public String buildCriteria() { } } if (isNotEmpty(fileNameNotAny)) { - addCondition(conditionBuilder, buildNotInCondition(FILE_NAME_FIELD,fileNameNotAny)); + addCondition(conditionBuilder, buildNotInCondition(FILE_NAME_FIELD, fileNameNotAny)); } if (isNotEmpty(profileIdAny)) { addCondition(conditionBuilder, buildInCondition(JOB_PROFILE_ID_FIELD, profileIdAny)); } + if (isNotEmpty(subordinationTypeNotAny)) { + var subordinationTypes = subordinationTypeNotAny.stream().map(JobExecutionDto.SubordinationType::value).collect(Collectors.toList()); + addCondition(conditionBuilder, buildNotInCondition(SUBORDINATION_TYPE_FIELD, subordinationTypes)); + } if (isNotEmpty(userId)) { addCondition(conditionBuilder, buildEqualCondition(USER_ID_FIELD, userId)); } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/dao/util/JobExecutionDBConstants.java b/mod-source-record-manager-server/src/main/java/org/folio/dao/util/JobExecutionDBConstants.java index 980d89403..e5ac63696 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/dao/util/JobExecutionDBConstants.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/dao/util/JobExecutionDBConstants.java @@ -10,6 +10,7 @@ public final class JobExecutionDBConstants { public static final String JOB_PROFILE_NAME_FIELD = "job_profile_name"; public static final String JOB_PROFILE_HIDDEN_FIELD = "job_profile_hidden"; public static final String JOB_PROFILE_DATA_TYPE_FIELD = "job_profile_data_type"; + public static final String JOB_PROFILE_COMPOSITE_DATA = "composite_data"; public static final String PROFILE_SNAPSHOT_WRAPPER_FIELD = "job_profile_snapshot_wrapper"; public static final String SOURCE_PATH_FIELD = "source_path"; public static final String FILE_NAME_FIELD = "file_name"; @@ -29,6 +30,9 @@ public final class JobExecutionDBConstants { public static final String CURRENTLY_PROCESSED_FIELD = "currently_processed"; public static final String TOTAL_FIELD = "total"; public static final String IS_DELETED_FIELD = "is_deleted"; + public static final String JOB_PART_NUMBER = "job_part_number"; + public static final String TOTAL_JOB_PARTS = "total_job_parts"; + public static final String TOTAL_RECORDS_IN_FILE = "total_records_in_file"; public static final String GET_BY_ID_SQL = "SELECT * FROM %s WHERE id = $1 AND is_deleted = false"; public static final String UPDATE_BY_IDS_SQL = "UPDATE ${tenantName}.${tableName} SET ${setFieldName} = ${setFieldValue} WHERE ${setConditionalFieldName} IN ('${setConditionalFieldValues}') RETURNING ${returningFieldNames}"; @@ -37,8 +41,8 @@ public final class JobExecutionDBConstants { "INSERT INTO %s.%s (id, hrid, parent_job_id, subordination_type, source_path, file_name, " + "progress_current, progress_total, started_date, completed_date, status, ui_status, error_status, job_user_first_name, " + "job_user_last_name, user_id, job_profile_id, job_profile_name, job_profile_data_type, job_profile_snapshot_wrapper, " - + "job_profile_hidden) " + - "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)"; + + "job_profile_hidden, job_part_number, total_job_parts, total_records_in_file) " + + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)"; public static final String UPDATE_SQL = "UPDATE %s " + @@ -46,25 +50,36 @@ public final class JobExecutionDBConstants { "progress_current = $7, progress_total = $8, started_date = $9, completed_date = $10, " + "status = $11, ui_status = $12, error_status = $13, job_user_first_name = $14, job_user_last_name = $15, " + "user_id = $16, job_profile_id = $17, job_profile_name = $18, job_profile_data_type = $19, " + - "job_profile_snapshot_wrapper = $20, job_profile_hidden = $21" + - "WHERE id = $1"; + "job_profile_snapshot_wrapper = $20, job_profile_hidden = $21, job_part_number = $22, total_job_parts = $23, " + + "total_records_in_file = $24 WHERE id = $1"; public static final String GET_CHILDREN_JOBS_BY_PARENT_ID_SQL = "WITH cte AS (SELECT count(*) AS total_count FROM %s " + - "WHERE parent_job_id = $1 AND subordination_type = 'CHILD' AND is_deleted = false) " + + "WHERE parent_job_id = $1 AND subordination_type in ('CHILD', 'COMPOSITE_CHILD') AND is_deleted = false) " + "SELECT j.*, cte.*, p.total_records_count total, " + "p.succeeded_records_count + p.error_records_count currently_processed " + "FROM %s j " + "LEFT JOIN %s p ON j.id = p.job_execution_id " + "LEFT JOIN cte ON true " + - "WHERE parent_job_id = $1 AND subordination_type = 'CHILD' AND is_deleted = false " + + "WHERE parent_job_id = $1 AND subordination_type in ('CHILD', 'COMPOSITE_CHILD') AND is_deleted = false " + "LIMIT $2 OFFSET $3"; public static final String GET_JOBS_NOT_PARENT_SQL = "WITH cte AS (SELECT count(*) AS total_count FROM %s " + "WHERE subordination_type <> 'PARENT_MULTIPLE' AND %s) " + "SELECT j.*, cte.*, p.total_records_count total, " + - "p.succeeded_records_count + p.error_records_count currently_processed " + + "p.succeeded_records_count + p.error_records_count currently_processed, " + + "(select jsonb_agg(x) composite_data " + + "from (select status, " + + "count(1) cnt, " + + "sum(p1.total_records_count) total_records_count, " + + "sum(p1.succeeded_records_count + p1.error_records_count) currently_processed " + + "FROM %s j1 " + + "LEFT JOIN %s p1 ON j1.id = p1.job_execution_id " + + "where j1.parent_job_id = j.id " + + "and j1.id != j1.parent_job_id " + + "and j1.subordination_type = 'COMPOSITE_CHILD' " + + "group by status) x) composite_data " + "FROM %s j " + "LEFT JOIN %s p ON j.id = p.job_execution_id " + "LEFT JOIN cte ON true " + diff --git a/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/MetadataProviderImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/MetadataProviderImpl.java index c8b3b9336..21c6a274a 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/MetadataProviderImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/rest/impl/MetadataProviderImpl.java @@ -12,6 +12,7 @@ import org.folio.dao.util.SortField; import org.folio.dataimport.util.ExceptionHelper; import org.folio.rest.jaxrs.model.JobExecution; +import org.folio.rest.jaxrs.model.JobExecutionDto; import org.folio.rest.jaxrs.model.MetadataProviderJobLogEntriesJobExecutionIdGetOrder; import org.folio.rest.jaxrs.model.MetadataProviderJournalRecordsJobExecutionIdGetOrder; import org.folio.rest.jaxrs.model.MetadataProviderJobLogEntriesJobExecutionIdGetEntityType; @@ -60,15 +61,15 @@ public MetadataProviderImpl(Vertx vertx, String tenantId) { //NOSONAR @Override public void getMetadataProviderJobExecutions(List statusAny, List profileIdNotAny, String statusNot, List uiStatusAny, String hrId, String fileName, List fileNameNotAny, - List profileIdAny, String userId, Date completedAfter, Date completedBefore, - List sortBy, String totalRecords, int offset, int limit, - Map okapiHeaders, + List profileIdAny, List subordinationTypeNotAny, String userId, Date completedAfter, + Date completedBefore, List sortBy, String totalRecords, int offset, int limit, Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { vertxContext.runOnContext(v -> { try { LOGGER.debug("getMetadataProviderJobExecutions:: sortBy {}", sortBy); List sortFields = mapSortQueryToSortFields(sortBy); - JobExecutionFilter filter = buildJobExecutionFilter(statusAny, profileIdNotAny, statusNot, uiStatusAny, hrId, fileName, fileNameNotAny, profileIdAny, userId, completedAfter, completedBefore); + JobExecutionFilter filter = buildJobExecutionFilter(statusAny, profileIdNotAny, statusNot, uiStatusAny, hrId, fileName, fileNameNotAny, profileIdAny, + subordinationTypeNotAny, userId, completedAfter, completedBefore); jobExecutionService.getJobExecutionsWithoutParentMultiple(filter, sortFields, offset, limit, tenantId) .map(GetMetadataProviderJobExecutionsResponse::respond200WithApplicationJson) .map(Response.class::cast) @@ -198,8 +199,8 @@ public void getMetadataProviderJobExecutionsUsers(String totalRecords, int offse private JobExecutionFilter buildJobExecutionFilter(List statusAny, List profileIdNotAny, String statusNot, List uiStatusAny, String hrIdPattern, String fileNamePattern, - List fileNameNotAny, List profileIdAny, String userId, - Date completedAfter, Date completedBefore) { + List fileNameNotAny, List profileIdAny, List subordinationTypeNotAny, + String userId, Date completedAfter, Date completedBefore) { List statuses = statusAny.stream() .map(JobExecution.Status::fromValue) .collect(Collectors.toList()); @@ -208,6 +209,10 @@ private JobExecutionFilter buildJobExecutionFilter(List statusAny, List< .map(JobExecution.UiStatus::fromValue) .collect(Collectors.toList()); + var subordinationTypes = subordinationTypeNotAny.stream() + .map(JobExecutionDto.SubordinationType::fromValue) + .collect(Collectors.toList()); + return new JobExecutionFilter() .withStatusAny(statuses) .withProfileIdNotAny(profileIdNotAny) @@ -217,6 +222,7 @@ private JobExecutionFilter buildJobExecutionFilter(List statusAny, List< .withFileNamePattern(fileNamePattern) .withFileNameNotAny(fileNameNotAny) .withProfileIdAny(profileIdAny) + .withSubordinationTypeNotAny(subordinationTypes) .withUserId(userId) .withCompletedAfter(completedAfter) .withCompletedBefore(completedBefore); diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/JobExecutionServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/JobExecutionServiceImpl.java index 7ca9b6158..f9f415e73 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/JobExecutionServiceImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/JobExecutionServiceImpl.java @@ -5,8 +5,8 @@ import io.vertx.core.http.HttpMethod; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.handler.HttpException; - import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.HttpStatus; @@ -25,24 +25,25 @@ import org.folio.rest.jaxrs.model.InitJobExecutionsRqDto; import org.folio.rest.jaxrs.model.InitJobExecutionsRsDto; import org.folio.rest.jaxrs.model.JobExecution; +import org.folio.rest.jaxrs.model.JobExecutionDto; import org.folio.rest.jaxrs.model.JobExecutionDtoCollection; import org.folio.rest.jaxrs.model.JobExecutionUserInfoCollection; import org.folio.rest.jaxrs.model.JobProfile; import org.folio.rest.jaxrs.model.JobProfileInfo; +import org.folio.rest.jaxrs.model.JobProfileInfoCollection; import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper; import org.folio.rest.jaxrs.model.Progress; import org.folio.rest.jaxrs.model.RunBy; import org.folio.rest.jaxrs.model.Snapshot; import org.folio.rest.jaxrs.model.StatusDto; import org.folio.rest.jaxrs.model.UserInfo; -import org.folio.rest.jaxrs.model.JobProfileInfoCollection; +import org.folio.rest.jaxrs.model.JobExecution.SubordinationType; import org.folio.services.exceptions.JobDuplicateUpdateException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.ws.rs.BadRequestException; import javax.ws.rs.NotFoundException; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -78,6 +79,13 @@ public class JobExecutionServiceImpl implements JobExecutionService { private static final String DEFAULT_JOB_PROFILE_ID = "22fafcc3-f582-493d-88b0-3c538480cd83"; private static final String NO_FILE_NAME = "No file name"; + private static final List COMPLETE_STATUSES = Collections.unmodifiableList(Arrays.asList( + JobExecution.UiStatus.CANCELLED, + JobExecution.UiStatus.DISCARDED, + JobExecution.UiStatus.ERROR, + JobExecution.UiStatus.RUNNING_COMPLETE + )); + @Autowired private JobExecutionDao jobExecutionDao; @Autowired @@ -98,7 +106,8 @@ public Future initializeJobExecutions(InitJobExecutionsR LOGGER.warn(errorMessage); return Future.failedFuture(new BadRequestException(errorMessage)); } else { - String parentJobExecutionId = UUID.randomUUID().toString(); + String parentJobId = jobExecutionsRqDto.getParentJobId(); + String parentJobExecutionId = StringUtils.isNotBlank(parentJobId) ? parentJobId : UUID.randomUUID().toString(); return lookupUser(jobExecutionsRqDto.getUserId(), params) .compose(userInfo -> { List jobExecutions = @@ -148,7 +157,8 @@ public Future getJobExecutionCollectionByParentId(Str return jobExecutionDao.getJobExecutionById(parentId, tenantId) .compose(optionalJobExecution -> optionalJobExecution .map(jobExec -> { - if (JobExecution.SubordinationType.PARENT_MULTIPLE.equals(jobExec.getSubordinationType())) { + var subordinationType = jobExec.getSubordinationType(); + if (JobExecution.SubordinationType.PARENT_MULTIPLE == subordinationType || JobExecution.SubordinationType.COMPOSITE_PARENT == subordinationType) { return jobExecutionDao.getChildrenJobExecutionsByParentId(jobExec.getId(), offset, limit, tenantId); } else { return Future.succeededFuture(new JobExecutionDtoCollection().withTotalRecords(0)); @@ -168,26 +178,59 @@ public Future updateJobExecutionStatus(String jobExecutionId, Stat return Future.failedFuture(new BadRequestException(errorMessage)); } else { return jobExecutionDao.updateBlocking(jobExecutionId, jobExecution -> { - Promise promise = Promise.promise(); - try { - if (JobExecution.Status.PARENT.name().equals(jobExecution.getStatus().name())) { - String message = format("JobExecution %s current status is PARENT and cannot be updated", jobExecutionId); - LOGGER.warn(message); - promise.fail(new BadRequestException(message)); - } else { - jobExecution.setStatus(JobExecution.Status.fromValue(status.getStatus().name())); - jobExecution.setUiStatus(JobExecution.UiStatus.fromValue(Status.valueOf(status.getStatus().name()).getUiStatus())); - updateJobExecutionIfErrorExist(status, jobExecution); - promise.complete(jobExecution); + Promise promise = Promise.promise(); + try { + if (JobExecution.Status.PARENT.name().equals(jobExecution.getStatus().name())) { + String message = format("JobExecution %s current status is PARENT and cannot be updated", jobExecutionId); + LOGGER.warn(message); + promise.fail(new BadRequestException(message)); + } else { + jobExecution.setStatus(JobExecution.Status.fromValue(status.getStatus().name())); + jobExecution.setUiStatus(JobExecution.UiStatus.fromValue(Status.valueOf(status.getStatus().name()).getUiStatus())); + updateJobExecutionIfErrorExist(status, jobExecution); + promise.complete(jobExecution); + } + } catch (Exception e) { + String errorMessage = "Error updating JobExecution with id " + jobExecutionId; + LOGGER.warn(errorMessage, e); + promise.fail(errorMessage); } - } catch (Exception e) { - String errorMessage = "Error updating JobExecution with id " + jobExecutionId; - LOGGER.warn(errorMessage, e); - promise.fail(errorMessage); - } - return promise.future(); - }, params.getTenantId()) - .compose(jobExecution -> updateSnapshotStatus(jobExecution, params)); + return promise.future(); + }, params.getTenantId()) + .compose(jobExecution -> updateSnapshotStatus(jobExecution, params)) + .compose(jobExecution -> { + // if this composite child finished, check if all other children are finished + // if so, then mark the composite parent as completed + if (jobExecution.getSubordinationType().equals(SubordinationType.COMPOSITE_CHILD) && + COMPLETE_STATUSES.contains(jobExecution.getUiStatus())) { + return this.getJobExecutionById(jobExecution.getParentJobId(), params.getTenantId()) + .map(v -> v.orElseThrow(() -> new IllegalStateException("Could not find parent job execution"))) + .compose(parentExecution -> + this.getJobExecutionCollectionByParentId(parentExecution.getId(), 0, Integer.MAX_VALUE, params.getTenantId()) + .map(JobExecutionDtoCollection::getJobExecutions) + // ensure all other children are completed + .map(children -> + children.stream() + .filter(child -> child.getSubordinationType().equals(JobExecutionDto.SubordinationType.COMPOSITE_CHILD)) + .map(JobExecutionDto::getUiStatus) + .map(JobExecutionDto.UiStatus::toString) + .map(JobExecution.UiStatus::fromValue) + .allMatch(COMPLETE_STATUSES::contains) + ) + .compose(allChildrenCompleted -> { + if (Boolean.TRUE.equals(allChildrenCompleted)) { + LOGGER.info("All children for job {} have completed!", parentExecution.getId()); + parentExecution.withStatus(JobExecution.Status.COMMITTED) + .withUiStatus(JobExecution.UiStatus.RUNNING_COMPLETE) + .withCompletedDate(new Date()); + return this.updateJobExecutionWithSnapshotStatus(parentExecution, params); + } + return Future.succeededFuture(parentExecution); + }) + ); + } + return Future.succeededFuture(jobExecution); + }); } } @@ -195,16 +238,16 @@ public Future updateJobExecutionStatus(String jobExecutionId, Stat public Future setJobProfileToJobExecution(String jobExecutionId, JobProfileInfo jobProfile, OkapiConnectionParams params) { LOGGER.debug("setJobProfileToJobExecution:: jobExecutionId {}, jobProfileId {}", jobExecutionId, jobProfile.getId()); return loadJobProfileById(jobProfile.getId(), params) - .map(profile-> jobProfile.withName(profile.getName())) - .compose(v-> jobExecutionDao.updateBlocking(jobExecutionId, jobExecution -> { - if (jobExecution.getJobProfileSnapshotWrapper() != null) { - throw new BadRequestException(String.format("JobExecution already associated to JobProfile with id '%s'", jobProfile.getId())); - } - return createJobProfileSnapshotWrapper(jobProfile, params) - .map(profileSnapshotWrapper -> jobExecution - .withJobProfileInfo(jobProfile) - .withJobProfileSnapshotWrapper(profileSnapshotWrapper)); - }, params.getTenantId()) + .map(profile -> jobProfile.withName(profile.getName())) + .compose(v -> jobExecutionDao.updateBlocking(jobExecutionId, jobExecution -> { + if (jobExecution.getJobProfileSnapshotWrapper() != null) { + throw new BadRequestException(String.format("JobExecution already associated to JobProfile with id '%s'", jobProfile.getId())); + } + return createJobProfileSnapshotWrapper(jobProfile, params) + .map(profileSnapshotWrapper -> jobExecution + .withJobProfileInfo(jobProfile) + .withJobProfileSnapshotWrapper(profileSnapshotWrapper)); + }, params.getTenantId()) .recover(throwable -> { StatusDto statusDto = new StatusDto().withStatus(ERROR).withErrorStatus(PROFILE_SNAPSHOT_CREATING_ERROR); return updateJobExecutionStatus(jobExecutionId, statusDto, params) @@ -224,7 +267,7 @@ private Future createJobProfileSnapshotWrapper(JobProfil client.postDataImportProfilesJobProfileSnapshotsById(jobProfile.getId(), response -> { if (response.result().statusCode() == HTTP_CREATED.toInt()) { - promise.handle(Try.itGet(() -> response.result().bodyAsJsonObject().mapTo(ProfileSnapshotWrapper.class))); + promise.handle(Try.itGet(() -> response.result().bodyAsJsonObject().mapTo(ProfileSnapshotWrapper.class))); } else { String message = String.format("Error creating ProfileSnapshotWrapper by JobProfile id '%s', response code %s", jobProfile.getId(), response.result().statusCode()); LOGGER.warn(message); @@ -293,27 +336,48 @@ public Future getRelatedUsersInfo(int offset, in */ private List prepareJobExecutionList(String parentJobExecutionId, List files, UserInfo userInfo, InitJobExecutionsRqDto dto) { String userId = dto.getUserId(); - if (dto.getSourceType().equals(InitJobExecutionsRqDto.SourceType.ONLINE)) { - JobProfileInfo jobProfileInfo = dto.getJobProfileInfo(); - if (jobProfileInfo != null && jobProfileInfo.getId().equals(DEFAULT_JOB_PROFILE_ID)) { - jobProfileInfo.withName(DEFAULT_JOB_PROFILE); + var sourceType = dto.getSourceType(); + var runBy = buildRunByFromUserInfo(userInfo); + switch (sourceType) { + case ONLINE: { + JobProfileInfo jobProfileInfo = dto.getJobProfileInfo(); + if (jobProfileInfo != null && jobProfileInfo.getId().equals(DEFAULT_JOB_PROFILE_ID)) { + jobProfileInfo.withName(DEFAULT_JOB_PROFILE); + } + return Collections.singletonList(buildNewJobExecution(true, true, false, parentJobExecutionId, NO_FILE_NAME, userId) + .withJobProfileInfo(jobProfileInfo) + .withRunBy(runBy)); } - return Collections.singletonList(buildNewJobExecution(true, true, parentJobExecutionId, NO_FILE_NAME, userId) - .withJobProfileInfo(jobProfileInfo) - .withRunBy(buildRunByFromUserInfo(userInfo))); - } - List result = new ArrayList<>(); - if (files.size() > 1) { - for (File file : files) { - result.add(buildNewJobExecution(false, false, parentJobExecutionId, file.getName(), userId)); + + case COMPOSITE: { + var parentJobId = dto.getParentJobId(); + var isParent = StringUtils.isBlank(parentJobId); + File file = files.get(0); + var jobExecution = buildNewJobExecution(isParent, true, true, parentJobExecutionId, file.getName(), userId) + .withJobPartNumber(dto.getJobPartNumber()) + .withTotalJobParts(dto.getTotalJobParts()) + .withRunBy(runBy); + + return Collections.singletonList(jobExecution); } - result.add(buildNewJobExecution(true, false, parentJobExecutionId, null, userId)); - } else { - File file = files.get(0); - result.add(buildNewJobExecution(true, true, parentJobExecutionId, file.getName(), userId)); + + case FILES: { + List result = new ArrayList<>(); + if (files.size() > 1) { + for (File file : files) { + result.add(buildNewJobExecution(false, false, false, parentJobExecutionId, file.getName(), userId).withRunBy(runBy)); + } + result.add(buildNewJobExecution(true, false, false, parentJobExecutionId, null, userId).withRunBy(runBy)); + } else { + File file = files.get(0); + result.add(buildNewJobExecution(true, true, false, parentJobExecutionId, file.getName(), userId).withRunBy(runBy)); + } +// result.forEach(job -> job.setRunBy(runBy)); + return result; + } + default: + throw new IllegalArgumentException("InitJobExecutionsRqDto.getSourceType() can not be null"); } - result.forEach(job -> job.setRunBy(buildRunByFromUserInfo(userInfo))); - return result; } private RunBy buildRunByFromUserInfo(UserInfo info) { @@ -356,7 +420,7 @@ private Future lookupUser(String userId, OkapiConnectionParams params) String userName = jsonUser.getString("username"); UserInfo userInfo = new UserInfo() .withFirstName(Objects.isNull(userPersonalInfo) - ? userName : userPersonalInfo.getString("firstName")) + ? userName : userPersonalInfo.getString("firstName")) .withLastName(Objects.isNull(userPersonalInfo) ? DEFAULT_LASTNAME : userPersonalInfo.getString("lastName")) .withUserName(userName); @@ -371,7 +435,7 @@ private Future lookupUser(String userId, OkapiConnectionParams params) /** * Create new JobExecution object and fill fields */ - private JobExecution buildNewJobExecution(boolean isParent, boolean isSingle, String parentJobExecutionId, String fileName, String userId) { + private JobExecution buildNewJobExecution(boolean isParent, boolean isSingle, boolean isComposite, String parentJobExecutionId, String fileName, String userId) { LOGGER.debug("buildNewJobExecution:: parentJobExecutionId {}, fileName {}, userId {}", parentJobExecutionId, fileName, userId); JobExecution job = new JobExecution() .withId(isParent ? parentJobExecutionId : UUID.randomUUID().toString()) @@ -384,15 +448,17 @@ private JobExecution buildNewJobExecution(boolean isParent, boolean isSingle, St .withUserId(userId) .withStartedDate(new Date()); if (!isParent) { - job.withSubordinationType(JobExecution.SubordinationType.CHILD) + job.withSubordinationType(isComposite ? JobExecution.SubordinationType.COMPOSITE_CHILD : JobExecution.SubordinationType.CHILD) .withStatus(JobExecution.Status.NEW) .withUiStatus(JobExecution.UiStatus.valueOf(Status.valueOf(JobExecution.Status.NEW.value()).getUiStatus())); } else { - job.withSubordinationType(isSingle ? JobExecution.SubordinationType.PARENT_SINGLE : JobExecution.SubordinationType.PARENT_MULTIPLE) + job.withSubordinationType(isComposite ? + JobExecution.SubordinationType.COMPOSITE_PARENT : + isSingle ? JobExecution.SubordinationType.PARENT_SINGLE : JobExecution.SubordinationType.PARENT_MULTIPLE) .withStatus(isSingle ? JobExecution.Status.NEW : JobExecution.Status.PARENT) .withUiStatus(isSingle ? - JobExecution.UiStatus.valueOf(Status.valueOf(JobExecution.Status.NEW.value()).getUiStatus()) - : JobExecution.UiStatus.valueOf(Status.valueOf(JobExecution.Status.PARENT.value()).getUiStatus())); + JobExecution.UiStatus.valueOf(Status.valueOf(JobExecution.Status.NEW.value()).getUiStatus()) : + JobExecution.UiStatus.valueOf(Status.valueOf(JobExecution.Status.PARENT.value()).getUiStatus())); } return job; } @@ -491,6 +557,12 @@ private Future updateSnapshotStatus(JobExecution jobExecution, Oka if (response.result().statusCode() == HttpStatus.HTTP_OK.toInt()) { promise.complete(jobExecution); } else { + LOGGER.warn( + "Update snapshot status failed for jobExecution with id {}. Response code={}", + jobExecution.getId(), + response.result().statusCode() + ); + LOGGER.warn("Response: {}", response.result().bodyAsString()); jobExecutionDao.updateBlocking(jobExecution.getId(), jobExec -> { Promise jobExecutionPromise = Promise.promise(); jobExec.setErrorStatus(JobExecution.ErrorStatus.SNAPSHOT_UPDATE_ERROR); @@ -515,7 +587,7 @@ private Future updateSnapshotStatus(JobExecution jobExecution, Oka private JobExecution verifyJobExecution(JobExecution jobExecution) { if (jobExecution.getStatus() == JobExecution.Status.ERROR || jobExecution.getStatus() == COMMITTED - || jobExecution.getStatus() == JobExecution.Status.CANCELLED) { + || jobExecution.getStatus() == JobExecution.Status.CANCELLED) { String msg = String.format("JobExecution with status '%s' cannot be forcibly completed", jobExecution.getStatus()); LOGGER.info(msg); throw new JobDuplicateUpdateException(msg); @@ -572,8 +644,7 @@ private void updateJobExecutionIfErrorExist(StatusDto status, JobExecution jobEx if (jobExecution.getErrorStatus().equals(JobExecution.ErrorStatus.FILE_PROCESSING_ERROR)) { jobExecution.setProgress(jobExecution.getProgress().withTotal(0)); } - } - else if (status.getStatus() == CANCELLED) { + } else if (status.getStatus() == CANCELLED) { jobExecution.setCompletedDate(new Date()); } } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/RecordProcessedEventHandlingServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/RecordProcessedEventHandlingServiceImpl.java index 8a4e73412..a64673f75 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/RecordProcessedEventHandlingServiceImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/RecordProcessedEventHandlingServiceImpl.java @@ -11,14 +11,17 @@ import org.folio.dataimport.util.OkapiConnectionParams; import org.folio.rest.jaxrs.model.DataImportEventTypes; import org.folio.rest.jaxrs.model.JobExecution; +import org.folio.rest.jaxrs.model.JobExecutionDto; +import org.folio.rest.jaxrs.model.JobExecutionDtoCollection; import org.folio.rest.jaxrs.model.JobExecutionProgress; import org.folio.rest.jaxrs.model.Progress; import org.folio.rest.jaxrs.model.StatusDto; +import org.folio.rest.jaxrs.model.JobExecution.SubordinationType; import org.folio.services.progress.JobExecutionProgressService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.IOException; +import java.util.Arrays; import java.util.Date; import static java.lang.String.format; @@ -121,6 +124,41 @@ private Future updateJobExecutionIfAllRecordsProcessed(String jobExecut .withTotal(progress.getTotal())); return jobExecutionService.updateJobExecutionWithSnapshotStatus(jobExecution, params) + .compose(updatedExecution -> { + if (updatedExecution.getSubordinationType().equals(SubordinationType.COMPOSITE_CHILD)) { + + return jobExecutionService.getJobExecutionById(updatedExecution.getParentJobId(), params.getTenantId()) + .map(v -> v.orElseThrow(() -> new IllegalStateException("Could not find parent job execution"))) + .compose(parentExecution -> + jobExecutionService.getJobExecutionCollectionByParentId(parentExecution.getId(), 0, Integer.MAX_VALUE, params.getTenantId()) + .map(JobExecutionDtoCollection::getJobExecutions) + .map(children -> + children.stream() + .filter(child -> child.getSubordinationType().equals(JobExecutionDto.SubordinationType.COMPOSITE_CHILD)) + .allMatch(child -> + Arrays.asList( + JobExecutionDto.UiStatus.RUNNING_COMPLETE, + JobExecutionDto.UiStatus.CANCELLED, + JobExecutionDto.UiStatus.ERROR, + JobExecutionDto.UiStatus.DISCARDED + ).contains(child.getUiStatus()) + ) + ) + .compose(allChildrenCompleted -> { + if (Boolean.TRUE.equals(allChildrenCompleted)) { + LOGGER.info("All children for job {} have completed!", parentExecution.getId()); + parentExecution.withStatus(JobExecution.Status.COMMITTED) + .withUiStatus(JobExecution.UiStatus.RUNNING_COMPLETE) + .withCompletedDate(new Date()); + return jobExecutionService.updateJobExecutionWithSnapshotStatus(parentExecution, params); + } + return Future.succeededFuture(parentExecution); + }) + ); + } else { + return Future.succeededFuture(updatedExecution); + } + }) .map(true); }) .orElse(Future.failedFuture(format("Couldn't find JobExecution for update status and progress with id '%s'", jobExecutionId)))); diff --git a/mod-source-record-manager-server/src/main/resources/templates/db_scripts/alter_job_executions_table_for_chunk_support.sql b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/alter_job_executions_table_for_chunk_support.sql new file mode 100644 index 000000000..d1e41a5bb --- /dev/null +++ b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/alter_job_executions_table_for_chunk_support.sql @@ -0,0 +1,11 @@ +alter type job_execution_subordination_type + add value if not exists 'COMPOSITE_PARENT'; + +alter type job_execution_subordination_type + add value if not exists 'COMPOSITE_CHILD'; + +alter table job_execution + add column if not exists job_part_number integer default 1; + +alter table job_execution + add column if not exists total_job_parts integer default 1; diff --git a/mod-source-record-manager-server/src/main/resources/templates/db_scripts/alter_job_executions_table_for_total_records.sql b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/alter_job_executions_table_for_total_records.sql new file mode 100644 index 000000000..bc8eaa912 --- /dev/null +++ b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/alter_job_executions_table_for_total_records.sql @@ -0,0 +1,2 @@ +alter table job_execution + add column if not exists total_records_in_file integer; diff --git a/mod-source-record-manager-server/src/main/resources/templates/db_scripts/schema.json b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/schema.json index bc7f0db9c..30b052c43 100644 --- a/mod-source-record-manager-server/src/main/resources/templates/db_scripts/schema.json +++ b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/schema.json @@ -261,6 +261,16 @@ "run": "after", "snippetPath": "create_get_job_log_entries_function.sql", "fromModuleVersion": "mod-source-record-manager-3.7.0" + }, + { + "run": "after", + "snippetPath": "alter_job_executions_table_for_chunk_support.sql", + "fromModuleVersion": "mod-source-record-manager-3.7.0" + }, + { + "run": "after", + "snippetPath": "alter_job_executions_table_for_total_records.sql", + "fromModuleVersion": "mod-source-record-manager-3.7.0" } ] } diff --git a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java index f575341d2..a8bb46d95 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java @@ -61,6 +61,7 @@ import java.io.IOException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -533,6 +534,41 @@ protected InitJobExecutionsRsDto constructAndPostInitJobExecutionRqDto(int files .when().post(JOB_EXECUTION_PATH).body().as(InitJobExecutionsRsDto.class); } + /** Returns list of [parent, child1, ..., childN] */ + protected List constructAndPostCompositeInitJobExecutionRqDto(String filename, int children) { + InitJobExecutionsRqDto requestDto = new InitJobExecutionsRqDto(); + requestDto.getFiles().add(new File().withName(filename)); + requestDto.setUserId(okapiUserIdHeader); + requestDto.setSourceType(InitJobExecutionsRqDto.SourceType.COMPOSITE); + + JobExecution parent = RestAssured.given() + .spec(spec) + .body(JsonObject.mapFrom(requestDto).toString()) + .when().post(JOB_EXECUTION_PATH).body().as(InitJobExecutionsRsDto.class).getJobExecutions().get(0); + + List result = new ArrayList<>(); + result.add(parent); + + for (int i = 1; i <= children; i++) { + InitJobExecutionsRqDto childRequestDto = new InitJobExecutionsRqDto(); + childRequestDto.getFiles().add(new File().withName(filename + i)); + childRequestDto.setUserId(okapiUserIdHeader); + childRequestDto.setSourceType(InitJobExecutionsRqDto.SourceType.COMPOSITE); + childRequestDto.setParentJobId(parent.getId()); + childRequestDto.setJobPartNumber(i); + childRequestDto.setTotalJobParts(children); + + result.add( + RestAssured.given() + .spec(spec) + .body(JsonObject.mapFrom(childRequestDto).toString()) + .when().post(JOB_EXECUTION_PATH).body().as(InitJobExecutionsRsDto.class).getJobExecutions().get(0) + ); + } + + return result; + } + protected io.restassured.response.Response updateJobExecutionStatus(JobExecution jobExecution, StatusDto statusDto) { return RestAssured.given() .spec(spec) diff --git a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerAPITest.java b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerAPITest.java index c2740e193..77be0ad6b 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerAPITest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerAPITest.java @@ -14,6 +14,7 @@ import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import net.mguenther.kafka.junit.ObserveKeyValues; + import org.apache.http.HttpStatus; import org.folio.MatchProfile; import org.folio.TestUtil; @@ -87,6 +88,7 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; @@ -656,6 +658,126 @@ public void shouldUpdateStatusOfChild() { .body("uiStatus", is(Status.valueOf(status.getStatus().name()).getUiStatus())); } + @Test + public void shouldUpdateStatusOfCompositeParent() { + List executions = constructAndPostCompositeInitJobExecutionRqDto("test", 1); + assertThat(executions.size(), is(2)); + JobExecution parent = executions.get(0); + JobExecution child = executions.get(1); + + // update child to a non-completion state; should NOT update parent + StatusDto progressStatus = new StatusDto().withStatus(StatusDto.Status.PARSING_IN_PROGRESS); + RestAssured.given() + .spec(spec) + .body(JsonObject.mapFrom(progressStatus).toString()) + .when() + .put(JOB_EXECUTION_PATH + child.getId() + STATUS_PATH) + .then() + .statusCode(HttpStatus.SC_OK); + + RestAssured.given() + .spec(spec) + .when() + .get(JOB_EXECUTION_PATH + parent.getId()) + .then() + .statusCode(HttpStatus.SC_OK) + .body("status", is(not(JobExecution.Status.COMMITTED))) + .body("uiStatus", is(not(JobExecution.UiStatus.RUNNING_COMPLETE))); + + // update child to a completion state; should now update parent + StatusDto completeStatus = new StatusDto().withStatus(StatusDto.Status.COMMITTED); + RestAssured.given() + .spec(spec) + .body(JsonObject.mapFrom(completeStatus).toString()) + .when() + .put(JOB_EXECUTION_PATH + child.getId() + STATUS_PATH) + .then() + .statusCode(HttpStatus.SC_OK); + + RestAssured.given() + .spec(spec) + .when() + .get(JOB_EXECUTION_PATH + parent.getId()) + .then() + .statusCode(HttpStatus.SC_OK) + .body("status", is(JobExecution.Status.COMMITTED.toString())) + .body("uiStatus", is(JobExecution.UiStatus.RUNNING_COMPLETE.toString())); + } + + @Test + public void shouldUpdateStatusOfCompositeParentMultipleChildren() { + List executions = constructAndPostCompositeInitJobExecutionRqDto("test", 2); + assertThat(executions.size(), is(3)); + JobExecution parent = executions.get(0); + JobExecution child1 = executions.get(1); + JobExecution child2 = executions.get(2); + + // complete child1; parent should NOT update yet though, since child 2 is not completed + StatusDto completeStatus = new StatusDto().withStatus(StatusDto.Status.COMMITTED); + RestAssured.given() + .spec(spec) + .body(JsonObject.mapFrom(completeStatus).toString()) + .when() + .put(JOB_EXECUTION_PATH + child1.getId() + STATUS_PATH) + .then() + .statusCode(HttpStatus.SC_OK); + + RestAssured.given() + .spec(spec) + .when() + .get(JOB_EXECUTION_PATH + parent.getId()) + .then() + .statusCode(HttpStatus.SC_OK) + .body("status", is(not(JobExecution.Status.COMMITTED.toString()))) + .body("uiStatus", is(not(JobExecution.UiStatus.RUNNING_COMPLETE.toString()))); + + // after this, both children are completed so the parent should complete + RestAssured.given() + .spec(spec) + .body(JsonObject.mapFrom(completeStatus).toString()) + .when() + .put(JOB_EXECUTION_PATH + child2.getId() + STATUS_PATH) + .then() + .statusCode(HttpStatus.SC_OK); + + RestAssured.given() + .spec(spec) + .when() + .get(JOB_EXECUTION_PATH + parent.getId()) + .then() + .statusCode(HttpStatus.SC_OK) + .body("status", is(JobExecution.Status.COMMITTED.toString())) + .body("uiStatus", is(JobExecution.UiStatus.RUNNING_COMPLETE.toString())); + } + + @Test + public void shouldNotUpdateStatusOfCompositeUnknownParent() { + InitJobExecutionsRqDto childRequestDto = new InitJobExecutionsRqDto(); + childRequestDto.getFiles().add(new File().withName("test")); + childRequestDto.setUserId(okapiUserIdHeader); + childRequestDto.setSourceType(InitJobExecutionsRqDto.SourceType.COMPOSITE); + childRequestDto.setParentJobId("aaaaaaaa-bbbb-1ccc-8ddd-eeeeeeeeeeee"); + childRequestDto.setJobPartNumber(1); + childRequestDto.setTotalJobParts(1); + + JobExecution child = + RestAssured.given() + .spec(spec) + .body(JsonObject.mapFrom(childRequestDto).toString()) + .when().post(JOB_EXECUTION_PATH).body().as(InitJobExecutionsRsDto.class).getJobExecutions().get(0); + + // update child to a completion state; should now attempt to update parent + // the parent does not exist, though, so it should throw a 500 + StatusDto completeStatus = new StatusDto().withStatus(StatusDto.Status.COMMITTED); + RestAssured.given() + .spec(spec) + .body(JsonObject.mapFrom(completeStatus).toString()) + .when() + .put(JOB_EXECUTION_PATH + child.getId() + STATUS_PATH) + .then() + .statusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR); + } + @Test public void shouldSetCompletedDateToJobExecutionOnUpdateStatusToError() { InitJobExecutionsRsDto response = @@ -2482,5 +2604,4 @@ public void shouldNotUpdateJobExecutionOfDeletedRecords(){ .then() .statusCode(HttpStatus.SC_NOT_FOUND); } - } diff --git a/ramls/metadata-provider.raml b/ramls/metadata-provider.raml index 0180ee390..370ab6500 100644 --- a/ramls/metadata-provider.raml +++ b/ramls/metadata-provider.raml @@ -80,6 +80,11 @@ resourceTypes: type: string[] example: ["d0ebb7b0-2f0f-11eb-adc1-0242ac120002", "91f9b8d6-d80e-4727-9783-73fb53e3c786"] required: false + subordinationTypeNotAny: + description: Filter by specified SubordinationTypes + type: string[] + example: ["COMPOSITE_CHILD"] + required: false userId: description: Filter by user id type: string