Skip to content

Commit

Permalink
Merge pull request #172 from simonsobs/koopman/seq-cleanup-error-hand…
Browse files Browse the repository at this point in the history
…ling

Add error handling to SMuRF stream shutdown
  • Loading branch information
BrianJKoopman authored Oct 7, 2024
2 parents a367d58 + 54401a4 commit 7f92c96
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
15 changes: 14 additions & 1 deletion src/sorunlib/smurf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import time

from ocs.client_http import ControlClientError

import sorunlib as run
from sorunlib._internal import check_response, check_started

Expand Down Expand Up @@ -366,7 +368,18 @@ def stream(state, tag=None, subtype=None):
else:
print('Stopping SMuRF streams.')
for smurf in run.CLIENTS['smurf']:
smurf.stream.stop()
try:
smurf.stream.stop()
# Handles case where agent becomes unreachable
except ControlClientError as e:
print(f"Failed to stop stream on {smurf}, removing from targets list.")
print(e)
clients_to_remove.append(smurf)

# Remove failed SMuRF clients
for client in clients_to_remove:
run.CLIENTS['smurf'].remove(client)
clients_to_remove = []

for smurf in run.CLIENTS['smurf']:
print(f'Waiting for stream from {smurf.instance_id} to stop.')
Expand Down
32 changes: 32 additions & 0 deletions tests/test_smurf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

import pytest

from ocs.client_http import ControlClientError
from ocs.ocs_client import OCSReply

import sorunlib as run
from sorunlib import smurf
from util import create_patch_clients

Expand All @@ -17,6 +19,30 @@
patch_clients = create_patch_clients('satp', autouse=True)


# Used to mock errors communicating with an OCS Client
class ErrorProcess:
def __init__(self, *args, **kwargs):
pass

def start(self, **kwargs):
raise ControlClientError("no callee registered for procedure <observatory.controller.ops")

def wait(self, timeout=None):
raise ControlClientError("no callee registered for procedure <observatory.controller.ops")

def status(self):
raise ControlClientError("no callee registered for procedure <observatory.controller.ops")

def stop(self):
raise ControlClientError("no callee registered for procedure <observatory.controller.ops")


class ErrorClient:
def __init__(self, instance_id, *args, **kwargs):
self.stream = ErrorProcess()
self.instance_id = instance_id


def test_set_targets():
smurf.set_targets(['smurf1'])
assert len(smurf.run.CLIENTS['smurf']) == 1
Expand Down Expand Up @@ -160,3 +186,9 @@ def test_stream_single_failure(state):
client.stream.start.assert_called_once()
else:
client.stream.stop.assert_called_once()


def test_stream_agent_unavailable_on_stop():
# Replace 'smurf1' client with one that will error on stream.stop()
run.CLIENTS['smurf'][0] = ErrorClient('smurf1')
smurf.stream(state='off')

0 comments on commit 7f92c96

Please sign in to comment.