From e6b2e58db903cc97e3a65cced342dec20fc8b154 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Mon, 2 Oct 2023 21:15:02 +0200 Subject: [PATCH] fix testing setup --- notebooks/helm/helm_syft.ipynb | 218 +++++++++--------- packages/hagrid/hagrid/orchestra.py | 2 +- packages/syft/src/syft/node/node.py | 18 +- .../src/syft/service/action/action_graph.py | 2 - .../syft/src/syft/service/queue/zmq_queue.py | 35 +-- 5 files changed, 143 insertions(+), 132 deletions(-) diff --git a/notebooks/helm/helm_syft.ipynb b/notebooks/helm/helm_syft.ipynb index 2329f29a8da..25f44a2e76e 100644 --- a/notebooks/helm/helm_syft.ipynb +++ b/notebooks/helm/helm_syft.ipynb @@ -2,43 +2,36 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": 23, "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /Users/koen/workspace/pysyft\n" - ] - } - ], + "outputs": [], "source": [ "import syft as sy\n", "from syft.store.blob_storage import BlobStorageConfig, BlobStorageClientConfig\n", "from syft.store.blob_storage.seaweedfs import SeaweedFSClient, SeaweedFSClientConfig\n", "from syft import ActionObject\n", "from syft.service.action.action_data_empty import ActionFileData\n", + "from syft.service.queue.zmq_queue import ZMQQueueConfig, ZMQClientConfig\n", "from collections import defaultdict" ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "CREATING A PRODUCER ON tcp://127.0.0.1:60612\n", - "CREATING A CONSUMER ON tcp://127.0.0.1:60613\n", + "CREATING A PRODUCER ON 55945\n", + "CREATING A CONSUMER ON tcp://localhost:55945\n", "spawning thread\n", - "CREATING A CONSUMER ON tcp://127.0.0.1:60613\n", + "CREATING A CONSUMER ON tcp://localhost:55945\n", "spawning thread\n", - "CREATING A CONSUMER ON tcp://127.0.0.1:60613\n", + "CREATING A CONSUMER ON tcp://localhost:55945\n", "spawning thread\n", - "CREATING A CONSUMER ON tcp://127.0.0.1:60613\n", + "CREATING A CONSUMER ON tcp://localhost:55945\n", "spawning thread\n", "Logged into as \n" ] @@ -57,7 +50,9 @@ } ], "source": [ - "node = sy.orchestra.launch(name=\"test-domain-helm2\", dev_mode=True, reset=True, n_consumers=4)\n", + "node = sy.orchestra.launch(name=\"test-domain-helm2\", dev_mode=True, reset=True,\n", + " n_consumers=4,\n", + " queue_config=ZMQQueueConfig(client_config=ZMQClientConfig(create_producer=True)))\n", "client = node.login(email=\"info@openmined.org\", password=\"changethis\")" ] }, @@ -72,7 +67,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 27, "metadata": {}, "outputs": [], "source": [ @@ -88,7 +83,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 28, "metadata": {}, "outputs": [], "source": [ @@ -104,7 +99,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 29, "metadata": {}, "outputs": [], "source": [ @@ -116,7 +111,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 30, "metadata": {}, "outputs": [], "source": [ @@ -128,7 +123,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 31, "metadata": {}, "outputs": [], "source": [ @@ -145,7 +140,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 32, "metadata": {}, "outputs": [ { @@ -246,7 +241,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 33, "metadata": {}, "outputs": [ { @@ -258,7 +253,7 @@ "SyftSuccess: User Code Submitted" ] }, - "execution_count": 9, + "execution_count": 33, "metadata": {}, "output_type": "execute_result" } @@ -269,7 +264,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 34, "metadata": {}, "outputs": [ { @@ -305,7 +300,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 35, "metadata": {}, "outputs": [ { @@ -318,13 +313,13 @@ { "data": { "text/html": [ - "
SyftSuccess: Request 723cf7b4dc534adeab085d8147081ac9 changes applied

" + "
SyftSuccess: Request 52b36cb4b34e4d918ad6f0a5ce264f2a changes applied

