Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODSOURCE-817: Fix data consistency in handling and updating Marc Bib records for links.instance-authority event #652

Merged
merged 26 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5f43cc4
modelinks-115: disable concurrent links event processing
mukhiddin-yusuf Oct 25, 2024
880c315
Merge branch 'master' into modelinks-115
mukhiddin-yusuf Oct 25, 2024
ad1d0a0
modelinks-115: test
mukhiddin-yusuf Oct 30, 2024
8a5a6dd
modelinks-115: force completing the authority link event handling
mukhiddin-yusuf Oct 31, 2024
3529099
modsource-817: debug with logs
mukhiddin-yusuf Nov 1, 2024
51721e0
modsource-817: add blocking get and update service method
mukhiddin-yusuf Nov 5, 2024
58675c2
fix(marc-bib-update): Fix data consistency in handling and updating M…
mukhiddin-yusuf Nov 5, 2024
8b6bfe1
Merge branch 'master' into modsource-817
mukhiddin-yusuf Nov 5, 2024
a076434
modsource-817: test blocking in handler
mukhiddin-yusuf Nov 5, 2024
9f310bc
modsource-817: test blocking in handler
mukhiddin-yusuf Nov 5, 2024
5e8b715
modsource-817: add separate save records method for running blocking …
mukhiddin-yusuf Nov 6, 2024
076a9fc
Merge branch 'modelinks-115' into modsource-817
mukhiddin-yusuf Nov 6, 2024
e627eca
modsource-817: add new save records method
mukhiddin-yusuf Nov 6, 2024
06d3e5c
modsource-817: refactor
mukhiddin-yusuf Nov 14, 2024
890e9a6
MODSOURCE-817: update pom and refactor
mukhiddin-yusuf Nov 14, 2024
776ebf3
Merge branch 'master' into MODSOURCE-817
mukhiddin-yusuf Nov 19, 2024
76b0504
Merge branch 'master' into MODSOURCE-817
mukhiddin-yusuf Nov 22, 2024
207ce8f
MODSOURCE-817: refactor
mukhiddin-yusuf Nov 22, 2024
b645f28
MODSOURCE-817: fix test
mukhiddin-yusuf Nov 22, 2024
83c6176
MODSOURCE-817: fix test
mukhiddin-yusuf Nov 22, 2024
50ea5c6
MODSOURCE-817: fix test
mukhiddin-yusuf Nov 25, 2024
4edac7f
MODSOURCE-817: fix test
mukhiddin-yusuf Nov 25, 2024
969bd50
MODSOURCE-817: fix test
mukhiddin-yusuf Nov 25, 2024
c04d831
Merge branch 'master' into MODSOURCE-817
mukhiddin-yusuf Nov 26, 2024
80c6847
MODSOURCE-817: fix sonar quality gate issues
mukhiddin-yusuf Nov 26, 2024
d432c83
Merge branch 'master' into MODSOURCE-817
mukhiddin-yusuf Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## 2025-xx-xx 5.10.0-SNAPSHOT
## 2025-XX-XX 5.10.0-SNAPSHOT
* [MODSOURCE-817](https://folio-org.atlassian.net/browse/MODSOURCE-817) Fix data consistency in handling and updating Marc Bib records for links.instance-authority event
* [MODSOURCE-816](https://folio-org.atlassian.net/browse/MODSOURCE-816) [RRT] Optimize execution plan for streaming SQL
* [MODSOURCE-824](https://folio-org.atlassian.net/browse/MODSOURCE-824) Endpoint /batch/parsed-records/fetch does not return deleted records

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.folio.consumers;

import static java.util.Collections.emptyList;
import static java.util.Objects.nonNull;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.EMPTY;
Expand Down Expand Up @@ -34,8 +33,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.util.IdType;
import org.folio.dao.util.RecordDaoUtil;
import org.folio.dao.util.RecordType;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
Expand All @@ -54,6 +51,7 @@
import org.folio.rest.jaxrs.model.UpdateTarget;
import org.folio.services.RecordService;
import org.folio.services.SnapshotService;
import org.folio.services.entities.RecordsModifierOperator;
import org.folio.services.handlers.links.DeleteLinkProcessor;
import org.folio.services.handlers.links.LinkProcessor;
import org.folio.services.handlers.links.UpdateLinkProcessor;
Expand Down Expand Up @@ -86,21 +84,27 @@ public AuthorityLinkChunkKafkaHandler(RecordService recordService, KafkaConfig k
@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
LOGGER.trace("handle:: Handling kafka record: {}", consumerRecord);
LOGGER.info("handle:: Start Handling kafka record");
var userId = extractHeaderValue(XOkapiHeaders.USER_ID, consumerRecord.headers());
return mapToEvent(consumerRecord)

var result = mapToEvent(consumerRecord)
.compose(this::createSnapshot)
.compose(event -> retrieveRecords(event, event.getTenant())
.compose(recordCollection -> mapRecordFieldsChanges(event, recordCollection, userId))
.compose(recordCollection -> recordService.saveRecords(recordCollection,
toOkapiHeaders(consumerRecord.headers(), event.getTenant())))
.map(recordsBatchResponse -> sendReports(recordsBatchResponse, event, consumerRecord.headers()))
.map(recordsBatchResponse -> mapRecordsToBibUpdateEvents(recordsBatchResponse, event))
.compose(marcBibUpdates -> sendEvents(marcBibUpdates, event, consumerRecord))
).recover(th -> {
LOGGER.error("Failed to handle {} event", MARC_BIB.moduleTopicName(), th);
return Future.failedFuture(th);
}
);
.compose(linksUpdate -> {
var instanceIds = getBibRecordExternalIds(linksUpdate);
var okapiHeaders = toOkapiHeaders(consumerRecord.headers(), linksUpdate.getTenant());
RecordsModifierOperator recordsModifier = recordsCollection ->
this.mapRecordFieldsChanges(linksUpdate, recordsCollection, userId);

return recordService.saveRecordsByExternalIds(instanceIds, RecordType.MARC_BIB, recordsModifier, okapiHeaders)
.compose(recordsBatchResponse -> {
sendReports(recordsBatchResponse, linksUpdate, consumerRecord.headers());
var marcBibUpdateStats = mapRecordsToBibUpdateEvents(recordsBatchResponse, linksUpdate);
return sendEvents(marcBibUpdateStats, linksUpdate, consumerRecord);
});
});

LOGGER.info("handle:: Finish Handling kafka record");
return result;
}

private Future<BibAuthorityLinksUpdate> mapToEvent(KafkaConsumerRecord<String, String> consumerRecord) {
Expand All @@ -115,98 +119,103 @@ private Future<BibAuthorityLinksUpdate> mapToEvent(KafkaConsumerRecord<String, S
}
}

private Future<RecordCollection> retrieveRecords(BibAuthorityLinksUpdate bibAuthorityLinksUpdate, String tenantId) {
LOGGER.trace("Retrieving bibs for jobId {}, authorityId {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());
var instanceIds = bibAuthorityLinksUpdate.getUpdateTargets().stream()
private Future<BibAuthorityLinksUpdate> createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var now = new Date();
var snapshot = new Snapshot()
.withJobExecutionId(bibAuthorityLinksUpdate.getJobId())
.withStatus(Snapshot.Status.COMMITTED)
.withProcessingStartedDate(now)
.withMetadata(new Metadata()
.withCreatedDate(now)
.withUpdatedDate(now));

return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant())
.map(result -> bibAuthorityLinksUpdate);
}

private List<String> getBibRecordExternalIds(BibAuthorityLinksUpdate linksUpdate) {
return linksUpdate.getUpdateTargets().stream()
.flatMap(updateTarget -> updateTarget.getLinks().stream()
.map(Link::getInstanceId))
.distinct()
.collect(Collectors.toList());

var condition = RecordDaoUtil.getExternalIdsCondition(instanceIds, IdType.INSTANCE)
.and(RecordDaoUtil.filterRecordByDeleted(false));
return recordService.getRecords(condition, RecordType.MARC_BIB, emptyList(), 0, instanceIds.size(), tenantId);
.toList();
}

private Future<RecordCollection> mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate,
RecordCollection recordCollection, String userId) {
private RecordCollection mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate,
RecordCollection recordCollection, String userId) {
LOGGER.debug("Retrieved {} bib records for jobId {}, authorityId {}",
recordCollection.getTotalRecords(), bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());

return getLinkProcessorForEvent(bibAuthorityLinksUpdate).map(linkProcessor -> {
recordCollection.getRecords().forEach(bibRecord -> {
var newRecordId = UUID.randomUUID().toString();
var instanceId = bibRecord.getExternalIdsHolder().getInstanceId();
var parsedRecord = bibRecord.getParsedRecord();
var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord);
var fields = new LinkedList<>(parsedRecordContent.getDataFields());

var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId);
var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream()
.filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField()))
.collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields));

fields.forEach(field -> {
if (!updateTargetFieldCodes.contains(field.getTag())) {
return;
}

var subfields = field.getSubfields();
if (isEmpty(subfields)) {
return;
}

var authorityId = getAuthorityIdSubfield(subfields);
if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) {
return;
}

var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields);
LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(),
instanceId, field.getTag(), subfields, newSubfields);

