Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODSOURMAN-1022: remove step of initial saving of incoming records to SRS #826

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* [MODSOURMAN-1030](https://issues.folio.org/browse/MODSOURMAN-1030) The number of updated records is not correct displayed in the 'SRS Marc' column in the 'Log summary' table
* [MODSOURMAN-1020](https://issues.folio.org/browse/MODSOURMAN-1020) Add table to save incoming records for DI logs
* [MODSOURMAN-1021](https://issues.folio.org/browse/MODSOURMAN-1021) Provide endpoint for getting parsed content for DI log
* [MODSOURMAN-1022](https://issues.folio.org/browse/MODSOURMAN-1022) Remove step of initial saving of incoming records to SRS

## 2023-10-13 v3.7.0
* [MODSOURMAN-1045](https://issues.folio.org/browse/MODSOURMAN-1045) Allow create action with non-matches for instance without match profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static org.folio.rest.RestVerticle.MODULE_SPECIFIC_ARGS;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_DELETE_RECEIVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_RAW_RECORDS_CHUNK_PARSED;
Expand Down Expand Up @@ -72,6 +74,7 @@
import org.folio.rest.jaxrs.model.ActionProfile.Action;
import org.folio.rest.jaxrs.model.ActionProfile.FolioRecord;
import org.folio.rest.jaxrs.model.DataImportEventPayload;
import org.folio.rest.jaxrs.model.DataImportEventTypes;
import org.folio.rest.jaxrs.model.EntityType;
import org.folio.rest.jaxrs.model.ErrorRecord;
import org.folio.rest.jaxrs.model.ExternalIdsHolder;
Expand Down Expand Up @@ -177,49 +180,95 @@ public Future<List<Record>> parseRawRecordsChunkForJobExecution(RawRecordsDto ch
.map(parsedRecords))
.onSuccess(parsedRecords -> {
fillParsedRecordsWithAdditionalFields(parsedRecords);

if (updateMarcActionExists(jobExecution) || updateInstanceActionExists(jobExecution) || isCreateOrUpdateItemOrHoldingsActionExists(jobExecution, parsedRecords)) {
hrIdFieldService.move001valueTo035Field(parsedRecords);
updateRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords))
.onFailure(promise::fail);
} else if (deleteMarcActionExists(jobExecution)) {
deleteRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords))
.onFailure(promise::fail);
} else {
saveRecords(params, jobExecution, parsedRecords)
.onComplete(postAr -> {
if (postAr.failed()) {
StatusDto statusDto = new StatusDto()
.withStatus(StatusDto.Status.ERROR)
.withErrorStatus(StatusDto.ErrorStatus.RECORD_UPDATE_ERROR);
jobExecutionService.updateJobExecutionStatus(jobExecution.getId(), statusDto, params)
.onComplete(r -> {
if (r.failed()) {
LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error during update jobExecution and snapshot status", r.cause());
}
});
jobExecutionSourceChunkDao.getById(sourceChunkId, params.getTenantId())
.compose(optional -> optional
.map(sourceChunk -> jobExecutionSourceChunkDao
.update(sourceChunk.withState(JobExecutionSourceChunk.State.ERROR), params.getTenantId()))
.orElseThrow(() -> new NotFoundException(String.format(
"Couldn't update failed jobExecutionSourceChunk status to ERROR, jobExecutionSourceChunk with id %s was not found",
sourceChunkId))))
.onComplete(ar -> promise.fail(postAr.cause()));
} else {
promise.complete(parsedRecords);
}
});
}
processRecords(parsedRecords, jobExecution, params, sourceChunkId, promise);
}).onFailure(th -> {
LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error parsing records: {}", th.getMessage());
promise.fail(th);
});
return promise.future();
}

private void processRecords(List<Record> parsedRecords, JobExecution jobExecution, OkapiConnectionParams params,
String sourceChunkId, Promise<List<Record>> promise) {
switch (getAction(parsedRecords, jobExecution)) {
case UPDATE_RECORD -> {
hrIdFieldService.move001valueTo035Field(parsedRecords);
updateRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
}
case DELETE_RECORD -> deleteRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
case SEND_ERROR -> sendEvents(parsedRecords, jobExecution, params, DI_ERROR)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
case SEND_MARC_BIB -> sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_MARC_BIB_RECORD_PARSED)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
case SEND_EDIFACT -> sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_EDIFACT_RECORD_PARSED)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
default -> saveRecords(jobExecution, sourceChunkId, params, parsedRecords, promise);
}
}

private ActionType getAction(List<Record> parsedRecords, JobExecution jobExecution) {
if (updateMarcActionExists(jobExecution) || updateInstanceActionExists(jobExecution)
|| isCreateOrUpdateItemOrHoldingsActionExists(jobExecution, parsedRecords)) {
return ActionType.UPDATE_RECORD;
}
if (deleteMarcActionExists(jobExecution)) {
return ActionType.DELETE_RECORD;
}
if (parsedRecords.isEmpty()) {
return ActionType.SAVE_RECORD;
}
RecordType recordType = parsedRecords.get(0).getRecordType();
if (recordType == RecordType.MARC_BIB) {
return ActionType.SEND_MARC_BIB;
}
if (recordType == RecordType.EDIFACT) {
return ActionType.SEND_EDIFACT;
}
if (recordType == null) {
return ActionType.SEND_ERROR;
}
return ActionType.SAVE_RECORD;
}

