Skip to content

Commit

Permalink
MODSOURCE-817: Fix data consistency in handling and updating Marc Bib…
Browse files Browse the repository at this point in the history
… records for links.instance-authority event (#652)

MODSOURCE-817: Fix data consistency in handling and updating Marc Bib records for links.instance-authority event

- make the handling of events for instance-authority links so that no Marc Bib modifications from are lost due to concurrent nature of handling events
- make sure the event of the first in a sequence of updates sent by the producer and then received by the consumer is handled before with persisting the changes of related bibs than the events produced/consumed later

Closes: MODSOURCE-817
  • Loading branch information
mukhiddin-yusuf authored Dec 3, 2024
1 parent e85657d commit 43120f4
Show file tree
Hide file tree
Showing 16 changed files with 717 additions and 465 deletions.
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## 2025-xx-xx 5.10.0-SNAPSHOT
## 2025-XX-XX 5.10.0-SNAPSHOT
* [MODSOURCE-817](https://folio-org.atlassian.net/browse/MODSOURCE-817) Fix data consistency in handling and updating Marc Bib records for links.instance-authority event
* [MODSOURCE-816](https://folio-org.atlassian.net/browse/MODSOURCE-816) [RRT] Optimize execution plan for streaming SQL
* [MODSOURCE-824](https://folio-org.atlassian.net/browse/MODSOURCE-824) Endpoint /batch/parsed-records/fetch does not return deleted records

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.folio.consumers;

import static java.util.Collections.emptyList;
import static java.util.Objects.nonNull;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.EMPTY;
Expand Down Expand Up @@ -34,8 +33,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.util.IdType;
import org.folio.dao.util.RecordDaoUtil;
import org.folio.dao.util.RecordType;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
Expand All @@ -54,6 +51,7 @@
import org.folio.rest.jaxrs.model.UpdateTarget;
import org.folio.services.RecordService;
import org.folio.services.SnapshotService;
import org.folio.services.entities.RecordsModifierOperator;
import org.folio.services.handlers.links.DeleteLinkProcessor;
import org.folio.services.handlers.links.LinkProcessor;
import org.folio.services.handlers.links.UpdateLinkProcessor;
Expand Down Expand Up @@ -86,21 +84,27 @@ public AuthorityLinkChunkKafkaHandler(RecordService recordService, KafkaConfig k
@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
LOGGER.trace("handle:: Handling kafka record: {}", consumerRecord);
LOGGER.info("handle:: Start Handling kafka record");
var userId = extractHeaderValue(XOkapiHeaders.USER_ID, consumerRecord.headers());
return mapToEvent(consumerRecord)

var result = mapToEvent(consumerRecord)
.compose(this::createSnapshot)
.compose(event -> retrieveRecords(event, event.getTenant())
.compose(recordCollection -> mapRecordFieldsChanges(event, recordCollection, userId))
.compose(recordCollection -> recordService.saveRecords(recordCollection,
toOkapiHeaders(consumerRecord.headers(), event.getTenant())))
.map(recordsBatchResponse -> sendReports(recordsBatchResponse, event, consumerRecord.headers()))
.map(recordsBatchResponse -> mapRecordsToBibUpdateEvents(recordsBatchResponse, event))
.compose(marcBibUpdates -> sendEvents(marcBibUpdates, event, consumerRecord))
).recover(th -> {
LOGGER.error("Failed to handle {} event", MARC_BIB.moduleTopicName(), th);
return Future.failedFuture(th);
}
);
.compose(linksUpdate -> {
var instanceIds = getBibRecordExternalIds(linksUpdate);
var okapiHeaders = toOkapiHeaders(consumerRecord.headers(), linksUpdate.getTenant());
RecordsModifierOperator recordsModifier = recordsCollection ->
this.mapRecordFieldsChanges(linksUpdate, recordsCollection, userId);

return recordService.saveRecordsByExternalIds(instanceIds, RecordType.MARC_BIB, recordsModifier, okapiHeaders)
.compose(recordsBatchResponse -> {
sendReports(recordsBatchResponse, linksUpdate, consumerRecord.headers());
var marcBibUpdateStats = mapRecordsToBibUpdateEvents(recordsBatchResponse, linksUpdate);
return sendEvents(marcBibUpdateStats, linksUpdate, consumerRecord);
});
});

LOGGER.info("handle:: Finish Handling kafka record");
return result;
}

private Future<BibAuthorityLinksUpdate> mapToEvent(KafkaConsumerRecord<String, String> consumerRecord) {
Expand All @@ -115,98 +119,103 @@ private Future<BibAuthorityLinksUpdate> mapToEvent(KafkaConsumerRecord<String, S
}
}

private Future<RecordCollection> retrieveRecords(BibAuthorityLinksUpdate bibAuthorityLinksUpdate, String tenantId) {
LOGGER.trace("Retrieving bibs for jobId {}, authorityId {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());
var instanceIds = bibAuthorityLinksUpdate.getUpdateTargets().stream()
private Future<BibAuthorityLinksUpdate> createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var now = new Date();
var snapshot = new Snapshot()
.withJobExecutionId(bibAuthorityLinksUpdate.getJobId())
.withStatus(Snapshot.Status.COMMITTED)
.withProcessingStartedDate(now)
.withMetadata(new Metadata()
.withCreatedDate(now)
.withUpdatedDate(now));

return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant())
.map(result -> bibAuthorityLinksUpdate);
}

private List<String> getBibRecordExternalIds(BibAuthorityLinksUpdate linksUpdate) {
return linksUpdate.getUpdateTargets().stream()
.flatMap(updateTarget -> updateTarget.getLinks().stream()
.map(Link::getInstanceId))
.distinct()
.collect(Collectors.toList());

var condition = RecordDaoUtil.getExternalIdsCondition(instanceIds, IdType.INSTANCE)
.and(RecordDaoUtil.filterRecordByDeleted(false));
return recordService.getRecords(condition, RecordType.MARC_BIB, emptyList(), 0, instanceIds.size(), tenantId);
.toList();
}

private Future<RecordCollection> mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate,
RecordCollection recordCollection, String userId) {
private RecordCollection mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate,
RecordCollection recordCollection, String userId) {
LOGGER.debug("Retrieved {} bib records for jobId {}, authorityId {}",
recordCollection.getTotalRecords(), bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());

return getLinkProcessorForEvent(bibAuthorityLinksUpdate).map(linkProcessor -> {
recordCollection.getRecords().forEach(bibRecord -> {
var newRecordId = UUID.randomUUID().toString();
var instanceId = bibRecord.getExternalIdsHolder().getInstanceId();
var parsedRecord = bibRecord.getParsedRecord();
var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord);
var fields = new LinkedList<>(parsedRecordContent.getDataFields());

var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId);
var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream()
.filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField()))
.collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields));

fields.forEach(field -> {
if (!updateTargetFieldCodes.contains(field.getTag())) {
return;
}

var subfields = field.getSubfields();
if (isEmpty(subfields)) {
return;
}

var authorityId = getAuthorityIdSubfield(subfields);
if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) {
return;
}

var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields);
LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(),
instanceId, field.getTag(), subfields, newSubfields);

var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2());
newSubfields.forEach(newField::addSubfield);

