Skip to content

Commit

Permalink
redirected request/sample updates to updates channel (#816)
Browse files Browse the repository at this point in the history
* redirected request/sample updates to updates channel

Signed-off-by: Divya Madala <[email protected]>

* logic fixes

Signed-off-by: Divya Madala <[email protected]>

* checkstyle

Signed-off-by: Divya Madala <[email protected]>

Signed-off-by: Divya Madala <[email protected]>
  • Loading branch information
divyamadala30 authored Oct 26, 2022
1 parent 060b9d9 commit f674992
Showing 1 changed file with 47 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nats.client.Message;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -80,10 +83,12 @@ public class ResearchMessageHandlingServiceImpl implements ResearchMessageHandli
new LinkedBlockingQueue<SmileRequest>();
private static final BlockingQueue<SmileRequest> promotedRequestQueue =
new LinkedBlockingQueue<SmileRequest>();
private static final BlockingQueue<RequestMetadata> requestUpdateQueue =
new LinkedBlockingQueue<RequestMetadata>();
private static final BlockingQueue<SampleMetadata> researchSampleUpdateQueue =
new LinkedBlockingQueue<SampleMetadata>();
// Boolean in requestUpdateQueue Map.Entry refers to arg fromLims
private static final BlockingQueue<Map.Entry<Boolean,RequestMetadata>> requestUpdateQueue =
new LinkedBlockingQueue<Map.Entry<Boolean,RequestMetadata>>();
// Boolean in researchSampleUpdateQueue Map.Entry refers to arg fromLims
private static final BlockingQueue<Map.Entry<Boolean, SampleMetadata>> researchSampleUpdateQueue =
new LinkedBlockingQueue<Map.Entry<Boolean, SampleMetadata>>();

private static CountDownLatch newRequestHandlerShutdownLatch;
private static CountDownLatch promotedRequestHandlerShutdownLatch;
Expand Down Expand Up @@ -124,37 +129,31 @@ public void run() {
if (existingRequest == null) {
LOG.info("Persisting new request: " + request.getIgoRequestId());
requestService.saveRequest(request);
// publish saved request to consistency checker or promoted request topic
String requestJson = mapper.writeValueAsString(
requestService.getPublishedSmileRequestById(request.getIgoRequestId()));
switch (smileRequestDest) {
case NEW_REQUEST_DEST:
LOG.info("Publishing request to: " + CONSISTENCY_CHECK_NEW_REQUEST);
messagingGateway.publish(request.getIgoRequestId(),
CONSISTENCY_CHECK_NEW_REQUEST, requestJson);
break;
case PROMOTED_REQUEST_DEST:
LOG.info("Publishing request to: " + CONSUMERS_PROMOTED_REQUEST_TOPIC);
messagingGateway.publish(request.getIgoRequestId(),
CONSUMERS_PROMOTED_REQUEST_TOPIC, requestJson);
break;
default:
break;
}
} else {
// request-service and sample-service methods will check for updates and persist
// them if applicable (including patient swapping)
// Boolean arg in updateRequestMetadata refers to fromLims
requestService.updateRequestMetadata(request.getLatestRequestMetadata(),
Boolean.TRUE);
requestUpdateQueue.add(new AbstractMap.SimpleImmutableEntry<>(
Boolean.TRUE, request.getLatestRequestMetadata()));
for (SmileSample sample : request.getSmileSampleList()) {
// Boolean arg in updateSampleMetadata refers to fromLims
sampleService.updateSampleMetadata(sample.getLatestSampleMetadata(),
Boolean.TRUE);
sampleService.createSampleRequestRelationship(sample.getSmileSampleId(),
existingRequest.getSmileRequestId());
researchSampleUpdateQueue.add(new AbstractMap.SimpleImmutableEntry<>(
Boolean.TRUE, sample.getLatestSampleMetadata()));
}
}
// publish updated/saved request to consistency checker or promoted request topic
String requestJson = mapper.writeValueAsString(
requestService.getPublishedSmileRequestById(request.getIgoRequestId()));
switch (smileRequestDest) {
case NEW_REQUEST_DEST:
LOG.info("Publishing request to: " + CONSISTENCY_CHECK_NEW_REQUEST);
messagingGateway.publish(request.getIgoRequestId(),
CONSISTENCY_CHECK_NEW_REQUEST, requestJson);
break;
case PROMOTED_REQUEST_DEST:
LOG.info("Publishing request to: " + CONSUMERS_PROMOTED_REQUEST_TOPIC);
messagingGateway.publish(request.getIgoRequestId(),
CONSUMERS_PROMOTED_REQUEST_TOPIC, requestJson);
break;
default:
break;
}
}
if (interrupted && requestQueue.isEmpty()) {
break;
Expand Down Expand Up @@ -182,14 +181,17 @@ public void run() {
phaser.arrive();
while (true) {
try {
RequestMetadata requestMetadata = requestUpdateQueue.poll(100, TimeUnit.MILLISECONDS);
if (requestMetadata != null) {
Entry<Boolean, RequestMetadata> requestMetadataEntry =
requestUpdateQueue.poll(100, TimeUnit.MILLISECONDS);
if (requestMetadataEntry != null) {
// Boolean arg in updateRequestMetadata refers to fromLims
if (requestService.updateRequestMetadata(requestMetadata, Boolean.FALSE)) {
if (requestService.updateRequestMetadata(requestMetadataEntry.getValue(),
requestMetadataEntry.getKey())) {
LOG.info("Publishing Request-level Metadata updates "
+ "to " + CMO_REQUEST_UPDATE_TOPIC);
SmileRequest existingRequest =
requestService.getSmileRequestById(requestMetadata.getIgoRequestId());
requestService.getSmileRequestById(
requestMetadataEntry.getValue().getIgoRequestId());
// publish request-level metadata history to CMO_REQUEST_UPDATE_TOPIC
messagingGateway.publish(existingRequest.getIgoRequestId(),
CMO_REQUEST_UPDATE_TOPIC,
Expand Down Expand Up @@ -224,15 +226,17 @@ public void run() {
phaser.arrive();
while (true) {
try {
SampleMetadata sampleMetadata = researchSampleUpdateQueue.poll(
Entry<Boolean, SampleMetadata> sampleMetadataEntry = researchSampleUpdateQueue.poll(
100, TimeUnit.MILLISECONDS);
if (sampleMetadata != null) {
if (sampleMetadataEntry != null) {
// Boolean arg in updateSampleMetadata refers to fromLims
if (sampleService.updateSampleMetadata(sampleMetadata, Boolean.FALSE)) {
if (sampleService.updateSampleMetadata(sampleMetadataEntry.getValue(),
sampleMetadataEntry.getKey())) {
SmileSample existingSample = sampleService.getResearchSampleByRequestAndIgoId(
sampleMetadata.getIgoRequestId(), sampleMetadata.getPrimaryId());
sampleMetadataEntry.getValue().getIgoRequestId(),
sampleMetadataEntry.getValue().getPrimaryId());
LOG.info("Publishing sample-level metadata history for research sample: "
+ sampleMetadata.getPrimaryId());
+ sampleMetadataEntry.getValue().getPrimaryId());
// publish sample-level metadata history to CMO_REQUEST_UPDATE_TOPIC
messagingGateway.publish(CMO_SAMPLE_UPDATE_TOPIC,
mapper.writeValueAsString(existingSample.getSampleMetadataList()));
Expand Down Expand Up @@ -298,7 +302,7 @@ public void requestUpdateHandler(RequestMetadata requestMetadata) throws Excepti
throw new IllegalStateException("Message Handling Service has not been initialized");
}
if (!shutdownInitiated) {
requestUpdateQueue.put(requestMetadata);
requestUpdateQueue.put(new AbstractMap.SimpleImmutableEntry<>(Boolean.FALSE, requestMetadata));
} else {
LOG.error("Shutdown initiated, not accepting request update: " + requestMetadata);
throw new IllegalStateException("Shutdown initiated, not handling any more requests");
Expand All @@ -311,7 +315,8 @@ public void researchSampleUpdateHandler(SampleMetadata sampleMetadata) throws Ex
throw new IllegalStateException("Message Handling Service has not been initialized");
}
if (!shutdownInitiated) {
researchSampleUpdateQueue.put(sampleMetadata);
researchSampleUpdateQueue.put(new AbstractMap.SimpleImmutableEntry<>(
Boolean.FALSE, sampleMetadata));
} else {
LOG.error("Shutdown initiated, not accepting research sample update: " + sampleMetadata);
throw new IllegalStateException("Shutdown initiated, not handling any more samples");
Expand Down

0 comments on commit f674992

Please sign in to comment.