private enum ActionType {
UPDATE_RECORD, DELETE_RECORD, SEND_ERROR, SEND_MARC_BIB, SEND_EDIFACT, SAVE_RECORD
}

private void saveRecords(JobExecution jobExecution, String sourceChunkId, OkapiConnectionParams params, List<Record> parsedRecords, Promise<List<Record>> promise) {
saveRecords(params, jobExecution, parsedRecords)
.onComplete(postAr -> {
if (postAr.failed()) {
StatusDto statusDto = new StatusDto()
.withStatus(StatusDto.Status.ERROR)
.withErrorStatus(StatusDto.ErrorStatus.RECORD_UPDATE_ERROR);
jobExecutionService.updateJobExecutionStatus(jobExecution.getId(), statusDto, params)
.onComplete(r -> {
if (r.failed()) {
LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error during update jobExecution with id '{}' and snapshot status",
jobExecution.getId(), r.cause());
}
});
jobExecutionSourceChunkDao.getById(sourceChunkId, params.getTenantId())
.compose(optional -> optional
.map(sourceChunk -> jobExecutionSourceChunkDao
.update(sourceChunk.withState(JobExecutionSourceChunk.State.ERROR), params.getTenantId()))
.orElseThrow(() -> new NotFoundException(String.format(
"Couldn't update failed jobExecutionSourceChunk status to ERROR, jobExecutionSourceChunk with id %s was not found, jobExecutionId: %s",
sourceChunkId, jobExecution.getId()))))
.onComplete(ar -> promise.fail(postAr.cause()));
} else {
promise.complete(parsedRecords);
}
});
}

private Future<Boolean> sendEvents(List<Record> records, JobExecution jobExecution, OkapiConnectionParams params, DataImportEventTypes eventType) {
LOGGER.info("sendEvents:: Sending events with type: {}, jobExecutionId: {}", eventType.value(), jobExecution.getId());
return recordsPublishingService.sendEventsWithRecords(records, jobExecution.getId(), params, eventType.value());
}