var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2());
newSubfields.forEach(newField::addSubfield);

var dataFields = parsedRecordContent.getDataFields();
var fieldPosition = dataFields.indexOf(field);
dataFields.remove(fieldPosition);
dataFields.add(fieldPosition, newField);
});

parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent));
parsedRecord.setFormattedContent(EMPTY);
parsedRecord.setId(newRecordId);
bibRecord.setId(newRecordId);
bibRecord.getRawRecord().setId(newRecordId);
bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId());
setUpdatedBy(bibRecord, userId);
var linkProcessor = getLinkProcessorForEvent(bibAuthorityLinksUpdate);
recordCollection.getRecords().forEach(bibRecord -> {
var newRecordId = UUID.randomUUID().toString();
var instanceId = bibRecord.getExternalIdsHolder().getInstanceId();
var parsedRecord = bibRecord.getParsedRecord();
var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord);
var fields = new LinkedList<>(parsedRecordContent.getDataFields());

var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId);
var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream()
.filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField()))
.collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields));

fields.forEach(field -> {
if (!updateTargetFieldCodes.contains(field.getTag())) {
return;
}

var subfields = field.getSubfields();
if (isEmpty(subfields)) {
return;
}

var authorityId = getAuthorityIdSubfield(subfields);
if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) {
return;
}

var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields);
LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(),
instanceId, field.getTag(), subfields, newSubfields);

