Skip to content

Commit

Permalink
MODSOURMAN-1022: remove step of initial saving of incoming records to…
Browse files Browse the repository at this point in the history
… SRS (#826)
  • Loading branch information
yaroslav-epam committed Jan 15, 2024
1 parent 28a0240 commit 0dce100
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 183 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* [MODSOURMAN-1085](https://issues.folio.org/browse/MODSOURMAN-1085) MARC record with a 100 tag without a $a is being discarded on import.
* [MODSOURMAN-1020](https://issues.folio.org/browse/MODSOURMAN-1020) Add table to save incoming records for DI logs
* [MODSOURMAN-1021](https://issues.folio.org/browse/MODSOURMAN-1021) Provide endpoint for getting parsed content for DI log
* [MODSOURMAN-1022](https://issues.folio.org/browse/MODSOURMAN-1022) Remove step of initial saving of incoming records to SRS
* [MODSOURMAN-1030](https://issues.folio.org/browse/MODSOURMAN-1030) The number of updated records is not correct displayed in the 'SRS Marc' column in the 'Log summary' table
* [MODSOURMAN-976](https://issues.folio.org/browse/MODSOURMAN-976) Incorrect error counts
* [MODSOURMAN-1093](https://issues.folio.org/browse/MODSOURMAN-1093) EventHandlingUtil hangs forever on error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.folio.rest.RestVerticle.MODULE_SPECIFIC_ARGS;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_DELETE_RECEIVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_RAW_RECORDS_CHUNK_PARSED;
Expand Down Expand Up @@ -64,6 +66,7 @@
import org.folio.rest.jaxrs.model.ActionProfile.Action;
import org.folio.rest.jaxrs.model.ActionProfile.FolioRecord;
import org.folio.rest.jaxrs.model.DataImportEventPayload;
import org.folio.rest.jaxrs.model.DataImportEventTypes;
import org.folio.rest.jaxrs.model.EntityType;
import org.folio.rest.jaxrs.model.ErrorRecord;
import org.folio.rest.jaxrs.model.ExternalIdsHolder;
Expand Down Expand Up @@ -175,50 +178,95 @@ public Future<List<Record>> parseRawRecordsChunkForJobExecution(RawRecordsDto ch
.map(parsedRecords))
.onSuccess(parsedRecords -> {
fillParsedRecordsWithAdditionalFields(parsedRecords);

if (updateMarcActionExists(jobExecution) || updateInstanceActionExists(jobExecution)
|| isCreateOrUpdateItemOrHoldingsActionExists(jobExecution, parsedRecords) || isMarcAuthorityMatchProfile(jobExecution)) {
hrIdFieldService.move001valueTo035Field(parsedRecords);
updateRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords))
.onFailure(promise::fail);
} else if (deleteMarcActionExists(jobExecution)) {
deleteRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords))
.onFailure(promise::fail);
} else {
saveRecords(params, jobExecution, parsedRecords)
.onComplete(postAr -> {
if (postAr.failed()) {
StatusDto statusDto = new StatusDto()
.withStatus(StatusDto.Status.ERROR)
.withErrorStatus(StatusDto.ErrorStatus.RECORD_UPDATE_ERROR);
jobExecutionService.updateJobExecutionStatus(jobExecution.getId(), statusDto, params)
.onComplete(r -> {
if (r.failed()) {
LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error during update jobExecution and snapshot status", r.cause());
}
});
jobExecutionSourceChunkDao.getById(sourceChunkId, params.getTenantId())
.compose(optional -> optional
.map(sourceChunk -> jobExecutionSourceChunkDao
.update(sourceChunk.withState(JobExecutionSourceChunk.State.ERROR), params.getTenantId()))
.orElseThrow(() -> new NotFoundException(String.format(
"Couldn't update failed jobExecutionSourceChunk status to ERROR, jobExecutionSourceChunk with id %s was not found",
sourceChunkId))))
.onComplete(ar -> promise.fail(postAr.cause()));
} else {
promise.complete(parsedRecords);
}
});
}
processRecords(parsedRecords, jobExecution, params, sourceChunkId, promise);
}).onFailure(th -> {
LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error parsing records: {}", th.getMessage());
promise.fail(th);
});
return promise.future();
}

private void processRecords(List<Record> parsedRecords, JobExecution jobExecution, OkapiConnectionParams params,
String sourceChunkId, Promise<List<Record>> promise) {
switch (getAction(parsedRecords, jobExecution)) {
case UPDATE_RECORD -> {
hrIdFieldService.move001valueTo035Field(parsedRecords);
updateRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
}
case DELETE_RECORD -> deleteRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
case SEND_ERROR -> sendEvents(parsedRecords, jobExecution, params, DI_ERROR)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
case SEND_MARC_BIB -> sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_MARC_BIB_RECORD_PARSED)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
case SEND_EDIFACT -> sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_EDIFACT_RECORD_PARSED)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
default -> saveRecords(jobExecution, sourceChunkId, params, parsedRecords, promise);
}
}

private ActionType getAction(List<Record> parsedRecords, JobExecution jobExecution) {
if (updateMarcActionExists(jobExecution) || updateInstanceActionExists(jobExecution)
|| isCreateOrUpdateItemOrHoldingsActionExists(jobExecution, parsedRecords) || isMarcAuthorityMatchProfile(jobExecution)) {
return ActionType.UPDATE_RECORD;
}
if (deleteMarcActionExists(jobExecution)) {
return ActionType.DELETE_RECORD;
}
if (parsedRecords.isEmpty()) {
return ActionType.SAVE_RECORD;
}
RecordType recordType = parsedRecords.get(0).getRecordType();
if (recordType == RecordType.MARC_BIB) {
return ActionType.SEND_MARC_BIB;
}
if (recordType == RecordType.EDIFACT) {
return ActionType.SEND_EDIFACT;
}
if (recordType == null) {
return ActionType.SEND_ERROR;
}
return ActionType.SAVE_RECORD;
}

private enum ActionType {
UPDATE_RECORD, DELETE_RECORD, SEND_ERROR, SEND_MARC_BIB, SEND_EDIFACT, SAVE_RECORD
}

private void saveRecords(JobExecution jobExecution, String sourceChunkId, OkapiConnectionParams params, List<Record> parsedRecords, Promise<List<Record>> promise) {
saveRecords(params, jobExecution, parsedRecords)
.onComplete(postAr -> {
if (postAr.failed()) {
StatusDto statusDto = new StatusDto()
.withStatus(StatusDto.Status.ERROR)
.withErrorStatus(StatusDto.ErrorStatus.RECORD_UPDATE_ERROR);
jobExecutionService.updateJobExecutionStatus(jobExecution.getId(), statusDto, params)
.onComplete(r -> {
if (r.failed()) {
LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error during update jobExecution with id '{}' and snapshot status",
jobExecution.getId(), r.cause());
}
});
jobExecutionSourceChunkDao.getById(sourceChunkId, params.getTenantId())
.compose(optional -> optional
.map(sourceChunk -> jobExecutionSourceChunkDao
.update(sourceChunk.withState(JobExecutionSourceChunk.State.ERROR), params.getTenantId()))
.orElseThrow(() -> new NotFoundException(String.format(
"Couldn't update failed jobExecutionSourceChunk status to ERROR, jobExecutionSourceChunk with id %s was not found, jobExecutionId: %s",
sourceChunkId, jobExecution.getId()))))
.onComplete(ar -> promise.fail(postAr.cause()));
} else {
promise.complete(parsedRecords);
}
});
}

