Skip to content

Commit

Permalink
MODSOURCE-752: add domain event sending for Update record methods
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jul 30, 2024
1 parent e988131 commit a541282
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@

import static org.folio.dao.util.QMEventTypes.QM_ERROR;
import static org.folio.dao.util.QMEventTypes.QM_SRS_MARC_RECORD_UPDATED;
import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.services.util.EventHandlingUtil.createProducer;
import static org.folio.services.util.EventHandlingUtil.createProducerRecord;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;

import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -19,19 +14,22 @@
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.services.RecordService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import org.folio.dao.util.QMEventTypes;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
import org.folio.rest.jaxrs.model.Event;
import org.folio.rest.jaxrs.model.ParsedRecordDto;
import org.folio.rest.util.OkapiConnectionParams;
import org.folio.services.RecordService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class QuickMarcKafkaHandler implements AsyncRecordHandler<String, String> {
Expand Down Expand Up @@ -67,22 +65,23 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
log.trace("handle:: Handling kafka consumerRecord {}", consumerRecord);

var kafkaHeaders = consumerRecord.headers();
var params = new OkapiConnectionParams(kafkaHeadersToMap(kafkaHeaders), vertx);
var okapiHeaders = toOkapiHeaders(kafkaHeaders, null);

return getEventPayload(consumerRecord)
.compose(eventPayload -> {
String snapshotId = eventPayload.getOrDefault(SNAPSHOT_ID_KEY, UUID.randomUUID().toString());
var tenantId = okapiHeaders.get(TENANT);
return getRecordDto(eventPayload)
.compose(recordDto -> recordService.updateSourceRecord(recordDto, snapshotId, params.getTenantId()))
.compose(recordDto -> recordService.updateSourceRecord(recordDto, snapshotId, okapiHeaders))
.compose(updatedRecord -> {
eventPayload.put(updatedRecord.getRecordType().value(), Json.encode(updatedRecord));
return sendEvent(eventPayload, QM_SRS_MARC_RECORD_UPDATED, params.getTenantId(), kafkaHeaders)
return sendEvent(eventPayload, QM_SRS_MARC_RECORD_UPDATED, tenantId, kafkaHeaders)
.map(aBoolean -> consumerRecord.key());
})
.recover(th -> {
log.warn("handle:: Failed to handle QM_RECORD_UPDATED event", th);
eventPayload.put(ERROR_KEY, th.getMessage());
return sendEvent(eventPayload, QM_ERROR, params.getTenantId(), kafkaHeaders)
return sendEvent(eventPayload, QM_ERROR, tenantId, kafkaHeaders)
.map(aBoolean -> th.getMessage());
});
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.folio.dao;

import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor;
import io.reactivex.Flowable;
import io.vertx.core.Future;
import io.vertx.sqlclient.Row;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import io.vertx.sqlclient.Row;
import net.sf.jsqlparser.JSQLParserException;
import org.folio.dao.util.IdType;
import org.folio.dao.util.MatchField;
import org.folio.dao.util.RecordType;
import org.folio.rest.jaxrs.model.MarcBibCollection;
import org.folio.rest.jaxrs.model.ParsedRecord;
Expand All @@ -22,17 +25,12 @@
import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.services.RecordSearchParameters;
import org.folio.dao.util.MatchField;
import org.folio.services.util.TypeConnection;
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
import org.jooq.Condition;
import org.jooq.OrderField;

import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor;
import io.reactivex.Flowable;
import io.vertx.core.Future;

/**
* Data access object for {@link Record}
*/
Expand Down Expand Up @@ -372,9 +370,10 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* @param txQE query execution
* @param newRecord new Record to create
* @param oldRecord old Record that has to be marked as "old"
* @param okapiHeaders okapi headers
* @return future with new "updated" Record
*/
Future<Record> saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord);
Future<Record> saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord, Map<String, String> okapiHeaders);

/**
* Change suppress from discovery flag for record by external relation id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,7 @@ public Future<Optional<Record>> getRecordByCondition(ReactiveClassicGenericQuery
public Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
LOG.trace("saveRecord:: Saving {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record, okapiHeaders))
.onSuccess(created -> recordDomainEventPublisher.publishRecordCreated(created, okapiHeaders));
return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record, okapiHeaders));
}

@Override
Expand Down Expand Up @@ -971,8 +970,9 @@ public Future<Record> updateRecord(Record record, Map<String, String> okapiHeade
LOG.trace("updateRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> getRecordById(txQE, record.getId())
.compose(optionalRecord -> optionalRecord
.map(r -> saveRecord(txQE, record, okapiHeaders))
.orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId()))))));
.map(r -> insertOrUpdateRecord(txQE, record))
.orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId()))))))
.onSuccess(updated -> recordDomainEventPublisher.publishRecordUpdated(updated, okapiHeaders));
}

@Override
Expand Down Expand Up @@ -1299,9 +1299,10 @@ private MarcBibCollection toMarcBibCollection(QueryResult result) {
}

@Override
public Future<Record> saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord) {
public Future<Record> saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord, Map<String, String> okapiHeaders) {
LOG.trace("saveUpdatedRecord:: Saving updated record {}", newRecord.getId());
return insertOrUpdateRecord(txQE, oldRecord).compose(r -> insertOrUpdateRecord(txQE, newRecord));
return insertOrUpdateRecord(txQE, oldRecord).compose(r -> insertOrUpdateRecord(txQE, newRecord))
.onSuccess(r -> recordDomainEventPublisher.publishRecordUpdated(r, okapiHeaders));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ public interface RecordService {
*
* @param parsedRecordDto parsed record DTO containing updates to parsed record
* @param snapshotId snapshot id to which new Record should be linked
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with updated Record
*/
Future<Record> updateSourceRecord(ParsedRecordDto parsedRecordDto, String snapshotId, String tenantId);
Future<Record> updateSourceRecord(ParsedRecordDto parsedRecordDto, String snapshotId, Map<String, String> okapiHeaders);

/**
* Find marc bib ids by incoming arrays from SRM and exclude all valid marc bib and return only marc bib ids,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders
if (generation > 0) {
return recordDao.getRecordByMatchedId(txQE, record.getMatchedId())
.compose(optionalMatchedRecord -> optionalMatchedRecord
.map(matchedRecord -> recordDao.saveUpdatedRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), matchedRecord.withState(Record.State.OLD)))
.map(matchedRecord -> recordDao.saveUpdatedRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)),
matchedRecord.withState(Record.State.OLD), okapiHeaders))
.orElseGet(() -> recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), okapiHeaders)));
} else {
return recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), okapiHeaders);
Expand Down Expand Up @@ -293,7 +294,7 @@ public Future<Void> deleteRecordsByExternalId(String externalId, String tenantId
}

@Override
public Future<Record> updateSourceRecord(ParsedRecordDto parsedRecordDto, String snapshotId, String tenantId) {
public Future<Record> updateSourceRecord(ParsedRecordDto parsedRecordDto, String snapshotId, Map<String, String> okapiHeaders) {
String newRecordId = UUID.randomUUID().toString();
return recordDao.executeInTransaction(txQE -> recordDao.getRecordByMatchedId(txQE, parsedRecordDto.getId())
.compose(optionalRecord -> optionalRecord
Expand All @@ -313,9 +314,9 @@ public Future<Record> updateSourceRecord(ParsedRecordDto parsedRecordDto, String
.withParsedRecord(new ParsedRecord().withId(newRecordId).withContent(parsedRecordDto.getParsedRecord().getContent()))
.withExternalIdsHolder(parsedRecordDto.getExternalIdsHolder())
.withAdditionalInfo(parsedRecordDto.getAdditionalInfo())
.withMetadata(parsedRecordDto.getMetadata()), existingRecord.withState(Record.State.OLD))))
.withMetadata(parsedRecordDto.getMetadata()), existingRecord.withState(Record.State.OLD), okapiHeaders)))
.orElse(Future.failedFuture(new NotFoundException(
format(RECORD_NOT_FOUND_TEMPLATE, parsedRecordDto.getId()))))), tenantId);
format(RECORD_NOT_FOUND_TEMPLATE, parsedRecordDto.getId()))))), okapiHeaders.get(TENANT));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,8 +965,8 @@ public void shouldUpdateRecordState(TestContext context) {
var okapiHeaders = Map.of(TENANT, TENANT_ID);

recordDao.saveRecord(original, okapiHeaders)
.compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID))
.compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID))
.compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders))
.compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders))
.compose(ar -> recordService.updateRecordsState(original.getMatchedId(), RecordState.DRAFT, RecordType.MARC_BIB, TENANT_ID))
.onComplete(update -> {
if (update.failed()) {
Expand Down Expand Up @@ -1002,7 +1002,7 @@ public void shouldUpdateMarcAuthorityRecordStateToDeleted(TestContext context) {
var okapiHeaders = Map.of(TENANT, TENANT_ID);

recordDao.saveRecord(original, okapiHeaders)
.compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID))
.compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders))
.compose(ar -> recordService.updateRecordsState(original.getMatchedId(), RecordState.DELETED, RecordType.MARC_AUTHORITY, TENANT_ID))
.onComplete(update -> {
if (update.failed()) {
Expand Down Expand Up @@ -1191,7 +1191,7 @@ public void shouldGetMarcBibSourceRecordByMatchedIdNotEqualToId(TestContext cont
var okapiHeaders = Map.of(TENANT, TENANT_ID);

recordDao.saveRecord(expected, okapiHeaders)
.compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID))
.compose(ar -> recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders))
.onComplete(update -> {
if (update.failed()) {
context.fail(update.cause());
Expand Down Expand Up @@ -1361,7 +1361,7 @@ public void shouldUpdateSourceRecord(TestContext context) {
.withAdditionalInfo(expected.getAdditionalInfo())
.withExternalIdsHolder(expected.getExternalIdsHolder())
.withMetadata(expected.getMetadata());
recordService.updateSourceRecord(parsedRecordDto, snapshotId, TENANT_ID).onComplete(update -> {
recordService.updateSourceRecord(parsedRecordDto, snapshotId, okapiHeaders).onComplete(update -> {
if (update.failed()) {
context.fail(update.cause());
}
Expand Down

0 comments on commit a541282

Please sign in to comment.