var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2());
newSubfields.forEach(newField::addSubfield);

var dataFields = parsedRecordContent.getDataFields();
var fieldPosition = dataFields.indexOf(field);
dataFields.remove(fieldPosition);
dataFields.add(fieldPosition, newField);
});

return recordCollection;
parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent));
parsedRecord.setFormattedContent(EMPTY);
parsedRecord.setId(newRecordId);
bibRecord.setId(newRecordId);
bibRecord.getRawRecord().setId(newRecordId);
bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId());
setUpdatedBy(bibRecord, userId);
});
return recordCollection;
}

private Future<LinkProcessor> getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
private LinkProcessor getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var eventType = bibAuthorityLinksUpdate.getType();
switch (eventType) {
case DELETE: {
case DELETE -> {
LOGGER.debug("Precessing DELETE event for jobId {}, authorityId {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());
return Future.succeededFuture(new DeleteLinkProcessor());
return new DeleteLinkProcessor();
}
case UPDATE: {
case UPDATE -> {
LOGGER.debug("Precessing UPDATE event for jobId {}, authorityId {}",
bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId());
return Future.succeededFuture(new UpdateLinkProcessor());
return new UpdateLinkProcessor();
}
default: {
return Future.failedFuture(new IllegalArgumentException(
default ->
throw new IllegalArgumentException(
String.format("Unsupported event type: %s for jobId %s, authorityId %s",
eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId())));
}
eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()));
}
}

Expand All @@ -216,7 +225,7 @@ private List<String> extractUpdateTargetFieldCodesForInstance(BibAuthorityLinksU
.filter(updateTarget -> updateTarget.getLinks().stream()
.anyMatch(link -> link.getInstanceId().equals(instanceId)))
.map(UpdateTarget::getField)
.collect(Collectors.toList());
.toList();
}

private List<MarcBibUpdate> mapRecordsToBibUpdateEvents(RecordsBatchResponse batchResponse,
Expand Down Expand Up @@ -252,7 +261,7 @@ private List<MarcBibUpdate> toMarcBibUpdateEvents(RecordsBatchResponse batchResp
.withTs(bibAuthorityLinksUpdate.getTs())
.withRecord(bibRecord);
})
.collect(Collectors.toList());
.toList();
}