" ], "text/plain": [ - "SyftSuccess: Request 723cf7b4dc534adeab085d8147081ac9 changes applied" + "SyftSuccess: Request 52b36cb4b34e4d918ad6f0a5ce264f2a changes applied" ] }, - "execution_count": 11, + "execution_count": 35, "metadata": {}, "output_type": "execute_result" } @@ -336,7 +331,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 36, "metadata": {}, "outputs": [], "source": [ @@ -352,7 +347,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 37, "metadata": {}, "outputs": [ { @@ -360,7 +355,7 @@ "text/markdown": [ "```python\n", "class Job:\n", - " id: UID = d47a3ae9c6924e3fa30cc2553c703fed\n", + " id: UID = 76f0f66c1cbd44e5b1157595ca908ff9\n", " status: created\n", " has_parent: False\n", " result: None\n", @@ -374,7 +369,7 @@ "syft.service.job.job_stash.Job" ] }, - "execution_count": 13, + "execution_count": 37, "metadata": {}, "output_type": "execute_result" } @@ -385,28 +380,21 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 38, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "FUNCTION LOG (2f5fe069e78143398728dbe955b7cf1c): starting overlap computation\n", - "FUNCTION LOG (df7bb1153b2443d8b9ba43cf381d2b35): starting overlap computation\n", - "FUNCTION LOG (df7bb1153b2443d8b9ba43cf381d2b35): preparing scenarios and creating indexes\n", - "FUNCTION LOG (df7bb1153b2443d8b9ba43cf381d2b35): preparing scenarios and creating indexes\n", - "FUNCTION LOG (df7bb1153b2443d8b9ba43cf381d2b35): computing overlap\n", - "FUNCTION LOG (df7bb1153b2443d8b9ba43cf381d2b35): computing overlap\n", - "FUNCTION LOG (df7bb1153b2443d8b9ba43cf381d2b35): done\n", - "FUNCTION LOG (df7bb1153b2443d8b9ba43cf381d2b35): done\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "E: Invalid message from worker: [b'']\n" + "FUNCTION LOG (30d02d4caf5245ee94ab1c9e6cd2a926): starting overlap computation\n", + "FUNCTION LOG (30d02d4caf5245ee94ab1c9e6cd2a926): preparing scenarios and creating indexes\n", + "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): starting overlap computation\n", + "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): preparing scenarios and creating indexes\n", + "FUNCTION LOG (30d02d4caf5245ee94ab1c9e6cd2a926): computing overlap\n", + "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): done\n", + "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): computing overlap\n", + "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): done\n" ] }, { @@ -416,7 +404,7 @@ "None" ] }, - "execution_count": 14, + "execution_count": 38, "metadata": {}, "output_type": "execute_result" } @@ -427,7 +415,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 39, "metadata": {}, "outputs": [ { @@ -634,7 +622,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table85b831a25ce64be5b77bea19677fdaf4 {\n", + " .grid-table98c67862061d446395eb05f1d2b04fbf {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(24, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -806,25 +794,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -1049,7 +1037,7 @@ "[syft.service.job.job_stash.Job, syft.service.job.job_stash.Job]" ] }, - "execution_count": 15, + "execution_count": 39, "metadata": {}, "output_type": "execute_result" } @@ -1060,7 +1048,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 40, "metadata": {}, "outputs": [ { @@ -1068,6 +1056,10 @@ "output_type": "stream", "text": [ "starting overlap computation\n", + "preparing scenarios and creating indexes\n", + "computing overlap\n", + "done\n", + "done\n", "\n" ] } @@ -1078,7 +1070,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 41, "metadata": {}, "outputs": [], "source": [ @@ -1087,7 +1079,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 42, "metadata": {}, "outputs": [ { @@ -1120,7 +1112,7 @@ " 'anatomy_test_5': 135}))]" ] }, - "execution_count": 18, + "execution_count": 42, "metadata": {}, "output_type": "execute_result" } @@ -1139,7 +1131,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": 43, "metadata": {}, "outputs": [], "source": [ @@ -1190,7 +1182,7 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": 44, "metadata": {}, "outputs": [ { diff --git a/packages/hagrid/hagrid/orchestra.py b/packages/hagrid/hagrid/orchestra.py index 03a129283c9..6580e6d6142 100644 --- a/packages/hagrid/hagrid/orchestra.py +++ b/packages/hagrid/hagrid/orchestra.py @@ -464,7 +464,7 @@ def launch( # type: ignore # worker related inputs port: Optional[Union[int, str]] = None, processes: int = 1, # temporary work around for jax in subprocess - n_consumers: int = 1, + n_consumers: int = 0, local_db: bool = False, dev_mode: bool = False, cmd: bool = False, diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 7b6c5fcf5de..a5910bab326 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -236,7 +236,7 @@ def __init__( root_email: str = default_root_email, root_password: str = default_root_password, processes: int = 0, - n_consumers: int = 1, + n_consumers: int = 0, is_subprocess: bool = False, node_type: Union[str, NodeType] = NodeType.DOMAIN, local_db: bool = False, @@ -374,15 +374,27 @@ def init_queue_manager(self, queue_config: Optional[QueueConfig], n_consumers): # message_queue.run() # TODO: Remove this once create_producer property is consistently in + # client config if getattr(queue_config_.client_config, "create_producer", True): producer = self.queue_manager.create_producer( queue_name=queue_name, queue_stash=self.queue_stash ) producer.run() + address = producer.address + else: + port = queue_config_.client_config.producer_port + if port is not None: + address = f"tcp://localhost:{port}" + else: + address = None for _ in range(n_consumers): - consumer = self.queue_manager.create_consumer(message_handler) + if address is None: + raise ValueError("address unknown for consumers") + consumer = self.queue_manager.create_consumer( + message_handler, address=address + ) consumer.run() # consumer = self.queue_manager.create_consumer( @@ -397,7 +409,7 @@ def named( *, # Trasterisk name: str, processes: int = 0, - n_consumers: int = 1, + n_consumers: int = 0, reset: bool = False, local_db: bool = False, sqlite_path: Optional[str] = None, diff --git a/packages/syft/src/syft/service/action/action_graph.py b/packages/syft/src/syft/service/action/action_graph.py index d5dcca8df8a..903a1157afe 100644 --- a/packages/syft/src/syft/service/action/action_graph.py +++ b/packages/syft/src/syft/service/action/action_graph.py @@ -231,7 +231,6 @@ def db(self) -> nx.Graph: def _thread_safe_cbk(self, cbk: Callable, *args, **kwargs): # TODO copied method from document_store, have it in one place and reuse? - print(f"CALLING LOCK ON ACTION STORE LEVEL FOR {self.lock._lock._lock_file}") locked = self.lock.acquire(blocking=True) if not locked: @@ -240,7 +239,6 @@ def _thread_safe_cbk(self, cbk: Callable, *args, **kwargs): result = cbk(*args, **kwargs) except BaseException as e: result = Err(str(e)) - print("CALLING RELEASE ON ACTION STORE LEVEL FOR", self.lock._lock._lock_file) self.lock.release() return result diff --git a/packages/syft/src/syft/service/queue/zmq_queue.py b/packages/syft/src/syft/service/queue/zmq_queue.py index 4573104755f..3c473c642e7 100644 --- a/packages/syft/src/syft/service/queue/zmq_queue.py +++ b/packages/syft/src/syft/service/queue/zmq_queue.py @@ -72,20 +72,24 @@ def is_empty(self): @serializable() class ZMQProducer(QueueProducer): - def __init__(self, address: str, queue_name: str, queue_stash) -> None: + def __init__(self, queue_name: str, queue_stash, port: int) -> None: # ctx = zmq.Context.instance() - self.address = address + self.port = port # self._producer = ctx.socket(zmq.REQ) # self._producer.connect(address) self.queue_name = queue_name self.queue_stash = queue_stash self.post_init() + @property + def address(self): + return f"tcp://localhost:{self.port}" + def post_init(self): self.identity = b"%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000)) self.context = zmq.Context(1) self.backend = self.context.socket(zmq.ROUTER) # ROUTER - self.backend.bind("tcp://*:5556") # For workers + self.backend.bind(f"tcp://*:{self.port}") self.poll_workers = zmq.Poller() self.poll_workers.register(self.backend, zmq.POLLIN) self.workers = WorkerQueue() @@ -219,7 +223,7 @@ def create_socket(self): self.identity = b"%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000)) self.worker.setsockopt(zmq.IDENTITY, self.identity) self.poller.register(self.worker, zmq.POLLIN) - self.worker.connect("tcp://localhost:5556") + self.worker.connect(self.address) self.worker.send(PPP_READY) def post_init(self): @@ -332,7 +336,7 @@ class ZMQClientConfig(SyftObject, QueueClientConfig): producer_port: Optional[int] = None # TODO: setting this to false until we can fix the ZMQ # port issue causing tests to randomly fail - create_producer: bool = True + create_producer: bool = False # class MessageQueueConfig(): @@ -415,21 +419,25 @@ def _get_free_tcp_port(host: str): # return self.message_queue def add_producer( - self, queue_name: str, address: Optional[str] = None, queue_stash=None + self, queue_name: str, port: Optional[int] = None, queue_stash=None ) -> ZMQProducer: """Add a producer of a queue. A queue can have at most one producer attached to it. """ - if address is None: + if port is None: if self.config.producer_port is None: self.config.producer_port = self._get_free_tcp_port(self.host) + port = self.config.producer_port + else: + port = self.config.producer_port - if self.config.consumer_port is None: - self.config.consumer_port = self._get_free_tcp_port(self.host) + # if self.config.consumer_port is None: + # self.config.consumer_port = self._get_free_tcp_port(self.host) - address = f"tcp://{self.host}:{self.config.producer_port}" + # self.backend.bind("tcp://*:5556") # For workers + # address = f"tcp://:{self.config.producer_port}" # if queue_name in self.producers: # producer = self.producers[queue_name] @@ -442,9 +450,9 @@ def add_producer( # self.config.producer_port = port # address = f"tcp://{self.host}:{self.config.producer_port}" - print(f"CREATING A PRODUCER ON {address}") + print(f"CREATING A PRODUCER ON {port}") producer = ZMQProducer( - address=address, queue_name=queue_name, queue_stash=queue_stash + queue_name=queue_name, queue_stash=queue_stash, port=port ) self.producers[queue_name] = producer return producer @@ -462,7 +470,8 @@ def add_consumer( """ if address is None: - address = f"tcp://{self.host}:{self.config.consumer_port}" + # address = f"tcp://{self.host}:{self.config.consumer_port}" + address = f"tcp://*:{self.config.consumer_port}" # if queue_name in self.producers: # address = self.producers[queue_name].address # elif queue_name in self.consumers: