Skip to content

Commit

Permalink
MODDATAIMP-957: add Orders creation (#846)
Browse files Browse the repository at this point in the history
* MODDATAIMP-957: add Orders creation

* MODDATAIMP-957: rename DI_MARC_BIB_FOR_ORDER_CREATED to DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED (#305)

* MODDATAIMP-957: remove functionality  setting Order creation type in StoredRecordChunksKafkaHandler
  • Loading branch information
yaroslav-epam authored Jan 26, 2024
1 parent cacca35 commit 27b9204
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.folio.rest.RestVerticle.MODULE_SPECIFIC_ARGS;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_DELETE_RECEIVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
Expand All @@ -24,6 +25,10 @@
import static org.folio.services.afterprocessing.AdditionalFieldsUtil.getValue;
import static org.folio.services.afterprocessing.AdditionalFieldsUtil.hasIndicator;
import static org.folio.services.util.EventHandlingUtil.sendEventToKafka;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.ACTION_FIELD;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.CREATE_ACTION;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.FOLIO_RECORD;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.ORDER_TYPE;

import com.google.common.collect.Lists;
import io.vertx.core.CompositeFuture;
Expand Down Expand Up @@ -196,6 +201,8 @@ private void processRecords(List<Record> parsedRecords, JobExecution jobExecutio
}
case DELETE_RECORD -> deleteRecords(parsedRecords, jobExecution, params)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
case CREATE_ORDER -> sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
case SEND_ERROR -> sendEvents(parsedRecords, jobExecution, params, DI_ERROR)
.onSuccess(ar -> promise.complete(parsedRecords)).onFailure(promise::fail);
case SEND_MARC_BIB -> sendEvents(parsedRecords, jobExecution, params, DI_INCOMING_MARC_BIB_RECORD_PARSED)
Expand All @@ -214,6 +221,9 @@ private ActionType getAction(List<Record> parsedRecords, JobExecution jobExecuti
if (deleteMarcActionExists(jobExecution)) {
return ActionType.DELETE_RECORD;
}
if (createOrderActionExists(jobExecution)) {
return ActionType.CREATE_ORDER;
}
if (parsedRecords.isEmpty()) {
return ActionType.SAVE_RECORD;
}
Expand All @@ -231,7 +241,7 @@ private ActionType getAction(List<Record> parsedRecords, JobExecution jobExecuti
}

private enum ActionType {
UPDATE_RECORD, DELETE_RECORD, SEND_ERROR, SEND_MARC_BIB, SEND_EDIFACT, SAVE_RECORD
UPDATE_RECORD, DELETE_RECORD, SEND_ERROR, SEND_MARC_BIB, SEND_EDIFACT, SAVE_RECORD, CREATE_ORDER
}

private void saveRecords(JobExecution jobExecution, String sourceChunkId, OkapiConnectionParams params, List<Record> parsedRecords, Promise<List<Record>> promise) {
Expand Down Expand Up @@ -274,6 +284,29 @@ private void saveIncomingAndJournalRecords(List<Record> parsedRecords, String te
}
}

private boolean createOrderActionExists(JobExecution jobExecution) {
if (jobExecution.getJobProfileSnapshotWrapper() != null) {
List<ProfileSnapshotWrapper> actionProfiles = jobExecution.getJobProfileSnapshotWrapper().getChildSnapshotWrappers()
.stream().filter(wrapper -> wrapper.getContentType() == ProfileSnapshotWrapper.ContentType.ACTION_PROFILE).toList();

if (!actionProfiles.isEmpty() && ifOrderCreateActionProfileExists(actionProfiles)) {
LOGGER.debug("createOrderActionExists:: Event type for Order's logic set by jobExecutionId {} ", jobExecution.getId());
return true;
}
}
return false;
}

private static boolean ifOrderCreateActionProfileExists(List<ProfileSnapshotWrapper> profiles) {
for (ProfileSnapshotWrapper profile : profiles) {
Map<String, String> content = DatabindCodec.mapper().convertValue(profile.getContent(), HashMap.class);
if (content.get(FOLIO_RECORD).equals(ORDER_TYPE) && content.get(ACTION_FIELD).equals(CREATE_ACTION)) {
return true;
}
}
return false;
}

/**
* Checks whether job profile snapshot is compatible with record type of the specified {@code records}.
* Returns {@code true} for the specified records that have not been parsed successfully and therefore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.rest.jaxrs.model.DataImportEventTypes;
import org.folio.rest.jaxrs.model.Event;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JournalRecord;
import org.folio.rest.jaxrs.model.JournalRecord.EntityType;
import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jaxrs.model.Record.RecordType;
import org.folio.rest.jaxrs.model.RecordsBatchResponse;
Expand All @@ -36,17 +34,15 @@

import javax.ws.rs.NotFoundException;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_EDIFACT_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_BIB_FOR_ORDER_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionStatus.COMPLETED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionType.CREATE;
Expand Down Expand Up @@ -126,13 +122,7 @@ public Future<String> handle(KafkaConsumerRecord<String, byte[]> record) {

LOGGER.debug("handle:: RecordsBatchResponse has been received, starting processing chunkId: {} chunkNumber: {} jobExecutionId: {}", chunkId, chunkNumber, jobExecutionId);
saveCreatedRecordsInfoToDataImportLog(storedRecords, okapiConnectionParams.getTenantId());
return Future.succeededFuture(jobExecution)
.compose(jobExecutionFuture -> {
LOGGER.debug("handle:: JobExecution found by id {}: chunkId:{} chunkNumber: {} ", jobExecutionId, chunkId, chunkNumber);
return setOrderEventTypeIfNeeded(jobExecutionFuture, eventType);
})
.compose(eventTypes -> recordsPublishingService.sendEventsWithRecords(storedRecords, jobExecutionId,
okapiConnectionParams, eventTypes.value()))
return recordsPublishingService.sendEventsWithRecords(storedRecords, jobExecutionId, okapiConnectionParams, eventType.value())
.compose(b -> {
LOGGER.debug("handle:: RecordsBatchResponse processing has been completed chunkId: {} chunkNumber: {} jobExecutionId: {}", chunkId, chunkNumber, jobExecutionId);
return Future.succeededFuture(chunkId);
Expand All @@ -152,33 +142,6 @@ public Future<String> handle(KafkaConsumerRecord<String, byte[]> record) {
}));
}

private Future<DataImportEventTypes> setOrderEventTypeIfNeeded(JobExecution jobExecution, DataImportEventTypes dataImportEventTypes) {
if (jobExecution.getJobProfileSnapshotWrapper() != null) {
ProfileSnapshotWrapper profileSnapshotWrapper = DatabindCodec.mapper().convertValue(jobExecution.getJobProfileSnapshotWrapper(), ProfileSnapshotWrapper.class);
List<ProfileSnapshotWrapper> actionProfiles = profileSnapshotWrapper
.getChildSnapshotWrappers()
.stream()
.filter(e -> e.getContentType() == ProfileSnapshotWrapper.ContentType.ACTION_PROFILE)
.collect(Collectors.toList());

if (!actionProfiles.isEmpty() && checkIfOrderCreateActionProfileExists(actionProfiles)) {
dataImportEventTypes = DI_MARC_BIB_FOR_ORDER_CREATED;
LOGGER.debug("setOrderEventTypeIfNeeded:: Event type for Order's logic set by jobExecutionId {} ", jobExecution.getId());
}
}
return Future.succeededFuture(dataImportEventTypes);
}

private static boolean checkIfOrderCreateActionProfileExists(List<ProfileSnapshotWrapper> actionProfiles) {
for (ProfileSnapshotWrapper actionProfile : actionProfiles) {
LinkedHashMap<String, String> content = DatabindCodec.mapper().convertValue(actionProfile.getContent(), LinkedHashMap.class);
if (content.get(FOLIO_RECORD).equals(ORDER_TYPE) && content.get(ACTION_FIELD).equals(CREATE_ACTION)) {
return true;
}
}
return false;
}

private void saveCreatedRecordsInfoToDataImportLog(List<Record> storedRecords, String tenantId) {
MappingRuleCacheKey cacheKey = new MappingRuleCacheKey(tenantId, storedRecords.get(0).getRecordType());
mappingRuleCache.get(cacheKey).onComplete(rulesAr -> {
Expand Down Expand Up @@ -209,7 +172,7 @@ private JsonArray buildJournalRecords(List<Record> storedRecords, Optional<JsonO
.map(JsonObject.class::cast)
.filter(fieldMappingRule -> fieldMappingRule.getString("target").equals(INSTANCE_TITLE_FIELD_PATH))
.flatMap(fieldMappingRule -> fieldMappingRule.getJsonArray("subfield").stream())
.map(subfieldCode -> subfieldCode.toString())
.map(Object::toString)
.collect(Collectors.toList());
}
}
Expand Down Expand Up @@ -238,17 +201,12 @@ private JsonArray buildJournalRecords(List<Record> storedRecords, Optional<JsonO
}

private EntityType getEntityType(List<Record> storedRecords) {
switch (storedRecords.get(0).getRecordType()) {
case EDIFACT:
return EntityType.EDIFACT;
case MARC_AUTHORITY:
return EntityType.MARC_AUTHORITY;
case MARC_HOLDING:
return EntityType.MARC_HOLDINGS;
case MARC_BIB:
default:
return EntityType.MARC_BIBLIOGRAPHIC;
}
return switch (storedRecords.get(0).getRecordType()) {
case EDIFACT -> EntityType.EDIFACT;
case MARC_AUTHORITY -> EntityType.MARC_AUTHORITY;
case MARC_HOLDING -> EntityType.MARC_HOLDINGS;
default -> EntityType.MARC_BIBLIOGRAPHIC;
};
}

private Optional<String> getTitleFieldTagByInstanceFieldPath(JsonObject mappingRules) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.folio.rest.jaxrs.model.ActionProfile.Action.CREATE;
import static org.folio.rest.jaxrs.model.ActionProfile.FolioRecord.AUTHORITY;
import static org.folio.rest.jaxrs.model.ActionProfile.FolioRecord.MARC_AUTHORITY;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE;
Expand All @@ -11,6 +12,10 @@
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.MATCH_PROFILE;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ReactTo.NON_MATCH;
import static org.folio.services.ChangeEngineServiceImpl.RECORD_ID_HEADER;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.ACTION_FIELD;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.CREATE_ACTION;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.FOLIO_RECORD;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.ORDER_TYPE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -38,6 +43,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.folio.MatchProfile;
Expand Down Expand Up @@ -199,6 +205,30 @@ public void shouldReturnMarcHoldingsRecordWhenProfileHasUpdateAction() {
assertThat(actual.get(0).getErrorRecord(), nullValue());
}

@Test
public void shouldCreateOrderWhenActionCreateOrder() {
RawRecordsDto rawRecordsDto = getTestRawRecordsDto(MARC_BIB_REC_WITHOUT_FF);
JobExecution jobExecution = getTestJobExecution();
jobExecution.setJobProfileSnapshotWrapper(new ProfileSnapshotWrapper()
.withChildSnapshotWrappers(List.of(new ProfileSnapshotWrapper()
.withContentType(ACTION_PROFILE)
.withContent(Map.of(FOLIO_RECORD, ORDER_TYPE, ACTION_FIELD, CREATE_ACTION)))
));

when(marcRecordAnalyzer.process(any())).thenReturn(MarcRecordType.BIB);
when(jobExecutionSourceChunkDao.getById(any(), any())).thenReturn(Future.succeededFuture(Optional.of(new JobExecutionSourceChunk())));
when(jobExecutionSourceChunkDao.update(any(), any())).thenReturn(Future.succeededFuture(new JobExecutionSourceChunk()));
when(recordsPublishingService.sendEventsWithRecords(any(), any(), any(), any())).thenReturn(Future.succeededFuture(true));

Future<List<Record>> serviceFuture = executeWithKafkaMock(rawRecordsDto, jobExecution, Future.succeededFuture(true));

var actual = serviceFuture.result();
assertThat(actual, hasSize(1));
assertThat(actual.get(0).getRecordType(), equalTo(Record.RecordType.MARC_BIB));
assertThat(actual.get(0).getErrorRecord(), nullValue());
verify(recordsPublishingService).sendEventsWithRecords(any(), eq(jobExecution.getId()), any(), eq(DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED.value()));
}

@Test
public void shouldReturnMarcAuthorityRecordWhenProfileHasDeleteAction() {
RawRecordsDto rawRecordsDto = getTestRawRecordsDto(MARC_AUTHORITY_REC_VALID);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.folio.verticle.consumers;

import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.matching.RegexPattern;
import com.github.tomakehurst.wiremock.matching.UrlPathPattern;
import io.restassured.RestAssured;
import io.vertx.core.json.Json;
import io.vertx.ext.unit.junit.VertxUnitRunner;
Expand Down Expand Up @@ -35,13 +33,10 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_BIB_FOR_ORDER_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_PARSED_RECORDS_CHUNK_SAVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_PARSED_RECORDS_CHUNK_SAVED;
import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_BIB;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
Expand Down Expand Up @@ -175,32 +170,6 @@ public void shouldObserveOnlySingleEventInCaseOfDuplicates() throws InterruptedE
assertEquals(DI_INCOMING_MARC_BIB_RECORD_PARSED.value(), eventPayload.getEventType());
}

@Test
public void shouldSendEventForOrderIfOrderActionProfileExists() throws InterruptedException {
WireMock.stubFor(post(new UrlPathPattern(new RegexPattern(PROFILE_SNAPSHOT_URL + "/.*"), true))
.willReturn(WireMock.created().withBody(Json.encode(orderProfileSnapshotWrapperResponse))));
WireMock.stubFor(get(new UrlPathPattern(new RegexPattern(PROFILE_SNAPSHOT_URL + "/.*"), true))
.willReturn(WireMock.ok().withBody(Json.encode(orderProfileSnapshotWrapperResponse))));
linkJobProfileToJobExecution();
// given
String parsedContent = "{\"leader\":\"00115nam 22000731a 4500\",\"fields\":[{\"003\":\"in001\"},{\"507\":{\"subfields\":[{\"a\":\"data\"}],\"ind1\":\" \",\"ind2\":\" \"}},{\"500\":{\"subfields\":[{\"a\":\"data\"}],\"ind1\":\" \",\"ind2\":\" \"}}]}";
RecordsBatchResponse recordsBatch = getRecordsBatchResponse(parsedContent, 1);

SendKeyValues<String, String> request = getRequest(jobExec.getId(), recordsBatch);

// when
kafkaCluster.send(request);

// then
List<String> observedValues = observeValuesAndFilterByLeader("00115nam 22000731a 4500", DI_MARC_BIB_FOR_ORDER_CREATED, 1);
Event obtainedEvent = Json.decodeValue(observedValues.get(0), Event.class);
DataImportEventPayload eventPayload = Json.decodeValue(obtainedEvent.getEventPayload(), DataImportEventPayload.class);
assertEquals(DI_MARC_BIB_FOR_ORDER_CREATED.value(), eventPayload.getEventType());
assertEquals(TENANT_ID, eventPayload.getTenant());
assertNotNull(eventPayload.getContext().get(EntityType.MARC_BIBLIOGRAPHIC.value()));
assertNotNull(eventPayload.getContext().get(JOB_PROFILE_SNAPSHOT_ID));
}

private RecordsBatchResponse getRecordsBatchResponse(String parsedContent, Integer totalRecords) {
List<Record> records = new ArrayList<>();
for (int i = 0; i < totalRecords; i++) {
Expand Down
2 changes: 1 addition & 1 deletion ramls/raml-storage

0 comments on commit 27b9204

Please sign in to comment.