private List<LinkUpdateReport> toFailedLinkUpdateReports(List<Record> errorRecords,
Expand All @@ -273,21 +282,7 @@ private List<LinkUpdateReport> toFailedLinkUpdateReports(List<Record> errorRecor
.withFailCause(bibRecord.getErrorRecord().getDescription())
.withStatus(FAIL);
})
.collect(Collectors.toList());
}

private Future<BibAuthorityLinksUpdate> createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var now = new Date();
var snapshot = new Snapshot()
.withJobExecutionId(bibAuthorityLinksUpdate.getJobId())
.withStatus(Snapshot.Status.COMMITTED)
.withProcessingStartedDate(now)
.withMetadata(new Metadata()
.withCreatedDate(now)
.withUpdatedDate(now));

return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant())
.map(result -> bibAuthorityLinksUpdate);
.toList();
}

private void setUpdatedBy(Record changedRecord, String userId) {
Expand All @@ -300,8 +295,8 @@ private void setUpdatedBy(Record changedRecord, String userId) {
}
}

private RecordsBatchResponse sendReports(RecordsBatchResponse batchResponse, BibAuthorityLinksUpdate event,
List<KafkaHeader> headers) {
private void sendReports(RecordsBatchResponse batchResponse, BibAuthorityLinksUpdate event,
List<KafkaHeader> headers) {
var errorRecords = getErrorRecords(batchResponse);
if (!errorRecords.isEmpty()) {
LOGGER.info("Errors detected. Sending {} linking reports for jobId {}, authorityId {}",
Expand All @@ -310,7 +305,6 @@ private RecordsBatchResponse sendReports(RecordsBatchResponse batchResponse, Bib
toFailedLinkUpdateReports(errorRecords, event).forEach(report ->
sendEventToKafka(LINKS_STATS, report.getTenant(), report.getJobId(), report, headers));
}
return batchResponse;
}

private Future<String> sendEvents(List<MarcBibUpdate> marcBibUpdateEvents, BibAuthorityLinksUpdate event,
Expand Down Expand Up @@ -368,7 +362,7 @@ private KafkaProducerRecord<String, String> createKafkaProducerRecord(KafkaTopic
private List<Record> getErrorRecords(RecordsBatchResponse batchResponse) {
return batchResponse.getRecords().stream()
.filter(marcRecord -> nonNull(marcRecord.getErrorRecord()))
.collect(Collectors.toList());
.toList();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import net.sf.jsqlparser.JSQLParserException;
import org.folio.dao.util.IdType;
import org.folio.dao.util.MatchField;
Expand All @@ -26,6 +27,7 @@
import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.services.RecordSearchParameters;
import org.folio.services.entities.RecordsModifierOperator;
import org.folio.services.util.TypeConnection;
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
Expand Down Expand Up @@ -209,14 +211,28 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record, Map<String, String> okapiHeaders);

/**
* Saves {@link RecordCollection} to the db
* Saves {@link RecordCollection} to the db.
*
* @param recordCollection Record collection to save
* @param okapiHeaders okapi headers
* @return future with saved {@link RecordsBatchResponse}
*/
Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders);

/**
* Saves {@link RecordCollection} to the db.
*
* @param externalIds external relation ids
* @param recordType record type
* @param recordsModifier records collection modifier operator
* @param okapiHeaders okapi headers
* @return future with saved {@link RecordsBatchResponse}
*/
Future<RecordsBatchResponse> saveRecordsByExternalIds(List<String> externalIds,
RecordType recordType,
RecordsModifierOperator recordsModifier,
Map<String, String> okapiHeaders);

/**
* Updates {{@link Record} in the db
*
Expand Down
Loading