Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODSOURCE-752: Emit Domain Events For Source Records #635

Merged
merged 8 commits into from
Aug 7, 2024
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [MODINV-1049](https://folio-org.atlassian.net/browse/MODINV-1049) Existing "035" field is not retained the original position in imported record
* [MODSOURCE-785](https://folio-org.atlassian.net/browse/MODSOURCE-785) Update 005 field when set MARC for deletion
* [MODSOURMAN-783](https://folio-org.atlassian.net/browse/MODSOURCE-783) Extend MARC-MARC search query to account for qualifiers
* [MODSOURCE-752](https://folio-org.atlassian.net/browse/MODSOURCE-752) Emit Domain Events For Source Records

## 2024-03-20 5.8.0
* [MODSOURCE-733](https://issues.folio.org/browse/MODSOURCE-733) Reduce Memory Allocation of Strings
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ After setup, it is good to check logs in all related modules for errors. Data im
* DI_SRS_MARC_HOLDINGS_RECORD_MATCHED
* DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED
* DI_SRS_MARC_AUTHORITY_RECORD_UPDATED
* SRS_SOURCE_RECORDS_PARTITIONS
Default value for all partitions is 1
* DOMAIN_EVENTS_ENABLED env variable defines if Source Record Domain Event publishing should occur. True by default.
## Database schemas

The mod-source-record-storage module uses relational approach and Liquibase to define database schemas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.folio.consumers.RecordMappingUtils.readParsedContentToObjectRepresentation;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.services.util.EventHandlingUtil.createProducer;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;
import static org.folio.services.util.KafkaUtil.extractHeaderValue;

import io.vertx.core.Future;
Expand Down Expand Up @@ -92,7 +93,8 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
.compose(this::createSnapshot)
.compose(event -> retrieveRecords(event, event.getTenant())
.compose(recordCollection -> mapRecordFieldsChanges(event, recordCollection, userId))
.compose(recordCollection -> recordService.saveRecords(recordCollection, event.getTenant()))
.compose(recordCollection -> recordService.saveRecords(recordCollection,
toOkapiHeaders(consumerRecord.headers(), event.getTenant())))
.map(recordsBatchResponse -> sendReports(recordsBatchResponse, event, consumerRecord.headers()))
.map(recordsBatchResponse -> mapRecordsToBibUpdateEvents(recordsBatchResponse, event))
.compose(marcBibUpdates -> sendEvents(marcBibUpdates, event, consumerRecord))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_PARSED_RECORDS_CHUNK_SAVED;
import static org.folio.services.util.EventHandlingUtil.constructModuleName;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;
import static org.folio.services.util.KafkaUtil.extractHeaderValue;

@Component
Expand Down Expand Up @@ -85,7 +86,7 @@ public Future<String> handle(KafkaConsumerRecord<String, byte[]> targetRecord) {
LOGGER.debug("handle:: RecordCollection has been received with event: '{}', jobExecutionId '{}', chunkId: '{}', starting processing... chunkNumber '{}'-'{}'",
event.getEventType(), jobExecutionId, chunkId, chunkNumber, key);
setUserMetadata(recordCollection, userId);
return recordService.saveRecords(recordCollection, tenantId)
return recordService.saveRecords(recordCollection, toOkapiHeaders(kafkaHeaders))
.compose(recordsBatchResponse -> sendBackRecordsBatchResponse(recordsBatchResponse, kafkaHeaders, tenantId, chunkNumber, event.getEventType(), targetRecord));
} catch (Exception e) {
LOGGER.warn("handle:: RecordCollection processing has failed with errors jobExecutionId '{}', chunkId: '{}', chunkNumber '{}'-'{}'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@

import static org.folio.dao.util.QMEventTypes.QM_ERROR;
import static org.folio.dao.util.QMEventTypes.QM_SRS_MARC_RECORD_UPDATED;
import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.services.util.EventHandlingUtil.createProducer;
import static org.folio.services.util.EventHandlingUtil.createProducerRecord;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;

import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -19,19 +14,22 @@
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.services.RecordService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import org.folio.dao.util.QMEventTypes;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
import org.folio.rest.jaxrs.model.Event;
import org.folio.rest.jaxrs.model.ParsedRecordDto;
import org.folio.rest.util.OkapiConnectionParams;
import org.folio.services.RecordService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class QuickMarcKafkaHandler implements AsyncRecordHandler<String, String> {
Expand Down Expand Up @@ -67,22 +65,23 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
log.trace("handle:: Handling kafka consumerRecord {}", consumerRecord);

var kafkaHeaders = consumerRecord.headers();
var params = new OkapiConnectionParams(kafkaHeadersToMap(kafkaHeaders), vertx);
var okapiHeaders = toOkapiHeaders(kafkaHeaders);

return getEventPayload(consumerRecord)
.compose(eventPayload -> {
String snapshotId = eventPayload.getOrDefault(SNAPSHOT_ID_KEY, UUID.randomUUID().toString());
var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER);
return getRecordDto(eventPayload)
.compose(recordDto -> recordService.updateSourceRecord(recordDto, snapshotId, params.getTenantId()))
.compose(recordDto -> recordService.updateSourceRecord(recordDto, snapshotId, okapiHeaders))
.compose(updatedRecord -> {
eventPayload.put(updatedRecord.getRecordType().value(), Json.encode(updatedRecord));
return sendEvent(eventPayload, QM_SRS_MARC_RECORD_UPDATED, params.getTenantId(), kafkaHeaders)
return sendEvent(eventPayload, QM_SRS_MARC_RECORD_UPDATED, tenantId, kafkaHeaders)
.map(aBoolean -> consumerRecord.key());
})
.recover(th -> {
log.warn("handle:: Failed to handle QM_RECORD_UPDATED event", th);
eventPayload.put(ERROR_KEY, th.getMessage());
return sendEvent(eventPayload, QM_ERROR, params.getTenantId(), kafkaHeaders)
return sendEvent(eventPayload, QM_ERROR, tenantId, kafkaHeaders)
.map(aBoolean -> th.getMessage());
});
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package org.folio.dao;

import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor;
import io.reactivex.Flowable;
import io.vertx.core.Future;
import io.vertx.sqlclient.Row;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import io.vertx.sqlclient.Row;
import net.sf.jsqlparser.JSQLParserException;
import org.folio.dao.util.IdType;
import org.folio.dao.util.MatchField;
import org.folio.dao.util.RecordType;
import org.folio.rest.jaxrs.model.MarcBibCollection;
import org.folio.rest.jaxrs.model.ParsedRecord;
Expand All @@ -21,17 +25,12 @@
import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.services.RecordSearchParameters;
import org.folio.dao.util.MatchField;
import org.folio.services.util.TypeConnection;
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
import org.jooq.Condition;
import org.jooq.OrderField;

import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor;
import io.reactivex.Flowable;
import io.vertx.core.Future;

/**
* Data access object for {@link Record}
*/
Expand Down Expand Up @@ -189,10 +188,10 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* Saves {@link Record} to the db
*
* @param record Record to save
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with saved Record
*/
Future<Record> saveRecord(Record record, String tenantId);
Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders);

/**
* Saves {@link Record} to the db using {@link ReactiveClassicGenericQueryExecutor}
Expand All @@ -201,25 +200,25 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* @param record Record to save
* @return future with saved Record
*/
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record);
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record, Map<String, String> okapiHeaders);

/**
* Saves {@link RecordCollection} to the db
*
* @param recordCollection Record collection to save
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with saved {@link RecordsBatchResponse}
*/
Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, String tenantId);
Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders);

/**
* Updates {{@link Record} in the db
*
* @param record Record to update
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with updated Record
*/
Future<Record> updateRecord(Record record, String tenantId);
Future<Record> updateRecord(Record record, Map<String, String> okapiHeaders);

/**
* Increments generation in case a record with the same matchedId exists
Expand All @@ -235,19 +234,19 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* Updates {@link ParsedRecord} in the db
*
* @param record record dto from which {@link ParsedRecord} will be updated
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with updated ParsedRecord
*/
Future<ParsedRecord> updateParsedRecord(Record record, String tenantId);
Future<ParsedRecord> updateParsedRecord(Record record, Map<String, String> okapiHeaders);

/**
* Update parsed records from collection of records and external relations ids in one transaction
*
* @param recordCollection collection of records from which parsed records will be updated
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with response containing list of successfully updated records and error messages for records that were not updated
*/
Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, String tenantId);
Future<ParsedRecordsBatchResponse> updateParsedRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders);

/**
* Searches for {@link Record} by id of external entity which was created from desired record
Expand Down Expand Up @@ -371,9 +370,10 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* @param txQE query execution
* @param newRecord new Record to create
* @param oldRecord old Record that has to be marked as "old"
* @param okapiHeaders okapi headers
* @return future with new "updated" Record
*/
Future<Record> saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord);
Future<Record> saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord, Map<String, String> okapiHeaders);

/**
* Change suppress from discovery flag for record by external relation id
Expand Down
Loading
Loading