Skip to content

Commit

Permalink
Added a new thread to the pipeline
Browse files Browse the repository at this point in the history
Added a new thread that sets the NumCaptured PV
Added a new EndReason
  • Loading branch information
evalott100 committed Jan 8, 2024
1 parent 3638f0d commit 8fbe219
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/pandablocks/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(self, host: str):

async def connect(self):
"""Connect to the control port, and be ready to handle commands"""
await self._ctrl_stream.connect(self._host, 8888),
await self._ctrl_stream.connect(self._host, 8888)

self._ctrl_task = asyncio.create_task(
self._ctrl_read_forever(self._ctrl_stream.reader)
Expand Down
33 changes: 26 additions & 7 deletions src/pandablocks/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ class HDFDataOverrunException(Exception):
"""Raised if `DATA_OVERRUN` occurs while receiving data for HDF file"""


class Stop:
def __str__(self) -> "str":
return "<Stop>"

__repr__ = __str__


STOP = Stop()


class Pipeline(threading.Thread):
"""Helper class that runs a pipeline consumer process in its own thread"""

Expand All @@ -44,21 +54,21 @@ def __init__(self):
def run(self):
while True:
data = self.queue.get()
if data is None:
if data is STOP:
# stop() called below
break
else:
func = self.what_to_do.get(type(data), None)
if func:
# If we have a handler, use it to transform the data
data = func(data)
if self.downstream:
if self.downstream and data is not None:
# Pass the (possibly transformed) data downstream
self.downstream.queue.put_nowait(data)

def stop(self):
"""Stop the processing after the current queue has been emptied"""
self.queue.put(None)
self.queue.put(STOP)


class HDFWriter(Pipeline):
Expand Down Expand Up @@ -122,12 +132,15 @@ def write_frame(self, data: List[np.ndarray]):
dataset[written:] = column
dataset.flush()

# Return the number of samples written
return dataset.shape[0]

def close_file(self, data: EndData):
assert self.hdf_file, "File not open yet"
self.hdf_file.close()
self.hdf_file = None
logging.info(
f"Closed '{self.file_path}' after writing {data.samples} "
f"Closed '{self.file_path}' after receiving {data.samples} "
f"samples. End reason is '{data.reason.value}'"
)
self.file_path = ""
Expand Down Expand Up @@ -166,15 +179,21 @@ def scale_data(self, data: FrameData) -> List[np.ndarray]:


def create_default_pipeline(
file_names: Iterator[str],
file_names: Iterator[str], *additional_downstream_pipelines: Pipeline
) -> List[Pipeline]:
"""Create the default processing pipeline consisting of one `FrameProcessor` and
one `HDFWriter`. See `create_pipeline` for more details.
Args:
file_names: Iterator of file names. Must be full file paths. Will be called once
per file created. As required by `HDFWriter`."""
return create_pipeline(FrameProcessor(), HDFWriter(file_names))
per file created. As required by `HDFWriter`.
additional_downstream_pipelines: Any number of additional pipelines to add
downstream.
"""

return create_pipeline(
FrameProcessor(), HDFWriter(file_names), *additional_downstream_pipelines
)


def create_pipeline(*elements: Pipeline) -> List[Pipeline]:
Expand Down
2 changes: 2 additions & 0 deletions src/pandablocks/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ class EndReason(Enum):

#: Experiment completed by falling edge of ``PCAP.ENABLE```
OK = "Ok"
#: Experiment manually completed by ``DATA:CAPTURE``
MANUALLY_STOPPED = "Manually stopped"
#: Experiment manually completed by ``*PCAP.DISARM=`` command
DISARMED = "Disarmed"
#: Client disconnect detected
Expand Down

0 comments on commit 8fbe219

Please sign in to comment.