From e0282b6b86b4c7e6af50daa169d004c9ccd405a2 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Mon, 2 Oct 2023 21:25:26 +0200 Subject: [PATCH] clean up code --- notebooks/helm/helm_syft.ipynb | 213 +++++++++--------- .../src/syft/service/queue/queue_stash.py | 69 ------ .../syft/src/syft/service/queue/zmq_queue.py | 124 ---------- packages/syft/src/syft/store/locks.py | 34 --- 4 files changed, 112 insertions(+), 328 deletions(-) diff --git a/notebooks/helm/helm_syft.ipynb b/notebooks/helm/helm_syft.ipynb index 25f44a2e76e..8928d58a208 100644 --- a/notebooks/helm/helm_syft.ipynb +++ b/notebooks/helm/helm_syft.ipynb @@ -2,9 +2,17 @@ "cells": [ { "cell_type": "code", - "execution_count": 23, + "execution_count": 1, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /Users/koen/workspace/pysyft\n" + ] + } + ], "source": [ "import syft as sy\n", "from syft.store.blob_storage import BlobStorageConfig, BlobStorageClientConfig\n", @@ -17,21 +25,21 @@ }, { "cell_type": "code", - "execution_count": 26, + "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "CREATING A PRODUCER ON 55945\n", - "CREATING A CONSUMER ON tcp://localhost:55945\n", + "CREATING A PRODUCER ON 57231\n", + "CREATING A CONSUMER ON tcp://localhost:57231\n", "spawning thread\n", - "CREATING A CONSUMER ON tcp://localhost:55945\n", + "CREATING A CONSUMER ON tcp://localhost:57231\n", "spawning thread\n", - "CREATING A CONSUMER ON tcp://localhost:55945\n", + "CREATING A CONSUMER ON tcp://localhost:57231\n", "spawning thread\n", - "CREATING A CONSUMER ON tcp://localhost:55945\n", + "CREATING A CONSUMER ON tcp://localhost:57231\n", "spawning thread\n", "Logged into as \n" ] @@ -67,7 +75,7 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -83,7 +91,7 @@ }, { "cell_type": "code", - "execution_count": 28, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -99,7 +107,7 @@ }, { "cell_type": "code", - "execution_count": 29, + "execution_count": 5, "metadata": {}, "outputs": [], "source": [ @@ -111,7 +119,7 @@ }, { "cell_type": "code", - "execution_count": 30, + "execution_count": 6, "metadata": {}, "outputs": [], "source": [ @@ -123,7 +131,7 @@ }, { "cell_type": "code", - "execution_count": 31, + "execution_count": 7, "metadata": {}, "outputs": [], "source": [ @@ -140,7 +148,7 @@ }, { "cell_type": "code", - "execution_count": 32, + "execution_count": 8, "metadata": {}, "outputs": [ { @@ -241,7 +249,7 @@ }, { "cell_type": "code", - "execution_count": 33, + "execution_count": 9, "metadata": {}, "outputs": [ { @@ -253,7 +261,7 @@ "SyftSuccess: User Code Submitted" ] }, - "execution_count": 33, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } @@ -264,7 +272,7 @@ }, { "cell_type": "code", - "execution_count": 34, + "execution_count": 10, "metadata": {}, "outputs": [ { @@ -300,7 +308,7 @@ }, { "cell_type": "code", - "execution_count": 35, + "execution_count": 11, "metadata": {}, "outputs": [ { @@ -313,13 +321,13 @@ { "data": { "text/html": [ - "
SyftSuccess: Request 52b36cb4b34e4d918ad6f0a5ce264f2a changes applied

" + "
SyftSuccess: Request a362a0fc05e04a678f7323ad638a5375 changes applied

" ], "text/plain": [ - "SyftSuccess: Request 52b36cb4b34e4d918ad6f0a5ce264f2a changes applied" + "SyftSuccess: Request a362a0fc05e04a678f7323ad638a5375 changes applied" ] }, - "execution_count": 35, + "execution_count": 11, "metadata": {}, "output_type": "execute_result" } @@ -331,7 +339,7 @@ }, { "cell_type": "code", - "execution_count": 36, + "execution_count": 12, "metadata": {}, "outputs": [], "source": [ @@ -347,7 +355,7 @@ }, { "cell_type": "code", - "execution_count": 37, + "execution_count": 13, "metadata": {}, "outputs": [ { @@ -355,7 +363,7 @@ "text/markdown": [ "```python\n", "class Job:\n", - " id: UID = 76f0f66c1cbd44e5b1157595ca908ff9\n", + " id: UID = 9ac59d21db1d424d8c75e8182a564a17\n", " status: created\n", " has_parent: False\n", " result: None\n", @@ -369,7 +377,7 @@ "syft.service.job.job_stash.Job" ] }, - "execution_count": 37, + "execution_count": 13, "metadata": {}, "output_type": "execute_result" } @@ -380,21 +388,28 @@ }, { "cell_type": "code", - "execution_count": 38, + "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "FUNCTION LOG (30d02d4caf5245ee94ab1c9e6cd2a926): starting overlap computation\n", - "FUNCTION LOG (30d02d4caf5245ee94ab1c9e6cd2a926): preparing scenarios and creating indexes\n", - "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): starting overlap computation\n", - "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): preparing scenarios and creating indexes\n", - "FUNCTION LOG (30d02d4caf5245ee94ab1c9e6cd2a926): computing overlap\n", - "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): done\n", - "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): computing overlap\n", - "FUNCTION LOG (ec687a7d68ec4e02a8d6ba9ba84b6239): done\n" + "FUNCTION LOG (357baa07ea3646a394a90693dd191071): starting overlap computation\n", + "FUNCTION LOG (2157ddd1507a40fdba292bbf005d456c): starting overlap computation\n", + "FUNCTION LOG (2157ddd1507a40fdba292bbf005d456c): preparing scenarios and creating indexes\n", + "FUNCTION LOG (2157ddd1507a40fdba292bbf005d456c): preparing scenarios and creating indexes\n", + "FUNCTION LOG (2157ddd1507a40fdba292bbf005d456c): computing overlap\n", + "FUNCTION LOG (2157ddd1507a40fdba292bbf005d456c): computing overlap\n", + "FUNCTION LOG (2157ddd1507a40fdba292bbf005d456c): done\n", + "FUNCTION LOG (2157ddd1507a40fdba292bbf005d456c): done\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "E: Invalid message from worker: [b'']\n" ] }, { @@ -404,7 +419,7 @@ "None" ] }, - "execution_count": 38, + "execution_count": 14, "metadata": {}, "output_type": "execute_result" } @@ -415,7 +430,7 @@ }, { "cell_type": "code", - "execution_count": 39, + "execution_count": 15, "metadata": {}, "outputs": [ { @@ -622,7 +637,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table98c67862061d446395eb05f1d2b04fbf {\n", + " .grid-table155a96a951b04e8b8fb79dc8218f6046 {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(24, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -794,25 +809,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -1037,7 +1052,7 @@ "[syft.service.job.job_stash.Job, syft.service.job.job_stash.Job]" ] }, - "execution_count": 39, + "execution_count": 15, "metadata": {}, "output_type": "execute_result" } @@ -1048,7 +1063,7 @@ }, { "cell_type": "code", - "execution_count": 40, + "execution_count": 16, "metadata": {}, "outputs": [ { @@ -1056,10 +1071,6 @@ "output_type": "stream", "text": [ "starting overlap computation\n", - "preparing scenarios and creating indexes\n", - "computing overlap\n", - "done\n", - "done\n", "\n" ] } @@ -1070,7 +1081,7 @@ }, { "cell_type": "code", - "execution_count": 41, + "execution_count": 17, "metadata": {}, "outputs": [], "source": [ @@ -1079,7 +1090,7 @@ }, { "cell_type": "code", - "execution_count": 42, + "execution_count": 18, "metadata": {}, "outputs": [ { @@ -1112,7 +1123,7 @@ " 'anatomy_test_5': 135}))]" ] }, - "execution_count": 42, + "execution_count": 18, "metadata": {}, "output_type": "execute_result" } @@ -1131,7 +1142,7 @@ }, { "cell_type": "code", - "execution_count": 43, + "execution_count": 19, "metadata": {}, "outputs": [], "source": [ @@ -1182,7 +1193,7 @@ }, { "cell_type": "code", - "execution_count": 44, + "execution_count": 20, "metadata": {}, "outputs": [ { diff --git a/packages/syft/src/syft/service/queue/queue_stash.py b/packages/syft/src/syft/service/queue/queue_stash.py index 8640dfba4b2..067069c3c7e 100644 --- a/packages/syft/src/syft/service/queue/queue_stash.py +++ b/packages/syft/src/syft/service/queue/queue_stash.py @@ -51,74 +51,12 @@ class QueueItem(SyftObject): api_call: Union[SyftAPICall, SignedSyftAPICall] worker_settings: WorkerSettings - # def fetch(self) -> None: - # api = APIRegistry.api_for( - # node_uid=self.node_uid, - # user_verify_key=self.syft_client_verify_key, - # ) - # call = SyftAPICall( - # node_uid=self.node_uid, - # path="queue", - # args=[], - # kwargs={"uid": self.id}, - # blocking=True, - # ) - # result = api.make_call(call) - # if isinstance(result, QueueItem): - # self.resolved = result.resolved - # if result.resolved: - # self.result = result.result - # self.status = result.status - - # @property - # def subjobs(self): - # api = APIRegistry.api_for( - # node_uid=self.node_uid, - # user_verify_key=self.syft_client_verify_key, - # ) - # return api.services.queue.get_subjobs(self.id) - - # def logs(self, _print=True): - # api = APIRegistry.api_for( - # node_uid=self.node_uid, - # user_verify_key=self.syft_client_verify_key, - # ) - # log_item = api.services.log.get(self.log_id) - # res = log_item.stdout - # if _print: - # print(res) - # else: - # return res - def __repr__(self) -> str: return f": {self.status}" def _repr_markdown_(self) -> str: return f": {self.status}" - # def wait(self): - # # stdlib - # from time import sleep - - # # todo: timeout - # if self.resolved: - # return self.resolve - # while True: - # self.fetch() - # sleep(0.1) - # if self.resolved: - # break - # return self.resolve - - # @property - # def resolve(self) -> Union[Any, SyftNotReady]: - # if not self.resolved: - # self.fetch() - - # if self.resolved: - # return self.result.message.data - # return SyftNotReady(message=f"{self.id} not ready yet.") - @instrument @serializable() @@ -167,13 +105,6 @@ def get_by_uid( item = self.query_one(credentials=credentials, qks=qks) return item - # def get_by_parent_id( - # self, credentials: SyftVerifyKey, uid: UID - # ) -> Result[Optional[QueueItem], str]: - # qks = QueryKeys(qks=[PartitionKey(key="parent_queue_item_id", type_=UID).with_obj(uid)]) - # item = self.query_all(credentials=credentials, qks=qks) - # return item - def pop( self, credentials: SyftVerifyKey, uid: UID ) -> Result[Optional[QueueItem], str]: diff --git a/packages/syft/src/syft/service/queue/zmq_queue.py b/packages/syft/src/syft/service/queue/zmq_queue.py index 3c473c642e7..c248f12ce0c 100644 --- a/packages/syft/src/syft/service/queue/zmq_queue.py +++ b/packages/syft/src/syft/service/queue/zmq_queue.py @@ -180,22 +180,6 @@ def _run(self): except Exception as e: print(f"Error in producer {e}") - # def send(self, message: bytes) -> None: - # try: - # message_list = [message] - # # TODO: Enable zero copy - # self._producer.send_multipart(message_list) - # _ = self._producer.recv() - # # print("Message Queued Successfully !", flush=True) - # except zmq.Again as e: - # # TODO: Add retry mechanism if this error occurs - # raise e - # except zmq.ZMQError as e: - # if e.errno == zmq.ETERM: - # print("Connection Interrupted....") - # else: - # raise e - def close(self): self._producer.close() @@ -233,22 +217,6 @@ def post_init(self): # self._consumer = ctx.socket(zmq.REP) self.thread = None - # def receive(self): - # try: - # print(f"Starting receival ({self.id})") - # message_list = self._consumer.recv_multipart() - # print(f"Received stuff ({self.id})") - # self._consumer.send(b"") - # print(f"sent back confirmation ({self.id})") - # message = message_list[0] - # print("Message Received Successfully !", flush=True) - # except zmq.ZMQError as e: - # if e.errno == zmq.ETERM: - # print("Subscriber connection Terminated") - # else: - # raise e - # self.message_handler.handle_message(message=message) - def _run(self): liveness = HEARTBEAT_LIVENESS interval = INTERVAL_INIT @@ -339,55 +307,6 @@ class ZMQClientConfig(SyftObject, QueueClientConfig): create_producer: bool = False -# class MessageQueueConfig(): - -# @staticmethod -# def _get_free_tcp_port(host: str): -# with socketserver.TCPServer((host, 0), None) as s: -# free_port = s.server_address[1] -# return free_port - -# def __init__(self, producer_port: Optional[int]=None, consumer_port: Optional[int] = None): -# self.producer_port = producer_port if producer_port is not None else self._get_free_tcp_port(self.host) -# self.consumer_port = consumer_port - - -# class MessageQueue: -# def __init__(self, consumer_port, producer_port): -# self.consumer_port = consumer_port -# self.producer_port = producer_port -# self.post_init() - -# def post_init(self): -# self.thread = None -# self.ctx = zmq.Context.instance() - -# # Socket facing clients -# self._frontend = self.ctx.socket(zmq.ROUTER) -# self._frontend.bind(f"tcp://*:{self.producer_port}") - -# # Socket facing services -# self._backend = self.ctx.socket(zmq.DEALER) -# self._backend.bind(f"tcp://*:{self.consumer_port}") -# # poller = zmq.Poller() -# # poller.register(frontend, zmq.POLLIN) -# # poller.register(backend, zmq.POLLIN) - -# def _run(self): -# zmq.proxy(self._frontend, self._backend) -# # we never get here -# self._frontend.close() -# self._backend.close() -# self.ctx.term() - -# def run(self): -# # stdlib -# import threading - -# self.thread = threading.Thread(target=self._run) -# self.thread.start() - - @serializable(attrs=["host"]) class ZMQClient(QueueClient): """ZMQ Client for creating producers and consumers.""" @@ -408,16 +327,6 @@ def _get_free_tcp_port(host: str): free_port = s.server_address[1] return free_port - # def add_message_queue(self, queue_name: str): - # if self.config.consumer_port is None: - # self.config.consumer_port = self._get_free_tcp_port(self.host) - # if self.config.producer_port is None: - # self.config.producer_port = self._get_free_tcp_port(self.host) - # self.message_queue = MessageQueue( - # self.config.consumer_port, self.config.producer_port - # ) - # return self.message_queue - def add_producer( self, queue_name: str, port: Optional[int] = None, queue_stash=None ) -> ZMQProducer: @@ -433,24 +342,6 @@ def add_producer( else: port = self.config.producer_port - # if self.config.consumer_port is None: - # self.config.consumer_port = self._get_free_tcp_port(self.host) - - # self.backend.bind("tcp://*:5556") # For workers - # address = f"tcp://:{self.config.producer_port}" - - # if queue_name in self.producers: - # producer = self.producers[queue_name] - # if producer.alive: - # return producer - # address = producer.address - - # if not address: - # if self.config.producer_port is None: - # self.config.producer_port = port - # address = f"tcp://{self.host}:{self.config.producer_port}" - - print(f"CREATING A PRODUCER ON {port}") producer = ZMQProducer( queue_name=queue_name, queue_stash=queue_stash, port=port ) @@ -472,27 +363,12 @@ def add_consumer( if address is None: # address = f"tcp://{self.host}:{self.config.consumer_port}" address = f"tcp://*:{self.config.consumer_port}" - # if queue_name in self.producers: - # address = self.producers[queue_name].address - # elif queue_name in self.consumers: - # consumers = self.consumers[queue_name] - # consumer = consumers[0] if len(consumers) > 0 else None - # address = consumer.address if consumer else None - - # address = ( - # self._get_free_tcp_addr( - # self.host, - # ) - # if address is None - # else address - # ) consumer = ZMQConsumer( queue_name=queue_name, message_handler=message_handler, address=address, ) - print(f"CREATING A CONSUMER ON {address}") self.consumers[queue_name].append(consumer) return consumer diff --git a/packages/syft/src/syft/store/locks.py b/packages/syft/src/syft/store/locks.py index 071f757985d..c790967db61 100644 --- a/packages/syft/src/syft/store/locks.py +++ b/packages/syft/src/syft/store/locks.py @@ -247,20 +247,17 @@ def _acquire_file_lock(self) -> bool: time.sleep(0.1) if _retry == 9: pass - # print("(1) COULD NOT READ LOCK FILE", self._data_file) now = self._now() has_expired = self._has_expired(data, now) if owner != data["owner"]: if not has_expired: - # print("(2) COULD NOT GET FILE LOCK FOR ", self._data_file) # Someone else holds the lock. return False else: # Lock is available for us to take. data = {"owner": owner, "expiry_time": self._expiry_time()} else: - # print("(3) COULD NOT GET FILE LOCK FOR ", self._data_file) # Same owner so do not set or modify Lease. return False else: @@ -273,22 +270,6 @@ def _acquire_file_lock(self) -> bool: # We succeeded in writing to the file so we now hold the lock. self._owner = owner - # # increment lock count - # thread_id = threading.current_thread().ident - # current_dict = THREAD_FILE_LOCKS[thread_id] - # path = str(self._lock_file.lock_file) - # if path not in current_dict: - # current_dict[path] = 0 - - # total_files = 0 - # for k,v in THREAD_FILE_LOCKS.items(): - # for j, i in v.items(): - # total_files += i - - # current_dict[path] += 1 - # THREAD_FILE_LOCKS[thread_id] = current_dict - # print(f"Acquiring. Open Files: {total_files} Thread Lock State:", json.dumps(THREAD_FILE_LOCKS, indent=2)) - return True @property @@ -323,7 +304,6 @@ def _locked(self): return True def _release_file_lock(self) -> None: - # print(f"CALLING RELEASE FOR {self._lock_file.lock_file}") if not self._lock_file_enabled: return @@ -346,21 +326,7 @@ def _release_file_lock(self) -> None: return if self._owner == data["owner"]: - # print("> PatchedFileLock unlink") self._data_file.unlink() - # # decrement lock count - # thread_id = threading.current_thread().ident - # current_dict = THREAD_FILE_LOCKS[thread_id] - # path = str(self._lock_file.lock_file) - # if path not in current_dict: - # current_dict[path] = 0 - # current_dict[path] -= 1 - # THREAD_FILE_LOCKS[thread_id] = current_dict - # total_files = 0 - # for k,v in THREAD_FILE_LOCKS.items(): - # for j, i in v.items(): - # total_files += i - self._owner = None