Skip to content

Commit

Permalink
MODSOURMAN-1022: remove step of initial saving of incoming records to…
Browse files Browse the repository at this point in the history
… SRS
  • Loading branch information
yaroslav-epam committed Nov 21, 2023
1 parent e3732fa commit 8d101ec
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 162 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* [MODSOURMAN-1030](https://issues.folio.org/browse/MODSOURMAN-1030) The number of updated records is not correct displayed in the 'SRS Marc' column in the 'Log summary' table
* [MODSOURMAN-1020](https://issues.folio.org/browse/MODSOURMAN-1020) Add table to save incoming records for DI logs
* [MODSOURMAN-1021](https://issues.folio.org/browse/MODSOURMAN-1021) Provide endpoint for getting parsed content for DI log
* [MODSOURMAN-1022](https://issues.folio.org/browse/MODSOURMAN-1022) Remove step of initial saving of incoming records to SRS

## 2023-10-13 v3.7.0
* [MODSOURMAN-1045](https://issues.folio.org/browse/MODSOURMAN-1045) Allow create action with non-matches for instance without match profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static org.folio.rest.RestVerticle.MODULE_SPECIFIC_ARGS;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_DELETE_RECEIVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_RAW_RECORDS_CHUNK_PARSED;
Expand Down Expand Up @@ -72,6 +74,7 @@
import org.folio.rest.jaxrs.model.ActionProfile.Action;
import org.folio.rest.jaxrs.model.ActionProfile.FolioRecord;
import org.folio.rest.jaxrs.model.DataImportEventPayload;
import org.folio.rest.jaxrs.model.DataImportEventTypes;
import org.folio.rest.jaxrs.model.EntityType;
import org.folio.rest.jaxrs.model.ErrorRecord;
import org.folio.rest.jaxrs.model.ExternalIdsHolder;
Expand Down Expand Up @@ -181,37 +184,23 @@ public Future<List<Record>> parseRawRecordsChunkForJobExecution(RawRecordsDto ch
if (updateMarcActionExists(jobExecution) || updateInstanceActionExists(jobExecution) || isCreateOrUpdateItemOrHoldingsActionExists(jobExecution, parsedRecords)) {
hrIdFieldService.move001valueTo035Field(parsedRecords);
updateRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords))
.onFailure(promise::fail);
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
} else if (deleteMarcActionExists(jobExecution)) {
deleteRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords))
.onFailure(promise::fail);
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
} else if (parsedRecords.isEmpty()) {
saveRecords(jobExecution, sourceChunkId, params, parsedRecords, promise);
} else if (parsedRecords.get(0).getRecordType() == MARC_BIB) {
sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_MARC_BIB_RECORD_PARSED)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
} else if (parsedRecords.get(0).getRecordType() == RecordType.EDIFACT) {
sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_EDIFACT_RECORD_PARSED)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
} else if (parsedRecords.get(0).getRecordType() == null) {
sendEvents(parsedRecords, jobExecution, params, DI_ERROR)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
} else {
saveRecords(params, jobExecution, parsedRecords)
.onComplete(postAr -> {
if (postAr.failed()) {
StatusDto statusDto = new StatusDto()
.withStatus(StatusDto.Status.ERROR)
.withErrorStatus(StatusDto.ErrorStatus.RECORD_UPDATE_ERROR);
jobExecutionService.updateJobExecutionStatus(jobExecution.getId(), statusDto, params)
.onComplete(r -> {
if (r.failed()) {
LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error during update jobExecution and snapshot status", r.cause());
}
});
jobExecutionSourceChunkDao.getById(sourceChunkId, params.getTenantId())
.compose(optional -> optional
.map(sourceChunk -> jobExecutionSourceChunkDao
.update(sourceChunk.withState(JobExecutionSourceChunk.State.ERROR), params.getTenantId()))
.orElseThrow(() -> new NotFoundException(String.format(
"Couldn't update failed jobExecutionSourceChunk status to ERROR, jobExecutionSourceChunk with id %s was not found",
sourceChunkId))))
.onComplete(ar -> promise.fail(postAr.cause()));
} else {
promise.complete(parsedRecords);
}
});
saveRecords(jobExecution, sourceChunkId, params, parsedRecords, promise);
}
}).onFailure(th -> {
LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error parsing records: {}", th.getMessage());
Expand All @@ -220,6 +209,38 @@ public Future<List<Record>> parseRawRecordsChunkForJobExecution(RawRecordsDto ch
return promise.future();
}

private void saveRecords(JobExecution jobExecution, String sourceChunkId, OkapiConnectionParams params, List<Record> parsedRecords, Promise<List<Record>> promise) {
saveRecords(params, jobExecution, parsedRecords)
.onComplete(postAr -> {
if (postAr.failed()) {
StatusDto statusDto = new StatusDto()
.withStatus(StatusDto.Status.ERROR)
.withErrorStatus(StatusDto.ErrorStatus.RECORD_UPDATE_ERROR);
jobExecutionService.updateJobExecutionStatus(jobExecution.getId(), statusDto, params)
.onComplete(r -> {
if (r.failed()) {
LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error during update jobExecution and snapshot status", r.cause());
}
});
jobExecutionSourceChunkDao.getById(sourceChunkId, params.getTenantId())
.compose(optional -> optional
.map(sourceChunk -> jobExecutionSourceChunkDao
.update(sourceChunk.withState(JobExecutionSourceChunk.State.ERROR), params.getTenantId()))
.orElseThrow(() -> new NotFoundException(String.format(
"Couldn't update failed jobExecutionSourceChunk status to ERROR, jobExecutionSourceChunk with id %s was not found",
sourceChunkId))))
.onComplete(ar -> promise.fail(postAr.cause()));
} else {
promise.complete(parsedRecords);
}
});
}

private Future<Boolean> sendEvents(List<Record> records, JobExecution jobExecution, OkapiConnectionParams params, DataImportEventTypes eventType) {
LOGGER.info("sendEvents:: Sending events with type: {}", eventType.value());
return recordsPublishingService.sendEventsWithRecords(records, jobExecution.getId(), params, eventType.value());
}

private void saveIncomingAndJournalRecords(List<Record> parsedRecords, String tenantId) {
if (!parsedRecords.isEmpty()) {
incomingRecordService.saveBatch(JournalUtil.buildIncomingRecordsByRecords(parsedRecords), tenantId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.folio.services.exceptions.RawChunkRecordsParsingException;
import org.folio.services.exceptions.RecordsPublishingException;
import org.folio.services.util.EventHandlingUtil;
import org.folio.services.util.RecordConversionUtil;
import org.folio.verticle.consumers.errorhandlers.payloadbuilders.DiErrorPayloadBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -41,24 +42,24 @@ public class RecordsPublishingServiceImpl implements RecordsPublishingService {
public static final String RECORD_ID_HEADER = "recordId";
public static final String USER_ID_HEADER = "userId";
private static final AtomicInteger indexer = new AtomicInteger();

public static final String ERROR_KEY = "ERROR";

@Value("${srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;

private JobExecutionService jobExecutionService;
private DataImportPayloadContextBuilder payloadContextBuilder;
private KafkaConfig kafkaConfig;

@Value("${srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;
@Autowired
private List<DiErrorPayloadBuilder> errorPayloadBuilders;

public RecordsPublishingServiceImpl(@Autowired JobExecutionService jobExecutionService,
@Autowired DataImportPayloadContextBuilder payloadContextBuilder,
@Autowired KafkaConfig kafkaConfig) {
@Autowired KafkaConfig kafkaConfig,
@Autowired List<DiErrorPayloadBuilder> errorPayloadBuilders) {
this.jobExecutionService = jobExecutionService;
this.payloadContextBuilder = payloadContextBuilder;
this.kafkaConfig = kafkaConfig;
this.errorPayloadBuilders = errorPayloadBuilders;
}

@Override
Expand All @@ -82,15 +83,17 @@ private Future<Boolean> sendRecords(List<Record> createdRecords, JobExecution jo
for (Record record : createdRecords) {
String key = String.valueOf(indexer.incrementAndGet() % maxDistributionNum);
try {
if (isParsedContentExists(record)) {
if (record.getRecordType() != null && isParsedContentExists(record)) {
DataImportEventPayload payload = prepareEventPayload(record, profileSnapshotWrapper, params, eventType);
params.getHeaders().set(RECORD_ID_HEADER, record.getId());
params.getHeaders().set(USER_ID_HEADER, jobExecution.getUserId());
futures.add(sendEventToKafka(params.getTenantId(), Json.encode(payload),
eventType, KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()), kafkaConfig, key));
}
else {
futures.add(sendDiErrorEvent(new RawChunkRecordsParsingException(record.getErrorRecord().getDescription()),
} else {
String cause = record.getErrorRecord() == null
? format("Cannot send event for individual record with recordType: %s", record.getRecordType())
: record.getErrorRecord().getDescription();
futures.add(sendDiErrorEvent(new RawChunkRecordsParsingException(cause),
params, jobExecution.getId(), params.getTenantId(), record));
}
} catch (Exception e) {
Expand Down Expand Up @@ -151,11 +154,8 @@ private DataImportEventPayload prepareEventPayload(Record record, ProfileSnapsho
.withToken(params.getToken());
}

public Future<Boolean> sendDiErrorEvent(Throwable throwable,
OkapiConnectionParams okapiParams,
String jobExecutionId,
String tenantId,
Record currentRecord) {
public Future<Boolean> sendDiErrorEvent(Throwable throwable, OkapiConnectionParams okapiParams, String jobExecutionId,
String tenantId, Record currentRecord) {
okapiParams.getHeaders().set(RECORD_ID_HEADER, currentRecord.getId());
for (DiErrorPayloadBuilder payloadBuilder: errorPayloadBuilders) {
if (payloadBuilder.isEligible(currentRecord.getRecordType())) {
Expand All @@ -166,6 +166,25 @@ public Future<Boolean> sendDiErrorEvent(Throwable throwable,
}
}
LOGGER.warn("sendDiErrorEvent:: Appropriate DI_ERROR payload builder not found, DI_ERROR without records info will be send");
return Future.failedFuture(throwable);
sendDiError(throwable, jobExecutionId, okapiParams, currentRecord);
return Future.succeededFuture(true);
}

private void sendDiError(Throwable throwable, String jobExecutionId, OkapiConnectionParams okapiParams, Record record) {
HashMap<String, String> context = new HashMap<>();
context.put(ERROR_KEY, throwable.getMessage());
if (record != null && record.getRecordType() != null) {
context.put(RecordConversionUtil.getEntityType(record).value(), Json.encode(record));
}

DataImportEventPayload payload = new DataImportEventPayload()
.withEventType(DI_ERROR.value())
.withJobExecutionId(jobExecutionId)
.withOkapiUrl(okapiParams.getOkapiUrl())
.withTenant(okapiParams.getTenantId())
.withToken(okapiParams.getToken())
.withContext(context);
EventHandlingUtil.sendEventToKafka(okapiParams.getTenantId(), Json.encode(payload), DI_ERROR.value(),
KafkaHeaderUtils.kafkaHeadersFromMultiMap(okapiParams.getHeaders()), kafkaConfig, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.springframework.stereotype.Component;

import javax.ws.rs.NotFoundException;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -45,10 +44,10 @@

import static java.lang.String.format;
import static org.apache.commons.lang3.ObjectUtils.allNotNull;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_EDIFACT_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_BIB_FOR_ORDER_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionStatus.COMPLETED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionType.CREATE;
Expand All @@ -70,10 +69,10 @@ public class StoredRecordChunksKafkaHandler implements AsyncRecordHandler<String
public static final String CREATE_ACTION = "CREATE";

private static final Map<RecordType, DataImportEventTypes> RECORD_TYPE_TO_EVENT_TYPE = Map.of(
MARC_BIB, DI_SRS_MARC_BIB_RECORD_CREATED,
MARC_BIB, DI_INCOMING_MARC_BIB_RECORD_PARSED,
MARC_AUTHORITY, DI_SRS_MARC_AUTHORITY_RECORD_CREATED,
MARC_HOLDING, DI_SRS_MARC_HOLDING_RECORD_CREATED,
EDIFACT, DI_EDIFACT_RECORD_CREATED
EDIFACT, DI_INCOMING_EDIFACT_RECORD_PARSED
);

private RecordsPublishingService recordsPublishingService;
Expand Down Expand Up @@ -124,7 +123,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> record) {
// we only know record type by inspecting the records, assuming records are homogeneous type and defaulting to previous static value
DataImportEventTypes eventType = !storedRecords.isEmpty() && RECORD_TYPE_TO_EVENT_TYPE.containsKey(storedRecords.get(0).getRecordType())
? RECORD_TYPE_TO_EVENT_TYPE.get(storedRecords.get(0).getRecordType())
: DI_SRS_MARC_BIB_RECORD_CREATED;
: DI_INCOMING_MARC_BIB_RECORD_PARSED;

LOGGER.debug("handle:: RecordsBatchResponse has been received, starting processing chunkId: {} chunkNumber: {} jobExecutionId: {}", chunkId, chunkNumber, jobExecutionId);
saveCreatedRecordsInfoToDataImportLog(storedRecords, okapiConnectionParams.getTenantId());
Expand Down
Loading

0 comments on commit 8d101ec

Please sign in to comment.