Skip to content

Commit

Permalink
Imahara edition ready for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-weinstein committed Jul 17, 2020
1 parent 074f95b commit d2bdf20
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 118 deletions.
12 changes: 7 additions & 5 deletions zymoTransmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

class CheckArgs(object):

__slots__ = ["input", "noTransmit", "hl7Directory"]
__slots__ = ["input", "noTransmit", "hl7Directory", "cdph"]

def __init__(self):
parser = argparse.ArgumentParser()
Expand All @@ -40,6 +40,7 @@ def __init__(self):
parser.add_argument("-e", "--editConfig", help = "Edit configuration file", action = 'store_true')
parser.add_argument("-n", "--noTransmit", help = "Do not attempt to transmit data", action = 'store_true')
parser.add_argument("-d", "--hl7Directory", help = "Upload a directory of HL7 files", action = 'store_true')
parser.add_argument("--cdph", help = "Read CDPH format with header line", action = 'store_true')
parser.add_argument("input", help = "Input file", type = str, nargs='?')
rawArgs = parser.parse_args()
testConnection = rawArgs.testConnection
Expand All @@ -50,6 +51,7 @@ def __init__(self):
inputValue = rawArgs.input
self.noTransmit = rawArgs.noTransmit
self.hl7Directory = rawArgs.hl7Directory
self.cdph = rawArgs.cdph
if self.hl7Directory and convertCertificate:
raise RuntimeError("Error: Program cannot be set to both process a certificate AND take in a directory for upload")
if testConnection or loinc or snomed:
Expand Down Expand Up @@ -108,8 +110,8 @@ def convertPFX(pfxPath:str, pfxPassword:str=None):
return pemPath


def getTestResults(testResultPath:str="results.txt"):
resultList = zymoTransmitSupport.inputOutput.resultReader.loadRawDataTable(testResultPath)
def getTestResults(testResultPath:str="results.txt", cdphCSV:bool=False):
resultList = zymoTransmitSupport.inputOutput.resultReader.loadRawDataTable(testResultPath, cdphCSV)
return resultList


