Skip to content

Commit

Permalink
MODSOURCE-752: Use RAML definition for event DTO
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jul 29, 2024
1 parent 6fbbd2c commit 1c5558b
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,55 @@
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.okapi.common.XOkapiHeaders.TOKEN;
import static org.folio.okapi.common.XOkapiHeaders.URL;
import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_UPDATED;
import static org.folio.services.util.EventHandlingUtil.sendEventToKafka;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.kafka.KafkaConfig;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jaxrs.model.SourceRecordDomainEvent;
import org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RecordDomainEventPublisher {

public static final String RECORD_DOMAIN_TOPIC = "srs.source_records";
public static final String SOURCE_RECORD_CREATED = "SOURCE_RECORD_CREATED";
public static final String SOURCE_RECORD_UPDATED = "SOURCE_RECORD_UPDATED";
private static final String RECORD_TYPE = "folio.srs.recordType";
private static final Logger LOG = LogManager.getLogger();

@Autowired
private KafkaConfig kafkaConfig;

public void publishRecordCreated(Record created, Map<String, String> okapiHeaders) {
publishRecord(created, okapiHeaders, SOURCE_RECORD_CREATED);
}

public void publishRecordUpdated(Record updated, Map<String, String> okapiHeaders) {
publishRecord(updated, okapiHeaders, SOURCE_RECORD_UPDATED);
}

private void publishRecord(Record aRecord, Map<String, String> okapiHeaders, EventType eventType) {
Vertx.vertx().executeBlocking(() -> {
var kafkaHeaders = getKafkaHeaders(okapiHeaders, created.getRecordType());
var key = created.getId();
return sendEventToKafka(okapiHeaders.get(TENANT), Json.encode(created), SOURCE_RECORD_CREATED, kafkaHeaders,
kafkaConfig, key);
try {
var kafkaHeaders = getKafkaHeaders(okapiHeaders, aRecord.getRecordType());
var key = aRecord.getId();
return sendEventToKafka(okapiHeaders.get(TENANT), getEvent(aRecord, eventType),
eventType.value(), kafkaHeaders, kafkaConfig, key);
} catch (Exception e) {
LOG.error("Exception during Record domain event sending", e);
return Future.failedFuture(e);
}
});
}

Expand All @@ -45,4 +64,12 @@ private List<KafkaHeader> getKafkaHeaders(Map<String, String> okapiHeaders, Reco
);
}

private String getEvent(Record eventRecord, EventType type) {
var event = new SourceRecordDomainEvent()
.withId(eventRecord.getId())
.withEventType(type)
.withEventPayload((String) eventRecord.getParsedRecord().getContent());
return Json.encode(event);
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.folio.services.util;

import static java.util.Arrays.stream;
import static java.util.Objects.nonNull;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.okapi.common.XOkapiHeaders.TOKEN;
import static org.folio.okapi.common.XOkapiHeaders.URL;
import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType;
import static org.folio.services.domainevent.RecordDomainEventPublisher.RECORD_DOMAIN_TOPIC;
import static org.folio.services.domainevent.RecordDomainEventPublisher.SOURCE_RECORD_CREATED;
import static org.folio.services.domainevent.RecordDomainEventPublisher.SOURCE_RECORD_UPDATED;
import static org.folio.services.util.KafkaUtil.extractHeaderValue;

import io.vertx.core.Future;
Expand Down Expand Up @@ -107,7 +107,7 @@ public static String constructModuleName() {
}

public static String createTopicName(String eventType, String tenantId, KafkaConfig kafkaConfig) {
if (SOURCE_RECORD_CREATED.equals(eventType) || SOURCE_RECORD_UPDATED.equals(eventType)) {
if (stream(EventType.values()).anyMatch(et -> et.value().equals(eventType))) {
return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), tenantId, RECORD_DOMAIN_TOPIC);
}
return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), KafkaTopicNameHelper.getDefaultNameSpace(),
Expand Down
97 changes: 97 additions & 0 deletions ramls/source-record-domain-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Source record domain event data model",
"javaType": "org.folio.rest.jaxrs.model.SourceRecordDomainEvent",
"type": "object",
"additionalProperties": false,
"properties": {
"id": {
"description": "UUID",
"type": "string",
"pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
},
"eventType": {
"type": "string",
"enum": ["SOURCE_RECORD_CREATED", "SOURCE_RECORD_UPDATED"],
"description": "Source record domain event type"
},
"sourceRecordDomainEventMetadata": {
"description": "Event metadata",
"type": "object",
"additionalProperties": false,
"properties": {
"eventTTL": {
"description": "Time-to-live (TTL) for event in minutes",
"type": "integer"
},
"correlationId": {
"description": "Id to track related events, can be a meaningful string or a UUID",
"type": "string"
},
"originalEventId": {
"description": "Id of the event that started the sequence of related events",
"type": "string",
"pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
},
"publisherCallback": {
"description": "Allows a publisher to provide a callback endpoint or an error Event Type to be notified that despite the fact that there are subscribers for such an event type no one has received the event within the specified period of time",
"type": "object",
"properties": {
"endpoint": {
"description": "Callback endpoint",
"type": "string"
},
"eventType": {
"description": "Error Event Type",
"type": "string"
}
}
},
"createdDate": {
"description": "Timestamp when event was created",
"type": "string",
"format": "date-time"
},
"publishedDate": {
"description": "Timestamp when event was initially published to the underlying topic",
"type": "string",
"format": "date-time"
},
"createdBy": {
"description": "Username of the user whose action caused an event",
"type": "string"
},
"publishedBy": {
"description": "Name and version of the module that published an event",
"type": "string"
}
},
"required": [
"eventTTL",
"publishedBy"
]
},
"eventPayload": {
"type": "string",
"description": "The source record JSON string"
},
"tenant": {
"description": "Tenant id",
"type": "string"
},
"ts": {
"description": "Message timestamp",
"type": "string",
"format": "date-time"
}
},
"excludedFromEqualsAndHashCode": [
"eventMetadata",
"tenant",
"ts"
],
"required": [
"id",
"eventType"
]
}
1 change: 1 addition & 0 deletions ramls/source-record-storage-records.raml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ types:
linkUpdateReport: !include raml-storage/schemas/mod-source-record-storage/linkUpdateReport.json
recordMatchingDto: !include raml-storage/schemas/dto/recordMatchingRqDto.json
recordsIdentifiersCollection: !include raml-storage/schemas/dto/recordsIdentifiersCollection.json
sourceRecordDomainEvent: !include source-record-domain-event.json

traits:
validate: !include raml-storage/raml-util/traits/validation.raml
Expand Down

0 comments on commit 1c5558b

Please sign in to comment.