private Future<Boolean> sendEvents(List<Record> records, JobExecution jobExecution, OkapiConnectionParams params, DataImportEventTypes eventType) {
LOGGER.info("sendEvents:: Sending events with type: {}, jobExecutionId: {}", eventType.value(), jobExecution.getId());
return recordsPublishingService.sendEventsWithRecords(records, jobExecution.getId(), params, eventType.value());
}

private void saveIncomingAndJournalRecords(List<Record> parsedRecords, String tenantId) {
if (!parsedRecords.isEmpty()) {
incomingRecordService.saveBatch(JournalUtil.buildIncomingRecordsByRecords(parsedRecords), tenantId);
Expand Down Expand Up @@ -759,6 +807,7 @@ private Future<List<Record>> saveRecords(OkapiConnectionParams params, JobExecut
if (CollectionUtils.isEmpty(parsedRecords)) {
return Future.succeededFuture();
}
LOGGER.info("saveRecords:: Saving records in SRS, amount: {}, jobExecutionId: {}", parsedRecords.size(), jobExecution.getId());
RecordCollection recordCollection = new RecordCollection()
.withRecords(parsedRecords)
.withTotalRecords(parsedRecords.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.folio.services.exceptions.RawChunkRecordsParsingException;
import org.folio.services.exceptions.RecordsPublishingException;
import org.folio.services.util.EventHandlingUtil;
import org.folio.services.util.RecordConversionUtil;
import org.folio.verticle.consumers.errorhandlers.payloadbuilders.DiErrorPayloadBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -41,24 +42,24 @@ public class RecordsPublishingServiceImpl implements RecordsPublishingService {
public static final String RECORD_ID_HEADER = "recordId";
public static final String USER_ID_HEADER = "userId";
private static final AtomicInteger indexer = new AtomicInteger();

public static final String ERROR_KEY = "ERROR";

@Value("${srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;

private JobExecutionService jobExecutionService;
private DataImportPayloadContextBuilder payloadContextBuilder;
private KafkaConfig kafkaConfig;

@Value("${srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;
@Autowired
private List<DiErrorPayloadBuilder> errorPayloadBuilders;

public RecordsPublishingServiceImpl(@Autowired JobExecutionService jobExecutionService,
@Autowired DataImportPayloadContextBuilder payloadContextBuilder,
@Autowired KafkaConfig kafkaConfig) {
@Autowired KafkaConfig kafkaConfig,
@Autowired List<DiErrorPayloadBuilder> errorPayloadBuilders) {
this.jobExecutionService = jobExecutionService;
this.payloadContextBuilder = payloadContextBuilder;
this.kafkaConfig = kafkaConfig;
this.errorPayloadBuilders = errorPayloadBuilders;
}

@Override
Expand All @@ -82,15 +83,17 @@ private Future<Boolean> sendRecords(List<Record> createdRecords, JobExecution jo
for (Record record : createdRecords) {
String key = String.valueOf(indexer.incrementAndGet() % maxDistributionNum);
try {
if (isParsedContentExists(record)) {
if (record.getRecordType() != null && isParsedContentExists(record)) {
DataImportEventPayload payload = prepareEventPayload(record, profileSnapshotWrapper, params, eventType);
params.getHeaders().set(RECORD_ID_HEADER, record.getId());
params.getHeaders().set(USER_ID_HEADER, jobExecution.getUserId());
futures.add(sendEventToKafka(params.getTenantId(), Json.encode(payload),
eventType, KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()), kafkaConfig, key));
}
else {
futures.add(sendDiErrorEvent(new RawChunkRecordsParsingException(record.getErrorRecord().getDescription()),
} else {
String cause = record.getErrorRecord() == null
? format("Cannot send event for individual record with recordType: %s", record.getRecordType())
: record.getErrorRecord().getDescription();
futures.add(sendDiErrorEvent(new RawChunkRecordsParsingException(cause),
params, jobExecution.getId(), params.getTenantId(), record));
}
} catch (Exception e) {
Expand Down Expand Up @@ -151,11 +154,8 @@ private DataImportEventPayload prepareEventPayload(Record record, ProfileSnapsho
.withToken(params.getToken());
}

public Future<Boolean> sendDiErrorEvent(Throwable throwable,
OkapiConnectionParams okapiParams,
String jobExecutionId,
String tenantId,
Record currentRecord) {
public Future<Boolean> sendDiErrorEvent(Throwable throwable, OkapiConnectionParams okapiParams, String jobExecutionId,
String tenantId, Record currentRecord) {
okapiParams.getHeaders().set(RECORD_ID_HEADER, currentRecord.getId());
for (DiErrorPayloadBuilder payloadBuilder: errorPayloadBuilders) {
if (payloadBuilder.isEligible(currentRecord.getRecordType())) {
Expand All @@ -166,6 +166,25 @@ public Future<Boolean> sendDiErrorEvent(Throwable throwable,
}
}
LOGGER.warn("sendDiErrorEvent:: Appropriate DI_ERROR payload builder not found, DI_ERROR without records info will be send");
return Future.failedFuture(throwable);
sendDiError(throwable, jobExecutionId, okapiParams, currentRecord);
return Future.succeededFuture(true);
}

private void sendDiError(Throwable throwable, String jobExecutionId, OkapiConnectionParams okapiParams, Record record) {
HashMap<String, String> context = new HashMap<>();
context.put(ERROR_KEY, throwable.getMessage());
if (record != null && record.getRecordType() != null) {
context.put(RecordConversionUtil.getEntityType(record).value(), Json.encode(record));
}

DataImportEventPayload payload = new DataImportEventPayload()
.withEventType(DI_ERROR.value())
.withJobExecutionId(jobExecutionId)
.withOkapiUrl(okapiParams.getOkapiUrl())
.withTenant(okapiParams.getTenantId())
.withToken(okapiParams.getToken())
.withContext(context);
EventHandlingUtil.sendEventToKafka(okapiParams.getTenantId(), Json.encode(payload), DI_ERROR.value(),
KafkaHeaderUtils.kafkaHeadersFromMultiMap(okapiParams.getHeaders()), kafkaConfig, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.springframework.stereotype.Component;

import javax.ws.rs.NotFoundException;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -45,10 +44,10 @@

import static java.lang.String.format;
import static org.apache.commons.lang3.ObjectUtils.allNotNull;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_EDIFACT_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_BIB_FOR_ORDER_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionStatus.COMPLETED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionType.CREATE;
Expand All @@ -70,10 +69,10 @@ public class StoredRecordChunksKafkaHandler implements AsyncRecordHandler<String
public static final String CREATE_ACTION = "CREATE";

private static final Map<RecordType, DataImportEventTypes> RECORD_TYPE_TO_EVENT_TYPE = Map.of(
MARC_BIB, DI_SRS_MARC_BIB_RECORD_CREATED,
MARC_BIB, DI_INCOMING_MARC_BIB_RECORD_PARSED,
MARC_AUTHORITY, DI_SRS_MARC_AUTHORITY_RECORD_CREATED,
MARC_HOLDING, DI_SRS_MARC_HOLDING_RECORD_CREATED,
EDIFACT, DI_EDIFACT_RECORD_CREATED
EDIFACT, DI_INCOMING_EDIFACT_RECORD_PARSED
);

private RecordsPublishingService recordsPublishingService;
Expand Down Expand Up @@ -124,7 +123,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> record) {
// we only know record type by inspecting the records, assuming records are homogeneous type and defaulting to previous static value
DataImportEventTypes eventType = !storedRecords.isEmpty() && RECORD_TYPE_TO_EVENT_TYPE.containsKey(storedRecords.get(0).getRecordType())
? RECORD_TYPE_TO_EVENT_TYPE.get(storedRecords.get(0).getRecordType())
: DI_SRS_MARC_BIB_RECORD_CREATED;
: DI_INCOMING_MARC_BIB_RECORD_PARSED;

LOGGER.debug("handle:: RecordsBatchResponse has been received, starting processing chunkId: {} chunkNumber: {} jobExecutionId: {}", chunkId, chunkNumber, jobExecutionId);
saveCreatedRecordsInfoToDataImportLog(storedRecords, okapiConnectionParams.getTenantId());
Expand Down
Loading

0 comments on commit 0dce100

Please sign in to comment.