From 43190a0fb5ca8e13d2f96cbfd07563531b9edc0c Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Thu, 28 Sep 2023 11:40:30 +0200 Subject: [PATCH] - --- notebooks/helm/helm-syft.ipynb | 302 ++++++++++++++---- notebooks/helm/test.py | 1 + .../src/syft/service/action/action_graph.py | 3 + .../syft/src/syft/service/queue/zmq_queue.py | 76 ++--- .../syft/src/syft/store/document_store.py | 3 + packages/syft/src/syft/store/locks.py | 66 +++- 6 files changed, 352 insertions(+), 99 deletions(-) diff --git a/notebooks/helm/helm-syft.ipynb b/notebooks/helm/helm-syft.ipynb index 787e543709f..a8eae9e5739 100644 --- a/notebooks/helm/helm-syft.ipynb +++ b/notebooks/helm/helm-syft.ipynb @@ -10,7 +10,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /Users/koen/workspace/pysyft\n" + "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /Users/koen/workspace/pysyft/notebooks\n" ] } ], @@ -24,11 +24,54 @@ { "cell_type": "code", "execution_count": 2, - "id": "2b24b2ee", + "id": "95e55a0a", "metadata": {}, "outputs": [], "source": [ - "# node = sy.orchestra.launch(name=\"test-domain-helm\", port=8800, dev_mode=True, reset=True, tail=True)" + "# subprocess.check_output(\n", + "# [lsof_path, \"-w\", \"-Fd\", \"-p\", str(os.getpid())]\n", + "# ).decode().split(os.linesep)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "dcd8f67c", + "metadata": {}, + "outputs": [], + "source": [ + "import os" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "151a3517", + "metadata": {}, + "outputs": [], + "source": [ + "from collections import Counter\n", + "def print_open_files():\n", + " import psutil\n", + "\n", + " files = psutil.Process(pid=os.getpid()).open_files()\n", + " print('len(files):', len(files))\n", + " print(Counter([f.path for f in files]))\n", + "# for f in files:\n", + "# print('FD:', f.fd, f.path)\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "ff6b7f51", + "metadata": {}, + "outputs": [], + "source": [ + "import psutil\n", + "\n", + "import inspect" ] }, { @@ -41,7 +84,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 6, "id": "4cbee0b1", "metadata": {}, "outputs": [], @@ -59,7 +102,27 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 7, + "id": "46b2c271", + "metadata": {}, + "outputs": [], + "source": [ + "# get_open_fds()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "0e8e3591", + "metadata": {}, + "outputs": [], + "source": [ + "# node.python_node" + ] + }, + { + "cell_type": "code", + "execution_count": 9, "id": "9b31c627", "metadata": {}, "outputs": [ @@ -67,7 +130,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "CREATING A PRODUCER ON tcp://127.0.0.1:56761\n", + "CREATING A PRODUCER ON tcp://127.0.0.1:56528\n", "Logged into as \n" ] }, @@ -91,17 +154,83 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 10, + "id": "6cd7e297", + "metadata": {}, + "outputs": [], + "source": [ + "# node.python_node.document_store.partitions[\"QueueItem\"].data._db" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "5357973f", + "metadata": {}, + "outputs": [], + "source": [ + "# print_open_files()" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "9f12c716", + "metadata": {}, + "outputs": [], + "source": [ + "# client.register(name=\"a\", email=\"A@b.org\", password=\"AVC\")" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "61d4752a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "[1, 2]" + ], + "text/plain": [ + "[1, 2]" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "x = ActionObject.from_obj([1,2])\n", + "x_ptr = x.send(client)\n", + "x_ptr.get()" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "1bea15f7", + "metadata": {}, + "outputs": [], + "source": [ + "# print_open_files()" + ] + }, + { + "cell_type": "code", + "execution_count": 15, "id": "97ae5ebd", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "" + "" ] }, - "execution_count": 5, + "execution_count": 15, "metadata": {}, "output_type": "execute_result" } @@ -112,7 +241,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 16, "id": "2b382f5a", "metadata": {}, "outputs": [], @@ -122,7 +251,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 17, "id": "830cb5cf", "metadata": {}, "outputs": [ @@ -147,7 +276,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 18, "id": "1a612f09", "metadata": {}, "outputs": [], @@ -157,7 +286,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 19, "id": "67a1ee69", "metadata": {}, "outputs": [], @@ -167,7 +296,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 20, "id": "475f6a75", "metadata": {}, "outputs": [], @@ -177,7 +306,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 21, "id": "11728060", "metadata": {}, "outputs": [], @@ -211,7 +340,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 22, "id": "83307a2f", "metadata": {}, "outputs": [], @@ -230,7 +359,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 23, "id": "5d2fd248", "metadata": {}, "outputs": [ @@ -260,7 +389,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 24, "id": "9ba22655", "metadata": {}, "outputs": [ @@ -273,7 +402,7 @@ "SyftSuccess: User Code Submitted" ] }, - "execution_count": 14, + "execution_count": 24, "metadata": {}, "output_type": "execute_result" } @@ -292,7 +421,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 25, "id": "ca1b95ee", "metadata": {}, "outputs": [ @@ -316,13 +445,15 @@ " print(\"Launching jobs\")\n", " for elem in x:\n", " # We inject a domain object in the scope\n", - " batch_job = domain.launch_job(process_batch, batch=elem)\n", - " jobs += [batch_job]\n", + " from time import sleep\n", + " sleep(1)\n", + "# batch_job = domain.launch_job(process_batch, batch=elem)\n", + "# jobs += [batch_job]\n", " print(\"starting aggregation\")\n", " print(\"Done\")\n", - " results = [x.wait().get() for x in jobs]\n", - "# return 1\n", - " return sum(results)" + "# results = [x.wait().get() for x in jobs]\n", + " return 1\n", + "# return sum(results)" ] }, { @@ -335,7 +466,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 26, "id": "0ab572f9", "metadata": { "scrolled": false @@ -351,13 +482,13 @@ { "data": { "text/html": [ - "
SyftSuccess: Request b1d451b802dd4983a60cf701faef5339 changes applied

