Skip to content

Commit

Permalink
[MODSOURMAN-1141] Change get_job_log_entries sql function to retrieve…
Browse files Browse the repository at this point in the history
… sourceRecordId from existing bib (#869)

* [MODSOURMAN-1141] Change get_job_log_entries sql function to retrieve sourceRecordId from existing bib

* Update ramls

---------

Co-authored-by: Kateryna Senchenko <[email protected]>
  • Loading branch information
RomanChernetskyi and KaterynaSenchenko authored Mar 12, 2024
1 parent e2c10a4 commit 4e0099a
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private RecordProcessingLogDto mapJobLogEntryRow(Row row) {
.withSourceRecordType(entityType)
.withJobExecutionId(row.getValue(JOB_EXECUTION_ID).toString())
.withIncomingRecordId(row.getValue(INCOMING_RECORD_ID).toString())
.withSourceRecordId(row.getValue(SOURCE_ID).toString())
.withSourceRecordId(row.getValue(SOURCE_ID) != null ? row.getValue(SOURCE_ID).toString() : null)
.withSourceRecordOrder(isEmpty(row.getString(INVOICE_ACTION_STATUS))
? row.getInteger(SOURCE_RECORD_ORDER).toString()
: row.getString(INVOICE_LINE_NUMBER))
Expand Down Expand Up @@ -553,48 +553,48 @@ private static RecordProcessingLogDtoCollection processMultipleHoldingsAndItemsI
if (!ifNeedToMerge(entries)) {
return recordProcessingLogDto;
}
Map<String, List<ProcessedHoldingsInfo>> relatedHoldingsInfoBySourceRecordId =
Map<String, List<ProcessedHoldingsInfo>> relatedHoldingsInfoByIncomingRecordId =
entries.stream()
.collect(Collectors.groupingBy(
RecordProcessingLogDto::getSourceRecordId,
RecordProcessingLogDto::getIncomingRecordId,
Collectors.mapping(RecordProcessingLogDto::getRelatedHoldingsInfo,
Collectors.flatMapping(List::stream, toList())
)));

Map<String, List<ProcessedItemInfo>> relatedItemInfoBySourceId =
Map<String, List<ProcessedItemInfo>> relatedItemInfoByIncomingRecordId =
entries.stream()
.collect(Collectors.groupingBy(
RecordProcessingLogDto::getSourceRecordId,
RecordProcessingLogDto::getIncomingRecordId,
Collectors.mapping(RecordProcessingLogDto::getRelatedItemInfo,
Collectors.flatMapping(List::stream, toList())
)));

List<RecordProcessingLogDto> mergedEntries = relatedHoldingsInfoBySourceRecordId.entrySet()
List<RecordProcessingLogDto> mergedEntries = relatedHoldingsInfoByIncomingRecordId.entrySet()
.stream().map(e -> {
String sourceRecordId = e.getKey();
List<ProcessedItemInfo> relatedItemInfos = relatedItemInfoBySourceId.get(sourceRecordId);
String incomingRecordId = e.getKey();
List<ProcessedItemInfo> relatedItemInfos = relatedItemInfoByIncomingRecordId.get(incomingRecordId);

RecordProcessingLogDto firstRecordWithCurrentSourceId = entries.stream()
.filter(record -> record.getSourceRecordId().equals(sourceRecordId))
RecordProcessingLogDto firstRecordWithCurrentIncomingRecordId = entries.stream()
.filter(record -> record.getIncomingRecordId().equals(incomingRecordId))
.findFirst().orElseGet(RecordProcessingLogDto::new);

return firstRecordWithCurrentSourceId
return firstRecordWithCurrentIncomingRecordId
.withRelatedHoldingsInfo(e.getValue().stream().distinct().toList())
.withRelatedItemInfo(relatedItemInfos.stream().distinct().toList());
}).collect(toList());
return recordProcessingLogDto.withEntries(mergedEntries);
}

private static boolean ifNeedToMerge(List<RecordProcessingLogDto> entries) {
Map<String, Long> sourceRecordIdCounts = entries.stream()
Map<String, Long> holdingsIncomingRecordIdCounts = entries.stream()
.filter(e -> e.getRelatedHoldingsInfo() != null && !e.getRelatedHoldingsInfo().isEmpty())
.collect(Collectors.groupingBy(RecordProcessingLogDto::getSourceRecordId, Collectors.counting()));
.collect(Collectors.groupingBy(RecordProcessingLogDto::getIncomingRecordId, Collectors.counting()));

Map<String, Long> sourceItemRecordIdCounts = entries.stream()
Map<String, Long> itemIncomingRecordIdCounts = entries.stream()
.filter(e -> e.getRelatedItemInfo() != null && !e.getRelatedItemInfo().isEmpty())
.collect(Collectors.groupingBy(RecordProcessingLogDto::getSourceRecordId, Collectors.counting()));
.collect(Collectors.groupingBy(RecordProcessingLogDto::getIncomingRecordId, Collectors.counting()));

return sourceRecordIdCounts.values().stream().anyMatch(count -> count > 1) ||
sourceItemRecordIdCounts.values().stream().anyMatch(count -> count > 1);
return holdingsIncomingRecordIdCounts.values().stream().anyMatch(count -> count > 1) ||
itemIncomingRecordIdCounts.values().stream().anyMatch(count -> count > 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionType.UPDATE;
import static org.folio.rest.jaxrs.model.JournalRecord.EntityType.AUTHORITY;
import static org.folio.rest.jaxrs.model.JournalRecord.EntityType.HOLDINGS;
import static org.folio.rest.jaxrs.model.JournalRecord.EntityType.INSTANCE;
Expand Down Expand Up @@ -142,7 +143,8 @@ record = Json.decodeValue(recordAsString, Record.class);
}

if (!isEmpty(entityAsString)) {
if (entityType == INSTANCE || entityType == PO_LINE || entityType == AUTHORITY) {
if (entityType == INSTANCE || entityType == PO_LINE || entityType == AUTHORITY ||
(entityType == MARC_BIBLIOGRAPHIC && actionType == UPDATE)) {
JsonObject entityJson = new JsonObject(entityAsString);
journalRecord.setEntityId(entityJson.getString(ID_KEY));
if (entityType == INSTANCE || entityType == PO_LINE) {
Expand Down Expand Up @@ -200,7 +202,7 @@ private static JournalRecord buildJournalRecordWithMarcBibType(JournalRecord.Act
String actionTypeFromContext = eventPayloadContext.get(MARC_BIB_RECORD_CREATED);

if (actionTypeFromContext.equals(Boolean.TRUE.toString())) actionTypeForMarcBib = JournalRecord.ActionType.CREATE;
else actionTypeForMarcBib = JournalRecord.ActionType.UPDATE;
else actionTypeForMarcBib = UPDATE;
}

return buildCommonJournalRecord(actionStatus, actionTypeForMarcBib, currentRecord, eventPayload, eventPayloadContext, incomingRecordId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,15 @@ WHERE tmp.entity_type = ''ITEM''
marc_holdings AS (
SELECT temp_result.job_execution_id, entity_id, title, source_record_order, action_type, error, source_id, tenant_id
FROM temp_result WHERE entity_type = ''MARC_HOLDINGS''
),
marc_bibliographic AS (
SELECT temp_result.job_execution_id, entity_id, title, source_record_order, action_type, error, source_id, tenant_id
FROM temp_result WHERE entity_type = ''MARC_BIBLIOGRAPHIC''
)
SELECT records_actions.job_execution_id AS job_execution_id,
records_actions.source_id AS source_id,
records_actions.source_id AS incoming_record_id,
coalesce(marc_bibliographic_entity_id::uuid, marc_authority_entity_id::uuid, marc_holdings_entity_id::uuid) AS source_id,
records_actions.source_record_order AS source_record_order,
'''' as invoiceline_number,
coalesce(rec_titles.title, marc_holdings_info.title) AS title,
Expand Down Expand Up @@ -292,12 +296,21 @@ FROM (
FROM marc_authority
) AS marc_authority_info ON marc_authority_info.source_id = records_actions.source_id
LEFT JOIN (
SELECT marc_bibliographic.action_type AS action_type,
marc_bibliographic.source_id AS source_id,
marc_bibliographic.title AS title,
marc_bibliographic.entity_id AS marc_bibliographic_entity_id,
marc_bibliographic.error AS marc_bibliographic_entity_error
FROM marc_bibliographic WHERE entity_id IS NOT NULL
) AS marc_bibliographic_info ON marc_bibliographic_info.source_id = records_actions.source_id
LEFT JOIN (
SELECT marc_holdings.action_type AS action_type,
marc_holdings.source_id AS source_id,
marc_holdings.title AS title,
marc_holdings.entity_id AS marc_authority_entity_id,
marc_holdings.error AS marc_authority_entity_error
marc_holdings.entity_id AS marc_holdings_entity_id,
marc_holdings.error AS marc_holdings_entity_error
FROM marc_holdings
) AS marc_holdings_info ON marc_holdings_info.source_id = records_actions.source_id
Expand All @@ -315,8 +328,8 @@ FROM (
UNION
SELECT records_actions.job_execution_id AS job_execution_id,
records_actions.source_id AS source_id,
records_actions.source_id AS incoming_record_id,
records_actions.source_id AS source_id,
source_record_order AS source_record_order,
entity_hrid as invoiceline_number,
invoice_line_info.title AS title,
Expand Down
Loading

0 comments on commit 4e0099a

Please sign in to comment.