From e0238f4079f342bda32049b069f8743d097faa16 Mon Sep 17 00:00:00 2001 From: Yaroslav_Kiriak Date: Thu, 16 Nov 2023 16:22:29 +0200 Subject: [PATCH] MODSOURMAN-1022: remove step of initial saving of incoming records to SRS --- NEWS.md | 1 + .../services/ChangeEngineServiceImpl.java | 122 ++++++++++++------ .../RecordsPublishingServiceImpl.java | 51 +++++--- .../StoredRecordChunksKafkaHandler.java | 11 +- .../changeManager/ChangeManagerAPITest.java | 114 ++++++++-------- .../services/ChangeEngineServiceImplTest.java | 49 +++++-- ...tDrivenChunkProcessingServiceImplTest.java | 10 +- ...ProcessedEventHandlingServiceImplTest.java | 7 +- ...DataImportJournalConsumerVerticleTest.java | 15 +-- .../RawMarcChunkConsumersVerticleTest.java | 38 +++--- ...toredRecordChunkConsumersVerticleTest.java | 12 +- .../util/MarcImportEventsHandlerTest.java | 6 +- 12 files changed, 268 insertions(+), 168 deletions(-) diff --git a/NEWS.md b/NEWS.md index 17e6b9b6a..67ec8757c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -2,6 +2,7 @@ * [MODSOURMAN-1030](https://issues.folio.org/browse/MODSOURMAN-1030) The number of updated records is not correct displayed in the 'SRS Marc' column in the 'Log summary' table * [MODSOURMAN-1020](https://issues.folio.org/browse/MODSOURMAN-1020) Add table to save incoming records for DI logs * [MODSOURMAN-1021](https://issues.folio.org/browse/MODSOURMAN-1021) Provide endpoint for getting parsed content for DI log +* [MODSOURMAN-1022](https://issues.folio.org/browse/MODSOURMAN-1022) Remove step of initial saving of incoming records to SRS ## 2023-10-13 v3.7.0 * [MODSOURMAN-1045](https://issues.folio.org/browse/MODSOURMAN-1045) Allow create action with non-matches for instance without match profile diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java index 1756c469d..da6232707 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java @@ -7,6 +7,8 @@ import static org.folio.rest.RestVerticle.MODULE_SPECIFIC_ARGS; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_DELETE_RECEIVED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_RAW_RECORDS_CHUNK_PARSED; @@ -72,6 +74,7 @@ import org.folio.rest.jaxrs.model.ActionProfile.Action; import org.folio.rest.jaxrs.model.ActionProfile.FolioRecord; import org.folio.rest.jaxrs.model.DataImportEventPayload; +import org.folio.rest.jaxrs.model.DataImportEventTypes; import org.folio.rest.jaxrs.model.EntityType; import org.folio.rest.jaxrs.model.ErrorRecord; import org.folio.rest.jaxrs.model.ExternalIdsHolder; @@ -177,42 +180,7 @@ public Future> parseRawRecordsChunkForJobExecution(RawRecordsDto ch .map(parsedRecords)) .onSuccess(parsedRecords -> { fillParsedRecordsWithAdditionalFields(parsedRecords); - - if (updateMarcActionExists(jobExecution) || updateInstanceActionExists(jobExecution) || isCreateOrUpdateItemOrHoldingsActionExists(jobExecution, parsedRecords)) { - hrIdFieldService.move001valueTo035Field(parsedRecords); - updateRecords(parsedRecords, jobExecution, params) - .onSuccess(ar -> promise.complete(parsedRecords)) - .onFailure(promise::fail); - } else if (deleteMarcActionExists(jobExecution)) { - deleteRecords(parsedRecords, jobExecution, params) - .onSuccess(ar -> promise.complete(parsedRecords)) - .onFailure(promise::fail); - } else { - saveRecords(params, jobExecution, parsedRecords) - .onComplete(postAr -> { - if (postAr.failed()) { - StatusDto statusDto = new StatusDto() - .withStatus(StatusDto.Status.ERROR) - .withErrorStatus(StatusDto.ErrorStatus.RECORD_UPDATE_ERROR); - jobExecutionService.updateJobExecutionStatus(jobExecution.getId(), statusDto, params) - .onComplete(r -> { - if (r.failed()) { - LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error during update jobExecution and snapshot status", r.cause()); - } - }); - jobExecutionSourceChunkDao.getById(sourceChunkId, params.getTenantId()) - .compose(optional -> optional - .map(sourceChunk -> jobExecutionSourceChunkDao - .update(sourceChunk.withState(JobExecutionSourceChunk.State.ERROR), params.getTenantId())) - .orElseThrow(() -> new NotFoundException(String.format( - "Couldn't update failed jobExecutionSourceChunk status to ERROR, jobExecutionSourceChunk with id %s was not found", - sourceChunkId)))) - .onComplete(ar -> promise.fail(postAr.cause())); - } else { - promise.complete(parsedRecords); - } - }); - } + processRecords(parsedRecords, jobExecution, params, sourceChunkId, promise); }).onFailure(th -> { LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error parsing records: {}", th.getMessage()); promise.fail(th); @@ -220,6 +188,87 @@ public Future> parseRawRecordsChunkForJobExecution(RawRecordsDto ch return promise.future(); } + private void processRecords(List parsedRecords, JobExecution jobExecution, OkapiConnectionParams params, + String sourceChunkId, Promise> promise) { + switch (getAction(parsedRecords, jobExecution)) { + case UPDATE_RECORD -> { + hrIdFieldService.move001valueTo035Field(parsedRecords); + updateRecords(parsedRecords, jobExecution, params) + .onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail); + } + case DELETE_RECORD -> deleteRecords(parsedRecords, jobExecution, params) + .onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail); + case SEND_ERROR -> sendEvents(parsedRecords, jobExecution, params, DI_ERROR) + .onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail); + case SEND_MARC_BIB -> sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_MARC_BIB_RECORD_PARSED) + .onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail); + case SEND_EDIFACT -> sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_EDIFACT_RECORD_PARSED) + .onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail); + default -> saveRecords(jobExecution, sourceChunkId, params, parsedRecords, promise); + } + } + + private ActionType getAction(List parsedRecords, JobExecution jobExecution) { + if (updateMarcActionExists(jobExecution) || updateInstanceActionExists(jobExecution) + || isCreateOrUpdateItemOrHoldingsActionExists(jobExecution, parsedRecords)) { + return ActionType.UPDATE_RECORD; + } + if (deleteMarcActionExists(jobExecution)) { + return ActionType.DELETE_RECORD; + } + if (parsedRecords.isEmpty()) { + return ActionType.SAVE_RECORD; + } + RecordType recordType = parsedRecords.get(0).getRecordType(); + if (recordType == RecordType.MARC_BIB) { + return ActionType.SEND_MARC_BIB; + } + if (recordType == RecordType.EDIFACT) { + return ActionType.SEND_EDIFACT; + } + if (recordType == null) { + return ActionType.SEND_ERROR; + } + return ActionType.SAVE_RECORD; + } + + private enum ActionType { + UPDATE_RECORD, DELETE_RECORD, SEND_ERROR, SEND_MARC_BIB, SEND_EDIFACT, SAVE_RECORD + } + + private void saveRecords(JobExecution jobExecution, String sourceChunkId, OkapiConnectionParams params, List parsedRecords, Promise> promise) { + saveRecords(params, jobExecution, parsedRecords) + .onComplete(postAr -> { + if (postAr.failed()) { + StatusDto statusDto = new StatusDto() + .withStatus(StatusDto.Status.ERROR) + .withErrorStatus(StatusDto.ErrorStatus.RECORD_UPDATE_ERROR); + jobExecutionService.updateJobExecutionStatus(jobExecution.getId(), statusDto, params) + .onComplete(r -> { + if (r.failed()) { + LOGGER.warn("parseRawRecordsChunkForJobExecution:: Error during update jobExecution with id '{}' and snapshot status", + jobExecution.getId(), r.cause()); + } + }); + jobExecutionSourceChunkDao.getById(sourceChunkId, params.getTenantId()) + .compose(optional -> optional + .map(sourceChunk -> jobExecutionSourceChunkDao + .update(sourceChunk.withState(JobExecutionSourceChunk.State.ERROR), params.getTenantId())) + .orElseThrow(() -> new NotFoundException(String.format( + "Couldn't update failed jobExecutionSourceChunk status to ERROR, jobExecutionSourceChunk with id %s was not found, jobExecutionId: %s", + sourceChunkId, jobExecution.getId())))) + .onComplete(ar -> promise.fail(postAr.cause())); + } else { + promise.complete(parsedRecords); + } + }); + } + + private Future sendEvents(List records, JobExecution jobExecution, OkapiConnectionParams params, DataImportEventTypes eventType) { + LOGGER.info("sendEvents:: Sending events with type: {}, jobExecutionId: {}", eventType.value(), jobExecution.getId()); + return recordsPublishingService.sendEventsWithRecords(records, jobExecution.getId(), params, eventType.value()); + } + private void saveIncomingAndJournalRecords(List parsedRecords, String tenantId) { if (!parsedRecords.isEmpty()) { incomingRecordService.saveBatch(JournalUtil.buildIncomingRecordsByRecords(parsedRecords), tenantId); @@ -738,6 +787,7 @@ private Future> saveRecords(OkapiConnectionParams params, JobExecut if (CollectionUtils.isEmpty(parsedRecords)) { return Future.succeededFuture(); } + LOGGER.info("saveRecords:: Saving records in SRS, amount: {}, jobExecutionId: {}", parsedRecords.size(), jobExecution.getId()); RecordCollection recordCollection = new RecordCollection() .withRecords(parsedRecords) .withTotalRecords(parsedRecords.size()); diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/RecordsPublishingServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/RecordsPublishingServiceImpl.java index 6ffa06fe1..326254bb7 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/RecordsPublishingServiceImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/RecordsPublishingServiceImpl.java @@ -19,6 +19,7 @@ import org.folio.services.exceptions.RawChunkRecordsParsingException; import org.folio.services.exceptions.RecordsPublishingException; import org.folio.services.util.EventHandlingUtil; +import org.folio.services.util.RecordConversionUtil; import org.folio.verticle.consumers.errorhandlers.payloadbuilders.DiErrorPayloadBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -41,24 +42,24 @@ public class RecordsPublishingServiceImpl implements RecordsPublishingService { public static final String RECORD_ID_HEADER = "recordId"; public static final String USER_ID_HEADER = "userId"; private static final AtomicInteger indexer = new AtomicInteger(); - public static final String ERROR_KEY = "ERROR"; + @Value("${srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum:100}") + private int maxDistributionNum; + private JobExecutionService jobExecutionService; private DataImportPayloadContextBuilder payloadContextBuilder; private KafkaConfig kafkaConfig; - - @Value("${srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum:100}") - private int maxDistributionNum; - @Autowired private List errorPayloadBuilders; public RecordsPublishingServiceImpl(@Autowired JobExecutionService jobExecutionService, @Autowired DataImportPayloadContextBuilder payloadContextBuilder, - @Autowired KafkaConfig kafkaConfig) { + @Autowired KafkaConfig kafkaConfig, + @Autowired List errorPayloadBuilders) { this.jobExecutionService = jobExecutionService; this.payloadContextBuilder = payloadContextBuilder; this.kafkaConfig = kafkaConfig; + this.errorPayloadBuilders = errorPayloadBuilders; } @Override @@ -82,15 +83,17 @@ private Future sendRecords(List createdRecords, JobExecution jo for (Record record : createdRecords) { String key = String.valueOf(indexer.incrementAndGet() % maxDistributionNum); try { - if (isParsedContentExists(record)) { + if (record.getRecordType() != null && isParsedContentExists(record)) { DataImportEventPayload payload = prepareEventPayload(record, profileSnapshotWrapper, params, eventType); params.getHeaders().set(RECORD_ID_HEADER, record.getId()); params.getHeaders().set(USER_ID_HEADER, jobExecution.getUserId()); futures.add(sendEventToKafka(params.getTenantId(), Json.encode(payload), eventType, KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()), kafkaConfig, key)); - } - else { - futures.add(sendDiErrorEvent(new RawChunkRecordsParsingException(record.getErrorRecord().getDescription()), + } else { + String cause = record.getErrorRecord() == null + ? format("Cannot send event for individual record with recordType: %s", record.getRecordType()) + : record.getErrorRecord().getDescription(); + futures.add(sendDiErrorEvent(new RawChunkRecordsParsingException(cause), params, jobExecution.getId(), params.getTenantId(), record)); } } catch (Exception e) { @@ -151,11 +154,8 @@ private DataImportEventPayload prepareEventPayload(Record record, ProfileSnapsho .withToken(params.getToken()); } - public Future sendDiErrorEvent(Throwable throwable, - OkapiConnectionParams okapiParams, - String jobExecutionId, - String tenantId, - Record currentRecord) { + public Future sendDiErrorEvent(Throwable throwable, OkapiConnectionParams okapiParams, String jobExecutionId, + String tenantId, Record currentRecord) { okapiParams.getHeaders().set(RECORD_ID_HEADER, currentRecord.getId()); for (DiErrorPayloadBuilder payloadBuilder: errorPayloadBuilders) { if (payloadBuilder.isEligible(currentRecord.getRecordType())) { @@ -166,6 +166,25 @@ public Future sendDiErrorEvent(Throwable throwable, } } LOGGER.warn("sendDiErrorEvent:: Appropriate DI_ERROR payload builder not found, DI_ERROR without records info will be send"); - return Future.failedFuture(throwable); + sendDiError(throwable, jobExecutionId, okapiParams, currentRecord); + return Future.succeededFuture(true); + } + + private void sendDiError(Throwable throwable, String jobExecutionId, OkapiConnectionParams okapiParams, Record record) { + HashMap context = new HashMap<>(); + context.put(ERROR_KEY, throwable.getMessage()); + if (record != null && record.getRecordType() != null) { + context.put(RecordConversionUtil.getEntityType(record).value(), Json.encode(record)); + } + + DataImportEventPayload payload = new DataImportEventPayload() + .withEventType(DI_ERROR.value()) + .withJobExecutionId(jobExecutionId) + .withOkapiUrl(okapiParams.getOkapiUrl()) + .withTenant(okapiParams.getTenantId()) + .withToken(okapiParams.getToken()) + .withContext(context); + EventHandlingUtil.sendEventToKafka(okapiParams.getTenantId(), Json.encode(payload), DI_ERROR.value(), + KafkaHeaderUtils.kafkaHeadersFromMultiMap(okapiParams.getHeaders()), kafkaConfig, null); } } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/StoredRecordChunksKafkaHandler.java b/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/StoredRecordChunksKafkaHandler.java index 767447cc0..308b53ac3 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/StoredRecordChunksKafkaHandler.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/verticle/consumers/StoredRecordChunksKafkaHandler.java @@ -35,7 +35,6 @@ import org.springframework.stereotype.Component; import javax.ws.rs.NotFoundException; -import java.util.Arrays; import java.util.Date; import java.util.LinkedHashMap; import java.util.List; @@ -45,10 +44,10 @@ import static java.lang.String.format; import static org.apache.commons.lang3.ObjectUtils.allNotNull; -import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_EDIFACT_RECORD_CREATED; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_BIB_FOR_ORDER_CREATED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED; -import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_CREATED; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED; import static org.folio.rest.jaxrs.model.JournalRecord.ActionStatus.COMPLETED; import static org.folio.rest.jaxrs.model.JournalRecord.ActionType.CREATE; @@ -70,10 +69,10 @@ public class StoredRecordChunksKafkaHandler implements AsyncRecordHandler RECORD_TYPE_TO_EVENT_TYPE = Map.of( - MARC_BIB, DI_SRS_MARC_BIB_RECORD_CREATED, + MARC_BIB, DI_INCOMING_MARC_BIB_RECORD_PARSED, MARC_AUTHORITY, DI_SRS_MARC_AUTHORITY_RECORD_CREATED, MARC_HOLDING, DI_SRS_MARC_HOLDING_RECORD_CREATED, - EDIFACT, DI_EDIFACT_RECORD_CREATED + EDIFACT, DI_INCOMING_EDIFACT_RECORD_PARSED ); private RecordsPublishingService recordsPublishingService; @@ -124,7 +123,7 @@ public Future handle(KafkaConsumerRecord record) { // we only know record type by inspecting the records, assuming records are homogeneous type and defaulting to previous static value DataImportEventTypes eventType = !storedRecords.isEmpty() && RECORD_TYPE_TO_EVENT_TYPE.containsKey(storedRecords.get(0).getRecordType()) ? RECORD_TYPE_TO_EVENT_TYPE.get(storedRecords.get(0).getRecordType()) - : DI_SRS_MARC_BIB_RECORD_CREATED; + : DI_INCOMING_MARC_BIB_RECORD_PARSED; LOGGER.debug("handle:: RecordsBatchResponse has been received, starting processing chunkId: {} chunkNumber: {} jobExecutionId: {}", chunkId, chunkNumber, jobExecutionId); saveCreatedRecordsInfoToDataImportLog(storedRecords, okapiConnectionParams.getTenantId()); diff --git a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerAPITest.java b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerAPITest.java index 77be0ad6b..26379893a 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerAPITest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/changeManager/ChangeManagerAPITest.java @@ -74,6 +74,8 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_RAW_RECORDS_CHUNK_PARSED; import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE; @@ -91,6 +93,10 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; /** * REST tests for ChangeManager to manager JobExecution entities initialization @@ -1760,21 +1766,17 @@ private void fillInRecordOrderIfAtLeastOneRecordHasNoOrder(String rawRecord) thr .then() .statusCode(HttpStatus.SC_NO_CONTENT); - String topicToObserve = formatToKafkaTopicName(DI_RAW_RECORDS_CHUNK_PARSED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) + String topicToObserve = formatToKafkaTopicName(DI_INCOMING_MARC_BIB_RECORD_PARSED.value()); + List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 3) .observeFor(30, TimeUnit.SECONDS) .build()); - Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); - assertEquals(DI_RAW_RECORDS_CHUNK_PARSED.value(), obtainedEvent.getEventType()); - - RecordCollection processedRecords = Json - .decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); - assertEquals(3, processedRecords.getRecords().size()); - - assertEquals(4, processedRecords.getRecords().get(0).getOrder().intValue()); - assertEquals(5, processedRecords.getRecords().get(1).getOrder().intValue()); - assertEquals(6, processedRecords.getRecords().get(2).getOrder().intValue()); + Event obtainedEvent = Json.decodeValue(observedValues.get(2), Event.class); + assertEquals(DI_INCOMING_MARC_BIB_RECORD_PARSED.value(), obtainedEvent.getEventType()); + DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); + assertNotNull(eventPayload.getContext()); + JsonObject record = new JsonObject(eventPayload.getContext().get("MARC_BIBLIOGRAPHIC")); + assertNotEquals(0, record.getInteger("order").intValue()); } @Test @@ -2139,19 +2141,21 @@ public void shouldHaveErrorRecordIf999ffsFieldExistsAndCreateInstanceActionProfi .statusCode(HttpStatus.SC_NO_CONTENT); async.complete(); - String topicToObserve = formatToKafkaTopicName(DI_RAW_RECORDS_CHUNK_PARSED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) + String topicToObserve = formatToKafkaTopicName(DI_ERROR.value()); + List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 2) .observeFor(30, TimeUnit.SECONDS) .build()); - Event obtainedEvent = Json.decodeValue(observedValues.get(5), Event.class); - assertEquals(DI_RAW_RECORDS_CHUNK_PARSED.value(), obtainedEvent.getEventType()); - RecordCollection recordCollection = Json - .decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); - Assert.assertNull(recordCollection.getRecords().get(0).getMatchedId()); - Assert.assertNotNull(recordCollection.getRecords().get(0).getErrorRecord()); - Assert.assertEquals("{\"error\":\"A new Instance was not created because the incoming record already contained a 999ff$s or 999ff$i field\"}", - recordCollection.getRecords().get(0).getErrorRecord().getDescription()); + Event obtainedEvent = Json.decodeValue(observedValues.get(1), Event.class); + assertEquals(DI_ERROR.value(), obtainedEvent.getEventType()); + DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); + + assertNotNull(eventPayload.getContext()); + JsonObject record = new JsonObject(eventPayload.getContext().get("MARC_BIBLIOGRAPHIC")); + assertNull(record.getString("matchedId")); + assertFalse(record.getJsonObject("errorRecord").isEmpty()); + assertEquals("A new Instance was not created because the incoming record already contained a 999ff$s or 999ff$i field", + new JsonObject(eventPayload.getContext().get("ERROR")).getString("error")); } @Test @@ -2195,15 +2199,16 @@ public void shouldHaveErrorRecordIsNullIf999ffsFieldExistsAndCreateInstanceActio .statusCode(HttpStatus.SC_NO_CONTENT); async.complete(); - String topicToObserve = formatToKafkaTopicName(DI_RAW_RECORDS_CHUNK_PARSED.value()); + String topicToObserve = formatToKafkaTopicName(DI_INCOMING_MARC_BIB_RECORD_PARSED.value()); List observedValues = kafkaCluster.observeValues( - ObserveKeyValues.on(topicToObserve, 1).observeFor(30, TimeUnit.SECONDS).build()); + ObserveKeyValues.on(topicToObserve, 57).observeFor(30, TimeUnit.SECONDS).build()); - Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); - assertEquals(DI_RAW_RECORDS_CHUNK_PARSED.value(), obtainedEvent.getEventType()); - RecordCollection recordCollection = Json.decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); - Assert.assertNull(recordCollection.getRecords().get(0).getMatchedId()); - Assert.assertNull(recordCollection.getRecords().get(0).getErrorRecord()); + Event obtainedEvent = Json.decodeValue(observedValues.get(56), Event.class); + assertEquals(DI_INCOMING_MARC_BIB_RECORD_PARSED.value(), obtainedEvent.getEventType()); + DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); + JsonObject record = new JsonObject(eventPayload.getContext().get("MARC_BIBLIOGRAPHIC")); + assertNull(record.getString("matchedId")); + assertNull(record.getJsonObject("errorRecord")); } @Test @@ -2214,8 +2219,7 @@ public void shouldHaveErrorRecordIf999ffsFieldExistsAndCreateMarcAuthorityAction assertThat(createdJobExecutions.size(), is(1)); JobExecution jobExec = createdJobExecutions.get(0); - - WireMock.stubFor(WireMock.get("/data-import-profiles/jobProfiles/"+ DEFAULT_MARC_AUTHORITY_JOB_PROFILE_ID +"?withRelations=false&") + WireMock.stubFor(WireMock.get("/data-import-profiles/jobProfiles/" + DEFAULT_MARC_AUTHORITY_JOB_PROFILE_ID + "?withRelations=false&") .willReturn(WireMock.ok().withBody(Json.encode(new JobProfile().withId(DEFAULT_MARC_AUTHORITY_JOB_PROFILE_ID).withName("Default - Create SRS MARC Authority"))))); WireMock.stubFor(post(RECORDS_SERVICE_URL) @@ -2249,17 +2253,17 @@ public void shouldHaveErrorRecordIf999ffsFieldExistsAndCreateMarcAuthorityAction async.complete(); String topicToObserve = formatToKafkaTopicName(DI_RAW_RECORDS_CHUNK_PARSED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 3) + List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) .observeFor(30, TimeUnit.SECONDS) .build()); - Event obtainedEvent = Json.decodeValue(observedValues.get(3), Event.class); + Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); assertEquals(DI_RAW_RECORDS_CHUNK_PARSED.value(), obtainedEvent.getEventType()); - RecordCollection recordCollection = Json - .decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); - Assert.assertNull(recordCollection.getRecords().get(0).getMatchedId()); - Assert.assertNotNull(recordCollection.getRecords().get(0).getErrorRecord()); - Assert.assertEquals( "{\"error\":\"A new MARC-Authority was not created because the incoming record already contained a 999ff$s or 999ff$i field\"}", recordCollection.getRecords().get(0).getErrorRecord().getDescription()); + RecordCollection recordCollection = Json.decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); + assertNull(recordCollection.getRecords().get(0).getMatchedId()); + assertNotNull(recordCollection.getRecords().get(0).getErrorRecord()); + assertEquals("{\"error\":\"A new MARC-Authority was not created because the incoming record already contained a 999ff$s or 999ff$i field\"}", + recordCollection.getRecords().get(0).getErrorRecord().getDescription()); } @Test @@ -2296,16 +2300,15 @@ public void shouldSetErrorToRecordWithInvalidLeaderLine(TestContext testContext) .statusCode(HttpStatus.SC_NO_CONTENT); async.complete(); - String topicToObserve = formatToKafkaTopicName(DI_RAW_RECORDS_CHUNK_PARSED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 3) - .observeFor(30, TimeUnit.SECONDS) - .build()); + String topicToObserve = formatToKafkaTopicName(DI_ERROR.value()); + List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 1) + .observeFor(30, TimeUnit.SECONDS).build()); - Event obtainedEvent = Json.decodeValue(observedValues.get(2), Event.class); - assertEquals(DI_RAW_RECORDS_CHUNK_PARSED.value(), obtainedEvent.getEventType()); - RecordCollection recordCollection = Json.decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); - assertEquals(1, recordCollection.getRecords().size()); - MatcherAssert.assertThat(recordCollection.getRecords().get(0).getErrorRecord().getDescription(), containsString("Error during analyze leader line for determining record type")); + Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); + assertEquals(DI_ERROR.value(), obtainedEvent.getEventType()); + DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); + MatcherAssert.assertThat(new JsonObject(eventPayload.getContext().get("ERROR")).getString("message"), + containsString("Error during analyze leader line for determining record type for record with id")); } @Test @@ -2316,8 +2319,7 @@ public void shouldHaveErrorRecordIf999ffsFieldExistsAndCreateMarcHoldingsActionP assertThat(createdJobExecutions.size(), is(1)); JobExecution jobExec = createdJobExecutions.get(0); - - WireMock.stubFor(WireMock.get("/data-import-profiles/jobProfiles/"+ DEFAULT_MARC_HOLDINGS_JOB_PROFILE_ID +"?withRelations=false&") + WireMock.stubFor(WireMock.get("/data-import-profiles/jobProfiles/" + DEFAULT_MARC_HOLDINGS_JOB_PROFILE_ID + "?withRelations=false&") .willReturn(WireMock.ok().withBody(Json.encode(new JobProfile().withId(DEFAULT_MARC_HOLDINGS_JOB_PROFILE_ID).withName("Default - Create Holdings and SRS MARC Holdings"))))); WireMock.stubFor(post(RECORDS_SERVICE_URL) @@ -2351,17 +2353,17 @@ public void shouldHaveErrorRecordIf999ffsFieldExistsAndCreateMarcHoldingsActionP async.complete(); String topicToObserve = formatToKafkaTopicName(DI_RAW_RECORDS_CHUNK_PARSED.value()); - List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 7) + List observedValues = kafkaCluster.observeValues(ObserveKeyValues.on(topicToObserve, 2) .observeFor(30, TimeUnit.SECONDS) .build()); - Event obtainedEvent = Json.decodeValue(observedValues.get(7), Event.class); + Event obtainedEvent = Json.decodeValue(observedValues.get(1), Event.class); assertEquals(DI_RAW_RECORDS_CHUNK_PARSED.value(), obtainedEvent.getEventType()); - RecordCollection recordCollection = Json - .decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); - Assert.assertNull(recordCollection.getRecords().get(0).getMatchedId()); - Assert.assertNotNull(recordCollection.getRecords().get(0).getErrorRecord()); - Assert.assertEquals( "{\"error\":\"A new MARC-Holding was not created because the incoming record already contained a 999ff$s or 999ff$i field\"}", recordCollection.getRecords().get(0).getErrorRecord().getDescription()); + RecordCollection recordCollection = Json.decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); + assertNull(recordCollection.getRecords().get(0).getMatchedId()); + assertNotNull(recordCollection.getRecords().get(0).getErrorRecord()); + assertEquals("{\"error\":\"A new MARC-Holding was not created because the incoming record already contained a 999ff$s or 999ff$i field\"}", + recordCollection.getRecords().get(0).getErrorRecord().getDescription()); } @Test diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java index 45ac39e52..15b79a327 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java @@ -6,6 +6,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.producer.KafkaHeader; import org.folio.MatchProfile; +import org.folio.TestUtil; import org.folio.dao.JobExecutionSourceChunkDao; import org.folio.dataimport.util.OkapiConnectionParams; import org.folio.dataimport.util.marc.MarcRecordAnalyzer; @@ -38,12 +39,14 @@ import org.mockito.junit.MockitoJUnitRunner; import org.springframework.test.util.ReflectionTestUtils; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Optional; import java.util.UUID; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED; import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE; import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.JOB_PROFILE; @@ -62,6 +65,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -79,6 +83,7 @@ public class ChangeEngineServiceImplTest { "01119cam a2200349Li 4500001001300000003000600013005001700019008004100036020001800077020001500095035002100110037002200131040002700153043001200180050002700192082001600219090002200235100003300257245002700290264003800317300002300355336002600378337002800404338002700432651006400459945004300523960006200566961001600628980003900644981002300683999006300706\u001Eocn922152790\u001EOCoLC\u001E20150927051630.4\u001E150713s2015 enk 000 f eng d\u001E \u001Fa9780241146064\u001E \u001Fa0241146062\u001E \u001Fa(OCoLC)922152790\u001E \u001Fa12370236\u001Fbybp\u001F5NU\u001E \u001FaYDXCP\u001Fbeng\u001Ferda\u001FcYDXCP\u001E \u001Fae-uk-en\u001E 4\u001FaPR6052.A6488\u001FbN66 2015\u001E04\u001Fa823.914\u001F223\u001E 4\u001Fa823.914\u001FcB2557\u001Fb1\u001E1 \u001FaBarker, Pat,\u001Fd1943-\u001Feauthor.\u001E10\u001FaNoonday /\u001FcPat Barker.\u001E 1\u001FaLondon :\u001FbHamish Hamilton,\u001Fc2015.\u001E \u001Fa258 pages ;\u001Fc24 cm\u001E \u001Fatext\u001Fbtxt\u001F2rdacontent\u001E \u001Faunmediated\u001Fbn\u001F2rdamedia\u001E \u001Favolume\u001Fbnc\u001F2rdacarrier\u001E 0\u001FaLondon (England)\u001FxHistory\u001FyBombardment, 1940-1941\u001FvFiction.\u001E \u001Ffh\u001Fg1\u001Fi0000000618391828\u001Flfhgen\u001Fr3\u001Fsv\u001Ft1\u001E \u001Fap\u001Fda\u001Fgh\u001Fim\u001Fjn\u001Fka\u001Fla\u001Fmo\u001Ftfhgen\u001Fo1\u001Fs15.57\u001Fu7ART\u001Fvukapf\u001FzGBP\u001E \u001FbGBP\u001Fm633761\u001E \u001Fa160128\u001Fb1899\u001Fd156\u001Fe1713\u001Ff654270\u001Fg1\u001E \u001Faukapf\u001Fb7ART\u001Fcfhgen\u001E \u001Fdm\u001Fea\u001Ffx\u001Fgeng\u001FiTesting with subfield i\u001FsAnd with subfield s\u001E\u001D"; private static final String MARC_BIB_REC_WITH_FF = "00861cam a2200193S1 45 0001000700000002000900007003000400016008004100020035002200061035001300083099001600096245005600112500011600168500019600284600003500480610003400515610003900549999007900588\u001E304162\u001E00320061\u001EPBL\u001E020613n 000 0 eng u\u001E \u001Fa(Sirsi)sc99900001\u001E \u001Fa(Sirsi)1\u001E \u001FaSC LVF M698\u001E00\u001FaMohler, Harold S. (Lehigh Collection Vertical File)\u001E \u001FaMaterial on this topic is contained in the Lehigh Collection Vertical File. See Special Collections for access.\u001E \u001FaContains press releases, versions of resumes, clippings, biographical information. L-in-Life program, and memorial service program -- Documents related Hershey Food Corporation. In two parts.\u001E10\u001FaMohler, Harold S.,\u001Fd1919-1988.\u001E20\u001FaLehigh University.\u001FbTrustees.\u001E20\u001FaLehigh University.\u001FbClass of 1948.\u001Eff\u001Fi29573076-a7ee-462a-8f9b-2659ab7df23c\u001Fs7ca42730-9ba6-4bc8-98d3-f068728504c9\u001E\u001D"; + private static final String RAW_EDIFACT_RECORD_PATH = "src/test/resources/records/edifact/565751us20210122.edi"; @Mock private JobExecutionSourceChunkDao jobExecutionSourceChunkDao; @@ -298,6 +303,8 @@ public void shouldReturnMarcBibRecord() { when(jobExecutionSourceChunkDao.getById(any(), any())) .thenReturn(Future.succeededFuture(Optional.of(new JobExecutionSourceChunk()))); when(jobExecutionSourceChunkDao.update(any(), any())).thenReturn(Future.succeededFuture(new JobExecutionSourceChunk())); + when(recordsPublishingService.sendEventsWithRecords(any(), any(), any(), any())) + .thenReturn(Future.succeededFuture()); Future> serviceFuture = executeWithKafkaMock(rawRecordsDto, jobExecution, Future.succeededFuture(true)); @@ -308,8 +315,34 @@ public void shouldReturnMarcBibRecord() { } @Test - public void shouldReturnMarcBibRecordWith999ByAcceptInstanceId() { + public void shouldReturnEdifactRecord() throws IOException { + RawRecordsDto rawRecordsDto = new RawRecordsDto() + .withId(UUID.randomUUID().toString()) + .withRecordsMetadata(new RecordsMetadata().withContentType(RecordsMetadata.ContentType.EDIFACT_RAW)) + .withInitialRecords(Collections.singletonList(new InitialRecord().withRecord(TestUtil.readFileFromPath(RAW_EDIFACT_RECORD_PATH)))); + JobExecution jobExecution = new JobExecution() + .withId(UUID.randomUUID().toString()) + .withUserId(UUID.randomUUID().toString()) + .withJobProfileSnapshotWrapper(new ProfileSnapshotWrapper()) + .withJobProfileInfo(new JobProfileInfo().withId(UUID.randomUUID().toString()) + .withName("test").withDataType(JobProfileInfo.DataType.EDIFACT)); + + when(jobExecutionSourceChunkDao.getById(any(), any())) + .thenReturn(Future.succeededFuture(Optional.of(new JobExecutionSourceChunk()))); + when(jobExecutionSourceChunkDao.update(any(), any())).thenReturn(Future.succeededFuture(new JobExecutionSourceChunk())); + when(recordsPublishingService.sendEventsWithRecords(any(), any(), any(), any())) + .thenReturn(Future.succeededFuture()); + + Future> serviceFuture = executeWithKafkaMock(rawRecordsDto, jobExecution, Future.succeededFuture(true)); + var actual = serviceFuture.result(); + assertThat(actual, hasSize(1)); + assertThat(actual.get(0).getRecordType(), equalTo(Record.RecordType.EDIFACT)); + assertThat(actual.get(0).getErrorRecord(), nullValue()); + } + + @Test + public void shouldReturnMarcBibRecordWith999ByAcceptInstanceId() { RawRecordsDto rawRecordsDto = getTestRawRecordsDto(MARC_BIB_REC_WITH_FF); JobExecution jobExecution = new JobExecution() .withId(UUID.randomUUID().toString()) @@ -318,15 +351,15 @@ public void shouldReturnMarcBibRecordWith999ByAcceptInstanceId() { .withJobProfileInfo(new JobProfileInfo().withId(UUID.randomUUID().toString()) .withName("test").withDataType(JobProfileInfo.DataType.MARC)); - boolean acceptInstanceId = true; - when(marcRecordAnalyzer.process(any())).thenReturn(MarcRecordType.BIB); when(jobExecutionSourceChunkDao.getById(any(), any())) .thenReturn(Future.succeededFuture(Optional.of(new JobExecutionSourceChunk()))); when(jobExecutionSourceChunkDao.update(any(), any())).thenReturn(Future.succeededFuture(new JobExecutionSourceChunk())); + when(recordsPublishingService.sendEventsWithRecords(any(), any(), any(), any())) + .thenReturn(Future.succeededFuture()); Future> serviceFuture = - executeWithKafkaMock(rawRecordsDto, jobExecution, Future.succeededFuture(true), acceptInstanceId); + executeWithKafkaMock(rawRecordsDto, jobExecution, Future.succeededFuture(true), true); var actual = serviceFuture.result(); assertThat(actual, hasSize(1)); @@ -345,6 +378,8 @@ public void shouldReturnMarcBibRecordWithIds() { when(jobExecutionSourceChunkDao.getById(any(), any())) .thenReturn(Future.succeededFuture(Optional.of(new JobExecutionSourceChunk()))); when(jobExecutionSourceChunkDao.update(any(), any())).thenReturn(Future.succeededFuture(new JobExecutionSourceChunk())); + when(recordsPublishingService.sendEventsWithRecords(any(), any(), any(), any())) + .thenReturn(Future.succeededFuture()); Future> serviceFuture = executeWithKafkaMock(rawRecordsDto, jobExecution, Future.succeededFuture(true)); @@ -419,7 +454,6 @@ public void shouldNotUpdateIfRecordTypeIsNotMarcBib() { .thenReturn(Future.succeededFuture(Optional.of(new JobExecutionSourceChunk()))); when(jobExecutionSourceChunkDao.update(any(), any())).thenReturn(Future.succeededFuture(new JobExecutionSourceChunk())); - try (var mockedStatic = Mockito.mockStatic(EventHandlingUtil.class)) { mockedStatic.when(() -> EventHandlingUtil.sendEventToKafka(any(), any(), any(), kafkaHeadersCaptor.capture(), any(), any())) .thenReturn(Future.succeededFuture(true)); @@ -449,7 +483,7 @@ public void shouldNotUpdateIfCreateInstanceActionExist() { service.parseRawRecordsChunkForJobExecution(rawRecordsDto, jobExecution, "1", false, okapiConnectionParams).result(); } - verify(recordsPublishingService, never()).sendEventsWithRecords(any(), any(), any(), any()); + verify(recordsPublishingService, times(1)).sendEventsWithRecords(any(), any(), any(), eq(DI_INCOMING_MARC_BIB_RECORD_PARSED.value())); } @Test @@ -548,7 +582,6 @@ private void mockServicesForParseRawRecordsChunkForJobExecution() { when(jobExecutionSourceChunkDao.getById(any(), any())) .thenReturn(Future.succeededFuture(Optional.of(new JobExecutionSourceChunk()))); when(jobExecutionSourceChunkDao.update(any(), any())).thenReturn(Future.succeededFuture(new JobExecutionSourceChunk())); - } ProfileSnapshotWrapper constructCreateInstanceSnapshotWrapper() { @@ -565,7 +598,7 @@ ProfileSnapshotWrapper constructCreateInstanceSnapshotWrapper() { .withAction(ActionProfile.Action.CREATE) .withFolioRecord(ActionProfile.FolioRecord.INSTANCE))).getMap()) )); - }; + } private ProfileSnapshotWrapper constructCreateMarcHoldingsAndInstanceSnapshotWrapper() { return new ProfileSnapshotWrapper() diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java index 899adc9d8..8ea0deae5 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java @@ -9,6 +9,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static java.util.Collections.emptyList; import static org.folio.dataimport.util.RestUtil.OKAPI_URL_HEADER; import static org.folio.rest.jaxrs.model.StatusDto.Status.PARSING_IN_PROGRESS; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER; @@ -141,8 +142,6 @@ public class EventDrivenChunkProcessingServiceImplTest extends AbstractRestTest @InjectMocks private MappingParamsSnapshotDaoImpl mappingParamsSnapshotDao; @Spy - private RecordsPublishingService recordsPublishingService; - @Spy @InjectMocks private FieldModificationServiceImpl fieldModificationService; @@ -190,12 +189,15 @@ public void setUp() throws IOException { mappingParametersProvider = when(mock(MappingParametersProvider.class).get(anyString(), any(OkapiConnectionParams.class))).thenReturn(Future.succeededFuture(new MappingParameters())).getMock(); mappingMetadataService = new MappingMetadataServiceImpl(mappingParametersProvider, mappingRuleService, mappingRulesSnapshotDao, mappingParamsSnapshotDao); - JobProfileSnapshotValidationServiceImpl jobProfileSnapshotValidationService = new JobProfileSnapshotValidationServiceImpl(); + + RecordsPublishingService recordsPublishingService = new RecordsPublishingServiceImpl(jobExecutionService, + new DataImportPayloadContextBuilderImpl(marcRecordAnalyzer), kafkaConfig, emptyList()); changeEngineService = new ChangeEngineServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, marcRecordAnalyzer, - hrIdFieldService, recordsPublishingService, mappingMetadataService, jobProfileSnapshotValidationService, kafkaConfig, + hrIdFieldService, recordsPublishingService, mappingMetadataService, new JobProfileSnapshotValidationServiceImpl(), kafkaConfig, fieldModificationService, incomingRecordService, journalRecordService); ReflectionTestUtils.setField(changeEngineService, "maxDistributionNum", 10); ReflectionTestUtils.setField(changeEngineService, "batchSize", 100); + ReflectionTestUtils.setField(recordsPublishingService, "maxDistributionNum", 100); chunkProcessingService = new EventDrivenChunkProcessingServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, changeEngineService, jobExecutionProgressService); HashMap headers = new HashMap<>(); diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java index 29f414631..9df4c1c83 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java @@ -6,6 +6,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor; import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static java.util.Collections.emptyList; import static org.folio.dataimport.util.RestUtil.OKAPI_URL_HEADER; import static org.folio.rest.jaxrs.model.JobExecution.Status.COMMITTED; import static org.folio.rest.jaxrs.model.JobExecution.Status.ERROR; @@ -146,9 +147,6 @@ public class RecordProcessedEventHandlingServiceImplTest extends AbstractRestTes @Spy @InjectMocks private IncomingRecordDaoImpl incomingRecordDao; - - @Spy - RecordsPublishingService recordsPublishingService; private ChunkProcessingService chunkProcessingService; private RecordProcessedEventHandlingServiceImpl recordProcessedEventHandlingService; private OkapiConnectionParams params; @@ -191,11 +189,14 @@ public void setUp() throws IOException { mappingParametersProvider = when(mock(MappingParametersProvider.class).get(anyString(), any(OkapiConnectionParams.class))).thenReturn(Future.succeededFuture(new MappingParameters())).getMock(); MappingMetadataService mappingMetadataService = new MappingMetadataServiceImpl(mappingParametersProvider, mappingRuleService, mappingRulesSnapshotDao, mappingParamsSnapshotDao); JobProfileSnapshotValidationServiceImpl jobProfileSnapshotValidationService = new JobProfileSnapshotValidationServiceImpl(); + RecordsPublishingService recordsPublishingService = new RecordsPublishingServiceImpl(jobExecutionService, + new DataImportPayloadContextBuilderImpl(marcRecordAnalyzer), kafkaConfig, emptyList()); ChangeEngineService changeEngineService = new ChangeEngineServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, marcRecordAnalyzer, hrIdFieldService, recordsPublishingService, mappingMetadataService, jobProfileSnapshotValidationService, kafkaConfig, fieldModificationService, incomingRecordService, journalRecordService); ReflectionTestUtils.setField(changeEngineService, "maxDistributionNum", 10); ReflectionTestUtils.setField(changeEngineService, "batchSize", 100); + ReflectionTestUtils.setField(recordsPublishingService, "maxDistributionNum", 100); chunkProcessingService = new EventDrivenChunkProcessingServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, changeEngineService, jobExecutionProgressService); recordProcessedEventHandlingService = new RecordProcessedEventHandlingServiceImpl(jobExecutionProgressService, jobExecutionService); HashMap headers = new HashMap<>(); diff --git a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/DataImportJournalConsumerVerticleTest.java b/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/DataImportJournalConsumerVerticleTest.java index 73028baba..921f1cbec 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/DataImportJournalConsumerVerticleTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/DataImportJournalConsumerVerticleTest.java @@ -9,7 +9,6 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.internals.RecordHeader; import org.folio.ActionProfile; import org.folio.DataImportEventPayload; import org.folio.dao.JobExecutionDaoImpl; @@ -30,11 +29,9 @@ import org.junit.Test; import org.junit.runner.RunWith; -import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.UUID; -import java.util.concurrent.ExecutionException; import static org.folio.kafka.KafkaTopicNameHelper.getDefaultNameSpace; import static org.folio.rest.jaxrs.model.DataImportEventTypes.*; @@ -127,7 +124,7 @@ public void testJournalInventoryInstanceCreatedAction(TestContext context) { } @Test - public void testJournalMarcBibRecordUpdatedAction(TestContext context) throws IOException { + public void testJournalMarcBibRecordUpdatedAction(TestContext context) { Async async = context.async(); // given @@ -155,7 +152,7 @@ public void testJournalMarcBibRecordUpdatedAction(TestContext context) throws IO } @Test - public void testJournalMarcHoldingsRecordCreatedAction(TestContext context) throws IOException { + public void testJournalMarcHoldingsRecordCreatedAction(TestContext context) { Async async = context.async(); // given @@ -183,7 +180,7 @@ public void testJournalMarcHoldingsRecordCreatedAction(TestContext context) thro } @Test - public void testJournalCompletedAction(TestContext context) throws IOException, ExecutionException, InterruptedException { + public void testJournalCompletedAction(TestContext context) { Async async = context.async(); // given @@ -222,7 +219,7 @@ public void testJournalCompletedAction(TestContext context) throws IOException, } @Test - public void testJournalErrorAction(TestContext context) throws IOException, ExecutionException, InterruptedException { + public void testJournalErrorAction(TestContext context) { Async async = context.async(); // given @@ -244,7 +241,7 @@ public void testJournalErrorAction(TestContext context) throws IOException, Exec .withId(UUID.randomUUID().toString()) .withContentType(ACTION_PROFILE) .withContent(JsonObject.mapFrom(new ActionProfile().withFolioRecord(ActionProfile.FolioRecord.HOLDINGS)))) - .withEventsChain(List.of(DI_SRS_MARC_BIB_RECORD_CREATED.value(), DI_INVENTORY_HOLDING_CREATED.value())); + .withEventsChain(List.of(DI_INCOMING_MARC_BIB_RECORD_PARSED.value(), DI_INVENTORY_HOLDING_CREATED.value())); // when KafkaConsumerRecord kafkaConsumerRecord = buildKafkaConsumerRecord(eventPayload); @@ -260,7 +257,7 @@ public void testJournalErrorAction(TestContext context) throws IOException, Exec } @Test - public void testJournalRecordMappingError(TestContext context) throws IOException, ExecutionException, InterruptedException { + public void testJournalRecordMappingError(TestContext context) { Async async = context.async(); // given diff --git a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/RawMarcChunkConsumersVerticleTest.java b/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/RawMarcChunkConsumersVerticleTest.java index 3c9e2dca2..ec77224a8 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/RawMarcChunkConsumersVerticleTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/RawMarcChunkConsumersVerticleTest.java @@ -22,7 +22,6 @@ import org.folio.rest.jaxrs.model.DataImportEventPayload; import org.folio.rest.jaxrs.model.DataImportEventTypes; import org.folio.rest.jaxrs.model.EntityType; -import org.folio.rest.jaxrs.model.ErrorRecord; import org.folio.rest.jaxrs.model.Event; import org.folio.rest.jaxrs.model.InitJobExecutionsRsDto; import org.folio.rest.jaxrs.model.InitialRecord; @@ -34,7 +33,6 @@ import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper; import org.folio.rest.jaxrs.model.RawRecordsDto; import org.folio.rest.jaxrs.model.Record; -import org.folio.rest.jaxrs.model.RecordCollection; import org.folio.rest.jaxrs.model.RecordsMetadata; import org.folio.verticle.consumers.errorhandlers.RawMarcChunksErrorHandler; import org.junit.Before; @@ -67,7 +65,7 @@ public class RawMarcChunkConsumersVerticleTest extends AbstractRestTest { private static final String RAW_RECORD_WITH_999_ff_field = "00948nam a2200241 a 4500001000800000003000400008005001700012008004100029035002100070035002000091040002300111041001300134100002300147245007900170260005800249300002400307440007100331650003600402650005500438650006900493655006500562999007900627\u001E1007048\u001EICU\u001E19950912000000.0\u001E891218s1983 wyu d 00010 eng d\u001E \u001Fa(ICU)BID12424550\u001E \u001Fa(OCoLC)16105467\u001E \u001FaPAU\u001FcPAU\u001Fdm/c\u001FdICU\u001E0 \u001Faeng\u001Faarp\u001E1 \u001FaSalzmann, Zdeněk\u001E10\u001FaDictionary of contemporary Arapaho usage /\u001Fccompiled by Zdeněk Salzmann.\u001E0 \u001FaWind River, Wyoming :\u001FbWind River Reservation,\u001Fc1983.\u001E \u001Fav, 231 p. ;\u001Fc28 cm.\u001E 0\u001FaArapaho language and culture instructional materials series\u001Fvno. 4\u001E 0\u001FaArapaho language\u001FxDictionaries.\u001E 0\u001FaIndians of North America\u001FxLanguages\u001FxDictionaries.\u001E 7\u001FaArapaho language.\u001F2fast\u001F0http://id.worldcat.org/fast/fst00812722\u001E 7\u001FaDictionaries.\u001F2fast\u001F0http://id.worldcat.org/fast/fst01423826\u001Eff\u001Fie27a5374-0857-462e-ac84-fb4795229c7a\u001Fse27a5374-0857-462e-ac84-fb4795229c7a\u001E\u001D"; - + private static final String CORRECT_RAW_RECORD = "01240cas a2200397 450000100070000000500170000700800410002401000170006502200140008203500260009603500220012203500110014403500190015504000440017405000150021808200110023322200420024424500430028626000470032926500380037630000150041431000220042932100250045136200230047657000290049965000330052865000450056165500420060670000450064885300180069386300230071190200160073490500210075094800370077195000340080836683220141106221425.0750907c19509999enkqr p 0 a0eng d a 58020553  a0022-0469 a(CStRLIN)NYCX1604275S a(NIC)notisABP6388 a366832 a(OCoLC)1604275 dCtYdMBTIdCtYdMBTIdNICdCStRLINdNIC0 aBR140b.J6 a270.0504aThe Journal of ecclesiastical history04aThe Journal of ecclesiastical history. aLondon,bCambridge University Press [etc.] a32 East 57th St., New York, 10022 av.b25 cm. aQuarterly,b1970- aSemiannual,b1950-690 av. 1- Apr. 1950- aEditor: C. W. Dugmore. 0aChurch historyxPeriodicals. 7aChurch history2fast0(OCoLC)fst00860740 7aPeriodicals2fast0(OCoLC)fst014116411 aDugmore, C. W.q(Clifford William),eed.0381av.i(year)4081a1-49i1950-1998 apfndbLintz a19890510120000.02 a20141106bmdbatcheltsxaddfast lOLINaBR140b.J86h01/01/01 N01542ccm a2200361 "; private static final String INVALID_RECORD = "00557nam a22002053i 4500001001200000005001700012008004100029020001800070040002100088041000800109100001900117245004400136250001200180264001800192336002600210337002800236338002700264700001900291999004100310\u001E00000010150\u001E20230724074007.2\u001E230724|2020|||||||||||| |||||und||\u001E \u001Fa9788408232421\u001E\\\\\u001FaCC-ClU\u001Fbspa\u001Ferda\u001E\\\\\u001Faspa\u001E1 \u001FaChicot, Marcos\u001E00\u001FaEl asesinato de Platón / Chicot Marcos\u001E \u001FaPrimera\u001E 1\u001FbPlaneta\u001Fc2020\u001E \u001Fatext\u001Fbtxt\u001F2rdacontent\u001E \u001Faunmediated\u001Fbn\u001F2rdamedia\u001E \u001Favolume\u001Fbnc\u001F2rdacarrier\u001E1 \u001FaChicot, Marcos\u001Eff\u001Fi7e1ea9dd-f65d-4758-a738-fa1d61365267\u001E\u001D"; private static final String RAW_EDIFACT_RECORD_PATH = "src/test/resources/records/edifact/565751us20210122.edi"; private static final String JOB_PROFILE_PATH = "/jobProfile"; @@ -162,11 +160,11 @@ public void shouldNotFillInInstanceIdAndInstanceHridWhenRecordContains999FieldWi kafkaCluster.send(request); // then - Event obtainedEvent = checkEventWithTypeSent(DI_RAW_RECORDS_CHUNK_PARSED); - RecordCollection recordCollection = Json.decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); - assertEquals(1, recordCollection.getRecords().size()); - Record record = recordCollection.getRecords().get(0); - assertNull(record.getExternalIdsHolder()); + Event obtainedEvent = checkEventWithTypeSent(DI_ERROR); + DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); + assertEquals("A new Instance was not created because the incoming record already contained a 999ff$s or 999ff$i field", + new JsonObject(eventPayload.getContext().get("ERROR")).getString("error")); + assertNull(new JsonObject(eventPayload.getContext().get("MARC_BIBLIOGRAPHIC")).getString("externalIdsHolder")); } @Test @@ -182,11 +180,10 @@ public void shouldParseAndPublishChunkWithEdifactRecord() throws InterruptedExce kafkaCluster.send(request); // then - Event obtainedEvent = checkEventWithTypeSent(DI_RAW_RECORDS_CHUNK_PARSED); - RecordCollection recordCollection = Json.decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); - assertEquals(1, recordCollection.getRecords().size()); - Record record = recordCollection.getRecords().get(0); - assertEquals(EDIFACT, record.getRecordType()); + Event obtainedEvent = checkEventWithTypeSent(DI_INCOMING_EDIFACT_RECORD_PARSED); + DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); + JsonObject record = new JsonObject(eventPayload.getContext().get("EDIFACT_INVOICE")); + assertEquals(EDIFACT, Record.RecordType.valueOf(record.getString("recordType"))); } @Test @@ -212,11 +209,10 @@ public void shouldCreateErrorRecordsWhenRecordNotParsed() throws InterruptedExce kafkaCluster.send(request); // then - Event obtainedEvent = checkEventWithTypeSent(DI_RAW_RECORDS_CHUNK_PARSED); - RecordCollection recordCollection = Json.decodeValue(obtainedEvent.getEventPayload(), RecordCollection.class); - assertEquals(1, recordCollection.getRecords().size()); - ErrorRecord errorRecord = recordCollection.getRecords().get(0).getErrorRecord(); - assertTrue(errorRecord.getDescription().contains("org.marc4j.MarcException")); + Event obtainedEvent = checkEventWithTypeSent(DI_ERROR); + DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); + JsonObject error = new JsonObject(eventPayload.getContext().get("ERROR")); + assertTrue(error.getString("errors").contains("org.marc4j.MarcException")); } @Test @@ -281,7 +277,7 @@ public void shouldNotObserveValuesWhenJobExecutionIdNotCreated() throws Interrup @Test public void shouldNotSendAnyEventsForDuplicates() throws InterruptedException { // given - RawRecordsDto chunk = getChunk(RecordsMetadata.ContentType.MARC_RAW, RAW_RECORD_WITH_999_ff_field); + RawRecordsDto chunk = getChunk(RecordsMetadata.ContentType.MARC_RAW, CORRECT_RAW_RECORD); JobExecutionSourceChunkDao jobExecutionSourceChunkDao = getBeanFromSpringContext(vertx, org.folio.dao.JobExecutionSourceChunkDao.class); jobExecutionSourceChunkDao.save(new JobExecutionSourceChunk() .withId(chunk.getId()) @@ -301,7 +297,7 @@ public void shouldNotSendAnyEventsForDuplicates() throws InterruptedException { @Test public void shouldNotSendDIErrorWhenReceiveDuplicateEvent() throws InterruptedException { // given - RawRecordsDto chunk = getChunk(RecordsMetadata.ContentType.MARC_RAW, RAW_RECORD_WITH_999_ff_field); + RawRecordsDto chunk = getChunk(RecordsMetadata.ContentType.MARC_RAW, CORRECT_RAW_RECORD); SendKeyValues request = prepareWithSpecifiedEventPayload(JobProfileInfo.DataType.MARC, Json.encode(chunk)); String jobExecutionId = getJobExecutionId(request); @@ -310,7 +306,7 @@ public void shouldNotSendDIErrorWhenReceiveDuplicateEvent() throws InterruptedEx kafkaCluster.send(request); // then - checkEventWithTypeSent(DI_RAW_RECORDS_CHUNK_PARSED); + checkEventWithTypeSent(DI_INCOMING_MARC_BIB_RECORD_PARSED); checkEventWithTypeWasNotSend(jobExecutionId, DI_ERROR); } diff --git a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/StoredRecordChunkConsumersVerticleTest.java b/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/StoredRecordChunkConsumersVerticleTest.java index fcee45e72..5c6d652b0 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/StoredRecordChunkConsumersVerticleTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/StoredRecordChunkConsumersVerticleTest.java @@ -41,7 +41,7 @@ import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_BIB_FOR_ORDER_CREATED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_PARSED_RECORDS_CHUNK_SAVED; -import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_CREATED; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_BIB; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER; @@ -126,7 +126,7 @@ public void shouldPublishCoupleOfSuccessEventsAndCoupleOfDiErrorEvents() throws kafkaCluster.send(request); // then - List successValues = observeValuesAndFilterByLeader("00116nam 22000731a 4700", DI_SRS_MARC_BIB_RECORD_CREATED, 3); + List successValues = observeValuesAndFilterByLeader("00116nam 22000731a 4700", DI_INCOMING_MARC_BIB_RECORD_PARSED, 3); assertEquals(3, successValues.size()); List diErrorValues = observeValuesAndFilterByLeader("13113c7m a2200553Ii 4800", DI_ERROR, 7); @@ -146,10 +146,10 @@ public void shouldSendEventsWithRecords() throws InterruptedException { kafkaCluster.send(request); // then - List observedValues = observeValuesAndFilterByLeader("00115nam 22000731a 4500", DI_SRS_MARC_BIB_RECORD_CREATED, 1); + List observedValues = observeValuesAndFilterByLeader("00115nam 22000731a 4500", DI_INCOMING_MARC_BIB_RECORD_PARSED, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); - assertEquals(DI_SRS_MARC_BIB_RECORD_CREATED.value(), eventPayload.getEventType()); + assertEquals(DI_INCOMING_MARC_BIB_RECORD_PARSED.value(), eventPayload.getEventType()); assertEquals(TENANT_ID, eventPayload.getTenant()); assertNotNull(eventPayload.getContext().get(EntityType.MARC_BIBLIOGRAPHIC.value())); assertNotNull(eventPayload.getContext().get(JOB_PROFILE_SNAPSHOT_ID)); @@ -169,10 +169,10 @@ public void shouldObserveOnlySingleEventInCaseOfDuplicates() throws InterruptedE kafkaCluster.send(request); // then - List observedValues = observeValuesAndFilterByLeader("00115nam 22000731a 4500", DI_SRS_MARC_BIB_RECORD_CREATED, 1); + List observedValues = observeValuesAndFilterByLeader("00115nam 22000731a 4500", DI_INCOMING_MARC_BIB_RECORD_PARSED, 1); Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class); - assertEquals(DI_SRS_MARC_BIB_RECORD_CREATED.value(), eventPayload.getEventType()); + assertEquals(DI_INCOMING_MARC_BIB_RECORD_PARSED.value(), eventPayload.getEventType()); } @Test diff --git a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/util/MarcImportEventsHandlerTest.java b/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/util/MarcImportEventsHandlerTest.java index ce7339fc6..3ea2463a9 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/util/MarcImportEventsHandlerTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/verticle/consumers/util/MarcImportEventsHandlerTest.java @@ -11,7 +11,7 @@ import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ORDER_CREATED_READY_FOR_POST_PROCESSING; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_PENDING_ORDER_CREATED; import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED; -import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_CREATED; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; @@ -300,7 +300,7 @@ var record = new Record() payloadContext.put("HOLDINGS","[{\"instanceId\":\"946c4945-b711-4e67-bfb9-83fa30be6332\",\"hrid\":\"ho001\",\"id\":\"946c4945-b711-4e67-bfb9-83fa37be6312\"},{\"instanceId\":\"946c4945-b711-4e67-bfb9-83fa30be6331\",\"hrid\":\"ho002\",\"id\":\"946c4945-b111-4e67-bfb9-83fa30be6312\"}]"); payloadContext.put("NOT_MATCHED_NUMBER","3"); return new DataImportEventPayload() - .withEventsChain(List.of(DI_SRS_MARC_BIB_RECORD_CREATED.value())) + .withEventsChain(List.of(DI_INCOMING_MARC_BIB_RECORD_PARSED.value())) .withEventType(DI_INVENTORY_HOLDING_MATCHED.value()) .withContext(payloadContext); } @@ -315,7 +315,7 @@ var record = new Record() payloadContext.put("ITEM","[{\"holdingsRecordId\":\"946c4945-b711-4e67-bfb9-83fa30be633c\",\"hrid\":\"it001\",\"id\":\"946c4945-b711-4e67-bfb9-83fa30be4312\"},{\"holdingsRecordId\":\"946c4945-b711-4e67-bfb9-83fa30be633b\",\"hrid\":\"it002\",\"id\":\"946c4945-b711-4e67-bfb9-83fa30be6312\"}]"); payloadContext.put("NOT_MATCHED_NUMBER","5"); return new DataImportEventPayload() - .withEventsChain(List.of(DI_SRS_MARC_BIB_RECORD_CREATED.value())) + .withEventsChain(List.of(DI_INCOMING_MARC_BIB_RECORD_PARSED.value())) .withEventType(DI_INVENTORY_ITEM_MATCHED.value()) .withContext(payloadContext); }