Skip to content

Commit

Permalink
Merge branch 'master' into MODBULKOPS-448
Browse files Browse the repository at this point in the history
  • Loading branch information
khandramai authored Jan 29, 2025
2 parents e607e80 + caedd25 commit 9c12b07
Show file tree
Hide file tree
Showing 29 changed files with 275 additions and 167 deletions.
2 changes: 1 addition & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"provides": [
{
"id": "bulk-operations",
"version": "1.5",
"version": "1.6",
"handlers": [
{
"methods": [ "POST" ],
Expand Down
6 changes: 0 additions & 6 deletions src/main/java/org/folio/bulkops/client/BulkEditClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
import java.util.UUID;

import org.folio.bulkops.configs.FeignClientConfiguration;
import org.folio.bulkops.domain.dto.Errors;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.multipart.MultipartFile;

Expand All @@ -20,7 +17,4 @@ public interface BulkEditClient {

@PostMapping(value = "/{jobId}/start")
void startJob(@PathVariable UUID jobId);

@GetMapping(value = "/{jobId}/errors")
Errors getErrorsPreview(@PathVariable UUID jobId, @RequestParam int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.folio.bulkops.domain.dto.EntityType.INSTANCE;
import static org.folio.bulkops.domain.dto.EntityType.INSTANCE_MARC;
import static org.folio.bulkops.domain.dto.FileContentType.COMMITTED_RECORDS_FILE;
import static org.folio.bulkops.domain.dto.FileContentType.COMMITTED_RECORDS_MARC_FILE;
import static org.folio.bulkops.domain.dto.FileContentType.COMMITTING_CHANGES_ERROR_FILE;
import static org.folio.bulkops.domain.dto.FileContentType.MATCHED_RECORDS_FILE;
import static org.folio.bulkops.domain.dto.FileContentType.PROPOSED_CHANGES_FILE;
Expand Down Expand Up @@ -164,9 +165,9 @@ public ResponseEntity<Resource> downloadFileByOperationId(
} else if (fileContentType == PROPOSED_CHANGES_MARC_FILE) {
path = bulkOperation.getLinkToModifiedRecordsMarcFile();
} else if (fileContentType == COMMITTED_RECORDS_FILE) {
path = INSTANCE_MARC.equals(bulkOperation.getEntityType()) ?
bulkOperation.getLinkToCommittedRecordsMarcFile() :
bulkOperation.getLinkToCommittedRecordsCsvFile();
path = bulkOperation.getLinkToCommittedRecordsCsvFile();
} else if (fileContentType == COMMITTED_RECORDS_MARC_FILE) {
path = bulkOperation.getLinkToCommittedRecordsMarcFile();
} else if (fileContentType == COMMITTING_CHANGES_ERROR_FILE) {
path = bulkOperation.getLinkToCommittedRecordsErrorsCsvFile();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import static java.lang.Boolean.TRUE;
import static java.lang.Boolean.parseBoolean;
import static java.lang.String.format;
import static org.folio.bulkops.domain.dto.EntityType.INSTANCE_MARC;
import static org.folio.bulkops.domain.dto.UpdateOptionType.SUPPRESS_FROM_DISCOVERY;
import static org.folio.bulkops.util.Constants.APPLY_TO_HOLDINGS;
import static org.folio.bulkops.util.Constants.APPLY_TO_ITEMS;
import static org.folio.bulkops.util.Constants.GET_HOLDINGS_BY_INSTANCE_ID_QUERY;
import static org.folio.bulkops.util.Constants.GET_ITEMS_BY_HOLDING_ID_QUERY;
import static org.folio.bulkops.util.Constants.MARC;
import static org.folio.bulkops.util.Constants.MSG_NO_ADMINISTRATIVE_CHANGE_REQUIRED;
import static org.folio.bulkops.util.Constants.MSG_NO_CHANGE_REQUIRED;
import static org.folio.bulkops.util.FolioExecutionContextUtil.prepareContextForTenant;
import static org.folio.bulkops.util.RuleUtils.fetchParameters;
Expand Down Expand Up @@ -83,7 +85,7 @@ public void updateAssociatedRecords(ExtendedInstance extendedInstance, BulkOpera
.filter(rule -> applyRuleToAssociatedRecords(extendedInstance, rule, operation))
.isPresent();
if (notChanged) {
var errorMessage = buildErrorMessage(recordsUpdated, instance.getDiscoverySuppress());
var errorMessage = buildErrorMessage(recordsUpdated, instance.getDiscoverySuppress(), operation.getEntityType());
errorService.saveError(operation.getId(), instance.getIdentifier(operation.getIdentifierType()), errorMessage, ErrorType.WARNING);
}
}
Expand Down Expand Up @@ -168,11 +170,14 @@ private boolean suppressItemsIfRequired(List<HoldingsRecord> holdingsRecords, bo
return true;
}

private String buildErrorMessage(boolean recordsUpdated, boolean newValue) {
private String buildErrorMessage(boolean recordsUpdated, boolean newValue, EntityType entityType) {
var affectedState = TRUE.equals(newValue) ? "unsuppressed" : "suppressed";
var message = INSTANCE_MARC.equals(entityType) ?
MSG_NO_ADMINISTRATIVE_CHANGE_REQUIRED :
MSG_NO_CHANGE_REQUIRED;
return recordsUpdated ?
format(ERROR_MESSAGE_TEMPLATE, affectedState) :
MSG_NO_CHANGE_REQUIRED;
message;
}

private List<HoldingsRecord> getHoldingsSourceFolioByInstanceId(String instanceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.folio.spring.data.OffsetRequest;
import org.springframework.data.domain.Page;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;

@Repository
Expand All @@ -16,6 +17,7 @@ public interface BulkOperationExecutionContentRepository extends JpaRepository<B
Page<BulkOperationExecutionContent> findByBulkOperationIdAndErrorMessageIsNotNullAndErrorTypeIsOrderByErrorType(UUID bulkOperationId, OffsetRequest offsetRequest, ErrorType errorType);
Optional<BulkOperationExecutionContent> findFirstByBulkOperationIdAndIdentifier(UUID bulkOperationId, String identifier);
int countAllByBulkOperationIdAndErrorMessageIsNotNullAndErrorTypeIs(UUID bulkOperationId, ErrorType errorType);

@Query("SELECT COUNT(i) FROM BulkOperationExecutionContent i WHERE i.bulkOperationId = :bulkOperationId")
long countByBulkOperationId(UUID bulkOperationId);
void deleteByBulkOperationId(UUID bulkOperationId);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.bulkops.repository;

import java.util.List;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -10,4 +11,5 @@
@Repository
public interface BulkOperationExecutionRepository extends JpaRepository<BulkOperationExecution, UUID> {
Optional<BulkOperationExecution> findByBulkOperationId(UUID uuid);
List<BulkOperationExecution> findAllByBulkOperationId(UUID uuid);
}
71 changes: 48 additions & 23 deletions src/main/java/org/folio/bulkops/service/BulkOperationService.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.bulkops.service;

import static java.lang.String.format;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.commons.lang3.StringUtils.LF;
Expand Down Expand Up @@ -152,7 +153,7 @@ public class BulkOperationService {
private static final String PREVIEW_CSV_PATH_TEMPLATE = "%s/%s-Updates-Preview-CSV-%s.csv";
private static final String PREVIEW_MARC_PATH_TEMPLATE = "%s/%s-Updates-Preview-MARC-%s.mrc";
private static final String CHANGED_JSON_PATH_TEMPLATE = "%s/json/%s-Changed-Records-%s.json";
private static final String CHANGED_CSV_PATH_TEMPLATE = "%s/%s-Changed-Records-%s.csv";
private static final String CHANGED_CSV_PATH_TEMPLATE = "%s/%s-Changed-Records-CSV-%s.csv";

private final ExecutorService executor = Executors.newCachedThreadPool();

Expand Down Expand Up @@ -379,16 +380,17 @@ public void commit(BulkOperation operation) {
var operationId = operation.getId();
operation.setCommittedNumOfRecords(0);
operation.setStatus(OperationStatusType.APPLY_CHANGES);
operation.setTotalNumOfRecords(operation.getMatchedNumOfRecords());

operation = bulkOperationRepository.save(operation);
var totalNumOfRecords = operation.getMatchedNumOfRecords();

if (operation.getEntityType() == INSTANCE_MARC) {
if (INSTANCE_MARC.equals(operation.getEntityType())) {
marcUpdateService.saveErrorsForFolioInstances(operation);
marcUpdateService.commitForInstanceMarc(operation);
return;
totalNumOfRecords *= getProgressMultiplier(operation);
}

operation.setTotalNumOfRecords(totalNumOfRecords);
operation = bulkOperationRepository.save(operation);

if (StringUtils.isNotEmpty(operation.getLinkToModifiedRecordsJsonFile())) {
var entityClass = resolveEntityClass(operation.getEntityType());
var extendedClass = resolveExtendedEntityClass(operation.getEntityType());
Expand Down Expand Up @@ -462,8 +464,6 @@ public void commit(BulkOperation operation) {
}

execution.setProcessedRecords(processedNumOfRecords);
operation.setProcessedNumOfRecords(operation.getCommittedNumOfRecords());
operation.setEndTime(LocalDateTime.now());
if (operation.getCommittedNumOfRecords() > 0) {
operation.setLinkToCommittedRecordsCsvFile(resultCsvFileName);
operation.setLinkToCommittedRecordsJsonFile(resultJsonFileName);
Expand All @@ -479,15 +479,21 @@ public void commit(BulkOperation operation) {
executionRepository.save(execution);
}

var linkToCommittingErrorsFile = errorService.uploadErrorsToStorage(operationId);
operation.setLinkToCommittedRecordsErrorsCsvFile(linkToCommittingErrorsFile);
operation.setCommittedNumOfErrors(errorService.getCommittedNumOfErrors(operationId));
operation.setCommittedNumOfWarnings(errorService.getCommittedNumOfWarnings(operationId));
marcUpdateService.commitForInstanceMarc(operation);

if (!FAILED.equals(operation.getStatus())) {
operation.setStatus(isEmpty(linkToCommittingErrorsFile) ? COMPLETED : COMPLETED_WITH_ERRORS);
if (!INSTANCE_MARC.equals(operation.getEntityType()) || isNull(operation.getLinkToModifiedRecordsMarcFile())) {
operation.setProcessedNumOfRecords(operation.getCommittedNumOfRecords());
operation.setEndTime(LocalDateTime.now());

var linkToCommittingErrorsFile = errorService.uploadErrorsToStorage(operationId);
operation.setLinkToCommittedRecordsErrorsCsvFile(linkToCommittingErrorsFile);
operation.setCommittedNumOfErrors(errorService.getCommittedNumOfErrors(operationId));

if (!FAILED.equals(operation.getStatus())) {
operation.setStatus(isEmpty(linkToCommittingErrorsFile) ? COMPLETED : COMPLETED_WITH_ERRORS);
}
bulkOperationRepository.save(operation);
}
bulkOperationRepository.save(operation);
}

private boolean isCurrentTenantNotCentral(String tenantId) {
Expand Down Expand Up @@ -658,17 +664,22 @@ public BulkOperation getOperationById(UUID bulkOperationId) {
yield operation;
}
case APPLY_CHANGES -> {
if (INSTANCE_MARC.equals(operation.getEntityType())) {
var executions = metadataProviderService.getJobExecutions(operation.getDataImportJobProfileId());
updateBulkOperationBasedOnDataImportState(executions, operation);
} else {
var execution = executionRepository.findByBulkOperationId(bulkOperationId);
if (execution.isPresent() && StatusType.ACTIVE.equals(execution.get().getStatus())) {
operation.setProcessedNumOfRecords(execution.get().getProcessedRecords());
var execution = executionRepository.findByBulkOperationId(bulkOperationId);
if (execution.isPresent() && StatusType.ACTIVE.equals(execution.get().getStatus())) {
var processedNumOfRecords = execution.get().getProcessedRecords();
if (INSTANCE_MARC.equals(operation.getEntityType())) {
var numOfErrors = operation.getCommittedNumOfErrors() * getProgressMultiplier(operation);
operation.setProcessedNumOfRecords(numOfErrors + processedNumOfRecords);
}
operation.setProcessedNumOfRecords(processedNumOfRecords);
}
yield bulkOperationRepository.save(operation);
}
case APPLY_MARC_CHANGES -> {
var executions = metadataProviderService.getJobExecutions(operation.getDataImportJobProfileId());
updateBulkOperationBasedOnDataImportState(executions, operation);
yield bulkOperationRepository.save(operation);
}
default -> operation;
};
}
Expand All @@ -692,8 +703,15 @@ public void processDataImportResult(UUID dataImportJobProfileId) {
}

private void updateBulkOperationBasedOnDataImportState(List<DataImportJobExecution> executions, BulkOperation operation) {
var numOfCommittedAdministrativeUpdates = executionRepository.findAllByBulkOperationId(operation.getId())
.stream()
.filter(execution -> StatusType.COMPLETED.equals(execution.getStatus()))
.map(BulkOperationExecution::getProcessedRecords)
.max(Integer::compareTo)
.orElse(0);
var processedNumOfRecords = metadataProviderService.calculateProgress(executions).getCurrent();
operation.setProcessedNumOfRecords(operation.getCommittedNumOfErrors() * 2 + processedNumOfRecords);
operation.setProcessedNumOfRecords(operation.getCommittedNumOfErrors() * getProgressMultiplier(operation) +
numOfCommittedAdministrativeUpdates + processedNumOfRecords);
}

public BulkOperation getBulkOperationOrThrow(UUID operationId) {
Expand Down Expand Up @@ -801,4 +819,11 @@ private synchronized void saveLinks(BulkOperation source) {
private boolean isCompletedSuccessfully(List<BulkOperationDataProcessing> list) {
return list.stream().allMatch(p -> StatusType.COMPLETED.equals(p.getStatus()));
}

private int getProgressMultiplier(BulkOperation operation) {
if (nonNull(operation.getLinkToModifiedRecordsMarcFile())) {
return nonNull(operation.getLinkToModifiedRecordsJsonFile()) ? 3 : 2;
}
return 1;
}
}
40 changes: 25 additions & 15 deletions src/main/java/org/folio/bulkops/service/ErrorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.commons.lang3.StringUtils.LF;
import static org.folio.bulkops.domain.dto.OperationStatusType.COMPLETED;
Expand All @@ -10,9 +11,13 @@
import static org.folio.bulkops.domain.dto.OperationStatusType.REVIEWED_NO_MARC_RECORDS;
import static org.folio.bulkops.domain.dto.OperationStatusType.REVIEW_CHANGES;
import static org.folio.bulkops.util.Constants.DATA_IMPORT_ERROR_DISCARDED;
import static org.folio.bulkops.util.Constants.MSG_NO_ADMINISTRATIVE_CHANGE_REQUIRED;
import static org.folio.bulkops.util.Constants.MSG_NO_MARC_CHANGE_REQUIRED;
import static org.folio.bulkops.util.Constants.MSG_NO_CHANGE_REQUIRED;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -23,10 +28,8 @@
import lombok.extern.log4j.Log4j2;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.folio.bulkops.client.BulkEditClient;
import org.folio.bulkops.client.MetadataProviderClient;
import org.folio.bulkops.client.RemoteFileSystemClient;
import org.folio.bulkops.domain.bean.BulkOperationsEntity;
import org.folio.bulkops.domain.bean.JobLogEntry;
import org.folio.bulkops.domain.bean.StateType;
import org.folio.bulkops.domain.dto.Error;
Expand Down Expand Up @@ -59,11 +62,11 @@ public class ErrorService {
private final BulkOperationRepository operationRepository;
private final RemoteFileSystemClient remoteFileSystemClient;
private final BulkOperationExecutionContentRepository executionContentRepository;
private final BulkEditClient bulkEditClient;
private final MetadataProviderClient metadataProviderClient;

public void saveError(UUID bulkOperationId, String identifier, String errorMessage, String uiErrorMessage, String link, ErrorType errorType) {
if (MSG_NO_CHANGE_REQUIRED.equals(errorMessage) && executionContentRepository.findFirstByBulkOperationIdAndIdentifier(bulkOperationId, identifier).isPresent()) {
if (Set.of(MSG_NO_CHANGE_REQUIRED, MSG_NO_ADMINISTRATIVE_CHANGE_REQUIRED, MSG_NO_MARC_CHANGE_REQUIRED).contains(errorMessage)
&& executionContentRepository.findFirstByBulkOperationIdAndIdentifier(bulkOperationId, identifier).isPresent()) {
return;
}
executionContentRepository.save(BulkOperationExecutionContent.builder()
Expand Down Expand Up @@ -95,11 +98,17 @@ public Errors getErrorsPreviewByBulkOperationId(UUID bulkOperationId, int limit,
var bulkOperation = operationRepository.findById(bulkOperationId)
.orElseThrow(() -> new NotFoundException("BulkOperation was not found by id=" + bulkOperationId));
if (Set.of(DATA_MODIFICATION, REVIEW_CHANGES, REVIEWED_NO_MARC_RECORDS).contains(bulkOperation.getStatus()) || COMPLETED_WITH_ERRORS == bulkOperation.getStatus() && noCommittedErrors(bulkOperation) && noCommittedWarnings(bulkOperation)) {
var errors = bulkEditClient.getErrorsPreview(bulkOperation.getDataExportJobId(), limit);
return new Errors().errors(errors.getErrors().stream()
.map(this::prepareInternalErrorRepresentation)
.toList())
.totalRecords(errors.getTotalRecords());
var errors = new BufferedReader(new InputStreamReader(remoteFileSystemClient.get(bulkOperation.getLinkToMatchedRecordsErrorsCsvFile())))
.lines()
.skip(offset)
.limit(limit)
.map(message -> {
var error = message.split(Constants.COMMA_DELIMETER);
return new Error().message(error[1]).parameters(List.of(new Parameter().key(IDENTIFIER).value(error[0])));
})
.toList();
return new Errors().errors(errors)
.totalRecords(remoteFileSystemClient.getNumOfLines(bulkOperation.getLinkToMatchedRecordsErrorsCsvFile()));
} else if (COMPLETED == bulkOperation.getStatus() || COMPLETED_WITH_ERRORS == bulkOperation.getStatus()) {
return getExecutionErrors(bulkOperationId, limit, offset, errorType);
} else {
Expand Down Expand Up @@ -147,18 +156,19 @@ private boolean noCommittedWarnings(BulkOperation bulkOperation) {
return isNull(bulkOperation.getCommittedNumOfWarnings()) || bulkOperation.getCommittedNumOfWarnings() == 0;
}

private Error prepareInternalErrorRepresentation(Error e) {
var error= e.getMessage().split(Constants.COMMA_DELIMETER);
return new Error().message(error[1]).parameters(List.of(new Parameter().key(IDENTIFIER).value(error[0])));
}

public String getErrorsCsvByBulkOperationId(UUID bulkOperationId, int offset, ErrorType errorType) {
return getErrorsPreviewByBulkOperationId(bulkOperationId, Integer.MAX_VALUE, offset, errorType).getErrors().stream()
.map(error -> String.join(Constants.COMMA_DELIMETER, ObjectUtils.isEmpty(error.getParameters()) ? EMPTY : error.getParameters().get(0).getValue(), error.getMessage()))
.collect(Collectors.joining(Constants.NEW_LINE_SEPARATOR));
}

private Errors getExecutionErrors(UUID bulkOperationId, int limit, int offset, ErrorType errorType) {
var totalRecords = (int) executionContentRepository.countByBulkOperationId(bulkOperationId);
if (limit == 0) {
return new Errors()
.errors(List.of())
.totalRecords(totalRecords);
}
Page<BulkOperationExecutionContent> errorPage;
if (isNull(errorType)) {
errorPage = executionContentRepository.findByBulkOperationIdAndErrorMessageIsNotNullOrderByErrorType(bulkOperationId, OffsetRequest.of(offset, limit));
Expand All @@ -170,7 +180,7 @@ private Errors getExecutionErrors(UUID bulkOperationId, int limit, int offset, E
.toList();
return new Errors()
.errors(errors)
.totalRecords((int) errorPage.getTotalElements());
.totalRecords(totalRecords);
}

/**
Expand Down
Loading

0 comments on commit 9c12b07

Please sign in to comment.