Skip to content

Commit

Permalink
MODINV-986: postSnapshotInSrs just before saving + minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jun 18, 2024
1 parent 06f2c2a commit 0017f8d
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 72 deletions.
22 changes: 13 additions & 9 deletions ramls/instance-ingress-event.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@
"enum": ["CREATE_INSTANCE", "UPDATE_INSTANCE"],
"description": "Instance ingress event type"
},
"eventMetadata": {
"InstanceIngressEventMetadata": {
"description": "Event metadata",
"type": "object",
"additionalProperties": false,
"properties": {
"tenantId": {
"description": "Tenant id",
"type": "string"
},
"eventTTL": {
"description": "Time-to-live (TTL) for event in minutes",
"type": "integer"
Expand Down Expand Up @@ -69,7 +65,6 @@
}
},
"required": [
"tenantId",
"eventTTL",
"publishedBy"
]
Expand All @@ -78,15 +73,24 @@
"type": "object",
"description": "An instance source record container",
"$ref": "instance-ingress-payload.json"
},
"tenant": {
"description": "Tenant id",
"type": "string"
},
"ts": {
"description": "Message timestamp",
"type": "string",
"format": "date-time"
}
},
"excludedFromEqualsAndHashCode": [
"eventMetadata",
"eventPayload"
"tenant",
"ts"
],
"required": [
"id",
"eventType",
"eventMetadata"
"eventType"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.processing.exceptions.EventProcessingException;
import org.folio.rest.jaxrs.model.EventMetadata;
import org.folio.rest.jaxrs.model.InstanceIngressEvent;

public class InstanceIngressEventConsumer implements AsyncRecordHandler<String, String> {
Expand Down Expand Up @@ -69,8 +68,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)

private static String getTenantId(InstanceIngressEvent event,
Map<String, String> kafkaHeaders) {
return Optional.ofNullable(event.getEventMetadata())
.map(EventMetadata::getTenantId)
return Optional.ofNullable(event.getTenant())
.orElseGet(() -> kafkaHeaders.get(OKAPI_TENANT_HEADER));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,32 +117,29 @@ private Future<org.folio.Instance> prepareAndExecuteMapping(MappingMetadataDto m
Record targetRecord,
InstanceIngressEvent event,
String instanceId) {
return postSnapshotInSrsAndHandleResponse(targetRecord.getId())
.compose(snapshot -> {
try {
LOGGER.info("Manipulating fields of a Record from InstanceIngressEvent with id '{}'", event.getId());
var mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class);
AdditionalFieldsUtil.updateLatestTransactionDate(targetRecord, mappingParameters);
AdditionalFieldsUtil.move001To035(targetRecord);
AdditionalFieldsUtil.normalize035(targetRecord);
if (event.getEventPayload().getAdditionalProperties().containsKey(LINKED_DATA_ID)) {
AdditionalFieldsUtil.addFieldToMarcRecord(targetRecord, TAG_035, TAG_035_SUB,
LD + event.getEventPayload().getAdditionalProperties().get(LINKED_DATA_ID));
}

LOGGER.info("Mapping a Record from InstanceIngressEvent with id '{}' into an Instance", event.getId());
var parsedRecord = new JsonObject((String) targetRecord.getParsedRecord().getContent());
RecordMapper<org.folio.Instance> recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT);
var instance = recordMapper.mapRecord(parsedRecord, mappingParameters, new JsonObject(mappingMetadata.getMappingRules()));
instance.setId(instanceId);
instance.setSource(event.getEventPayload().getSourceType().value());
LOGGER.info("Mapped Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance);
return Future.succeededFuture(instance);
} catch (Exception e) {
LOGGER.warn("Error during preparing and executing mapping:", e);
return Future.failedFuture(e);
try {
LOGGER.info("Manipulating fields of a Record from InstanceIngressEvent with id '{}'", event.getId());
var mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class);
AdditionalFieldsUtil.updateLatestTransactionDate(targetRecord, mappingParameters);
AdditionalFieldsUtil.move001To035(targetRecord);
AdditionalFieldsUtil.normalize035(targetRecord);
if (event.getEventPayload().getAdditionalProperties().containsKey(LINKED_DATA_ID)) {
AdditionalFieldsUtil.addFieldToMarcRecord(targetRecord, TAG_035, TAG_035_SUB,
LD + event.getEventPayload().getAdditionalProperties().get(LINKED_DATA_ID));
}
});

LOGGER.info("Mapping a Record from InstanceIngressEvent with id '{}' into an Instance", event.getId());
var parsedRecord = new JsonObject((String) targetRecord.getParsedRecord().getContent());
RecordMapper<org.folio.Instance> recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT);
var instance = recordMapper.mapRecord(parsedRecord, mappingParameters, new JsonObject(mappingMetadata.getMappingRules()));
instance.setId(instanceId);
instance.setSource(event.getEventPayload().getSourceType().value());
LOGGER.info("Mapped Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance);
return Future.succeededFuture(instance);
} catch (Exception e) {
LOGGER.warn("Error during preparing and executing mapping:", e);
return Future.failedFuture(e);
}
}

private Record constructMarcBibRecord(InstanceIngressPayload eventPayload) {
Expand Down Expand Up @@ -215,21 +212,29 @@ private Future<Instance> executeFieldsManipulation(Instance instance, Record src
private Future<Instance> saveRecordInSrsAndHandleResponse(InstanceIngressEvent event, Record srcRecord, Instance instance) {
LOGGER.info("Saving record in SRS and handling a response for an Instance with id '{}':", instance.getId());
Promise<Instance> promise = Promise.promise();
getSourceStorageRecordsClient(context.getOkapiLocation(), context.getToken(), context.getTenantId())
.postSourceStorageRecords(srcRecord)
.onComplete(ar -> {
var result = ar.result();
if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) {
LOGGER.info("Created MARC record in SRS with id: '{}', instanceId: '{}', from tenant: {}",
srcRecord.getId(), instance.getId(), context.getTenantId());
promise.complete(instance);
} else {
String msg = format("Failed to create MARC record in SRS, instanceId: '%s', status code: %s, Record: %s",
instance.getId(), result != null ? result.statusCode() : "", result != null ? result.bodyAsString() : "");
LOGGER.warn(msg);
super.deleteInstance(instance.getId(), event.getId(), instanceCollection);
promise.fail(msg);
}
postSnapshotInSrsAndHandleResponse(srcRecord.getSnapshotId())
.onFailure(promise::fail)
.compose(snapshot -> {
getSourceStorageRecordsClient(context.getOkapiLocation(), context.getToken(), context.getTenantId())
.postSourceStorageRecords(srcRecord)
.onComplete(ar -> {
var result = ar.result();
if (ar.succeeded() &&
result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) {
LOGGER.info("Created MARC record in SRS with id: '{}', instanceId: '{}', from tenant: {}",
srcRecord.getId(), instance.getId(), context.getTenantId());
promise.complete(instance);
} else {
String msg = format(
"Failed to create MARC record in SRS, instanceId: '%s', status code: %s, Record: %s",
instance.getId(), result != null ? result.statusCode() : "", result != null ? result.bodyAsString() : "");
LOGGER.warn(msg);
super.deleteInstance(instance.getId(), event.getId(),
instanceCollection);
promise.fail(msg);
}
});
return promise.future();
});
return promise.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void shouldReturnFailedFuture_ifMappingMetadataWasNotFound() {
}

@Test
public void shouldReturnFailedFuture_ifSourceStorageSnapshotsClientReturnsError() throws IOException {
public void shouldReturnFailedFuture_ifInstanceValidationFails() throws IOException {
// given
var event = new InstanceIngressEvent()
.withId(UUID.randomUUID().toString())
Expand All @@ -169,27 +169,29 @@ public void shouldReturnFailedFuture_ifSourceStorageSnapshotsClientReturnsError(
.withMappingRules(mappingRules.encode())
.withMappingParams(Json.encode(new MappingParameters())))))
.when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);

doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
var snapshotHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_BAD_REQUEST);
var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED);
doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());

var expectedMessage = "Failed to create snapshot in SRS, snapshot id: ";
var expectedMessage = "Mapped Instance is invalid: [Field 'title' is a required field and can not be null, "
+ "Field 'instanceTypeId' is a required field and can not be null], from InstanceIngressEvent with id '" + event.getId() + "'";

// when
var future = handler.handle(event);

// then
var exception = Assert.assertThrows(ExecutionException.class, future::get);
assertThat(exception.getCause().getMessage()).startsWith(expectedMessage);
assertThat(exception.getCause().getMessage()).isEqualTo(expectedMessage);
}

