From 4c50759506c0fa67683b353f59343548ff8ba442 Mon Sep 17 00:00:00 2001 From: pbobylev Date: Thu, 1 Aug 2024 17:04:36 +0500 Subject: [PATCH] MODSOURCE-752: updateParsedRecord & updateParsedRecords methods handled --- .../main/java/org/folio/dao/RecordDao.java | 8 ++-- .../java/org/folio/dao/RecordDaoImpl.java | 37 +++++++++++-------- .../rest/impl/SourceStorageBatchImpl.java | 6 ++- .../org/folio/services/RecordService.java | 8 ++-- .../org/folio/services/RecordServiceImpl.java | 8 ++-- .../AbstractPostProcessingEventHandler.java | 2 +- .../org/folio/services/RecordServiceTest.java | 3 +- ...nstancePostProcessingEventHandlerTest.java | 4 +- 8 files changed, 44 insertions(+), 32 deletions(-) diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java index d784997fc..2517ba4c2 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java @@ -234,19 +234,19 @@ Future getMatchedRecordsIdentifiers(MatchField mat * Updates {@link ParsedRecord} in the db * * @param record record dto from which {@link ParsedRecord} will be updated - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with updated ParsedRecord */ - Future updateParsedRecord(Record record, String tenantId); + Future updateParsedRecord(Record record, Map okapiHeaders); /** * Update parsed records from collection of records and external relations ids in one transaction * * @param recordCollection collection of records from which parsed records will be updated - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with response containing list of successfully updated records and error messages for records that were not updated */ - Future updateParsedRecords(RecordCollection recordCollection, String tenantId); + Future updateParsedRecords(RecordCollection recordCollection, Map okapiHeaders); /** * Searches for {@link Record} by id of external entity which was created from desired record diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index a7a7fb3e5..f29114e92 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -1077,21 +1077,26 @@ public Future calculateGeneration(ReactiveClassicGenericQueryExecutor t } @Override - public Future updateParsedRecord(Record record, String tenantId) { - LOG.trace("updateParsedRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); + public Future updateParsedRecord(Record record, Map okapiHeaders) { + var tenantId = okapiHeaders.get(TENANT); + LOG.trace("updateParsedRecord:: Updating {} record {} for tenant {}", record.getRecordType(), + record.getId(), tenantId); return getQueryExecutor(tenantId).transaction(txQE -> GenericCompositeFuture.all(Lists.newArrayList( updateExternalIdsForRecord(txQE, record), ParsedRecordDaoUtil.update(txQE, record.getParsedRecord(), ParsedRecordDaoUtil.toRecordType(record)) - )).map(res -> record.getParsedRecord())); + )).onSuccess(updated -> recordDomainEventPublisher.publishRecordUpdated(record, okapiHeaders)) + .map(res -> record.getParsedRecord())); } @Override - public Future updateParsedRecords(RecordCollection recordCollection, String tenantId) { + public Future updateParsedRecords(RecordCollection recordCollection, Map okapiHeaders) { + var tenantId = okapiHeaders.get(TENANT); logRecordCollection("updateParsedRecords:: Updating", recordCollection, tenantId); Promise promise = Promise.promise(); Context context = Vertx.currentContext(); if(context == null) return Future.failedFuture("updateParsedRecords must be called by a vertx thread"); + var recordsUpdated = new ArrayList(); context.owner().executeBlocking(blockingPromise -> { Set recordTypes = new HashSet<>(); @@ -1105,7 +1110,7 @@ public Future updateParsedRecords(RecordCollection r Field prtId = field(name(ID), UUID.class); Field prtContent = field(name(CONTENT), JSONB.class); - List parsedRecords = recordCollection.getRecords() + List processedRecords = recordCollection.getRecords() .stream() .map(this::validateParsedRecordId) .peek(record -> { @@ -1187,9 +1192,9 @@ public Future updateParsedRecords(RecordCollection r .setId(null); } - }).map(Record::getParsedRecord) - .filter(parsedRecord -> Objects.nonNull(parsedRecord.getId())) - .collect(Collectors.toList()); + }) + .filter(processedRecord -> Objects.nonNull(processedRecord.getParsedRecord().getId())) + .toList(); try (Connection connection = getConnection(tenantId)) { DSL.using(connection).transaction(ctx -> { @@ -1210,21 +1215,20 @@ public Future updateParsedRecords(RecordCollection r int[] parsedRecordUpdateResults = dsl.batch(parsedRecordUpdates).execute(); // check parsed record update results - List parsedRecordsUpdated = new ArrayList<>(); for (int i = 0; i < parsedRecordUpdateResults.length; i++) { int result = parsedRecordUpdateResults[i]; - ParsedRecord parsedRecord = parsedRecords.get(i); + var processedRecord = processedRecords.get(i); if (result == 0) { - errorMessages.add(format("Parsed Record with id '%s' was not updated", parsedRecord.getId())); + errorMessages.add(format("Parsed Record with id '%s' was not updated", processedRecord.getId())); } else { - parsedRecordsUpdated.add(parsedRecord); + recordsUpdated.add(processedRecord); } } blockingPromise.complete(new ParsedRecordsBatchResponse() .withErrorMessages(errorMessages) - .withParsedRecords(parsedRecordsUpdated) - .withTotalRecords(parsedRecordsUpdated.size())); + .withParsedRecords(recordsUpdated.stream().map(Record::getParsedRecord).toList()) + .withTotalRecords(recordsUpdated.size())); }); } catch (SQLException e) { LOG.warn("updateParsedRecords:: Failed to update records", e); @@ -1242,7 +1246,10 @@ public Future updateParsedRecords(RecordCollection r } }); - return promise.future(); + return promise.future() + .onSuccess(response -> + recordsUpdated.forEach(updated -> recordDomainEventPublisher.publishRecordUpdated(updated, okapiHeaders)) + ); } @Override diff --git a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageBatchImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageBatchImpl.java index b4317a84e..7e4afd4f7 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageBatchImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/SourceStorageBatchImpl.java @@ -1,5 +1,7 @@ package org.folio.rest.impl; +import static org.folio.okapi.common.XOkapiHeaders.TENANT; + import java.util.List; import java.util.Map; @@ -57,6 +59,7 @@ public void postSourceStorageBatchVerifiedRecords(List marcBibIds, Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { vertxContext.runOnContext(v -> { + okapiHeaders.putIfAbsent(TENANT, tenantId); try { MetadataUtil.populateMetadata(entity.getRecords(), okapiHeaders); recordService.saveRecords(entity, okapiHeaders) @@ -82,9 +85,10 @@ public void postSourceStorageBatchRecords(RecordCollection entity, Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { vertxContext.runOnContext(v -> { + okapiHeaders.putIfAbsent(TENANT, tenantId); try { MetadataUtil.populateMetadata(entity.getRecords(), okapiHeaders); - recordService.updateParsedRecords(entity, tenantId) + recordService.updateParsedRecords(entity, okapiHeaders) .map(parsedRecordsBatchResponse -> { if (!parsedRecordsBatchResponse.getParsedRecords().isEmpty()) { return PutSourceStorageBatchParsedRecordsResponse.respond200WithApplicationJson(parsedRecordsBatchResponse); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java index c1cf7294c..14ded8a17 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java @@ -173,19 +173,19 @@ public interface RecordService { * Updates {@link ParsedRecord} in the db * * @param record record dto from which {@link ParsedRecord} will be updated - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with updated ParsedRecord */ - Future updateParsedRecord(Record record, String tenantId); + Future updateParsedRecord(Record record, Map okapiHeaders); /** * Update parsed records from collection of records and external relations ids in one transaction * * @param recordCollection collection of records from which parsed records will be updated - * @param tenantId tenant id + * @param okapiHeaders okapi headers * @return future with response containing list of successfully updated records and error messages for records that were not updated */ - Future updateParsedRecords(RecordCollection recordCollection, String tenantId); + Future updateParsedRecords(RecordCollection recordCollection, Map okapiHeaders); /** * Fetch stripped parsed records by ids and filter marc fields by provided range of fields diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java index f8b8f39d2..1eabfc84a 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java @@ -238,18 +238,18 @@ public Future> getSourceRecordById(String id, IdType idTy } @Override - public Future updateParsedRecord(Record record, String tenantId) { - return recordDao.updateParsedRecord(record, tenantId); + public Future updateParsedRecord(Record record, Map okapiHeaders) { + return recordDao.updateParsedRecord(record, okapiHeaders); } @Override - public Future updateParsedRecords(RecordCollection recordCollection, String tenantId) { + public Future updateParsedRecords(RecordCollection recordCollection, Map okapiHeaders) { if (recordCollection.getRecords().isEmpty()) { Promise promise = Promise.promise(); promise.complete(new ParsedRecordsBatchResponse().withTotalRecords(0)); return promise.future(); } - return recordDao.updateParsedRecords(recordCollection, tenantId); + return recordDao.updateParsedRecords(recordCollection, okapiHeaders); } @Override diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/AbstractPostProcessingEventHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/AbstractPostProcessingEventHandler.java index adfdbf87e..58a8a630b 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/AbstractPostProcessingEventHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/AbstractPostProcessingEventHandler.java @@ -319,7 +319,7 @@ private Future saveRecord(Record record, Map okapiHeader return recordService.getRecordById(record.getId(), tenantId) .compose(r -> { if (r.isPresent()) { - return recordService.updateParsedRecord(record, tenantId).map(record.withGeneration(r.get().getGeneration())); + return recordService.updateParsedRecord(record, okapiHeaders).map(record.withGeneration(r.get().getGeneration())); } else { record.getRawRecord().setId(record.getId()); return recordService.saveRecord(record, okapiHeaders).map(record); diff --git a/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java b/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java index f56caef20..131ec2c5f 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java @@ -1938,7 +1938,8 @@ private void updateParsedMarcRecords(TestContext context, Record.RecordType reco List expected = updated.stream() .map(Record::getParsedRecord) .collect(Collectors.toList()); - recordService.updateParsedRecords(recordCollection, TENANT_ID).onComplete(update -> { + var okapiHeaders = Map.of(TENANT, TENANT_ID); + recordService.updateParsedRecords(recordCollection, okapiHeaders).onComplete(update -> { if (update.failed()) { context.fail(update.cause()); } diff --git a/mod-source-record-storage-server/src/test/java/org/folio/services/handlers/InstancePostProcessingEventHandlerTest.java b/mod-source-record-storage-server/src/test/java/org/folio/services/handlers/InstancePostProcessingEventHandlerTest.java index f202445fc..7ce917c14 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/services/handlers/InstancePostProcessingEventHandlerTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/services/handlers/InstancePostProcessingEventHandlerTest.java @@ -201,7 +201,7 @@ public void shouldProceedIfConsortiumTrackExists(TestContext context) { doAnswer(invocationOnMock -> Future.succeededFuture(Optional.of(record))).when(mockedRecordService).getRecordById(anyString(), anyString()); - doAnswer(invocationOnMock -> Future.succeededFuture(record.getParsedRecord())).when(mockedRecordService).updateParsedRecord(any(), anyString()); + doAnswer(invocationOnMock -> Future.succeededFuture(record.getParsedRecord())).when(mockedRecordService).updateParsedRecord(any(), any()); doAnswer(invocationOnMock -> Future.succeededFuture(recordCollection)).when(mockedRecordService).getRecords(any(), any(), any(), anyInt(), anyInt(), anyString()); @@ -237,7 +237,7 @@ public void shouldProceedIfConsortiumTrackExists(TestContext context) { if (e != null) { context.fail(e); } - verify(mockedRecordService, times(1)).updateParsedRecord(any(), anyString()); + verify(mockedRecordService, times(1)).updateParsedRecord(any(), any()); context.assertNull(payload.getContext().get(CENTRAL_TENANT_INSTANCE_UPDATED_FLAG)); context.assertEquals(expectedCentralTenantId, payload.getContext().get(CENTRAL_TENANT_ID)); async.complete();