" + "
SyftSuccess: Request 5484ed5429be4f97806318e87d49a11e changes applied

" ], "text/plain": [ - "SyftSuccess: Request b1d451b802dd4983a60cf701faef5339 changes applied" + "SyftSuccess: Request 5484ed5429be4f97806318e87d49a11e changes applied" ] }, - "execution_count": 16, + "execution_count": 26, "metadata": {}, "output_type": "execute_result" } @@ -369,7 +500,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 27, "id": "375ed965", "metadata": { "scrolled": true @@ -382,67 +513,118 @@ "FUNCTION LOG: Launching jobs\n", "FUNCTION LOG: starting aggregation\n", "FUNCTION LOG: Done\n", - "FUNCTION LOG: starting batch 1\n", - "FUNCTION LOG: starting batch 2\n", - "FUNCTION LOG: done\n", - "result 2\n", - "FUNCTION LOG: done\n", - "result 3\n" + "result 1\n" ] } ], "source": [ "job = client.code.process_all(x=x_ptr, blocking=False)\n", - "sleep(1)" + "sleep(5)" ] }, { "cell_type": "code", - "execution_count": 18, + "execution_count": null, + "id": "a1afd4c5", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "52163cc4", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "728406d4", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7f5ace83", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "e7a94e1c", + "metadata": {}, + "outputs": [], + "source": [ + "# print_open_files()" + ] + }, + { + "cell_type": "code", + "execution_count": 30, "id": "7b8b2a2e", "metadata": {}, + "outputs": [], + "source": [ + "res = job.wait().get()" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "id": "3cba7a2a", + "metadata": {}, "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "result 5\n" - ] - }, { "data": { "text/plain": [ - "5" + "1" ] }, - "execution_count": 18, + "execution_count": 31, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "job.wait().get()" + "res" ] }, { "cell_type": "code", - "execution_count": 19, + "execution_count": null, "id": "9cb2e2bf", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "starting batch 1\n", - "\n" - ] - } - ], + "outputs": [], "source": [ "job.subjobs[0].logs()" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "7fb84ccd", + "metadata": {}, + "outputs": [], + "source": [ + "job.subjobs[1].logs()" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "7c892dcc", + "metadata": {}, + "outputs": [], + "source": [ + "# get_open_fds()" + ] + }, { "cell_type": "markdown", "id": "3d141e3b", @@ -495,7 +677,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.16" + "version": "3.9.18" }, "toc": { "base_numbering": 1, diff --git a/notebooks/helm/test.py b/notebooks/helm/test.py index 56acec1d259..c4c08a7d2a0 100644 --- a/notebooks/helm/test.py +++ b/notebooks/helm/test.py @@ -93,3 +93,4 @@ def process_all(domain, x): sleep(1) job.wait().get() +print("done") diff --git a/packages/syft/src/syft/service/action/action_graph.py b/packages/syft/src/syft/service/action/action_graph.py index 489e51e91e5..d5dcca8df8a 100644 --- a/packages/syft/src/syft/service/action/action_graph.py +++ b/packages/syft/src/syft/service/action/action_graph.py @@ -231,6 +231,8 @@ def db(self) -> nx.Graph: def _thread_safe_cbk(self, cbk: Callable, *args, **kwargs): # TODO copied method from document_store, have it in one place and reuse? + print(f"CALLING LOCK ON ACTION STORE LEVEL FOR {self.lock._lock._lock_file}") + locked = self.lock.acquire(blocking=True) if not locked: return Err("Failed to acquire lock for the operation") @@ -238,6 +240,7 @@ def _thread_safe_cbk(self, cbk: Callable, *args, **kwargs): result = cbk(*args, **kwargs) except BaseException as e: result = Err(str(e)) + print("CALLING RELEASE ON ACTION STORE LEVEL FOR", self.lock._lock._lock_file) self.lock.release() return result diff --git a/packages/syft/src/syft/service/queue/zmq_queue.py b/packages/syft/src/syft/service/queue/zmq_queue.py index 78b683970b2..9e9a01547e0 100644 --- a/packages/syft/src/syft/service/queue/zmq_queue.py +++ b/packages/syft/src/syft/service/queue/zmq_queue.py @@ -350,40 +350,40 @@ class ZMQClientConfig(SyftObject, QueueClientConfig): # self.consumer_port = consumer_port -class MessageQueue: - def __init__(self, consumer_port, producer_port): - self.consumer_port = consumer_port - self.producer_port = producer_port - self.post_init() +# class MessageQueue: +# def __init__(self, consumer_port, producer_port): +# self.consumer_port = consumer_port +# self.producer_port = producer_port +# self.post_init() - def post_init(self): - self.thread = None - self.ctx = zmq.Context.instance() +# def post_init(self): +# self.thread = None +# self.ctx = zmq.Context.instance() - # Socket facing clients - self._frontend = self.ctx.socket(zmq.ROUTER) - self._frontend.bind(f"tcp://*:{self.producer_port}") +# # Socket facing clients +# self._frontend = self.ctx.socket(zmq.ROUTER) +# self._frontend.bind(f"tcp://*:{self.producer_port}") - # Socket facing services - self._backend = self.ctx.socket(zmq.DEALER) - self._backend.bind(f"tcp://*:{self.consumer_port}") - # poller = zmq.Poller() - # poller.register(frontend, zmq.POLLIN) - # poller.register(backend, zmq.POLLIN) +# # Socket facing services +# self._backend = self.ctx.socket(zmq.DEALER) +# self._backend.bind(f"tcp://*:{self.consumer_port}") +# # poller = zmq.Poller() +# # poller.register(frontend, zmq.POLLIN) +# # poller.register(backend, zmq.POLLIN) - def _run(self): - zmq.proxy(self._frontend, self._backend) - # we never get here - self._frontend.close() - self._backend.close() - self.ctx.term() +# def _run(self): +# zmq.proxy(self._frontend, self._backend) +# # we never get here +# self._frontend.close() +# self._backend.close() +# self.ctx.term() - def run(self): - # stdlib - import threading +# def run(self): +# # stdlib +# import threading - self.thread = threading.Thread(target=self._run) - self.thread.start() +# self.thread = threading.Thread(target=self._run) +# self.thread.start() @serializable(attrs=["host"]) @@ -397,7 +397,7 @@ def __init__(self, config: ZMQClientConfig) -> None: self.host = config.hostname self.producers = {} self.consumers = defaultdict(list) - self.message_queue: List[MessageQueue] = None + # self.message_queue: List[MessageQueue] = None self.config = config @staticmethod @@ -406,15 +406,15 @@ def _get_free_tcp_port(host: str): free_port = s.server_address[1] return free_port - def add_message_queue(self, queue_name: str): - if self.config.consumer_port is None: - self.config.consumer_port = self._get_free_tcp_port(self.host) - if self.config.producer_port is None: - self.config.producer_port = self._get_free_tcp_port(self.host) - self.message_queue = MessageQueue( - self.config.consumer_port, self.config.producer_port - ) - return self.message_queue + # def add_message_queue(self, queue_name: str): + # if self.config.consumer_port is None: + # self.config.consumer_port = self._get_free_tcp_port(self.host) + # if self.config.producer_port is None: + # self.config.producer_port = self._get_free_tcp_port(self.host) + # self.message_queue = MessageQueue( + # self.config.consumer_port, self.config.producer_port + # ) + # return self.message_queue def add_producer( self, queue_name: str, address: Optional[str] = None, queue_stash=None diff --git a/packages/syft/src/syft/store/document_store.py b/packages/syft/src/syft/store/document_store.py index 98ca622c5bb..c0e41273a31 100644 --- a/packages/syft/src/syft/store/document_store.py +++ b/packages/syft/src/syft/store/document_store.py @@ -352,14 +352,17 @@ def store_query_keys(self, objs: Any) -> QueryKeys: # Thread-safe methods def _thread_safe_cbk(self, cbk: Callable, *args, **kwargs): + # print(f"CALLING LOCK ON DOCUMENT LEVEL FOR {self.lock._lock._lock_file._context.lock_file}") locked = self.lock.acquire(blocking=True) if not locked: + print("FAILED TO LOCK") return Err("Failed to acquire lock for the operation") try: result = cbk(*args, **kwargs) except BaseException as e: result = Err(str(e)) + # print("CALLING RELEASE ON DOCUMENT STORE LEVEL FOR", self.lock._lock._lock_file) self.lock.release() return result diff --git a/packages/syft/src/syft/store/locks.py b/packages/syft/src/syft/store/locks.py index 95ce447467b..afdb1127711 100644 --- a/packages/syft/src/syft/store/locks.py +++ b/packages/syft/src/syft/store/locks.py @@ -18,6 +18,32 @@ # relative from ..serde.serializable import serializable +from typing import Dict + +from collections import defaultdict + + +THREAD_FILE_LOCKS: Dict[int, Dict[str, int]] = defaultdict(dict) + +import os +import shutil +import subprocess + + +def get_open_fds() -> int: + """Get the number of open file descriptors for the current process.""" + lsof_path = shutil.which("lsof") + if lsof_path is None: + raise NotImplementedError("Didn't handle unavailable lsof.") + raw_procs = subprocess.check_output( + [lsof_path, "-w", "-Ff", "-p", str(os.getpid())] + ) + + def filter_fds(lsof_entry: str) -> bool: + return lsof_entry.startswith("f") and lsof_entry[1:].isdigit() + + fds = list(filter(filter_fds, raw_procs.decode().split(os.linesep))) + return len(fds) @serializable() class LockingConfig(BaseModel): @@ -189,6 +215,9 @@ def _thread_safe_cbk(self, cbk: Callable) -> bool: try: result = cbk() + # import ipdb + # ipdb.set_trace() + except BaseException as e: print(e) result = False @@ -200,7 +229,9 @@ def _acquire(self) -> bool: return self._thread_safe_cbk(self._acquire_file_lock) def _release(self) -> None: - return self._thread_safe_cbk(self._release_file_lock) + res = self._thread_safe_cbk(self._release_file_lock) + # print(get_open_fds()) + return res def _acquire_file_lock(self) -> bool: if not self._lock_file_enabled: @@ -245,6 +276,23 @@ def _acquire_file_lock(self) -> bool: # We succeeded in writing to the file so we now hold the lock. self._owner = owner + # # increment lock count + # thread_id = threading.current_thread().ident + # current_dict = THREAD_FILE_LOCKS[thread_id] + # path = str(self._lock_file.lock_file) + # if path not in current_dict: + # current_dict[path] = 0 + + # total_files = 0 + # for k,v in THREAD_FILE_LOCKS.items(): + # for j, i in v.items(): + # total_files += i + + + # current_dict[path] += 1 + # THREAD_FILE_LOCKS[thread_id] = current_dict + # print(f"Acquiring. Open Files: {total_files} Thread Lock State:", json.dumps(THREAD_FILE_LOCKS, indent=2)) + return True @property @@ -279,6 +327,7 @@ def _locked(self): return True def _release_file_lock(self) -> None: + # print(f"CALLING RELEASE FOR {self._lock_file.lock_file}") if not self._lock_file_enabled: return @@ -301,7 +350,22 @@ def _release_file_lock(self) -> None: return if self._owner == data["owner"]: + # print("> PatchedFileLock unlink") self._data_file.unlink() + # # decrement lock count + # thread_id = threading.current_thread().ident + # current_dict = THREAD_FILE_LOCKS[thread_id] + # path = str(self._lock_file.lock_file) + # if path not in current_dict: + # current_dict[path] = 0 + # current_dict[path] -= 1 + # THREAD_FILE_LOCKS[thread_id] = current_dict + # total_files = 0 + # for k,v in THREAD_FILE_LOCKS.items(): + # for j, i in v.items(): + # total_files += i + + # print(f"Releasing {self._lock_file.lock_file} \nOpen Files: {total_files} \nThread Lock State:", json.dumps(THREAD_FILE_LOCKS, indent=2)) self._owner = None