@Test
public void shouldReturnFailedFuture_ifInstanceValidationFails() throws IOException {
public void shouldReturnFailedFuture_ifInstanceSavingFailed() throws IOException {
// given
var event = new InstanceIngressEvent()
.withId(UUID.randomUUID().toString())
.withEventPayload(new InstanceIngressPayload()
.withSourceRecordObject("{}")
.withSourceRecordObject(TestUtil.readFileFromPath(BIB_RECORD_PATH))
.withSourceType(LINKED_DATA)
);
doReturn(succeededFuture(null)).when(idStorageService).store(anyString(), anyString(), anyString());
Expand All @@ -198,13 +200,16 @@ public void shouldReturnFailedFuture_ifInstanceValidationFails() throws IOExcept
.withMappingRules(mappingRules.encode())
.withMappingParams(Json.encode(new MappingParameters())))))
.when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);

doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED);
doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());

var expectedMessage = "Mapped Instance is invalid: [Field 'title' is a required field and can not be null, "
+ "Field 'instanceTypeId' is a required field and can not be null], from InstanceIngressEvent with id '" + event.getId() + "'";
var expectedMessage = "Some failure";
doAnswer(i -> {
Consumer<Failure> failureHandler = i.getArgument(2);
failureHandler.accept(new Failure(expectedMessage, 400));
return null;
}).when(instanceCollection).add(any(), any(), any());