private void saveIncomingAndJournalRecords(List<Record> parsedRecords, String tenantId) {
if (!parsedRecords.isEmpty()) {
incomingRecordService.saveBatch(JournalUtil.buildIncomingRecordsByRecords(parsedRecords), tenantId);
Expand Down Expand Up @@ -738,6 +787,7 @@ private Future<List<Record>> saveRecords(OkapiConnectionParams params, JobExecut
if (CollectionUtils.isEmpty(parsedRecords)) {
return Future.succeededFuture();
}
LOGGER.info("saveRecords:: Saving records in SRS, amount: {}, jobExecutionId: {}", parsedRecords.size(), jobExecution.getId());
RecordCollection recordCollection = new RecordCollection()
.withRecords(parsedRecords)
.withTotalRecords(parsedRecords.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.folio.services.exceptions.RawChunkRecordsParsingException;
import org.folio.services.exceptions.RecordsPublishingException;
import org.folio.services.util.EventHandlingUtil;
import org.folio.services.util.RecordConversionUtil;
import org.folio.verticle.consumers.errorhandlers.payloadbuilders.DiErrorPayloadBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -41,24 +42,24 @@ public class RecordsPublishingServiceImpl implements RecordsPublishingService {
public static final String RECORD_ID_HEADER = "recordId";
public static final String USER_ID_HEADER = "userId";
private static final AtomicInteger indexer = new AtomicInteger();

public static final String ERROR_KEY = "ERROR";

@Value("${srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;

private JobExecutionService jobExecutionService;
private DataImportPayloadContextBuilder payloadContextBuilder;
private KafkaConfig kafkaConfig;

@Value("${srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;
@Autowired
private List<DiErrorPayloadBuilder> errorPayloadBuilders;

public RecordsPublishingServiceImpl(@Autowired JobExecutionService jobExecutionService,
@Autowired DataImportPayloadContextBuilder payloadContextBuilder,
@Autowired KafkaConfig kafkaConfig) {
@Autowired KafkaConfig kafkaConfig,
@Autowired List<DiErrorPayloadBuilder> errorPayloadBuilders) {
this.jobExecutionService = jobExecutionService;
this.payloadContextBuilder = payloadContextBuilder;
this.kafkaConfig = kafkaConfig;
this.errorPayloadBuilders = errorPayloadBuilders;
}

@Override
Expand All @@ -82,15 +83,17 @@ private Future<Boolean> sendRecords(List<Record> createdRecords, JobExecution jo
for (Record record : createdRecords) {
String key = String.valueOf(indexer.incrementAndGet() % maxDistributionNum);
try {
if (isParsedContentExists(record)) {
if (record.getRecordType() != null && isParsedContentExists(record)) {
DataImportEventPayload payload = prepareEventPayload(record, profileSnapshotWrapper, params, eventType);
params.getHeaders().set(RECORD_ID_HEADER, record.getId());
params.getHeaders().set(USER_ID_HEADER, jobExecution.getUserId());
futures.add(sendEventToKafka(params.getTenantId(), Json.encode(payload),
eventType, KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()), kafkaConfig, key));
}
else {
futures.add(sendDiErrorEvent(new RawChunkRecordsParsingException(record.getErrorRecord().getDescription()),
} else {
String cause = record.getErrorRecord() == null
? format("Cannot send event for individual record with recordType: %s", record.getRecordType())
: record.getErrorRecord().getDescription();
futures.add(sendDiErrorEvent(new RawChunkRecordsParsingException(cause),
params, jobExecution.getId(), params.getTenantId(), record));
}
} catch (Exception e) {
Expand Down Expand Up @@ -151,11 +154,8 @@ private DataImportEventPayload prepareEventPayload(Record record, ProfileSnapsho
.withToken(params.getToken());
}

public Future<Boolean> sendDiErrorEvent(Throwable throwable,
OkapiConnectionParams okapiParams,
String jobExecutionId,
String tenantId,
Record currentRecord) {
public Future<Boolean> sendDiErrorEvent(Throwable throwable, OkapiConnectionParams okapiParams, String jobExecutionId,
String tenantId, Record currentRecord) {
okapiParams.getHeaders().set(RECORD_ID_HEADER, currentRecord.getId());
for (DiErrorPayloadBuilder payloadBuilder: errorPayloadBuilders) {
if (payloadBuilder.isEligible(currentRecord.getRecordType())) {
Expand All @@ -166,6 +166,25 @@ public Future<Boolean> sendDiErrorEvent(Throwable throwable,
}
}
LOGGER.warn("sendDiErrorEvent:: Appropriate DI_ERROR payload builder not found, DI_ERROR without records info will be send");
return Future.failedFuture(throwable);
sendDiError(throwable, jobExecutionId, okapiParams, currentRecord);
return Future.succeededFuture(true);
}

private void sendDiError(Throwable throwable, String jobExecutionId, OkapiConnectionParams okapiParams, Record record) {
HashMap<String, String> context = new HashMap<>();
context.put(ERROR_KEY, throwable.getMessage());
if (record != null && record.getRecordType() != null) {
context.put(RecordConversionUtil.getEntityType(record).value(), Json.encode(record));
}

DataImportEventPayload payload = new DataImportEventPayload()
.withEventType(DI_ERROR.value())
.withJobExecutionId(jobExecutionId)
.withOkapiUrl(okapiParams.getOkapiUrl())
.withTenant(okapiParams.getTenantId())
.withToken(okapiParams.getToken())
.withContext(context);
EventHandlingUtil.sendEventToKafka(okapiParams.getTenantId(), Json.encode(payload), DI_ERROR.value(),
KafkaHeaderUtils.kafkaHeadersFromMultiMap(okapiParams.getHeaders()), kafkaConfig, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.springframework.stereotype.Component;

import javax.ws.rs.NotFoundException;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -45,10 +44,10 @@

import static java.lang.String.format;
import static org.apache.commons.lang3.ObjectUtils.allNotNull;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_EDIFACT_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_BIB_FOR_ORDER_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionStatus.COMPLETED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionType.CREATE;
Expand All @@ -70,10 +69,10 @@ public class StoredRecordChunksKafkaHandler implements AsyncRecordHandler<String
public static final String CREATE_ACTION = "CREATE";

private static final Map<RecordType, DataImportEventTypes> RECORD_TYPE_TO_EVENT_TYPE = Map.of(
MARC_BIB, DI_SRS_MARC_BIB_RECORD_CREATED,
MARC_BIB, DI_INCOMING_MARC_BIB_RECORD_PARSED,
MARC_AUTHORITY, DI_SRS_MARC_AUTHORITY_RECORD_CREATED,
MARC_HOLDING, DI_SRS_MARC_HOLDING_RECORD_CREATED,
EDIFACT, DI_EDIFACT_RECORD_CREATED
EDIFACT, DI_INCOMING_EDIFACT_RECORD_PARSED
);

private RecordsPublishingService recordsPublishingService;
Expand Down Expand Up @@ -124,7 +123,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> record) {
// we only know record type by inspecting the records, assuming records are homogeneous type and defaulting to previous static value
DataImportEventTypes eventType = !storedRecords.isEmpty() && RECORD_TYPE_TO_EVENT_TYPE.containsKey(storedRecords.get(0).getRecordType())
? RECORD_TYPE_TO_EVENT_TYPE.get(storedRecords.get(0).getRecordType())
: DI_SRS_MARC_BIB_RECORD_CREATED;
: DI_INCOMING_MARC_BIB_RECORD_PARSED;

LOGGER.debug("handle:: RecordsBatchResponse has been received, starting processing chunkId: {} chunkNumber: {} jobExecutionId: {}", chunkId, chunkNumber, jobExecutionId);
saveCreatedRecordsInfoToDataImportLog(storedRecords, okapiConnectionParams.getTenantId());
Expand Down
Loading