Skip to content

Commit

Permalink
Merge branch 'dev' into aziz/error_reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvanderveen authored Apr 25, 2024
2 parents ecb327a + 837e2c7 commit 01965be
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 16 deletions.
5 changes: 4 additions & 1 deletion packages/syft/src/syft/custom_worker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,7 @@ def test_image_build(self, tag: str, **kwargs: Any) -> SyftSuccess | SyftError:
)
return SyftSuccess(message=iterator_to_string(iterator=logs))
except Exception as e:
return SyftError(message=f"Failed to build: {e}")
# stdlib
import traceback

return SyftError(message=f"Failed to build: {e} {traceback.format_exc()}")
16 changes: 16 additions & 0 deletions packages/syft/src/syft/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# stdlib
from collections import OrderedDict
from collections import defaultdict
from collections.abc import Callable
from datetime import datetime
from functools import partial
Expand Down Expand Up @@ -463,6 +464,9 @@ def stop(self) -> None:
for p in self.queue_manager.producers.values():
p.close()

self.queue_manager.producers.clear()
self.queue_manager.consumers.clear()

NodeRegistry.remove_node(self.id)

def close(self) -> None:
Expand Down Expand Up @@ -567,6 +571,18 @@ def add_consumer_for_service(
)
consumer.run()

def remove_consumer_with_id(self, syft_worker_id: UID) -> None:
for _, consumers in self.queue_manager.consumers.items():
# Grab the list of consumers for the given queue
consumer_to_pop = None
for consumer_idx, consumer in enumerate(consumers):
if consumer.syft_worker_id == syft_worker_id:
consumer.close()
consumer_to_pop = consumer_idx
break
if consumer_to_pop is not None:
consumers.pop(consumer_to_pop)

@classmethod
def named(
cls,
Expand Down
21 changes: 11 additions & 10 deletions packages/syft/src/syft/service/job/job_stash.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,16 +609,17 @@ def _repr_html_(self) -> str:
worker_attr = ""
if self.job_worker_id:
worker = self.worker
worker_pool_id_button = CopyIDButton(
copy_text=str(worker.worker_pool_name), max_width=60
)
worker_attr = f"""
<div style="margin-top: 6px; margin-bottom: 6px;">
<span style="font-weight: 700; line-weight: 19.6px; font-size: 14px; font: 'Open Sans'">
Worker Pool:</span>
{worker.name} on worker {worker_pool_id_button.to_html()}
</div>
"""
if not isinstance(worker, SyftError):
worker_pool_id_button = CopyIDButton(
copy_text=str(worker.worker_pool_name), max_width=60
)
worker_attr = f"""
<div style="margin-top: 6px; margin-bottom: 6px;">
<span style="font-weight: 700; line-weight: 19.6px; font-size: 14px; font: 'Open Sans'">
Worker Pool:</span>
{worker.name} on worker {worker_pool_id_button.to_html()}
</div>
"""

logs = self.logs(_print=False)
logs_lines = logs.split("\n") if logs else []
Expand Down
24 changes: 19 additions & 5 deletions packages/syft/src/syft/service/queue/zmq_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,20 +388,29 @@ def update_consumer_state_for_worker(
return

try:
# Check if worker is present in the database
worker = self.worker_stash.get_by_uid(
credentials=self.worker_stash.partition.root_verify_key,
uid=syft_worker_id,
)
if worker.is_ok() and worker.ok() is None:
return

res = self.worker_stash.update_consumer_state(
credentials=self.worker_stash.partition.root_verify_key,
worker_uid=syft_worker_id,
consumer_state=consumer_state,
)
if res.is_err():
logger.error(
"Failed to update consumer state for worker id={} error={}",
"Failed to update consumer state for worker id={} to state: {} error={}",
syft_worker_id,
consumer_state,
res.err(),
)
except Exception as e:
logger.error(
f"Failed to update consumer state for worker id: {syft_worker_id}. Error: {e}"
f"Failed to update consumer state for worker id: {syft_worker_id} to state {consumer_state}. Error: {e}"
)

def worker_waiting(self, worker: Worker) -> None:
Expand Down Expand Up @@ -572,9 +581,10 @@ def delete_worker(self, worker: Worker, disconnect: bool) -> None:

self.workers.pop(worker.identity, None)

self.update_consumer_state_for_worker(
worker.syft_worker_id, ConsumerState.DETACHED
)
if worker.syft_worker_id is not None:
self.update_consumer_state_for_worker(
worker.syft_worker_id, ConsumerState.DETACHED
)

@property
def alive(self) -> bool:
Expand Down Expand Up @@ -633,7 +643,11 @@ def post_init(self) -> None:
self.producer_ping_t = Timeout(PRODUCER_TIMEOUT_SEC)
self.reconnect_to_producer()

def disconnect_from_producer(self) -> None:
self.send_to_producer(QueueMsgProtocol.W_DISCONNECT)

def close(self) -> None:
self.disconnect_from_producer()
self._stop.set()
try:
self.poller.unregister(self.socket)
Expand Down
3 changes: 3 additions & 0 deletions packages/syft/src/syft/service/worker/worker_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ def delete(
stopped = _stop_worker_container(worker, docker_container, force)
if stopped is not None:
return stopped
else:
# kill the in memory worker thread
context.node.remove_consumer_with_id(syft_worker_id=worker.id)

# remove the worker from the pool
try:
Expand Down

0 comments on commit 01965be

Please sign in to comment.