Skip to content

Commit

Permalink
MODINV-986: minor hotfix
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jun 17, 2024
1 parent cc441b0 commit 06f2c2a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.constructContext;
import static org.folio.rest.jaxrs.model.InstanceIngressEvent.EventType.CREATE_INSTANCE;
import static org.folio.rest.jaxrs.model.InstanceIngressEvent.EventType.UPDATE_INSTANCE;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;

Expand All @@ -13,6 +14,8 @@
import io.vertx.core.json.Json;
import io.vertx.ext.web.client.WebClient;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Map;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.common.Context;
Expand All @@ -28,6 +31,7 @@
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.processing.exceptions.EventProcessingException;
import org.folio.rest.jaxrs.model.EventMetadata;
import org.folio.rest.jaxrs.model.InstanceIngressEvent;

public class InstanceIngressEventConsumer implements AsyncRecordHandler<String, String> {
Expand All @@ -52,7 +56,7 @@ public InstanceIngressEventConsumer(Vertx vertx,
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
var kafkaHeaders = KafkaHeaderUtils.kafkaHeadersToMap(consumerRecord.headers());
var event = Json.decodeValue(consumerRecord.value(), InstanceIngressEvent.class);
var context = constructContext(event.getEventMetadata().getTenantId(),
var context = constructContext(getTenantId(event, kafkaHeaders),
kafkaHeaders.get(OKAPI_TOKEN_HEADER), kafkaHeaders.get(OKAPI_URL_HEADER));
LOGGER.info("Instance ingress event has been received with event type: {}", event.getEventType());
return Future.succeededFuture(event.getEventPayload())
Expand All @@ -63,6 +67,13 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
});
}

private static String getTenantId(InstanceIngressEvent event,
Map<String, String> kafkaHeaders) {
return Optional.ofNullable(event.getEventMetadata())
.map(EventMetadata::getTenantId)
.orElseGet(() -> kafkaHeaders.get(OKAPI_TENANT_HEADER));
}

private Future<InstanceIngressEvent.EventType> processEvent(InstanceIngressEvent event, Context context) {
try {
Promise<InstanceIngressEvent.EventType> promise = Promise.promise();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
import static org.folio.rest.jaxrs.model.EntityType.MARC_BIBLIOGRAPHIC;
import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_BIB;
import static org.folio.rest.jaxrs.model.Snapshot.Status.PROCESSING_IN_PROGRESS;
import static org.folio.rest.jaxrs.model.Snapshot.Status.PROCESSING_FINISHED;

import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand Down Expand Up @@ -117,17 +117,17 @@ private Future<org.folio.Instance> prepareAndExecuteMapping(MappingMetadataDto m
Record targetRecord,
InstanceIngressEvent event,
String instanceId) {
return postSnapshotInSrsAndHandleResponse(event.getId())
return postSnapshotInSrsAndHandleResponse(targetRecord.getId())
.compose(snapshot -> {
try {
LOGGER.info("Manipulating fields of a Record from InstanceIngressEvent with id '{}'", event.getId());
var mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class);
AdditionalFieldsUtil.updateLatestTransactionDate(targetRecord, mappingParameters);
AdditionalFieldsUtil.move001To035(targetRecord);
AdditionalFieldsUtil.normalize035(targetRecord);
if (event.getEventPayload().getAdditionalProperties().containsKey(
LINKED_DATA_ID)) {AdditionalFieldsUtil.addFieldToMarcRecord(targetRecord, TAG_035, TAG_035_SUB,
LD + event.getEventPayload().getAdditionalProperties().get(LINKED_DATA_ID));
if (event.getEventPayload().getAdditionalProperties().containsKey(LINKED_DATA_ID)) {
AdditionalFieldsUtil.addFieldToMarcRecord(targetRecord, TAG_035, TAG_035_SUB,
LD + event.getEventPayload().getAdditionalProperties().get(LINKED_DATA_ID));
}

LOGGER.info("Mapping a Record from InstanceIngressEvent with id '{}' into an Instance", event.getId());
Expand Down Expand Up @@ -207,7 +207,7 @@ private Future<Instance> saveInstance(Instance instance, InstanceIngressEvent ev
private Future<Instance> executeFieldsManipulation(Instance instance, Record srcRecord,
Map<String, Object> eventProperties) {
if (eventProperties.containsKey(LINKED_DATA_ID)) {
AdditionalFieldsUtil.addFieldToMarcRecord(srcRecord, TAG_999, SUBFIELD_L, (String) eventProperties.get(LINKED_DATA_ID));
AdditionalFieldsUtil.addFieldToMarcRecord(srcRecord, TAG_999, SUBFIELD_L, String.valueOf(eventProperties.get(LINKED_DATA_ID)));
}
return super.executeFieldsManipulation(instance, srcRecord);
}
Expand Down Expand Up @@ -238,7 +238,7 @@ private Future<Snapshot> postSnapshotInSrsAndHandleResponse(String id) {
var snapshot = new Snapshot()
.withJobExecutionId(id)
.withProcessingStartedDate(new Date())
.withStatus(PROCESSING_IN_PROGRESS);
.withStatus(PROCESSING_FINISHED);
return super.postSnapshotInSrsAndHandleResponse(context.getOkapiLocation(),
context.getToken(), snapshot, context.getTenantId());
}
Expand Down

0 comments on commit 06f2c2a

Please sign in to comment.