var dataFields = parsedRecordContent.getDataFields();
var fieldPosition = dataFields.indexOf(field);
dataFields.remove(fieldPosition);
dataFields.add(fieldPosition, newField);
});

parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent));
parsedRecord.setFormattedContent(EMPTY);
parsedRecord.setId(newRecordId);
bibRecord.setId(newRecordId);
bibRecord.getRawRecord().setId(newRecordId);
bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId());
setUpdatedBy(bibRecord, userId);
var linkProcessor = getLinkProcessorForEvent(bibAuthorityLinksUpdate);
recordCollection.getRecords().forEach(bibRecord -> {
var newRecordId = UUID.randomUUID().toString();
var instanceId = bibRecord.getExternalIdsHolder().getInstanceId();
var parsedRecord = bibRecord.getParsedRecord();
var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord);
var fields = new LinkedList<>(parsedRecordContent.getDataFields());

var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId);
var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream()
.filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField()))
.collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields));

fields.forEach(field -> {
if (!updateTargetFieldCodes.contains(field.getTag())) {
return;
}

var subfields = field.getSubfields();
if (isEmpty(subfields)) {
return;
}

var authorityId = getAuthorityIdSubfield(subfields);
if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) {
return;
}

var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields);
LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(),
instanceId, field.getTag(), subfields, newSubfields);

var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2());
newSubfields.forEach(newField::addSubfield);

var dataFields = parsedRecordContent.getDataFields();
var fieldPosition = dataFields.indexOf(field);
dataFields.remove(fieldPosition);
dataFields.add(fieldPosition, newField);
});

return recordCollection;
parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent));
parsedRecord.setFormattedContent(EMPTY);
parsedRecord.setId(newRecordId);
bibRecord.setId(newRecordId);
bibRecord.getRawRecord().setId(newRecordId);
bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId());
setUpdatedBy(bibRecord, userId);
});
return recordCollection;
}

