forked from kspaceKelvin/python-ismrmrd-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
executable file
·183 lines (154 loc) · 8.2 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import constants
from connection import Connection
import socket
import logging
import multiprocessing
import ismrmrd.xsd
import importlib
import os
import json
import simplefft
import invertcontrast
import analyzeflow
class Server:
"""
Something something docstring.
"""
def __init__(self, address, port, defaultConfig, savedata, savedataFolder, multiprocessing):
logging.info("Starting server and listening for data at %s:%d", address, port)
logging.info("Default config is %s", defaultConfig)
if (savedata is True):
logging.debug("Saving incoming data is enabled.")
if (multiprocessing is True):
logging.debug("Multiprocessing is enabled.")
self.defaultConfig = defaultConfig
self.multiprocessing = multiprocessing
self.savedata = savedata
self.savedataFolder = savedataFolder
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((address, port))
def serve(self):
logging.debug("Serving... ")
self.socket.listen(0)
while True:
sock, (remote_addr, remote_port) = self.socket.accept()
logging.info("Accepting connection from: %s:%d", remote_addr, remote_port)
if (self.multiprocessing is True):
process = multiprocessing.Process(target=self.handle, args=[sock])
process.daemon = True
process.start()
logging.debug("Spawned process %d to handle connection.", process.pid)
else:
self.handle(sock)
def handle(self, sock):
try:
connection = Connection(sock, self.savedata, "", self.savedataFolder, "dataset")
# First message is the config (file or text)
config = next(connection)
# Break out if a connection was established but no data was received
if ((config is None) & (connection.is_exhausted is True)):
logging.info("Connection closed without any data received")
return
# Second messages is the metadata (text)
metadata_xml = next(connection)
logging.debug("XML Metadata: %s", metadata_xml)
try:
metadata = ismrmrd.xsd.CreateFromDocument(metadata_xml)
if (metadata.acquisitionSystemInformation.systemFieldStrength_T != None):
logging.info("Data is from a %s %s at %1.1fT", metadata.acquisitionSystemInformation.systemVendor, metadata.acquisitionSystemInformation.systemModel, metadata.acquisitionSystemInformation.systemFieldStrength_T)
except:
logging.warning("Metadata is not a valid MRD XML structure. Passing on metadata as text")
metadata = metadata_xml
# Support additional config parameters passed through a JSON text message
if connection.peek_mrd_message_identifier() == constants.MRD_MESSAGE_TEXT:
configAdditionalText = next(connection)
logging.info("Received additional config text: %s", configAdditionalText)
connection.save_additional_config(configAdditionalText)
try:
configAdditional = json.loads(configAdditionalText)
if ('parameters' in configAdditional):
if ('config' in configAdditional['parameters']):
logging.info("Changing config to: %s", configAdditional['parameters']['config'])
config = configAdditional['parameters']['config']
if ('customconfig' in configAdditional['parameters']) and (configAdditional['parameters']['customconfig'] != ""):
logging.info("Changing config to: %s", configAdditional['parameters']['customconfig'])
config = configAdditional['parameters']['customconfig']
except:
logging.error("Failed to parse as JSON")
else:
configAdditional = config
# Decide what program to use based on config
# If not one of these explicit cases, try to load file matching name of config
if (config == "simplefft"):
logging.info("Starting simplefft processing based on config")
simplefft.process(connection, configAdditional, metadata)
elif (config == "invertcontrast"):
logging.info("Starting invertcontrast processing based on config")
invertcontrast.process(connection, configAdditional, metadata)
elif (config == "analyzeflow"):
logging.info("Starting analyzeflow processing based on config")
analyzeflow.process(connection, configAdditional, metadata)
elif (config == "null"):
logging.info("No processing based on config")
try:
for msg in connection:
if msg is None:
break
finally:
connection.send_close()
elif (config == "savedataonly"):
# Dummy loop with no processing
try:
for msg in connection:
if msg is None:
break
finally:
connection.send_close()
else:
usedConfig = config
if importlib.util.find_spec(config) is None:
logging.error("Could not find config module '%s' -- falling back to default config: %s", config, self.defaultConfig)
usedConfig = self.defaultConfig
try:
# Load module from file having exact name as config
module = importlib.import_module(usedConfig)
logging.info("Starting config %s", usedConfig)
module.process(connection, configAdditional, metadata)
except ImportError as e:
logging.error("Failed to load config '%s' with error:\n %s", usedConfig, e)
if usedConfig != self.defaultConfig:
logging.info("Falling back to default config: '%s'", self.defaultConfig)
try:
module = importlib.import_module(self.defaultConfig)
logging.info("Starting config %s", self.defaultConfig)
module.process(connection, configAdditional, metadata)
except ImportError as e:
logging.error("Failed to load default config '%s' with error:\n %s", self.defaultConfig, e)
except Exception as e:
logging.exception(e)
finally:
connection.shutdown_close()
# Dataset may not be closed properly if a close message is not received
if connection.savedata is True:
try:
connection.dset.close()
except:
pass
if (connection.savedataFile == ""):
try:
# Rename the saved file to use the protocol name
dset = ismrmrd.Dataset(connection.mrdFilePath, connection.savedataGroup, False)
groups = dset.list()
if ('xml' in groups):
xml_header = dset.read_xml_header()
xml_header = xml_header.decode("utf-8")
mrdHead = ismrmrd.xsd.CreateFromDocument(xml_header)
if (mrdHead.measurementInformation.protocolName != ""):
newFilePath = connection.mrdFilePath.replace("MRD_input_", mrdHead.measurementInformation.protocolName + "_")
os.rename(connection.mrdFilePath, newFilePath)
connection.mrdFilePath = newFilePath
except:
pass
if connection.mrdFilePath is not None:
logging.info("Incoming data was saved at %s", connection.mrdFilePath)