Skip to content

Commit

Permalink
DITF changes
Browse files Browse the repository at this point in the history
Co-authored-by: Taras_Spashchenko <[email protected]>
  • Loading branch information
ncovercash and TarasSpashchenko committed Oct 2, 2023
1 parent 4396064 commit 320d159
Show file tree
Hide file tree
Showing 12 changed files with 517 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
Expand All @@ -19,8 +20,11 @@
import org.folio.dao.util.SortField;
import org.folio.rest.jaxrs.model.DeleteJobExecutionsResp;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionCompositeDetailDto;
import org.folio.rest.jaxrs.model.JobExecutionCompositeDetailsDto;
import org.folio.rest.jaxrs.model.JobExecutionDetail;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDto.SubordinationType;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
import org.folio.rest.jaxrs.model.JobExecutionUserInfo;
import org.folio.rest.jaxrs.model.JobExecutionUserInfoCollection;
Expand Down Expand Up @@ -65,6 +69,8 @@
import static org.folio.dao.util.JobExecutionDBConstants.HRID_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.ID_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.INSERT_SQL;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PART_NUMBER;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_COMPOSITE_DATA;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_DATA_TYPE_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_HIDDEN_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_ID_FIELD;
Expand All @@ -82,6 +88,8 @@
import static org.folio.dao.util.JobExecutionDBConstants.SUBORDINATION_TYPE_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_COUNT_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_JOB_PARTS;
import static org.folio.dao.util.JobExecutionDBConstants.TOTAL_RECORDS_IN_FILE;
import static org.folio.dao.util.JobExecutionDBConstants.UI_STATUS_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.UPDATE_BY_IDS_SQL;
import static org.folio.dao.util.JobExecutionDBConstants.UPDATE_PROGRESS_SQL;
Expand Down Expand Up @@ -125,6 +133,9 @@ public class JobExecutionDaoImpl implements JobExecutionDao {
public static final String DELETE_FROM_JOB_EXECUTION_TABLE = "DELETE from %s.%s where id = ANY ($1)";
public static final String JOB_EXECUTION_SOURCE_CHUNKS_TABLE_NAME = "job_execution_source_chunks";
public static final String JOURNAL_RECORDS_TABLE_NAME = "journal_records";
public static final String JOB_PROFILE_COMPOSITE_DATA_STATUS = "status";
public static final String JOB_PROFILE_COMPOSITE_DATA_TOTAL_RECORDS_COUNT = "total_records_count";
public static final String JOB_PROFILE_COMPOSITE_DATA_CURRENTLY_PROCESSED = "currently_processed";

@Autowired
private PostgresClientFactory pgClientFactory;
Expand All @@ -137,7 +148,8 @@ public Future<JobExecutionDtoCollection> getJobExecutionsWithoutParentMultiple(J
String orderByClause = buildOrderByClause(sortFields);
String jobTable = formatFullTableName(tenantId, TABLE_NAME);
String progressTable = formatFullTableName(tenantId, PROGRESS_TABLE_NAME);
String query = format(GET_JOBS_NOT_PARENT_SQL, jobTable, filterCriteria, jobTable, progressTable, filterCriteria, orderByClause);
String query = format(GET_JOBS_NOT_PARENT_SQL, jobTable, filterCriteria, jobTable, progressTable, jobTable, progressTable, filterCriteria, orderByClause);
// LOGGER.warn("------> query: " + query);
pgClientFactory.createInstance(tenantId).selectRead(query, Tuple.of(limit, offset), promise);
} catch (Exception e) {
LOGGER.warn("getJobExecutionsWithoutParentMultiple:: Error while getting Logs", e);
Expand Down Expand Up @@ -285,7 +297,7 @@ public Future<DeleteJobExecutionsResp> softDeleteJobExecutionsByIds(List<String>
try {
Map<String, String> data = new HashMap<>();
data.put(TENANT_NAME, convertToPsqlStandard(tenantId));
data.put(DB_TABLE_NAME_FIELD,TABLE_NAME);
data.put(DB_TABLE_NAME_FIELD, TABLE_NAME);
data.put(SET_FIELD_NAME, IS_DELETED);
data.put(SET_FIELD_VALUE, TRUE);
data.put(SET_CONDITIONAL_FIELD_NAME, ID);
Expand All @@ -299,6 +311,7 @@ public Future<DeleteJobExecutionsResp> softDeleteJobExecutionsByIds(List<String>
}
return promise.future().map(this::mapRowSetToDeleteChangeManagerJobExeResp);
}

@Override
public Future<JobExecutionUserInfoCollection> getRelatedUsersInfo(int offset, int limit, String tenantId) {
Promise<RowSet<Row>> promise = Promise.promise();
Expand Down Expand Up @@ -330,7 +343,7 @@ private JobExecutionUserInfo mapRowToJobExecutionUserInfoDto(Row row) {
return jobExecutionUserInfo;
}

private DeleteJobExecutionsResp mapRowSetToDeleteChangeManagerJobExeResp(RowSet<Row> rowSet){
private DeleteJobExecutionsResp mapRowSetToDeleteChangeManagerJobExeResp(RowSet<Row> rowSet) {
DeleteJobExecutionsResp deleteJobExecutionsResp = new DeleteJobExecutionsResp();
List<JobExecutionDetail> jobExecutionDetails = new ArrayList<>();
rowSet.forEach(row -> {
Expand Down Expand Up @@ -366,7 +379,10 @@ private Tuple mapToTuple(JobExecution jobExecution) {
? jobExecution.getJobProfileInfo().getDataType().toString() : null,
jobExecution.getJobProfileSnapshotWrapper() == null
? null : JsonObject.mapFrom(jobExecution.getJobProfileSnapshotWrapper()),
jobExecution.getJobProfileInfo() != null && jobExecution.getJobProfileInfo().getHidden());
jobExecution.getJobProfileInfo() != null && jobExecution.getJobProfileInfo().getHidden(),
jobExecution.getJobPartNumber(),
jobExecution.getTotalJobParts(),
jobExecution.getTotalRecordsInFile());
}

private JobExecutionDtoCollection mapToJobExecutionDtoCollection(RowSet<Row> rowSet) {
Expand Down Expand Up @@ -411,7 +427,10 @@ private JobExecution mapRowToJobExecution(Row row) {
.withTotal(row.getInteger(PROGRESS_TOTAL_FIELD)))
.withJobProfileInfo(mapRowToJobProfileInfo(row))
.withJobProfileSnapshotWrapper(row.getJsonObject(PROFILE_SNAPSHOT_WRAPPER_FIELD) == null
? null : row.getJsonObject(PROFILE_SNAPSHOT_WRAPPER_FIELD).mapTo(ProfileSnapshotWrapper.class));
? null : row.getJsonObject(PROFILE_SNAPSHOT_WRAPPER_FIELD).mapTo(ProfileSnapshotWrapper.class))
.withJobPartNumber(row.getInteger(JOB_PART_NUMBER))
.withTotalJobParts(row.getInteger(TOTAL_JOB_PARTS))
.withTotalRecordsInFile(row.getInteger(TOTAL_RECORDS_IN_FILE));
}

private JobExecutionDto mapRowToJobExecutionDto(Row row) {
Expand All @@ -433,7 +452,11 @@ private JobExecutionDto mapRowToJobExecutionDto(Row row) {
.withLastName(row.getString(JOB_USER_LAST_NAME_FIELD)))
.withUserId(row.getValue(USER_ID_FIELD).toString())
.withProgress(mapRowToProgress(row))
.withJobProfileInfo(mapRowToJobProfileInfo(row));
.withJobProfileInfo(mapRowToJobProfileInfo(row))
.withJobPartNumber(row.getInteger(JOB_PART_NUMBER))
.withTotalJobParts(row.getInteger(TOTAL_JOB_PARTS))
.withTotalRecordsInFile(row.getInteger(TOTAL_RECORDS_IN_FILE))
.withCompositeDetails(mapToJobExecutionCompositeDetailsDto(row));
}

private Date mapRowToCompletedDate(Row row) {
Expand All @@ -442,6 +465,10 @@ private Date mapRowToCompletedDate(Row row) {
}

private Progress mapRowToProgress(Row row) {
if (row.get(JobExecutionDto.SubordinationType.class, SUBORDINATION_TYPE_FIELD).equals(SubordinationType.COMPOSITE_PARENT)) {
return mapRowToProgressComposite(row);
}

Integer processedCount = row.getInteger(CURRENTLY_PROCESSED_FIELD);
Integer total = row.getInteger(TOTAL_FIELD);
if (processedCount == null) {
Expand All @@ -457,6 +484,29 @@ private Progress mapRowToProgress(Row row) {
.withTotal(total);
}

private Progress mapRowToProgressComposite(Row row) {
JsonArray compositeData = row.getJsonArray(JOB_PROFILE_COMPOSITE_DATA);

if (Objects.nonNull(compositeData) && !compositeData.isEmpty()) {
Progress progressDto = new Progress()
.withJobExecutionId(row.getValue(ID_FIELD).toString());

int processed = 0;
int total = 0;

for (Object o : compositeData) {
JsonObject jo = (JsonObject) o;

processed += Optional.ofNullable(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_CURRENTLY_PROCESSED)).orElse(0);
total += Optional.ofNullable(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_TOTAL_RECORDS_COUNT)).orElse(0);
}

return progressDto.withCurrent(processed).withTotal(total);
}

return null;
}

private JobProfileInfo mapRowToJobProfileInfo(Row row) {
UUID profileId = row.getUUID(JOB_PROFILE_ID_FIELD);
if (Objects.nonNull(profileId)) {
Expand All @@ -470,6 +520,61 @@ private JobProfileInfo mapRowToJobProfileInfo(Row row) {
return null;
}

private JobExecutionCompositeDetailsDto mapToJobExecutionCompositeDetailsDto(Row row) {
if (row.getColumnIndex(JOB_PROFILE_COMPOSITE_DATA) == -1) {
return null;
}
var compositeData = row.getJsonArray(JOB_PROFILE_COMPOSITE_DATA);
if (Objects.nonNull(compositeData) && !compositeData.isEmpty()) {
var detailsDto = new JobExecutionCompositeDetailsDto();
compositeData.forEach(o -> {
JsonObject jo = (JsonObject) o;
var status = JobExecutionDto.Status.valueOf(jo.getString(JOB_PROFILE_COMPOSITE_DATA_STATUS));
var stateDto = new JobExecutionCompositeDetailDto()
.withChunksCount(jo.getInteger("cnt"))
.withTotalRecordsCount(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_TOTAL_RECORDS_COUNT))
.withCurrentlyProcessedCount(jo.getInteger(JOB_PROFILE_COMPOSITE_DATA_CURRENTLY_PROCESSED));
switch (status) {
case NEW:
detailsDto.setNewState(stateDto);
break;
case FILE_UPLOADED:
detailsDto.setFileUploadedState(stateDto);
break;
case PARSING_IN_PROGRESS:
detailsDto.setNewState(stateDto);
break;
case PARSING_FINISHED:
detailsDto.setParsingFinishedState(stateDto);
break;
case PROCESSING_IN_PROGRESS:
detailsDto.setProcessingInProgressState(stateDto);
break;
case PROCESSING_FINISHED:
detailsDto.setProcessingFinishedState(stateDto);
break;
case COMMIT_IN_PROGRESS:
detailsDto.setCommitInProgressState(stateDto);
break;
case COMMITTED:
detailsDto.setCommittedState(stateDto);
break;
case ERROR:
detailsDto.setErrorState(stateDto);
break;
case DISCARDED:
detailsDto.setDiscardedState(stateDto);
break;
case CANCELLED:
detailsDto.setCancelledState(stateDto);
break;
}
});
return detailsDto;
}
return null;
}

private String formatFullTableName(String tenantId, String table) {
return format("%s.%s", convertToPsqlStandard(tenantId), table);
}
Expand Down Expand Up @@ -517,7 +622,7 @@ public Future<Boolean> hardDeleteJobExecutions(long diffNumberOfDays, String ten
return CompositeFuture.all(jobExecutionProgressFuture, jobExecutionSourceChunksFuture, journalRecordsFuture)
.compose(ar -> Future.<RowSet<Row>>future(rowSetPromise -> deleteFromJobExecutionTable(uuids, sqlConnection, tenantId, rowSetPromise, postgresClient)))
.map(true);
}));
}));
}

private Future<List<String>> mapRowsetValuesToListOfString(RowSet<Row> rowset) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.dao;

import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;

import java.text.SimpleDateFormat;
import java.util.Date;
Expand All @@ -16,6 +17,7 @@
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_HIDDEN_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.JOB_PROFILE_ID_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.STATUS_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.SUBORDINATION_TYPE_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.UI_STATUS_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.USER_ID_FIELD;

Expand All @@ -30,6 +32,7 @@ public class JobExecutionFilter {
private String fileNamePattern;
private List<String> fileNameNotAny;
private List<String> profileIdAny;
private List<JobExecutionDto.SubordinationType> subordinationTypeNotAny;
private String userId;
private Date completedAfter;
private Date completedBefore;
Expand Down Expand Up @@ -74,6 +77,11 @@ public JobExecutionFilter withProfileIdAny(List<String> profileIdAny) {
return this;
}

public JobExecutionFilter withSubordinationTypeNotAny(List<JobExecutionDto.SubordinationType> subordinationTypeNotAny) {
this.subordinationTypeNotAny = subordinationTypeNotAny;
return this;
}

public JobExecutionFilter withUserId(String userId) {
this.userId = userId;
return this;
Expand Down Expand Up @@ -123,11 +131,15 @@ public String buildCriteria() {
}
}
if (isNotEmpty(fileNameNotAny)) {
addCondition(conditionBuilder, buildNotInCondition(FILE_NAME_FIELD,fileNameNotAny));
addCondition(conditionBuilder, buildNotInCondition(FILE_NAME_FIELD, fileNameNotAny));
}
if (isNotEmpty(profileIdAny)) {
addCondition(conditionBuilder, buildInCondition(JOB_PROFILE_ID_FIELD, profileIdAny));
}
if (isNotEmpty(subordinationTypeNotAny)) {
var subordinationTypes = subordinationTypeNotAny.stream().map(JobExecutionDto.SubordinationType::value).collect(Collectors.toList());
addCondition(conditionBuilder, buildNotInCondition(SUBORDINATION_TYPE_FIELD, subordinationTypes));
}
if (isNotEmpty(userId)) {
addCondition(conditionBuilder, buildEqualCondition(USER_ID_FIELD, userId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public final class JobExecutionDBConstants {
public static final String JOB_PROFILE_NAME_FIELD = "job_profile_name";
public static final String JOB_PROFILE_HIDDEN_FIELD = "job_profile_hidden";
public static final String JOB_PROFILE_DATA_TYPE_FIELD = "job_profile_data_type";
public static final String JOB_PROFILE_COMPOSITE_DATA = "composite_data";
public static final String PROFILE_SNAPSHOT_WRAPPER_FIELD = "job_profile_snapshot_wrapper";
public static final String SOURCE_PATH_FIELD = "source_path";
public static final String FILE_NAME_FIELD = "file_name";
Expand All @@ -29,6 +30,9 @@ public final class JobExecutionDBConstants {
public static final String CURRENTLY_PROCESSED_FIELD = "currently_processed";
public static final String TOTAL_FIELD = "total";
public static final String IS_DELETED_FIELD = "is_deleted";
public static final String JOB_PART_NUMBER = "job_part_number";
public static final String TOTAL_JOB_PARTS = "total_job_parts";
public static final String TOTAL_RECORDS_IN_FILE = "total_records_in_file";

public static final String GET_BY_ID_SQL = "SELECT * FROM %s WHERE id = $1 AND is_deleted = false";
public static final String UPDATE_BY_IDS_SQL = "UPDATE ${tenantName}.${tableName} SET ${setFieldName} = ${setFieldValue} WHERE ${setConditionalFieldName} IN ('${setConditionalFieldValues}') RETURNING ${returningFieldNames}";
Expand All @@ -37,34 +41,45 @@ public final class JobExecutionDBConstants {
"INSERT INTO %s.%s (id, hrid, parent_job_id, subordination_type, source_path, file_name, " +
"progress_current, progress_total, started_date, completed_date, status, ui_status, error_status, job_user_first_name, " +
"job_user_last_name, user_id, job_profile_id, job_profile_name, job_profile_data_type, job_profile_snapshot_wrapper, "
+ "job_profile_hidden) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)";
+ "job_profile_hidden, job_part_number, total_job_parts, total_records_in_file) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)";

public static final String UPDATE_SQL =
"UPDATE %s " +
"SET id = $1, hrid = $2, parent_job_id = $3, subordination_type = $4, source_path = $5, file_name = $6, " +
"progress_current = $7, progress_total = $8, started_date = $9, completed_date = $10, " +
"status = $11, ui_status = $12, error_status = $13, job_user_first_name = $14, job_user_last_name = $15, " +
"user_id = $16, job_profile_id = $17, job_profile_name = $18, job_profile_data_type = $19, " +
"job_profile_snapshot_wrapper = $20, job_profile_hidden = $21" +
"WHERE id = $1";
"job_profile_snapshot_wrapper = $20, job_profile_hidden = $21, job_part_number = $22, total_job_parts = $23, " +
"total_records_in_file = $24 WHERE id = $1";

public static final String GET_CHILDREN_JOBS_BY_PARENT_ID_SQL =
"WITH cte AS (SELECT count(*) AS total_count FROM %s " +
"WHERE parent_job_id = $1 AND subordination_type = 'CHILD' AND is_deleted = false) " +
"WHERE parent_job_id = $1 AND subordination_type in ('CHILD', 'COMPOSITE_CHILD') AND is_deleted = false) " +
"SELECT j.*, cte.*, p.total_records_count total, " +
"p.succeeded_records_count + p.error_records_count currently_processed " +
"FROM %s j " +
"LEFT JOIN %s p ON j.id = p.job_execution_id " +
"LEFT JOIN cte ON true " +
"WHERE parent_job_id = $1 AND subordination_type = 'CHILD' AND is_deleted = false " +
"WHERE parent_job_id = $1 AND subordination_type in ('CHILD', 'COMPOSITE_CHILD') AND is_deleted = false " +
"LIMIT $2 OFFSET $3";

public static final String GET_JOBS_NOT_PARENT_SQL =
"WITH cte AS (SELECT count(*) AS total_count FROM %s " +
"WHERE subordination_type <> 'PARENT_MULTIPLE' AND %s) " +
"SELECT j.*, cte.*, p.total_records_count total, " +
"p.succeeded_records_count + p.error_records_count currently_processed " +
"p.succeeded_records_count + p.error_records_count currently_processed, " +
"(select jsonb_agg(x) composite_data " +
"from (select status, " +
"count(1) cnt, " +
"sum(p1.total_records_count) total_records_count, " +
"sum(p1.succeeded_records_count + p1.error_records_count) currently_processed " +
"FROM %s j1 " +
"LEFT JOIN %s p1 ON j1.id = p1.job_execution_id " +
"where j1.parent_job_id = j.id " +
"and j1.id != j1.parent_job_id " +
"and j1.subordination_type = 'COMPOSITE_CHILD' " +
"group by status) x) composite_data " +
"FROM %s j " +
"LEFT JOIN %s p ON j.id = p.job_execution_id " +
"LEFT JOIN cte ON true " +
Expand Down
Loading

0 comments on commit 320d159

Please sign in to comment.