Skip to content

Commit

Permalink
MODSOURMAN-1063: MULTIPLE fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
VRohach committed Jan 10, 2024
1 parent 943140c commit 8be4663
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -138,6 +139,7 @@ public class JournalRecordDaoImpl implements JournalRecordDao {
public static final String SOURCE_ENTITY_ERROR = "source_entity_error";
public static final String INCOMING_RECORD_ID = "incoming_record_id";
public static final String HOLDINGS_ENTITY_HRID = "holdings_entity_hrid";
public static final String ITEM_HOLDINGS_ID = "item_holdings_id";
private final Set<String> sortableFields = Set.of("source_record_order", "action_type", "error");
private final Set<String> jobLogEntrySortableFields = Set.of("source_record_order", "title", "source_record_action_status",
"instance_action_status", "holdings_action_status", "item_action_status", "order_action_status", "invoice_action_status", "error");
Expand Down Expand Up @@ -318,15 +320,15 @@ private String prepareSortingClause(String sortBy, String order) {
}

private RecordProcessingLogDtoCollection mapRowSetToRecordProcessingLogDtoCollection(RowSet<Row> rowSet) {
var recordProcessingLogDto = new RecordProcessingLogDtoCollection()
.withTotalRecords(0);
var recordProcessingLogDto = new RecordProcessingLogDtoCollection().withTotalRecords(0);

rowSet.forEach(row ->
recordProcessingLogDto
.withTotalRecords(row.getInteger(TOTAL_COUNT))
.getEntries().add(mapJobLogEntryRow(row))
);
return recordProcessingLogDto;

return processMultipleHoldingsAndItemsIfNeeded(recordProcessingLogDto);
}

private RecordProcessingLogDto mapJobLogEntryRow(Row row) {
Expand Down Expand Up @@ -366,7 +368,7 @@ private RecordProcessingLogDto mapJobLogEntryRow(Row row) {
? null : row.getValue(INVOICE_LINE_JOURNAL_RECORD_ID).toString());

ProcessedHoldingsInfo processedHoldings = constructProcessedHoldingsInfoBasedOnEntityType(row, HOLDINGS_ACTION_STATUS, HOLDINGS_ENTITY_ID, JournalRecordsColumns.HOLDINGS_ENTITY_HRID, HOLDINGS_PERMANENT_LOCATION_ID, HOLDINGS_ENTITY_ERROR);
ProcessedItemInfo processedItem = constructProcessedItemInfoBasedOnEntityType(row, ITEM_ACTION_STATUS, ITEM_ENTITY_ID, ITEM_ENTITY_HRID, HOLDINGS_ENTITY_ID, ITEM_ENTITY_ERROR);
ProcessedItemInfo processedItem = constructProcessedItemInfoBasedOnEntityType(row, ITEM_ACTION_STATUS, ITEM_ENTITY_ID, ITEM_ENTITY_HRID, ITEM_HOLDINGS_ID, ITEM_ENTITY_ERROR);
if (Objects.nonNull(processedHoldings.getActionStatus()) || processedItem.getActionStatus() == UPDATED) {
processedHoldingsInfo.add(processedHoldings);
}
Expand Down Expand Up @@ -543,4 +545,56 @@ private EntityProcessingSummary mapToEntityProcessingSummary(Row row, String tot
.withTotalDiscardedEntities(totalDiscarded)
.withTotalErrors(totalErrors);
}

private static RecordProcessingLogDtoCollection processMultipleHoldingsAndItemsIfNeeded(RecordProcessingLogDtoCollection recordProcessingLogDto) {
List<RecordProcessingLogDto> entries = recordProcessingLogDto.getEntries();
boolean needToMerge = ifNeedToMerge(entries);

if (needToMerge) {
Map<String, List<ProcessedHoldingsInfo>> relatedHoldingsInfoBySourceRecordId =
entries.stream()
.collect(Collectors.groupingBy(
RecordProcessingLogDto::getSourceRecordId,
Collectors.mapping(RecordProcessingLogDto::getRelatedHoldingsInfo,
Collectors.flatMapping(List::stream, Collectors.toList())
)));

Map<String, List<ProcessedItemInfo>> relatedItemInfoBySourceId =
entries.stream()
.collect(Collectors.groupingBy(
RecordProcessingLogDto::getSourceRecordId,
Collectors.mapping(RecordProcessingLogDto::getRelatedItemInfo,
Collectors.flatMapping(List::stream, Collectors.toList())
)));

List<RecordProcessingLogDto> mergedEntries = new ArrayList<>();
for (String sourceRecordId : relatedHoldingsInfoBySourceRecordId.keySet()) {
List<ProcessedHoldingsInfo> relatedHoldingsInfos = relatedHoldingsInfoBySourceRecordId.get(sourceRecordId);
List<ProcessedItemInfo> relatedItemInfos = relatedItemInfoBySourceId.get(sourceRecordId);

RecordProcessingLogDto firstRecordWithCurrentSourceId = entries.stream().filter(record ->
record.getSourceRecordId().equals(sourceRecordId)).findFirst().get();
RecordProcessingLogDto newRecord = firstRecordWithCurrentSourceId
.withRelatedHoldingsInfo(relatedHoldingsInfos.stream().distinct().toList())
.withRelatedItemInfo(relatedItemInfos.stream().distinct().toList());
mergedEntries.add(newRecord);
}
return recordProcessingLogDto.withEntries(mergedEntries);
} else {
return recordProcessingLogDto;
}
}

private static boolean ifNeedToMerge(List<RecordProcessingLogDto> entries) {
Map<String, Long> sourceRecordIdCounts = entries.stream()
.filter(e -> e.getRelatedHoldingsInfo() != null && !e.getRelatedHoldingsInfo().isEmpty())
.collect(Collectors.groupingBy(RecordProcessingLogDto::getSourceRecordId, Collectors.counting()));

Map<String, Long> sourceItemRecordIdCounts = entries.stream()
.filter(e -> e.getRelatedItemInfo() != null && !e.getRelatedItemInfo().isEmpty())
.collect(Collectors.groupingBy(RecordProcessingLogDto::getSourceRecordId, Collectors.counting()));

return sourceRecordIdCounts.values().stream().anyMatch(count -> count > 1) ||
sourceItemRecordIdCounts.values().stream().anyMatch(count -> count > 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE OR REPLACE FUNCTION get_job_log_entries(jobExecutionId uuid, sortingField
RETURNS TABLE(job_execution_id uuid, incoming_record_id uuid, source_id uuid, source_record_order integer, invoiceline_number text, title text,
source_record_action_status text, source_entity_error text, source_record_tenant_id text,instance_action_status text, instance_entity_id text, instance_entity_hrid text, instance_entity_error text,
instance_entity_tenant_id text, holdings_action_status text, holdings_entity_id text, holdings_entity_hrid text, holdings_permanent_location_id text,
holdings_entity_error text, item_action_status text, item_entity_hrid text, item_entity_id text, item_entity_error text, authority_action_status text,
holdings_entity_error text, item_action_status text, item_entity_id text, item_entity_hrid text, item_entity_error text, item_holdings_id text, authority_action_status text,
authority_entity_id text, authority_entity_error text, po_line_action_status text, po_line_entity_id text, po_line_entity_hrid text, po_line_entity_error text,
order_entity_id text, invoice_action_status text, invoice_entity_id text[], invoice_entity_hrid text[], invoice_entity_error text, invoice_line_action_status text,
invoice_line_entity_id text, invoice_line_entity_hrid text, invoice_line_entity_error text, total_count bigint,
Expand Down Expand Up @@ -75,7 +75,6 @@ WITH
SELECT action_type, entity_id, entity_hrid, error, instance_id, permanent_location_id, temp_result.job_execution_id, temp_result.source_id, temp_result.title, temp_result.source_record_order
FROM temp_result WHERE entity_type = ''HOLDINGS''
),
items AS (
SELECT action_type, entity_id, holdings_id, entity_hrid, error, instance_id, temp_result.job_execution_id, temp_result.source_id, temp_result.title, temp_result.source_record_order
FROM temp_result WHERE entity_type = ''ITEM''
Expand All @@ -88,17 +87,13 @@ WITH
SELECT action_type, entity_id, temp_result.source_id, error, temp_result.job_execution_id, temp_result.title, temp_result.source_record_order
FROM temp_result WHERE entity_type = ''AUTHORITY''
),
marc AS (
SELECT temp_result.job_execution_id, entity_id, title, source_record_order, action_type, error, source_id, tenant_id
FROM temp_result WHERE entity_type IN (''MARC_BIBLIOGRAPHIC'', ''MARC_HOLDINGS'', ''MARC_AUTHORITY'')
),
marc_authority AS (
SELECT temp_result.job_execution_id, entity_id, title, source_record_order, action_type, error, source_id, tenant_id
FROM temp_result WHERE entity_type IN ( ''MARC_AUTHORITY'')
FROM temp_result WHERE entity_type = ''MARC_AUTHORITY''
),
marc_holdings AS (
SELECT temp_result.job_execution_id, entity_id, title, source_record_order, action_type, error, source_id, tenant_id
FROM temp_result WHERE entity_type IN ( ''MARC_HOLDINGS'')
FROM temp_result WHERE entity_type = ''MARC_HOLDINGS''
)
SELECT records_actions.job_execution_id AS job_execution_id,
Expand All @@ -114,24 +109,25 @@ SELECT records_actions.job_execution_id AS job_execution_id,
END AS source_record_action_status,
null AS source_entity_error,
null AS source_record_tenant_id,
get_entity_status(instance_actions, instance_errors_number) AS instance_action_status,
instance_info.action_type AS instance_action_status,
instance_info.instance_entity_id AS instance_entity_id,
instance_info.instance_entity_hrid AS instance_entity_hrid,
instance_info.instance_entity_error AS instance_entity_error,
instance_info.instance_entity_tenant_id AS instance_entity_tenant_id,
get_entity_status(holdings_actions, holdings_errors_number) AS holdings_action_status,
holdings_info.action_type AS holdings_action_status,
holdings_info.holdings_entity_id AS holdings_entity_id,
holdings_info.holdings_entity_hrid AS holdings_entity_hrid,
holdings_info.holdings_permanent_location_id AS holdings_permanent_location_id,
holdings_info.holdings_entity_error AS holdings_entity_error,
get_entity_status(item_actions, item_errors_number) AS item_action_status,
items_info.action_type AS item_action_status,
items_info.items_entity_id AS item_entity_id,
items_info.items_entity_hrid AS item_entity_hrid,
items_info.items_entity_error AS item_entity_error,
get_entity_status(authority_actions, authority_errors_number) AS authority_action_status,
items_info.item_holdings_id AS item_holdings_id,
authority_info.action_type AS authority_action_status,
coalesce(authority_info.authority_entity_id, marc_authority_info.marc_authority_entity_id) AS authority_entity_id,
coalesce(authority_info.authority_entity_error, marc_authority_info.marc_authority_entity_error) AS authority_entity_error,
get_entity_status(po_line_actions, po_line_errors_number) AS po_line_action_status,
po_lines_info.action_type AS po_line_action_status,
po_lines_info.po_lines_entity_id AS po_lines_entity_id,
po_lines_info.po_lines_entity_hrid AS po_lines_entity_hrid,
po_lines_info.po_lines_entity_error AS po_lines_entity_error,
Expand Down Expand Up @@ -176,7 +172,8 @@ FROM (
WHERE journal_records.job_execution_id = ''%1$s'') AS rec_titles
ON rec_titles.source_id = records_actions.source_id AND rec_titles.title IS NOT NULL
LEFT JOIN (
SELECT instances.job_execution_id AS job_execution_id,
SELECT instances.action_type AS action_type,
instances.job_execution_id AS job_execution_id,
instances.title AS title,
instances.source_id AS source_id,
instances.entity_id AS instance_entity_id,
Expand All @@ -188,7 +185,9 @@ FROM (
LEFT JOIN (
SELECT holdings.source_id AS source_id,
SELECT
holdings.action_type AS action_type,
holdings.source_id AS source_id,
holdings.title AS title,
holdings.entity_id AS holdings_entity_id,
holdings.entity_hrid AS holdings_entity_hrid,
Expand All @@ -198,16 +197,19 @@ FROM (
) AS holdings_info ON holdings_info.source_id = records_actions.source_id
LEFT JOIN (
SELECT items.source_id AS source_id,
SELECT items.action_type AS action_type,
items.source_id AS source_id,
items.title AS title,
items.entity_id AS items_entity_id,
items.entity_hrid AS items_entity_hrid,
items.error AS items_entity_error
items.error AS items_entity_error,
items.holdings_id AS item_holdings_id
FROM items
) AS items_info ON items_info.source_id = records_actions.source_id
LEFT JOIN (
SELECT po_lines.source_id AS source_id,
SELECT po_lines.action_type AS action_type,
po_lines.source_id AS source_id,
po_lines.title AS title,
po_lines.entity_id AS po_lines_entity_id,
po_lines.entity_hrid AS po_lines_entity_hrid,
Expand All @@ -217,23 +219,26 @@ FROM (
LEFT JOIN (
SELECT authorities.source_id AS source_id,
SELECT authorities.action_type AS action_type,
authorities.source_id AS source_id,
authorities.title AS title,
authorities.entity_id AS authority_entity_id,
authorities.error AS authority_entity_error
FROM authorities
) AS authority_info ON authority_info.source_id = records_actions.source_id
LEFT JOIN (
SELECT marc_authority.source_id AS source_id,
SELECT marc_authority.action_type AS action_type,
marc_authority.source_id AS source_id,
marc_authority.title AS title,
marc_authority.entity_id AS marc_authority_entity_id,
marc_authority.error AS marc_authority_entity_error
FROM marc_authority
) AS marc_authority_info ON marc_authority_info.source_id = records_actions.source_id
LEFT JOIN (
SELECT marc_holdings.source_id AS source_id,
SELECT marc_holdings.action_type AS action_type,
marc_holdings.source_id AS source_id,
marc_holdings.title AS title,
marc_holdings.entity_id AS marc_authority_entity_id,
marc_holdings.error AS marc_authority_entity_error
Expand Down Expand Up @@ -272,14 +277,15 @@ SELECT records_actions.job_execution_id AS job_execution_id,
null AS instance_entity_error,
null AS instance_entity_tenant_id,
null AS holdings_action_status,
null AS holdings_entity_hrid,
null AS holdings_entity_id,
null AS holdings_entity_hrid,
null AS holdings_permanent_location_id,
null AS holdings_entity_error,
null AS item_action_status,
null AS item_entity_hrid,
null AS item_entity_id,
null AS item_entity_hrid,
null AS item_entity_error,
null AS item_holdings_id,
null AS authority_action_status,
null AS authority_entity_id,
null AS authority_entity_error,
Expand Down
Loading

0 comments on commit 8be4663

Please sign in to comment.