Expand All @@ -120,7 +122,7 @@ def makeHL7Codes(resultList:typing.List[zymoTransmitSupport.inputOutput.resultRe
specimenID = result.specimenID
hl7Sets[(patientID, specimenID)] = []
currentSet = hl7Sets[(patientID, specimenID)]
currentSet.append(zymoTransmitSupport.hl7Encoder.encoders.makeMSHLine())
currentSet.append(zymoTransmitSupport.hl7Encoder.encoders.makeMSHLine(result))
currentSet.append(zymoTransmitSupport.hl7Encoder.encoders.makeSFTLine())
currentSet.append(zymoTransmitSupport.hl7Encoder.encoders.makePIDLine(result))
currentSet.append(zymoTransmitSupport.hl7Encoder.encoders.makeORCLine(result))
Expand Down Expand Up @@ -176,7 +178,7 @@ def prepareAndSendResults(args:CheckArgs):
print("Using raw HL7 from file %s" %args.input)
hl7TextBlocks = zymoTransmitSupport.inputOutput.rawHL7.textBlocksFromRawHL7(args.input)
else:
resultList = getTestResults(args.input)
resultList = getTestResults(args.input, args.cdph)
hl7Sets = makeHL7Codes(resultList)
hl7TextBlocks = makeHL7Blocks(hl7Sets)
hl7TextRecord = makeHL7TextRecord(hl7TextBlocks)
Expand Down
170 changes: 123 additions & 47 deletions zymoTransmitSupport/hl7Encoder/encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

config = defaultConfig

def makeMSHLine():
def makeMSHLine(result:inputOutput.resultReader.TestResult):
uniqueID = header.UniqueID(config.LabInfo.name, prependTime=True)
return header.MSHFromObject(config.Configuration.MSH, uniqueID, config.Configuration.productionReady)
return header.MSHFromObject(config.Configuration.MSH, uniqueID, config.Configuration.productionReady, result.auxiliaryData)


def makeSFTLine():
return software.SoftwareLine()
def makeSFTLine(passThruMode:bool=False):
return software.SoftwareLine(passThruMode)


def makePIDLine(result:inputOutput.resultReader.TestResult):
Expand Down Expand Up @@ -61,12 +61,22 @@ def makePIDLine(result:inputOutput.resultReader.TestResult):


def makeORCLine(result:inputOutput.resultReader.TestResult):
fillerOrderNumber = orderHeader.FillerOrderNumber(
"%s.%s" %(result.patientID, result.specimenID),
config.LabInfo.name,
config.LabInfo.clia,
"CLIA"
)
if "labName" in result.auxiliaryData or "labCLIA" in result.auxiliaryData:
if not ("labName" in result.auxiliaryData and "labCLIA" in result.auxiliaryData):
raise ValueError("Lab name and lab CLIA must both be either present or absent in auxiliary data")
fillerOrderNumber = orderHeader.FillerOrderNumber(
"%s.%s" % (result.patientID, result.specimenID),
result.auxiliaryData["labName"],
result.auxiliaryData["labCLIA"],
"CLIA"
)
else:
fillerOrderNumber = orderHeader.FillerOrderNumber(
"%s.%s" % (result.patientID, result.specimenID),
config.LabInfo.name,
config.LabInfo.clia,
"CLIA"
)
orderingProvider = orderHeader.OrderingProvider(
result.providerLastName,
result.providerFirstName,
Expand All @@ -76,25 +86,49 @@ def makeORCLine(result:inputOutput.resultReader.TestResult):
orderingFacilityAssigningAuthority = orderHeader.OrderingFacilityAssigningAuthority(
"CLIA"
)
orderingFacilityName = orderHeader.OrderingFacilityName(
config.LabInfo.name,
orderingFacilityAssigningAuthority,
config.LabInfo.clia
)
orderingFacilityAddress = orderHeader.OrderingFacilityAddress(
config.LabInfo.street,
config.LabInfo.city,
config.LabInfo.state,
config.LabInfo.zip
)
orderingFacilityPhone = orderHeader.OrderingFacilityPhone(config.LabInfo.phone)
if "labName" in result.auxiliaryData or "labCLIA" in result.auxiliaryData:
if not ("labName" in result.auxiliaryData and "labCLIA" in result.auxiliaryData):
raise ValueError("Lab name and lab CLIA must both be either present or absent in auxiliary data")
orderingFacilityName = orderHeader.OrderingFacilityName(
result.auxiliaryData["labName"],
orderingFacilityAssigningAuthority,
result.auxiliaryData["labCLIA"]
)
else:
orderingFacilityName = orderHeader.OrderingFacilityName(
config.LabInfo.name,
orderingFacilityAssigningAuthority,
config.LabInfo.clia,
)
if "labStreet" in result.auxiliaryData or "labCity" in result.auxiliaryData or "labState" in result.auxiliaryData or "labZip" in result.auxiliaryData:
if not (
"labStreet" in result.auxiliaryData and "labCity" in result.auxiliaryData and "labState" and result.auxiliaryData or "labZip" in result.auxiliaryData):
raise ValueError(
"All or no elements of lab address (street, city, state, zip) must be defined in auxiliary data")
orderingFacilityAddress = orderHeader.OrderingFacilityAddress(
result.auxiliaryData["labStreet"],
result.auxiliaryData["labCity"],
result.auxiliaryData["labState"],
result.auxiliaryData["labZip"]
)
else:
orderingFacilityAddress = orderHeader.OrderingFacilityAddress(
config.LabInfo.street,
config.LabInfo.city,
config.LabInfo.state,
config.LabInfo.zip
)
if "labPhone" in result.auxiliaryData:
orderingFacilityPhone = orderHeader.OrderingFacilityPhone(result.auxiliaryData["labPhone"])
else:
orderingFacilityPhone = orderHeader.OrderingFacilityPhone(config.LabInfo.phone)

orderingProviderAddress = orderHeader.ProviderAddress(
result.providerStreet,
result.providerCity,
result.providerState,
result.providerZip
)

return orderHeader.OrderHeaderLine(
fillerOrderNumber,
orderingProvider,
Expand All @@ -107,12 +141,22 @@ def makeORCLine(result:inputOutput.resultReader.TestResult):


def makeOBRLine(result:inputOutput.resultReader.TestResult):
fillerOrderNumber = orderRequest.FillerOrderNumber(
"%s.%s" %(result.patientID, result.specimenID),
config.LabInfo.name,
config.LabInfo.clia,
"CLIA"
)
if "labName" in result.auxiliaryData or "labCLIA" in result.auxiliaryData:
if not ("labName" in result.auxiliaryData and "labCLIA" in result.auxiliaryData):
raise ValueError("Lab name and lab CLIA must both be either present or absent in auxiliary data")
fillerOrderNumber = orderRequest.FillerOrderNumber(
"%s.%s" % (result.patientID, result.specimenID),
result.auxiliaryData["labName"],
result.auxiliaryData["labCLIA"],
"CLIA"
)
else:
fillerOrderNumber = orderRequest.FillerOrderNumber(
"%s.%s" % (result.patientID, result.specimenID),
config.LabInfo.name,
config.LabInfo.clia,
"CLIA"
)
serviceData = loinc.loincTable[result.testLOINC]
universalServiceIdentifier = orderRequest.UniversalServiceIdentifier(
result.testLOINC,
Expand Down Expand Up @@ -192,16 +236,34 @@ def makeObservationValueAndAbnormalityObjects(resultString:str):
result.analysisDateTime.minute,
result.analysisDateTime.second
)
performingOrganization = observedResults.PerformingOrganization(
config.LabInfo.name,
config.LabInfo.clia
)
labAddress = observedResults.PerformingOrganizationLabAddress(
config.LabInfo.street,
config.LabInfo.city,
config.LabInfo.state,
config.LabInfo.zip
)
if "labName" in result.auxiliaryData or "labCLIA" in result.auxiliaryData:
if not ("labName" in result.auxiliaryData and "labCLIA" in result.auxiliaryData):
raise ValueError("Lab name and lab CLIA must both be either present or absent in auxiliary data")
performingOrganization = observedResults.PerformingOrganization(
result.auxiliaryData["labName"],
result.auxiliaryData["labCLIA"]
)
else:
performingOrganization = observedResults.PerformingOrganization(
config.LabInfo.name,
config.LabInfo.clia
)
if "labStreet" in result.auxiliaryData or "labCity" in result.auxiliaryData or "labState" in result.auxiliaryData or "labZip" in result.auxiliaryData:
if not ("labStreet" in result.auxiliaryData and "labCity" in result.auxiliaryData and "labState" and result.auxiliaryData or "labZip" in result.auxiliaryData):
raise ValueError("All or no elements of lab address (street, city, state, zip) must be defined in auxiliary data")
labAddress = observedResults.PerformingOrganizationLabAddress(
result.auxiliaryData["labStreet"],
result.auxiliaryData["labCity"],
result.auxiliaryData["labState"],
result.auxiliaryData["labZip"]
)
else:
labAddress = observedResults.PerformingOrganizationLabAddress(
config.LabInfo.street,
config.LabInfo.city,
config.LabInfo.state,
config.LabInfo.zip
)
if config.LabDirectorInfo.assigningAuthority:
medicalDirectorAssignmentAuthority = observedResults.MedicalDirectorNumberAssignmentAuthority(
config.LabDirectorInfo.assigningAuthority
Expand Down Expand Up @@ -232,16 +294,30 @@ def makeObservationValueAndAbnormalityObjects(resultString:str):

def makeSPMLine(result:inputOutput.resultReader.TestResult):
specimenData = snomed.snomedTable[result.specimenSNOMED]
if "labName" in result.auxiliaryData:
assignerName = result.auxiliaryData["labName"]
else:
assignerName = config.LabInfo.name
specimenIDNumber = specimen.SpecimenIDNumber(
result.specimenID,
config.LabInfo.name
)
fillerAssignedIdentifier = specimen.FillerAssignedIdentifier(
"%s.%s" %(result.patientID, result.specimenID),
config.LabInfo.name,
config.LabInfo.clia,
"CLIA"
)
assignerName
)
if "labName" in result.auxiliaryData or "labCLIA" in result.auxiliaryData:
if not ("labName" in result.auxiliaryData and "labCLIA" in result.auxiliaryData):
raise ValueError("Lab name and lab CLIA must both be either present or absent in auxiliary data")
fillerAssignedIdentifier = specimen.FillerAssignedIdentifier(
"%s.%s" % (result.patientID, result.specimenID),
result.auxiliaryData["labName"],
result.auxiliaryData["labCLIA"],
"CLIA"
)
else:
fillerAssignedIdentifier = specimen.FillerAssignedIdentifier(
"%s.%s" % (result.patientID, result.specimenID),
config.LabInfo.name,
config.LabInfo.clia,
"CLIA"
)
specimenID = specimen.SpecimenID(
specimenIDNumber,
fillerAssignedIdentifier
Expand Down
12 changes: 11 additions & 1 deletion zymoTransmitSupport/hl7Encoder/header.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from .generics import Hl7Field
from . import generics
from .. import config


fieldSeparator = "|"

encodingCharacters = r"^~\&"
Expand Down Expand Up @@ -324,9 +326,17 @@ def __init__(self,
mshObject:config.Configuration.MSH,
uniqueID: UniqueID,
production:bool,
resultAuxiliary:dict = None
):
if not resultAuxiliary:
resultAuxiliary = {}
self.sendingApplication = SendingApplication()
self.sendingFacility = SendingFacility.fromObject(mshObject.SendingFacility)
if "labName" in resultAuxiliary or "labCLIA" in resultAuxiliary:
if not ("labName" in resultAuxiliary and "labCLIA" in resultAuxiliary):
raise ValueError("Both lab name and lab CLIA must be present or absent from the resultAuxiliary")
self.sendingFacility = SendingFacility(resultAuxiliary["labName"], resultAuxiliary["labCLIA"])
else:
self.sendingFacility = SendingFacility.fromObject(mshObject.SendingFacility)
self.receivingAppliation = ReceivingApplication.fromObject(mshObject.ReceivingApplication)
self.receivingFacility = ReceivingFacility.fromObject(mshObject.ReceivingFacility)
self.messageDateTime = TimeStamp()
Expand Down
10 changes: 6 additions & 4 deletions zymoTransmitSupport/hl7Encoder/software.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class VendorOrg(Hl7Field):

def __init__(self):
self.orgName = "Zymo Research"
self.orgType = "L" #TODO: Make sure JC is good with this being listed as our legal name
self.orgType = "L"
self.idNumber = ""
self.checkDigit = ""
self.checkDigitScheme = ""
Expand Down Expand Up @@ -50,8 +50,10 @@ def __init__(self):

class Name(Hl7Field):

def __init__(self):
def __init__(self, passThruMode:bool=False):
self.softwareName = "Zymo Transmission"
if passThruMode:
self.softwareName += " - Pass Thru"
self.subfields = [self.softwareName]


Expand Down Expand Up @@ -80,10 +82,10 @@ def __init__(self):

class SoftwareLine(generics.Hl7Line):

def __init__(self):
def __init__(self, passThruMode:bool=False):
self.vendorOrg = VendorOrg()
self.version = Version()
self.name = Name()
self.name = Name(passThruMode=passThruMode)
self.binaryID = BinaryID()
self.productInformation = ProductInformation()
self.installDate = InstallDate()
Expand Down
Loading

0 comments on commit d2bdf20

Please sign in to comment.