From 1c5558b525ce85920cee08509f46ba27e38ed578 Mon Sep 17 00:00:00 2001 From: pbobylev Date: Mon, 29 Jul 2024 16:50:37 +0500 Subject: [PATCH] MODSOURCE-752: Use RAML definition for event DTO --- .../RecordDomainEventPublisher.java | 39 ++++++-- .../services/util/EventHandlingUtil.java | 6 +- ramls/source-record-domain-event.json | 97 +++++++++++++++++++ ramls/source-record-storage-records.raml | 1 + 4 files changed, 134 insertions(+), 9 deletions(-) create mode 100644 ramls/source-record-domain-event.json diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java b/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java index cc1cf072b..adc483814 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/domainevent/RecordDomainEventPublisher.java @@ -3,16 +3,23 @@ import static org.folio.okapi.common.XOkapiHeaders.TENANT; import static org.folio.okapi.common.XOkapiHeaders.TOKEN; import static org.folio.okapi.common.XOkapiHeaders.URL; +import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_CREATED; +import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_UPDATED; import static org.folio.services.util.EventHandlingUtil.sendEventToKafka; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.Json; import io.vertx.kafka.client.producer.KafkaHeader; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.folio.kafka.KafkaConfig; import org.folio.rest.jaxrs.model.Record; +import org.folio.rest.jaxrs.model.SourceRecordDomainEvent; +import org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -20,19 +27,31 @@ public class RecordDomainEventPublisher { public static final String RECORD_DOMAIN_TOPIC = "srs.source_records"; - public static final String SOURCE_RECORD_CREATED = "SOURCE_RECORD_CREATED"; - public static final String SOURCE_RECORD_UPDATED = "SOURCE_RECORD_UPDATED"; private static final String RECORD_TYPE = "folio.srs.recordType"; + private static final Logger LOG = LogManager.getLogger(); @Autowired private KafkaConfig kafkaConfig; public void publishRecordCreated(Record created, Map okapiHeaders) { + publishRecord(created, okapiHeaders, SOURCE_RECORD_CREATED); + } + + public void publishRecordUpdated(Record updated, Map okapiHeaders) { + publishRecord(updated, okapiHeaders, SOURCE_RECORD_UPDATED); + } + + private void publishRecord(Record aRecord, Map okapiHeaders, EventType eventType) { Vertx.vertx().executeBlocking(() -> { - var kafkaHeaders = getKafkaHeaders(okapiHeaders, created.getRecordType()); - var key = created.getId(); - return sendEventToKafka(okapiHeaders.get(TENANT), Json.encode(created), SOURCE_RECORD_CREATED, kafkaHeaders, - kafkaConfig, key); + try { + var kafkaHeaders = getKafkaHeaders(okapiHeaders, aRecord.getRecordType()); + var key = aRecord.getId(); + return sendEventToKafka(okapiHeaders.get(TENANT), getEvent(aRecord, eventType), + eventType.value(), kafkaHeaders, kafkaConfig, key); + } catch (Exception e) { + LOG.error("Exception during Record domain event sending", e); + return Future.failedFuture(e); + } }); } @@ -45,4 +64,12 @@ private List getKafkaHeaders(Map okapiHeaders, Reco ); } + private String getEvent(Record eventRecord, EventType type) { + var event = new SourceRecordDomainEvent() + .withId(eventRecord.getId()) + .withEventType(type) + .withEventPayload((String) eventRecord.getParsedRecord().getContent()); + return Json.encode(event); + } + } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java index 69e0bd0e4..3008bf1eb 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java @@ -1,12 +1,12 @@ package org.folio.services.util; +import static java.util.Arrays.stream; import static java.util.Objects.nonNull; import static org.folio.okapi.common.XOkapiHeaders.TENANT; import static org.folio.okapi.common.XOkapiHeaders.TOKEN; import static org.folio.okapi.common.XOkapiHeaders.URL; +import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType; import static org.folio.services.domainevent.RecordDomainEventPublisher.RECORD_DOMAIN_TOPIC; -import static org.folio.services.domainevent.RecordDomainEventPublisher.SOURCE_RECORD_CREATED; -import static org.folio.services.domainevent.RecordDomainEventPublisher.SOURCE_RECORD_UPDATED; import static org.folio.services.util.KafkaUtil.extractHeaderValue; import io.vertx.core.Future; @@ -107,7 +107,7 @@ public static String constructModuleName() { } public static String createTopicName(String eventType, String tenantId, KafkaConfig kafkaConfig) { - if (SOURCE_RECORD_CREATED.equals(eventType) || SOURCE_RECORD_UPDATED.equals(eventType)) { + if (stream(EventType.values()).anyMatch(et -> et.value().equals(eventType))) { return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), tenantId, RECORD_DOMAIN_TOPIC); } return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), KafkaTopicNameHelper.getDefaultNameSpace(), diff --git a/ramls/source-record-domain-event.json b/ramls/source-record-domain-event.json new file mode 100644 index 000000000..24fa5f1e4 --- /dev/null +++ b/ramls/source-record-domain-event.json @@ -0,0 +1,97 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "description": "Source record domain event data model", + "javaType": "org.folio.rest.jaxrs.model.SourceRecordDomainEvent", + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "description": "UUID", + "type": "string", + "pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" + }, + "eventType": { + "type": "string", + "enum": ["SOURCE_RECORD_CREATED", "SOURCE_RECORD_UPDATED"], + "description": "Source record domain event type" + }, + "sourceRecordDomainEventMetadata": { + "description": "Event metadata", + "type": "object", + "additionalProperties": false, + "properties": { + "eventTTL": { + "description": "Time-to-live (TTL) for event in minutes", + "type": "integer" + }, + "correlationId": { + "description": "Id to track related events, can be a meaningful string or a UUID", + "type": "string" + }, + "originalEventId": { + "description": "Id of the event that started the sequence of related events", + "type": "string", + "pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$" + }, + "publisherCallback": { + "description": "Allows a publisher to provide a callback endpoint or an error Event Type to be notified that despite the fact that there are subscribers for such an event type no one has received the event within the specified period of time", + "type": "object", + "properties": { + "endpoint": { + "description": "Callback endpoint", + "type": "string" + }, + "eventType": { + "description": "Error Event Type", + "type": "string" + } + } + }, + "createdDate": { + "description": "Timestamp when event was created", + "type": "string", + "format": "date-time" + }, + "publishedDate": { + "description": "Timestamp when event was initially published to the underlying topic", + "type": "string", + "format": "date-time" + }, + "createdBy": { + "description": "Username of the user whose action caused an event", + "type": "string" + }, + "publishedBy": { + "description": "Name and version of the module that published an event", + "type": "string" + } + }, + "required": [ + "eventTTL", + "publishedBy" + ] + }, + "eventPayload": { + "type": "string", + "description": "The source record JSON string" + }, + "tenant": { + "description": "Tenant id", + "type": "string" + }, + "ts": { + "description": "Message timestamp", + "type": "string", + "format": "date-time" + } + }, + "excludedFromEqualsAndHashCode": [ + "eventMetadata", + "tenant", + "ts" + ], + "required": [ + "id", + "eventType" + ] +} diff --git a/ramls/source-record-storage-records.raml b/ramls/source-record-storage-records.raml index 1d7a99083..964125f86 100644 --- a/ramls/source-record-storage-records.raml +++ b/ramls/source-record-storage-records.raml @@ -29,6 +29,7 @@ types: linkUpdateReport: !include raml-storage/schemas/mod-source-record-storage/linkUpdateReport.json recordMatchingDto: !include raml-storage/schemas/dto/recordMatchingRqDto.json recordsIdentifiersCollection: !include raml-storage/schemas/dto/recordsIdentifiersCollection.json + sourceRecordDomainEvent: !include source-record-domain-event.json traits: validate: !include raml-storage/raml-util/traits/validation.raml