From f67499251ae6732d75f7728b2d72302fcdc984d0 Mon Sep 17 00:00:00 2001 From: divyamadala30 <71040191+divyamadala30@users.noreply.github.com> Date: Wed, 26 Oct 2022 14:48:32 -0400 Subject: [PATCH] redirected request/sample updates to updates channel (#816) * redirected request/sample updates to updates channel Signed-off-by: Divya Madala <71040191+divyamadala30@users.noreply.github.com> * logic fixes Signed-off-by: Divya Madala <71040191+divyamadala30@users.noreply.github.com> * checkstyle Signed-off-by: Divya Madala <71040191+divyamadala30@users.noreply.github.com> Signed-off-by: Divya Madala <71040191+divyamadala30@users.noreply.github.com> --- .../ResearchMessageHandlingServiceImpl.java | 89 ++++++++++--------- 1 file changed, 47 insertions(+), 42 deletions(-) diff --git a/service/src/main/java/org/mskcc/smile/service/impl/ResearchMessageHandlingServiceImpl.java b/service/src/main/java/org/mskcc/smile/service/impl/ResearchMessageHandlingServiceImpl.java index 76fe10f3..c6ebf068 100644 --- a/service/src/main/java/org/mskcc/smile/service/impl/ResearchMessageHandlingServiceImpl.java +++ b/service/src/main/java/org/mskcc/smile/service/impl/ResearchMessageHandlingServiceImpl.java @@ -3,6 +3,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.nats.client.Message; import java.nio.charset.StandardCharsets; +import java.util.AbstractMap; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -80,10 +83,12 @@ public class ResearchMessageHandlingServiceImpl implements ResearchMessageHandli new LinkedBlockingQueue(); private static final BlockingQueue promotedRequestQueue = new LinkedBlockingQueue(); - private static final BlockingQueue requestUpdateQueue = - new LinkedBlockingQueue(); - private static final BlockingQueue researchSampleUpdateQueue = - new LinkedBlockingQueue(); + // Boolean in requestUpdateQueue Map.Entry refers to arg fromLims + private static final BlockingQueue> requestUpdateQueue = + new LinkedBlockingQueue>(); + // Boolean in researchSampleUpdateQueue Map.Entry refers to arg fromLims + private static final BlockingQueue> researchSampleUpdateQueue = + new LinkedBlockingQueue>(); private static CountDownLatch newRequestHandlerShutdownLatch; private static CountDownLatch promotedRequestHandlerShutdownLatch; @@ -124,37 +129,31 @@ public void run() { if (existingRequest == null) { LOG.info("Persisting new request: " + request.getIgoRequestId()); requestService.saveRequest(request); + // publish saved request to consistency checker or promoted request topic + String requestJson = mapper.writeValueAsString( + requestService.getPublishedSmileRequestById(request.getIgoRequestId())); + switch (smileRequestDest) { + case NEW_REQUEST_DEST: + LOG.info("Publishing request to: " + CONSISTENCY_CHECK_NEW_REQUEST); + messagingGateway.publish(request.getIgoRequestId(), + CONSISTENCY_CHECK_NEW_REQUEST, requestJson); + break; + case PROMOTED_REQUEST_DEST: + LOG.info("Publishing request to: " + CONSUMERS_PROMOTED_REQUEST_TOPIC); + messagingGateway.publish(request.getIgoRequestId(), + CONSUMERS_PROMOTED_REQUEST_TOPIC, requestJson); + break; + default: + break; + } } else { - // request-service and sample-service methods will check for updates and persist - // them if applicable (including patient swapping) - // Boolean arg in updateRequestMetadata refers to fromLims - requestService.updateRequestMetadata(request.getLatestRequestMetadata(), - Boolean.TRUE); + requestUpdateQueue.add(new AbstractMap.SimpleImmutableEntry<>( + Boolean.TRUE, request.getLatestRequestMetadata())); for (SmileSample sample : request.getSmileSampleList()) { - // Boolean arg in updateSampleMetadata refers to fromLims - sampleService.updateSampleMetadata(sample.getLatestSampleMetadata(), - Boolean.TRUE); - sampleService.createSampleRequestRelationship(sample.getSmileSampleId(), - existingRequest.getSmileRequestId()); + researchSampleUpdateQueue.add(new AbstractMap.SimpleImmutableEntry<>( + Boolean.TRUE, sample.getLatestSampleMetadata())); } } - // publish updated/saved request to consistency checker or promoted request topic - String requestJson = mapper.writeValueAsString( - requestService.getPublishedSmileRequestById(request.getIgoRequestId())); - switch (smileRequestDest) { - case NEW_REQUEST_DEST: - LOG.info("Publishing request to: " + CONSISTENCY_CHECK_NEW_REQUEST); - messagingGateway.publish(request.getIgoRequestId(), - CONSISTENCY_CHECK_NEW_REQUEST, requestJson); - break; - case PROMOTED_REQUEST_DEST: - LOG.info("Publishing request to: " + CONSUMERS_PROMOTED_REQUEST_TOPIC); - messagingGateway.publish(request.getIgoRequestId(), - CONSUMERS_PROMOTED_REQUEST_TOPIC, requestJson); - break; - default: - break; - } } if (interrupted && requestQueue.isEmpty()) { break; @@ -182,14 +181,17 @@ public void run() { phaser.arrive(); while (true) { try { - RequestMetadata requestMetadata = requestUpdateQueue.poll(100, TimeUnit.MILLISECONDS); - if (requestMetadata != null) { + Entry requestMetadataEntry = + requestUpdateQueue.poll(100, TimeUnit.MILLISECONDS); + if (requestMetadataEntry != null) { // Boolean arg in updateRequestMetadata refers to fromLims - if (requestService.updateRequestMetadata(requestMetadata, Boolean.FALSE)) { + if (requestService.updateRequestMetadata(requestMetadataEntry.getValue(), + requestMetadataEntry.getKey())) { LOG.info("Publishing Request-level Metadata updates " + "to " + CMO_REQUEST_UPDATE_TOPIC); SmileRequest existingRequest = - requestService.getSmileRequestById(requestMetadata.getIgoRequestId()); + requestService.getSmileRequestById( + requestMetadataEntry.getValue().getIgoRequestId()); // publish request-level metadata history to CMO_REQUEST_UPDATE_TOPIC messagingGateway.publish(existingRequest.getIgoRequestId(), CMO_REQUEST_UPDATE_TOPIC, @@ -224,15 +226,17 @@ public void run() { phaser.arrive(); while (true) { try { - SampleMetadata sampleMetadata = researchSampleUpdateQueue.poll( + Entry sampleMetadataEntry = researchSampleUpdateQueue.poll( 100, TimeUnit.MILLISECONDS); - if (sampleMetadata != null) { + if (sampleMetadataEntry != null) { // Boolean arg in updateSampleMetadata refers to fromLims - if (sampleService.updateSampleMetadata(sampleMetadata, Boolean.FALSE)) { + if (sampleService.updateSampleMetadata(sampleMetadataEntry.getValue(), + sampleMetadataEntry.getKey())) { SmileSample existingSample = sampleService.getResearchSampleByRequestAndIgoId( - sampleMetadata.getIgoRequestId(), sampleMetadata.getPrimaryId()); + sampleMetadataEntry.getValue().getIgoRequestId(), + sampleMetadataEntry.getValue().getPrimaryId()); LOG.info("Publishing sample-level metadata history for research sample: " - + sampleMetadata.getPrimaryId()); + + sampleMetadataEntry.getValue().getPrimaryId()); // publish sample-level metadata history to CMO_REQUEST_UPDATE_TOPIC messagingGateway.publish(CMO_SAMPLE_UPDATE_TOPIC, mapper.writeValueAsString(existingSample.getSampleMetadataList())); @@ -298,7 +302,7 @@ public void requestUpdateHandler(RequestMetadata requestMetadata) throws Excepti throw new IllegalStateException("Message Handling Service has not been initialized"); } if (!shutdownInitiated) { - requestUpdateQueue.put(requestMetadata); + requestUpdateQueue.put(new AbstractMap.SimpleImmutableEntry<>(Boolean.FALSE, requestMetadata)); } else { LOG.error("Shutdown initiated, not accepting request update: " + requestMetadata); throw new IllegalStateException("Shutdown initiated, not handling any more requests"); @@ -311,7 +315,8 @@ public void researchSampleUpdateHandler(SampleMetadata sampleMetadata) throws Ex throw new IllegalStateException("Message Handling Service has not been initialized"); } if (!shutdownInitiated) { - researchSampleUpdateQueue.put(sampleMetadata); + researchSampleUpdateQueue.put(new AbstractMap.SimpleImmutableEntry<>( + Boolean.FALSE, sampleMetadata)); } else { LOG.error("Shutdown initiated, not accepting research sample update: " + sampleMetadata); throw new IllegalStateException("Shutdown initiated, not handling any more samples");