diff --git a/model/src/main/java/org/mskcc/smile/model/tempo/Tempo.java b/model/src/main/java/org/mskcc/smile/model/tempo/Tempo.java index 47925dec..e356487a 100644 --- a/model/src/main/java/org/mskcc/smile/model/tempo/Tempo.java +++ b/model/src/main/java/org/mskcc/smile/model/tempo/Tempo.java @@ -20,6 +20,9 @@ public class Tempo implements Serializable { private Long id; private String custodianInformation; private String accessLevel; + private Boolean billed; + private String billedBy; + private String costCenter; @Relationship(type = "HAS_EVENT", direction = Relationship.OUTGOING) private List bamCompleteEvents; @Relationship(type = "HAS_EVENT", direction = Relationship.OUTGOING) @@ -59,6 +62,30 @@ public void setAccessLevel(String accessLevel) { this.accessLevel = accessLevel; } + public Boolean getBilled() { + return billed; + } + + public void setBilled(Boolean billed) { + this.billed = billed; + } + + public String getBilledBy() { + return billedBy; + } + + public void setBilledBy(String billedBy) { + this.billedBy = billedBy; + } + + public String getCostCenter() { + return costCenter; + } + + public void setCostCenter(String costCenter) { + this.costCenter = costCenter; + } + /** * Returns list of bam complete events. * @return diff --git a/model/src/main/java/org/mskcc/smile/model/tempo/json/SampleBillingJson.java b/model/src/main/java/org/mskcc/smile/model/tempo/json/SampleBillingJson.java new file mode 100644 index 00000000..8913dad5 --- /dev/null +++ b/model/src/main/java/org/mskcc/smile/model/tempo/json/SampleBillingJson.java @@ -0,0 +1,61 @@ +package org.mskcc.smile.model.tempo.json; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; +import org.apache.commons.lang.builder.ToStringBuilder; + +/** + * + * @author ochoaa + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SampleBillingJson implements Serializable { + @JsonProperty("primaryId") + private String primaryId; + @JsonProperty("billed") + private Boolean billed; + @JsonProperty("billedBy") + private String billedBy; + @JsonProperty("costCenter") + private String costCenter; + + public SampleBillingJson() {} + + public String getPrimaryId() { + return primaryId; + } + + public void setPrimaryId(String primaryId) { + this.primaryId = primaryId; + } + + public Boolean getBilled() { + return billed; + } + + public void setBilled(Boolean billed) { + this.billed = billed; + } + + public String getBilledBy() { + return billedBy; + } + + public void setBilledBy(String billedBy) { + this.billedBy = billedBy; + } + + public String getCostCenter() { + return costCenter; + } + + public void setCostCenter(String costCenter) { + this.costCenter = costCenter; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } +} diff --git a/persistence/src/main/java/org/mskcc/smile/persistence/neo4j/TempoRepository.java b/persistence/src/main/java/org/mskcc/smile/persistence/neo4j/TempoRepository.java index cd09c01b..c0fd37d2 100644 --- a/persistence/src/main/java/org/mskcc/smile/persistence/neo4j/TempoRepository.java +++ b/persistence/src/main/java/org/mskcc/smile/persistence/neo4j/TempoRepository.java @@ -6,6 +6,7 @@ import org.mskcc.smile.model.tempo.MafComplete; import org.mskcc.smile.model.tempo.QcComplete; import org.mskcc.smile.model.tempo.Tempo; +import org.mskcc.smile.model.tempo.json.SampleBillingJson; import org.springframework.data.neo4j.annotation.Query; import org.springframework.data.neo4j.repository.Neo4jRepository; import org.springframework.data.repository.query.Param; @@ -38,8 +39,8 @@ public interface TempoRepository extends Neo4jRepository { @Query("MATCH (t:Tempo) WHERE ID(t) = $tempoId MATCH (t)-[:HAS_EVENT]->(bc: BamComplete) " + "RETURN bc ORDER BY bc.date DESC LIMIT 1") - BamComplete findLatestBamCompleteEventByTempoId(@Param("tempoId") Long tempoId); - + BamComplete findLatestBamCompleteEventByTempoId(@Param("tempoId") Long tempoId); + @Query("MATCH (t:Tempo) WHERE ID(t) = $tempoId MATCH (t)-[:HAS_EVENT]->(mc: MafComplete) " + "RETURN mc ORDER BY mc.date DESC LIMIT 1") MafComplete findLatestMafCompleteEventByTempoId(@Param("tempoId") Long tempoId); @@ -65,8 +66,14 @@ Tempo mergeQcCompleteEventBySamplePrimaryId(@Param("primaryId") String primaryId @Query("MATCH (s: Sample)-[:HAS_METADATA]->(sm: SampleMetadata {primaryId: $primaryId}) " + "MERGE (s)-[:HAS_TEMPO]->(t: Tempo) WITH s,t " + "MERGE (t)-[:HAS_EVENT]->(mc: MafComplete {date: $mcEvent.date, " - + "normalPrimaryId: $mcEvent.normalPrimaryId, status: $mcEvent.status}) " + + "normalPrimaryId: $mcEvent.normalPrimaryId, status: $mcEvent.status}) " + "WITH s,t,mc RETURN t") Tempo mergeMafCompleteEventBySamplePrimaryId(@Param("primaryId") String primaryId, @Param("mcEvent") MafComplete mcEvent); + + @Query("MATCH (s: Sample)-[:HAS_METADATA]->(sm: SampleMetadata {primaryId: $billing.primaryId}) " + + "MATCH (s)-[:HAS_TEMPO]->(t: Tempo) " + + "SET t.billed = $billing.billed, t.billedBy = $billing.billedBy, " + + "t.costCenter = $billing.costCenter") + void updateSampleBilling(@Param("billing") SampleBillingJson billing); } diff --git a/service/src/main/java/org/mskcc/smile/service/TempoMessageHandlingService.java b/service/src/main/java/org/mskcc/smile/service/TempoMessageHandlingService.java index 167f99ba..6528644e 100644 --- a/service/src/main/java/org/mskcc/smile/service/TempoMessageHandlingService.java +++ b/service/src/main/java/org/mskcc/smile/service/TempoMessageHandlingService.java @@ -6,6 +6,7 @@ import org.mskcc.smile.model.tempo.MafComplete; import org.mskcc.smile.model.tempo.QcComplete; import org.mskcc.smile.model.tempo.json.CohortCompleteJson; +import org.mskcc.smile.model.tempo.json.SampleBillingJson; /** * @@ -17,5 +18,6 @@ public interface TempoMessageHandlingService { void qcCompleteHandler(Map.Entry bcEvent) throws Exception; void mafCompleteHandler(Map.Entry mcEvent) throws Exception; void cohortCompleteHandler(CohortCompleteJson ccEvent) throws Exception; + void sampleBillingHandler(SampleBillingJson billing) throws Exception; void shutdown() throws Exception; } diff --git a/service/src/main/java/org/mskcc/smile/service/TempoService.java b/service/src/main/java/org/mskcc/smile/service/TempoService.java index 60bc7e68..98d3c4e0 100644 --- a/service/src/main/java/org/mskcc/smile/service/TempoService.java +++ b/service/src/main/java/org/mskcc/smile/service/TempoService.java @@ -5,6 +5,7 @@ import org.mskcc.smile.model.tempo.MafComplete; import org.mskcc.smile.model.tempo.QcComplete; import org.mskcc.smile.model.tempo.Tempo; +import org.mskcc.smile.model.tempo.json.SampleBillingJson; /** * @@ -18,4 +19,5 @@ public interface TempoService { Tempo mergeQcCompleteEventBySamplePrimaryId(String primaryId, QcComplete qcComplete) throws Exception; Tempo mergeMafCompleteEventBySamplePrimaryId(String primaryId, MafComplete mafComplete) throws Exception; Tempo initAndSaveDefaultTempoData(String primaryId) throws Exception; + void updateSampleBilling(SampleBillingJson billing) throws Exception; } diff --git a/service/src/main/java/org/mskcc/smile/service/impl/TempoMessageHandlingServiceImpl.java b/service/src/main/java/org/mskcc/smile/service/impl/TempoMessageHandlingServiceImpl.java index 73906378..4dfd52ea 100644 --- a/service/src/main/java/org/mskcc/smile/service/impl/TempoMessageHandlingServiceImpl.java +++ b/service/src/main/java/org/mskcc/smile/service/impl/TempoMessageHandlingServiceImpl.java @@ -28,6 +28,7 @@ import org.mskcc.smile.model.tempo.QcComplete; import org.mskcc.smile.model.tempo.Tempo; import org.mskcc.smile.model.tempo.json.CohortCompleteJson; +import org.mskcc.smile.model.tempo.json.SampleBillingJson; import org.mskcc.smile.service.CohortCompleteService; import org.mskcc.smile.service.SmileSampleService; import org.mskcc.smile.service.TempoMessageHandlingService; @@ -54,6 +55,9 @@ public class TempoMessageHandlingServiceImpl implements TempoMessageHandlingServ @Value("${tempo.wes_cohort_complete_topic}") private String TEMPO_WES_COHORT_COMPLETE_TOPIC; + @Value("${tempo.sample_billing_topic}") + private String TEMPO_SAMPLE_BILLING_TOPIC; + @Value("${num.tempo_msg_handler_threads}") private int NUM_TEMPO_MSG_HANDLERS; @@ -81,11 +85,14 @@ public class TempoMessageHandlingServiceImpl implements TempoMessageHandlingServ new LinkedBlockingQueue>(); private static final BlockingQueue cohortCompleteQueue = new LinkedBlockingQueue(); + private static final BlockingQueue sampleBillingQueue = + new LinkedBlockingQueue(); private static CountDownLatch bamCompleteHandlerShutdownLatch; private static CountDownLatch qcCompleteHandlerShutdownLatch; private static CountDownLatch mafCompleteHandlerShutdownLatch; private static CountDownLatch cohortCompleteHandlerShutdownLatch; + private static CountDownLatch sampleBillingHandlerShutdownLatch; private class BamCompleteHandler implements Runnable { final Phaser phaser; @@ -248,6 +255,43 @@ public void run() { } } + private class SampleBillingHandler implements Runnable { + final Phaser phaser; + boolean interrupted = false; + + SampleBillingHandler(Phaser phaser) { + this.phaser = phaser; + } + + @Override + public void run() { + phaser.arrive(); + while (true) { + try { + SampleBillingJson billing = sampleBillingQueue.poll(100, TimeUnit.MILLISECONDS); + if (billing != null) { + // this message is coming straight from the dashboard and should therefore always + // have data for a valid sample that exists in the database + // however this check is will make extra sure that the primary id received + // in the nats message actually exists in the database before conducting + // further operations in the db + if (sampleService.sampleExistsByPrimaryId(billing.getPrimaryId())) { + LOG.info("Updating billing information for sample: " + billing.getPrimaryId()); + tempoService.updateSampleBilling(billing); + } else { + LOG.error("Cannot update billing information for sample that does not exist: " + + billing.getPrimaryId()); + } + } + } catch (InterruptedException e) { + interrupted = true; + } catch (Exception e) { + LOG.error("Error during handling of sample billing data", e); + } + } + } + } + @Override public void intialize(Gateway gateway) throws Exception { if (!initialized) { @@ -256,6 +300,7 @@ public void intialize(Gateway gateway) throws Exception { setupQcCompleteHandler(messagingGateway, this); setupMafCompleteHandler(messagingGateway, this); setupCohortCompleteHandler(messagingGateway, this); + setupSampleBillingHandler(messagingGateway, this); initializeMessageHandlers(); initialized = true; } else { @@ -315,6 +360,19 @@ public void cohortCompleteHandler(CohortCompleteJson cohortEvent) throws Excepti } } + @Override + public void sampleBillingHandler(SampleBillingJson billing) throws Exception { + if (!initialized) { + throw new IllegalStateException("Message Handling Service has not been initialized"); + } + if (!shutdownInitiated) { + sampleBillingQueue.put(billing); + } else { + LOG.error("Shutdown initiated, not accepting billing event: " + billing); + throw new IllegalStateException("Shutdown initiated, not handling any more TEMPO events"); + } + } + @Override public void shutdown() throws Exception { if (!initialized) { @@ -325,6 +383,7 @@ public void shutdown() throws Exception { qcCompleteHandlerShutdownLatch.await(); mafCompleteHandlerShutdownLatch.await(); cohortCompleteHandlerShutdownLatch.await(); + sampleBillingHandlerShutdownLatch.await(); shutdownInitiated = true; } @@ -368,6 +427,16 @@ private void initializeMessageHandlers() throws Exception { exec.execute(new CohortCompleteHandler(cohortCompletePhaser)); } cohortCompletePhaser.arriveAndAwaitAdvance(); + + // sample billing handler + sampleBillingHandlerShutdownLatch = new CountDownLatch(NUM_TEMPO_MSG_HANDLERS); + final Phaser sampleBillingPhaser = new Phaser(); + sampleBillingPhaser.register(); + for (int lc = 0; lc < NUM_TEMPO_MSG_HANDLERS; lc++) { + sampleBillingPhaser.register(); + exec.execute(new SampleBillingHandler(sampleBillingPhaser)); + } + sampleBillingPhaser.arriveAndAwaitAdvance(); } private void setupBamCompleteHandler(Gateway gateway, @@ -463,4 +532,24 @@ public void onMessage(Message msg, Object message) { } }); } + + private void setupSampleBillingHandler(Gateway gateway, + TempoMessageHandlingService tempoMessageHandlingService) throws Exception { + gateway.subscribe(TEMPO_SAMPLE_BILLING_TOPIC, Object.class, new MessageConsumer() { + @Override + public void onMessage(Message msg, Object message) { + try { + String billingJson = mapper.readValue( + new String(msg.getData(), StandardCharsets.UTF_8), + String.class); + SampleBillingJson billing = mapper.readValue(billingJson, + SampleBillingJson.class); + tempoMessageHandlingService.sampleBillingHandler(billing); + } catch (Exception e) { + LOG.error("Exception occurred during processing of Cohort Complete event: " + + TEMPO_SAMPLE_BILLING_TOPIC, e); + } + } + }); + } } diff --git a/service/src/main/java/org/mskcc/smile/service/impl/TempoServiceImpl.java b/service/src/main/java/org/mskcc/smile/service/impl/TempoServiceImpl.java index 66b44ddd..10cd4c4e 100644 --- a/service/src/main/java/org/mskcc/smile/service/impl/TempoServiceImpl.java +++ b/service/src/main/java/org/mskcc/smile/service/impl/TempoServiceImpl.java @@ -7,6 +7,7 @@ import org.mskcc.smile.model.tempo.MafComplete; import org.mskcc.smile.model.tempo.QcComplete; import org.mskcc.smile.model.tempo.Tempo; +import org.mskcc.smile.model.tempo.json.SampleBillingJson; import org.mskcc.smile.persistence.neo4j.TempoRepository; import org.mskcc.smile.service.SmileRequestService; import org.mskcc.smile.service.SmileSampleService; @@ -51,6 +52,7 @@ public Tempo saveTempoData(Tempo tempo) throws Exception { } @Override + @Transactional(rollbackFor = {Exception.class}) public Tempo getTempoDataBySampleId(SmileSample smileSample) throws Exception { Tempo tempo = tempoRepository.findTempoBySmileSampleId(smileSample.getSmileSampleId()); if (tempo == null) { @@ -60,6 +62,7 @@ public Tempo getTempoDataBySampleId(SmileSample smileSample) throws Exception { } @Override + @Transactional(rollbackFor = {Exception.class}) public Tempo getTempoDataBySamplePrimaryId(String primaryId) throws Exception { Tempo tempo = tempoRepository.findTempoBySamplePrimaryId(primaryId); if (tempo == null) { @@ -69,6 +72,7 @@ public Tempo getTempoDataBySamplePrimaryId(String primaryId) throws Exception { } @Override + @Transactional(rollbackFor = {Exception.class}) public Tempo mergeBamCompleteEventBySamplePrimaryId(String primaryId, BamComplete bamCompleteEvent) throws Exception { if (getTempoDataBySamplePrimaryId(primaryId) == null) { @@ -79,6 +83,7 @@ public Tempo mergeBamCompleteEventBySamplePrimaryId(String primaryId, BamComplet } @Override + @Transactional(rollbackFor = {Exception.class}) public Tempo mergeQcCompleteEventBySamplePrimaryId(String primaryId, QcComplete qcCompleteEvent) throws Exception { if (getTempoDataBySamplePrimaryId(primaryId) == null) { @@ -89,6 +94,7 @@ public Tempo mergeQcCompleteEventBySamplePrimaryId(String primaryId, QcComplete } @Override + @Transactional(rollbackFor = {Exception.class}) public Tempo mergeMafCompleteEventBySamplePrimaryId(String primaryId, MafComplete mafCompleteEvent) throws Exception { if (getTempoDataBySamplePrimaryId(primaryId) == null) { @@ -99,6 +105,7 @@ public Tempo mergeMafCompleteEventBySamplePrimaryId(String primaryId, MafComplet } @Override + @Transactional(rollbackFor = {Exception.class}) public Tempo initAndSaveDefaultTempoData(String primaryId) throws Exception { SmileSample sample = sampleService.getSampleByInputId(primaryId); Tempo tempo = new Tempo(sample); @@ -120,4 +127,10 @@ private Tempo getDetailedTempoData(Tempo tempo) { tempo.setMafCompleteEvents(tempoRepository.findMafCompleteEventsByTempoId(tempo.getId())); return tempo; } + + @Override + @Transactional(rollbackFor = {Exception.class}) + public void updateSampleBilling(SampleBillingJson billing) throws Exception { + tempoRepository.updateSampleBilling(billing); + } } diff --git a/src/main/resources/application.properties.EXAMPLE b/src/main/resources/application.properties.EXAMPLE index 21578112..f13fe8c4 100644 --- a/src/main/resources/application.properties.EXAMPLE +++ b/src/main/resources/application.properties.EXAMPLE @@ -82,3 +82,4 @@ tempo.wes_bam_complete_topic= tempo.wes_qc_complete_topic= tempo.wes_maf_complete_topic= tempo.wes_cohort_complete_topic= +tempo.sample_billing_topic=