Skip to content

Commit

Permalink
MODSOURMAN-1195 Save job execution progress in batches (#908)
Browse files Browse the repository at this point in the history
  • Loading branch information
okolawole-ebsco authored Jun 12, 2024
1 parent e29f875 commit 92ee1c2
Show file tree
Hide file tree
Showing 17 changed files with 826 additions and 150 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## 2024-xx-xx v3.9.0
* [MODSOURMAN-1195](https://folio-org.atlassian.net/browse/MODSOURMAN-1195) Save job execution progress in batches
* [MODSOURMAN-1166](https://folio-org.atlassian.net/browse/MODSOURMAN-1166) Sorting by Autority, Order and Error columns is not working on Log details page
* [MODDATAIMP-1029](https://folio-org.atlassian.net/browse/MODDATAIMP-1029) The authority record loaded via data-import using Default - Create SRS MARC Authority job profile is duplicated on the job-summary page
* [MODSOURMAN-1152](https://folio-org.atlassian.net/browse/MODSOURMAN-1152) The error message is not displayed in the di log summary
Expand Down
4 changes: 4 additions & 0 deletions mod-source-record-manager-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java3</artifactId>
</dependency>
<dependency>
<groupId>org.folio</groupId>
<artifactId>data-import-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.folio.verticle.DataImportConsumersVerticle;
import org.folio.verticle.DataImportInitConsumersVerticle;
import org.folio.verticle.DataImportJournalConsumersVerticle;
import org.folio.verticle.JobExecutionProgressVerticle;
import org.folio.verticle.QuickMarcUpdateConsumersVerticle;
import org.folio.verticle.RawMarcChunkConsumersVerticle;
import org.folio.verticle.StoredRecordChunkConsumersVerticle;
Expand All @@ -31,6 +32,8 @@

import java.util.Arrays;

import static org.folio.services.progress.JobExecutionProgressUtil.registerCodecs;

public class InitAPIImpl implements InitAPI {

private static final Logger LOGGER = LogManager.getLogger();
Expand All @@ -51,6 +54,9 @@ public class InitAPIImpl implements InitAPI {
@Value("${srm.kafka.DataImportJournalConsumersVerticle.instancesNumber:1}")
private int dataImportJournalConsumerInstancesNumber;

@Value("${srm.kafka.JobExecutionProgressVerticle.instancesNumber:1}")
private int jobExecutionProgressInstancesNumber;

@Value("${srm.kafka.QuickMarcUpdateConsumersVerticle.instancesNumber:1}")
private int quickMarcUpdateConsumerInstancesNumber;

Expand All @@ -67,6 +73,9 @@ public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> han
try {
SpringContextUtil.init(vertx, context, ApplicationConfig.class);
SpringContextUtil.autowireDependencies(this, context);

registerCodecs(vertx);

initJournalService(vertx);
deployConsumersVerticles(vertx)
.onSuccess(car -> {
Expand Down Expand Up @@ -99,6 +108,7 @@ private Future<?> deployConsumersVerticles(Vertx vertx) {
Promise<String> deployStoredMarcChunkConsumer = Promise.promise();
Promise<String> deployDataImportConsumer = Promise.promise();
Promise<String> deployDataImportJournalConsumer = Promise.promise();
Promise<String> deployJobExecutionProgressConsumer = Promise.promise();
Promise<String> deployQuickMarcUpdateConsumer = Promise.promise();
Promise<String> deployPeriodicDeleteJobExecution = Promise.promise();

Expand Down Expand Up @@ -127,6 +137,11 @@ private Future<?> deployConsumersVerticles(Vertx vertx) {
.setWorker(true)
.setInstances(dataImportJournalConsumerInstancesNumber), deployDataImportJournalConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, JobExecutionProgressVerticle.class),
new DeploymentOptions()
.setWorker(true)
.setInstances(jobExecutionProgressInstancesNumber), deployJobExecutionProgressConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, QuickMarcUpdateConsumersVerticle.class),
new DeploymentOptions()
.setWorker(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.folio.HttpStatus;
import org.folio.dao.JobExecutionDao;
import org.folio.dao.JobExecutionFilter;
import org.folio.dao.JobExecutionSourceChunkDao;
import org.folio.dao.util.SortField;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.dataimport.util.RestUtil;
Expand Down Expand Up @@ -89,10 +88,10 @@ public class JobExecutionServiceImpl implements JobExecutionService {

@Autowired
private JobExecutionDao jobExecutionDao;
@Autowired
private JobExecutionSourceChunkDao jobExecutionSourceChunkDao;
@Autowired
private JournalRecordService journalRecordService;

public JobExecutionServiceImpl(@Autowired JobExecutionDao jobExecutionDao) {
this.jobExecutionDao = jobExecutionDao;
}

@Override
public Future<JobExecutionDtoCollection> getJobExecutionsWithoutParentMultiple(JobExecutionFilter filter, List<SortField> sortFields, int offset, int limit, String tenantId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,12 @@
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.rest.jaxrs.model.DataImportEventTypes;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
import org.folio.rest.jaxrs.model.JobExecutionProgress;
import org.folio.rest.jaxrs.model.Progress;
import org.folio.rest.jaxrs.model.StatusDto;
import org.folio.rest.jaxrs.model.JobExecution.SubordinationType;
import org.folio.services.progress.JobExecutionProgressService;
import org.folio.util.DataImportEventPayloadWithoutCurrentNode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.Date;

import static java.lang.String.format;
import static org.folio.rest.jaxrs.model.JobExecution.Status.CANCELLED;
import static org.folio.rest.jaxrs.model.JobExecution.Status.COMMITTED;

@Service
public class RecordProcessedEventHandlingServiceImpl implements EventHandlingService {

Expand All @@ -37,8 +25,8 @@ public class RecordProcessedEventHandlingServiceImpl implements EventHandlingSer
public static final String ERRORS_KEY = "ERRORS";
private static final String EMPTY_ARRAY = "[]";

private JobExecutionProgressService jobExecutionProgressService;
private JobExecutionService jobExecutionService;
private final JobExecutionProgressService jobExecutionProgressService;
private final JobExecutionService jobExecutionService;

public RecordProcessedEventHandlingServiceImpl(@Autowired JobExecutionProgressService jobExecutionProgressService,
@Autowired JobExecutionService jobExecutionService) {
Expand Down Expand Up @@ -78,8 +66,7 @@ public Future<Boolean> handle(String eventContent, OkapiConnectionParams params)
return Future.succeededFuture(false);
}

jobExecutionProgressService.updateCompletionCounts(jobExecutionId, successCount, errorCount, params.getTenantId())
.compose(updatedProgress -> updateJobExecutionIfAllRecordsProcessed(jobExecutionId, updatedProgress, params))
jobExecutionProgressService.updateCompletionCounts(jobExecutionId, successCount, errorCount, params)
.onComplete(ar -> {
if (ar.failed()) {
LOGGER.warn("handle:: Failed to handle {} event", eventType, ar.cause());
Expand All @@ -103,67 +90,5 @@ private Future<JobExecution> updateJobStatusToError(String jobExecutionId, Okapi
.withErrorStatus(StatusDto.ErrorStatus.FILE_PROCESSING_ERROR), params);
}

private Future<Boolean> updateJobExecutionIfAllRecordsProcessed(String jobExecutionId, JobExecutionProgress progress, OkapiConnectionParams params) {
if (progress.getTotal().equals(progress.getCurrentlySucceeded() + progress.getCurrentlyFailed())) {
return jobExecutionService.getJobExecutionById(jobExecutionId, params.getTenantId())
.compose(jobExecutionOptional -> jobExecutionOptional
.map(jobExecution -> {
JobExecution.Status statusToUpdate;
if (jobExecution.getStatus() == CANCELLED && jobExecution.getUiStatus() == JobExecution.UiStatus.CANCELLED) {
statusToUpdate = CANCELLED;
} else if (progress.getCurrentlyFailed() == 0) {
statusToUpdate = COMMITTED;
} else {
statusToUpdate = JobExecution.Status.ERROR;
}
jobExecution.withStatus(statusToUpdate)
.withUiStatus(JobExecution.UiStatus.fromValue(Status.valueOf(statusToUpdate.name()).getUiStatus()))
.withCompletedDate(new Date())
.withProgress(new Progress()
.withJobExecutionId(jobExecutionId)
.withCurrent(progress.getCurrentlySucceeded() + progress.getCurrentlyFailed())
.withTotal(progress.getTotal()));

return jobExecutionService.updateJobExecutionWithSnapshotStatus(jobExecution, params)
.compose(updatedExecution -> {
if (updatedExecution.getSubordinationType().equals(SubordinationType.COMPOSITE_CHILD)) {

return jobExecutionService.getJobExecutionById(updatedExecution.getParentJobId(), params.getTenantId())
.map(v -> v.orElseThrow(() -> new IllegalStateException("Could not find parent job execution")))
.compose(parentExecution ->
jobExecutionService.getJobExecutionCollectionByParentId(parentExecution.getId(), 0, Integer.MAX_VALUE, params.getTenantId())
.map(JobExecutionDtoCollection::getJobExecutions)
.map(children ->
children.stream()
.filter(child -> child.getSubordinationType().equals(JobExecutionDto.SubordinationType.COMPOSITE_CHILD))
.allMatch(child ->
Arrays.asList(
JobExecutionDto.UiStatus.RUNNING_COMPLETE,
JobExecutionDto.UiStatus.CANCELLED,
JobExecutionDto.UiStatus.ERROR,
JobExecutionDto.UiStatus.DISCARDED
).contains(child.getUiStatus())
)
)
.compose(allChildrenCompleted -> {
if (Boolean.TRUE.equals(allChildrenCompleted)) {
LOGGER.info("All children for job {} have completed!", parentExecution.getId());
parentExecution.withStatus(JobExecution.Status.COMMITTED)
.withUiStatus(JobExecution.UiStatus.RUNNING_COMPLETE)
.withCompletedDate(new Date());
return jobExecutionService.updateJobExecutionWithSnapshotStatus(parentExecution, params);
}
return Future.succeededFuture(parentExecution);
})
);
} else {
return Future.succeededFuture(updatedExecution);
}
})
.map(true);
})
.orElse(Future.failedFuture(format("Couldn't find JobExecution for update status and progress with id '%s'", jobExecutionId))));
}
return Future.succeededFuture(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.folio.services.progress;

import org.apache.commons.lang3.StringUtils;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.rest.jaxrs.model.JobExecutionProgress;

/**
* Class that enforces the presence of tenant ID and Job execution ID. These values must be present for appropriate batch
* processing.
*/
public class BatchableJobExecutionProgress {
private final JobExecutionProgress jobExecutionProgress;
private final OkapiConnectionParams params;

public BatchableJobExecutionProgress(OkapiConnectionParams params, JobExecutionProgress jobExecutionProgress) {
if (params == null || StringUtils.isBlank(params.getTenantId())) {
throw new IllegalArgumentException("Tenant ID must be set in Okapi connection parameters");
}
if (jobExecutionProgress == null || StringUtils.isBlank(jobExecutionProgress.getJobExecutionId())) {
throw new IllegalArgumentException("job execution id must be set on JobExecutionProgress object");
}
this.params = params;
this.jobExecutionProgress = jobExecutionProgress;
}

public JobExecutionProgress getJobExecutionProgress() {
return jobExecutionProgress;
}

public OkapiConnectionParams getParams() {
return params;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.folio.services.progress;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;

/**
* Codec that won't cause any serialization/deserialization. It should only be used locally, exception will be thrown if
* message need to be sent over the wire for a clustered event bus.
*/
public class BatchableJobExecutionProgressCodec implements MessageCodec<BatchableJobExecutionProgress, BatchableJobExecutionProgress> {
@Override
public void encodeToWire(Buffer buffer, BatchableJobExecutionProgress progress) {
throw new UnsupportedOperationException(progress.toString());
}

@Override
public BatchableJobExecutionProgress decodeFromWire(int pos, Buffer buffer) {
throw new UnsupportedOperationException(buffer.toString());
}

@Override
public BatchableJobExecutionProgress transform(BatchableJobExecutionProgress progress) {
return progress;
}

@Override
public String name() {
return BatchableJobExecutionProgressCodec.class.getSimpleName();
}

@Override
public byte systemCodecID() {
return -1;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.services.progress;

import io.vertx.core.Future;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionProgress;

Expand Down Expand Up @@ -46,13 +47,17 @@ public interface JobExecutionProgressService {
* EXAMPLE:
* a success count of 5 and an error count of 0 means that the success count will be increased by 5 and the
* error count will remain unchanged in jobExecutionProgress entity.
*
*
* </p>
* <p>
* NOTE: An implementation of this interface {@link JobExecutionProgressServiceImpl}, returns a
* succesful future when the updates has successfully been enqueued. Actual update of job execution can happen
* a bit later.
* </p>
* @param jobExecutionId jobExecution id
* @param successCountDelta number of successful executions
* @param errorCountDelta number of failed executions
* @param tenantId tenant id
* @return future with updated jobExecutionProgress
* @param params okapi connection parameters
* @return future that returns when the notification is successful
*/
Future<JobExecutionProgress> updateCompletionCounts(String jobExecutionId, int successCountDelta, int errorCountDelta, String tenantId);
Future<Void> updateCompletionCounts(String jobExecutionId, int successCountDelta, int errorCountDelta, OkapiConnectionParams params);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.folio.services.progress;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageProducer;
import org.folio.dao.JobExecutionDao;
import org.folio.dao.JobExecutionProgressDao;
import org.folio.dao.util.DbUtil;
import org.folio.dao.util.PostgresClientFactory;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.rest.jaxrs.model.JobExecutionProgress;
import org.folio.rest.jaxrs.model.Progress;
import org.folio.rest.persist.PostgresClient;
Expand All @@ -15,6 +18,7 @@
import java.util.function.UnaryOperator;

import static java.lang.String.format;
import static org.folio.services.progress.JobExecutionProgressUtil.getBatchJobProgressProducer;

@Service
public class JobExecutionProgressServiceImpl implements JobExecutionProgressService {
Expand All @@ -27,6 +31,12 @@ public class JobExecutionProgressServiceImpl implements JobExecutionProgressServ
@Autowired
private JobExecutionDao jobExecutionDao;

private final MessageProducer<BatchableJobExecutionProgress> jobExecutionProgressMessageProducer;

public JobExecutionProgressServiceImpl(@Autowired Vertx vertx) {
this.jobExecutionProgressMessageProducer = getBatchJobProgressProducer(vertx);
}

@Override
public Future<JobExecutionProgress> getByJobExecutionId(String jobExecutionId, String tenantId) {
return jobExecutionProgressDao.getByJobExecutionId(jobExecutionId, tenantId)
Expand All @@ -53,8 +63,11 @@ public Future<JobExecutionProgress> updateJobExecutionProgress(String jobExecuti
}

@Override
public Future<JobExecutionProgress> updateCompletionCounts(String jobExecutionId, int successCountDelta, int errorCountDelta, String tenantId) {
return jobExecutionProgressDao.updateCompletionCounts(jobExecutionId, successCountDelta, errorCountDelta, tenantId);
public Future<Void> updateCompletionCounts(String jobExecutionId, int successCountDelta, int errorCountDelta, OkapiConnectionParams params) {
JobExecutionProgress jobExecutionProgress = new JobExecutionProgress().withJobExecutionId(jobExecutionId)
.withCurrentlySucceeded(successCountDelta)
.withCurrentlyFailed(errorCountDelta);
BatchableJobExecutionProgress batchableJobExecutionProgress = new BatchableJobExecutionProgress(params, jobExecutionProgress);
return jobExecutionProgressMessageProducer.write(batchableJobExecutionProgress);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.folio.services.progress;

import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageProducer;

public class JobExecutionProgressUtil {

public static final String BATCH_JOB_PROGRESS_ADDRESS = "batchJobProgress";

private JobExecutionProgressUtil() {}

/**
* Creates a message producer that will enqueue {@link BatchableJobExecutionProgress} objects
* to be processed asynchronously. Messages are only processed locally.
*/
public static MessageProducer<BatchableJobExecutionProgress> getBatchJobProgressProducer(Vertx vertx) {
return vertx.eventBus().sender(BATCH_JOB_PROGRESS_ADDRESS, new DeliveryOptions()
.setCodecName(BatchableJobExecutionProgressCodec.class.getSimpleName())
.setLocalOnly(true));
}

/**
* Register needed message codecs for job execution progress processing
*/
public static void registerCodecs(Vertx vertx) {
vertx.eventBus().registerCodec(new BatchableJobExecutionProgressCodec());
}

}
Loading

0 comments on commit 92ee1c2

Please sign in to comment.