private Future<LinkProcessor> getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
private LinkProcessor getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var eventType = bibAuthorityLinksUpdate.getType();
switch (eventType) {
case DELETE: {
case DELETE -> {
LOGGER.debug("Precessing DELETE event for jobId {}, authorityId {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());
return Future.succeededFuture(new DeleteLinkProcessor());
return new DeleteLinkProcessor();
}
case UPDATE: {
case UPDATE -> {
LOGGER.debug("Precessing UPDATE event for jobId {}, authorityId {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());
return Future.succeededFuture(new UpdateLinkProcessor());
return new UpdateLinkProcessor();
}
default: {
return Future.failedFuture(new IllegalArgumentException(
default ->
throw new IllegalArgumentException(
String.format("Unsupported event type: %s for jobId %s, authorityId %s",
eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId())));
}
eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()));
}
}

Expand All @@ -216,7 +225,7 @@ private List<String> extractUpdateTargetFieldCodesForInstance(BibAuthorityLinksU
.filter(updateTarget -> updateTarget.getLinks().stream()
.anyMatch(link -> link.getInstanceId().equals(instanceId)))
.map(UpdateTarget::getField)
.collect(Collectors.toList());
.toList();
}

private List<MarcBibUpdate> mapRecordsToBibUpdateEvents(RecordsBatchResponse batchResponse,
Expand Down Expand Up @@ -252,7 +261,7 @@ private List<MarcBibUpdate> toMarcBibUpdateEvents(RecordsBatchResponse batchResp
.withTs(bibAuthorityLinksUpdate.getTs())
.withRecord(bibRecord);
})
.collect(Collectors.toList());
.toList();
}

private List<LinkUpdateReport> toFailedLinkUpdateReports(List<Record> errorRecords,
Expand All @@ -273,21 +282,7 @@ private List<LinkUpdateReport> toFailedLinkUpdateReports(List<Record> errorRecor
.withFailCause(bibRecord.getErrorRecord().getDescription())
.withStatus(FAIL);
})
.collect(Collectors.toList());
}

private Future<BibAuthorityLinksUpdate> createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var now = new Date();
var snapshot = new Snapshot()
.withJobExecutionId(bibAuthorityLinksUpdate.getJobId())
.withStatus(Snapshot.Status.COMMITTED)
.withProcessingStartedDate(now)
.withMetadata(new Metadata()
.withCreatedDate(now)
.withUpdatedDate(now));

return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant())
.map(result -> bibAuthorityLinksUpdate);
.toList();
}

private void setUpdatedBy(Record changedRecord, String userId) {
Expand All @@ -300,8 +295,8 @@ private void setUpdatedBy(Record changedRecord, String userId) {
}
}

private RecordsBatchResponse sendReports(RecordsBatchResponse batchResponse, BibAuthorityLinksUpdate event,
List<KafkaHeader> headers) {
private void sendReports(RecordsBatchResponse batchResponse, BibAuthorityLinksUpdate event,
List<KafkaHeader> headers) {
var errorRecords = getErrorRecords(batchResponse);
if (!errorRecords.isEmpty()) {
LOGGER.info("Errors detected. Sending {} linking reports for jobId {}, authorityId {}",
Expand All @@ -310,7 +305,6 @@ private RecordsBatchResponse sendReports(RecordsBatchResponse batchResponse, Bib
toFailedLinkUpdateReports(errorRecords, event).forEach(report ->
sendEventToKafka(LINKS_STATS, report.getTenant(), report.getJobId(), report, headers));
}
return batchResponse;
}

private Future<String> sendEvents(List<MarcBibUpdate> marcBibUpdateEvents, BibAuthorityLinksUpdate event,
Expand Down Expand Up @@ -368,7 +362,7 @@ private KafkaProducerRecord<String, String> createKafkaProducerRecord(KafkaTopic
private List<Record> getErrorRecords(RecordsBatchResponse batchResponse) {
return batchResponse.getRecords().stream()
.filter(marcRecord -> nonNull(marcRecord.getErrorRecord()))
.collect(Collectors.toList());
.toList();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import net.sf.jsqlparser.JSQLParserException;
import org.folio.dao.util.IdType;
import org.folio.dao.util.MatchField;
Expand All @@ -26,6 +27,7 @@
import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.services.RecordSearchParameters;
import org.folio.services.entities.RecordsModifierOperator;
import org.folio.services.util.TypeConnection;
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
Expand Down Expand Up @@ -209,14 +211,28 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record, Map<String, String> okapiHeaders);

/**
* Saves {@link RecordCollection} to the db
* Saves {@link RecordCollection} to the db.
*
* @param recordCollection Record collection to save
* @param okapiHeaders okapi headers
* @return future with saved {@link RecordsBatchResponse}
*/
Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders);

/**
* Saves {@link RecordCollection} to the db.
*
* @param externalIds external relation ids
* @param recordType record type
* @param recordsModifier records collection modifier operator
* @param okapiHeaders okapi headers
* @return future with saved {@link RecordsBatchResponse}
*/
Future<RecordsBatchResponse> saveRecordsByExternalIds(List<String> externalIds,
RecordType recordType,
RecordsModifierOperator recordsModifier,
Map<String, String> okapiHeaders);

/**
* Updates {{@link Record} in the db
*
Expand Down
Loading

0 comments on commit 43120f4

Please sign in to comment.