Skip to content

Commit

Permalink
MODSOURCE-752: topic creation + fixes + UT
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Aug 1, 2024
1 parent ab9cace commit bcee5f1
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 28 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [MODINV-1049](https://folio-org.atlassian.net/browse/MODINV-1049) Existing "035" field is not retained the original position in imported record
* [MODSOURCE-785](https://folio-org.atlassian.net/browse/MODSOURCE-785) Update 005 field when set MARC for deletion
* [MODSOURMAN-783](https://folio-org.atlassian.net/browse/MODSOURCE-783) Extend MARC-MARC search query to account for qualifiers
* [MODSOURCE-752](https://folio-org.atlassian.net/browse/MODSOURCE-752) Emit Domain Events For Source Records

## 2024-03-20 5.8.0
* [MODSOURCE-733](https://issues.folio.org/browse/MODSOURCE-733) Reduce Memory Allocation of Strings
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ After setup, it is good to check logs in all related modules for errors. Data im
* DI_SRS_MARC_HOLDINGS_RECORD_MATCHED
* DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED
* DI_SRS_MARC_AUTHORITY_RECORD_UPDATED
* SRS_SOURCE_RECORDS_PARTITIONS
Default value for all partitions is 1
* DOMAIN_EVENTS_ENABLED env variable defines if Source Record Domain Event publishing should occur. True by default.
## Database schemas

The mod-source-record-storage module uses relational approach and Liquibase to define database schemas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_UPDATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.services.domainevent.RecordDomainEventPublisher.RECORD_DOMAIN_EVENT_TOPIC;

import org.folio.kafka.services.KafkaTopic;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -74,6 +75,9 @@ public class SRSKafkaTopicService {
@Value("${di_marc_authority_record_updated.partitions}")
private Integer diMarcAuthorityRecordUpdatedPartitions;

@Value("${source_records.partitions}")
private Integer sourceRecordsPartitions;

public KafkaTopic[] createTopicObjects() {
return new KafkaTopic[] {
MARC_BIB,
Expand All @@ -91,18 +95,27 @@ public KafkaTopic[] createTopicObjects() {
new SRSKafkaTopic(DI_LOG_SRS_MARC_AUTHORITY_RECORD_UPDATED.value(), diLogSrsMarcAuthorityRecordUpdatedPartitions),
new SRSKafkaTopic(DI_SRS_MARC_HOLDINGS_RECORD_MATCHED.value(), diMarcHoldingsMatchedPartitions),
new SRSKafkaTopic(DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED.value(), diMarcHoldingsNotMatchedPartitions),
new SRSKafkaTopic(DI_SRS_MARC_AUTHORITY_RECORD_UPDATED.value(), diMarcAuthorityRecordUpdatedPartitions)
new SRSKafkaTopic(DI_SRS_MARC_AUTHORITY_RECORD_UPDATED.value(), diMarcAuthorityRecordUpdatedPartitions),
new SRSKafkaTopic(RECORD_DOMAIN_EVENT_TOPIC, sourceRecordsPartitions, false)
};
}

public static class SRSKafkaTopic implements KafkaTopic {

private final String topic;
private final int numPartitions;
private final boolean includeNamespace;

public SRSKafkaTopic(String topic, int numPartitions) {
this.topic = topic;
this.numPartitions = numPartitions;
this.includeNamespace = true;
}

public SRSKafkaTopic(String topic, int numPartitions, boolean includeNamespace) {
this.topic = topic;
this.numPartitions = numPartitions;
this.includeNamespace = includeNamespace;
}

@Override
Expand All @@ -122,7 +135,11 @@ public int numPartitions() {

@Override
public String fullTopicName(String tenant) {
return formatTopicName(environment(), getDefaultNameSpace(), tenant, topicName());
if (includeNamespace) {
return formatTopicName(environment(), getDefaultNameSpace(), tenant, topicName());
} else {
return formatTopicName(environment(), tenant, topicName());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package org.folio.services.domainevent;

import static java.util.Objects.isNull;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.okapi.common.XOkapiHeaders.TOKEN;
import static org.folio.okapi.common.XOkapiHeaders.URL;
import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType.SOURCE_RECORD_UPDATED;
import static org.folio.services.util.EventHandlingUtil.sendEventToKafka;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.kafka.KafkaConfig;
import org.folio.services.kafka.KafkaSender;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jaxrs.model.SourceRecordDomainEvent;
import org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType;
Expand All @@ -27,14 +24,13 @@
@Component
public class RecordDomainEventPublisher {

public static final String RECORD_DOMAIN_TOPIC = "srs.source_records";
public static final String RECORD_DOMAIN_EVENT_TOPIC = "srs.source_records";
private static final String RECORD_TYPE = "folio.srs.recordType";
private static final Logger LOG = LogManager.getLogger();
@Value("${ENABLE_DOMAIN_EVENTS:true}")
private boolean enableDomainEvents;

@Value("${DOMAIN_EVENTS_ENABLED:true}")
private boolean domainEventsEnabled;
@Autowired
private KafkaConfig kafkaConfig;
private KafkaSender kafkaSender;

public void publishRecordCreated(Record created, Map<String, String> okapiHeaders) {
publishRecord(created, okapiHeaders, SOURCE_RECORD_CREATED);
Expand All @@ -45,33 +41,50 @@ public void publishRecordUpdated(Record updated, Map<String, String> okapiHeader
}

private void publishRecord(Record aRecord, Map<String, String> okapiHeaders, EventType eventType) {
Vertx.vertx().executeBlocking(() -> {
try {
var kafkaHeaders = getKafkaHeaders(okapiHeaders, aRecord.getRecordType());
var key = aRecord.getId();
return sendEventToKafka(okapiHeaders.get(TENANT), getEvent(aRecord, eventType),
eventType.value(), kafkaHeaders, kafkaConfig, key);
} catch (Exception e) {
LOG.error("Exception during Record domain event sending", e);
return Future.failedFuture(e);
}
});
if (!domainEventsEnabled || notValidForPublishing(aRecord)) {
return;
}
try {
var kafkaHeaders = getKafkaHeaders(okapiHeaders, aRecord.getRecordType());
var key = aRecord.getId();
kafkaSender.sendEventToKafka(okapiHeaders.get(TENANT), getEvent(aRecord, eventType), eventType.value(),
kafkaHeaders, key);
} catch (Exception e) {
LOG.error("Exception during Record domain event sending", e);
}
}

private boolean notValidForPublishing(Record aRecord) {
if (isNull(aRecord.getRecordType())) {
LOG.error("Record [with id {}] contains no type information and won't be sent as domain event", aRecord.getId());
return true;
}
if (isNull(aRecord.getRawRecord())) {
LOG.error("Record [with id {}] contains no raw record and won't be sent as domain event", aRecord.getId());
return true;
}
if (isNull(aRecord.getRawRecord().getContent())) {
LOG.error("Record [with id {}] contains no raw record content and won't be sent as domain event",
aRecord.getId());
return true;
}
return false;
}

private List<KafkaHeader> getKafkaHeaders(Map<String, String> okapiHeaders, Record.RecordType recordType) {
return new ArrayList<>(List.of(
return List.of(
KafkaHeader.header(URL, okapiHeaders.get(URL)),
KafkaHeader.header(TENANT, okapiHeaders.get(TENANT)),
KafkaHeader.header(TOKEN, okapiHeaders.get(TOKEN)),
KafkaHeader.header(RECORD_TYPE, recordType.value()))
KafkaHeader.header(RECORD_TYPE, recordType.value())
);
}

private String getEvent(Record eventRecord, EventType type) {
var event = new SourceRecordDomainEvent()
.withId(eventRecord.getId())
.withEventType(type)
.withEventPayload((String) eventRecord.getParsedRecord().getContent());
.withEventPayload(eventRecord.getRawRecord().getContent());
return Json.encode(event);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.folio.services.kafka;

import io.vertx.core.Future;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.util.List;
import org.folio.kafka.KafkaConfig;
import org.folio.services.util.EventHandlingUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class KafkaSender {

@Autowired
private KafkaConfig kafkaConfig;

public Future<Boolean> sendEventToKafka(String tenantId, String eventPayload, String eventType,
List<KafkaHeader> kafkaHeaders, String key) {
return EventHandlingUtil.sendEventToKafka(tenantId, eventPayload, eventType, kafkaHeaders, kafkaConfig, key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import static org.folio.okapi.common.XOkapiHeaders.TOKEN;
import static org.folio.okapi.common.XOkapiHeaders.URL;
import static org.folio.rest.jaxrs.model.SourceRecordDomainEvent.EventType;
import static org.folio.services.domainevent.RecordDomainEventPublisher.RECORD_DOMAIN_TOPIC;
import static org.folio.services.domainevent.RecordDomainEventPublisher.RECORD_DOMAIN_EVENT_TOPIC;
import static org.folio.services.util.KafkaUtil.extractHeaderValue;

import io.vertx.core.Future;
Expand Down Expand Up @@ -108,7 +108,7 @@ public static String constructModuleName() {

public static String createTopicName(String eventType, String tenantId, KafkaConfig kafkaConfig) {
if (stream(EventType.values()).anyMatch(et -> et.value().equals(eventType))) {
return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), tenantId, RECORD_DOMAIN_TOPIC);
return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), tenantId, RECORD_DOMAIN_EVENT_TOPIC);
}
return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), KafkaTopicNameHelper.getDefaultNameSpace(),
tenantId, eventType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ di_logs_srs_marc_authority_record_updated.partitions = ${DI_LOG_SRS_MARC_AUTHORI
di_marc_holdings_matched.partitions = ${DI_SRS_MARC_HOLDINGS_RECORD_MATCHED:1}
di_marc_holdings_not_matched.partitions = ${DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED:1}
di_marc_authority_record_updated.partitions = ${DI_SRS_MARC_AUTHORITY_RECORD_UPDATED:1}
source_records.partitions = ${SRS_SOURCE_RECORDS_PARTITIONS:1}
Loading

0 comments on commit bcee5f1

Please sign in to comment.