// when
var future = handler.handle(event);
Expand All @@ -215,7 +220,7 @@ public void shouldReturnFailedFuture_ifInstanceValidationFails() throws IOExcept
}

@Test
public void shouldReturnFailedFuture_ifInstanceSavingFailed() throws IOException {
public void shouldReturnFailedFuture_ifCreatePrecedingSucceedingTitlesFailed() throws IOException {
// given
var event = new InstanceIngressEvent()
.withId(UUID.randomUUID().toString())
Expand All @@ -232,13 +237,13 @@ public void shouldReturnFailedFuture_ifInstanceSavingFailed() throws IOException
doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED);
doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());

var expectedMessage = "Some failure";
doAnswer(i -> {
Consumer<Failure> failureHandler = i.getArgument(2);
failureHandler.accept(new Failure(expectedMessage, 400));
Consumer<Success<Instance>> sucessHandler = i.getArgument(1);
sucessHandler.accept(new Success<>(i.getArgument(0)));
return null;
}).when(instanceCollection).add(any(), any(), any());
var expectedMessage = "Some failure";
doReturn(failedFuture(expectedMessage)).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any());

// when
var future = handler.handle(event);
Expand All @@ -249,7 +254,7 @@ public void shouldReturnFailedFuture_ifInstanceSavingFailed() throws IOException
}

@Test
public void shouldReturnFailedFuture_ifCreatePrecedingSucceedingTitlesFailed() throws IOException {
public void shouldReturnFailedFuture_ifSourceStorageSnapshotsClientReturnsError() throws IOException {
// given
var event = new InstanceIngressEvent()
.withId(UUID.randomUUID().toString())
Expand All @@ -264,22 +269,24 @@ public void shouldReturnFailedFuture_ifCreatePrecedingSucceedingTitlesFailed() t
.withMappingParams(Json.encode(new MappingParameters())))))
.when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);
doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED);
doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());
doAnswer(i -> {
Consumer<Success<Instance>> sucessHandler = i.getArgument(1);
sucessHandler.accept(new Success<>(i.getArgument(0)));
return null;
}).when(instanceCollection).add(any(), any(), any());
var expectedMessage = "Some failure";
doReturn(failedFuture(expectedMessage)).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any());
doReturn(succeededFuture()).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any());
doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
var snapshotHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_BAD_REQUEST);
doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());

var expectedMessage = "Failed to create snapshot in SRS, snapshot id: ";

// when
var future = handler.handle(event);

// then
var exception = Assert.assertThrows(ExecutionException.class, future::get);
assertThat(exception.getCause().getMessage()).isEqualTo(expectedMessage);
assertThat(exception.getCause().getMessage()).startsWith(expectedMessage);
}

@Test
Expand Down

0 comments on commit 0017f8d

Please sign in to comment.