Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass Queue Config to Worker #8084

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions notebooks/Experimental/Shubham/Testing Queue with Uvicorn.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Test Queue with Uvicorn"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Install Syft & Import packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"SYFT_VERSION = \">=0.8.2.b0,<0.9\"\n",
"package_string = f'\"syft{SYFT_VERSION}\"'\n",
"# %pip install {package_string} -q"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import syft as sy\n",
"sy.requires(SYFT_VERSION)\n",
"from syft import autocache\n",
"import pandas as pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Launch a Syft Domain Server"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Launch a fresh domain server named \"test-domain-1\" in dev mode on the local machine\n",
"node = sy.orchestra.launch(name=\"test-domain-1\", port=\"auto\", dev_mode=True, reset=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# log into the node with default root credentials\n",
"domain_client = node.login(email=\"[email protected]\", password=\"changethis\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# List the available API\n",
"domain_client.api"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data Subjects\n",
"\n",
"Think of Data Subjects as individuals/organizations/institutions owning a dataset that you can pool together privately in Syft.\n",
"\n",
"For this notebook, we'll create a sample dataset that includes trade information of various commodities for different countries."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Check for existing Data Subjects\n",
"data_subjects = domain_client.data_subject_registry.get_all(blocking=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data_subjects"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"data_subjects.resolve.data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"users = domain_client.users.get_all(blocking=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"users "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"users.resolve.data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": true
}
},
"nbformat": 4,
"nbformat_minor": 4
}
15 changes: 13 additions & 2 deletions packages/syft/src/syft/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ def init_blob_storage(self, config: Optional[BlobStorageConfig] = None) -> None:

def init_queue_manager(self, queue_config: Optional[QueueConfig]):
queue_config_ = ZMQQueueConfig() if queue_config is None else queue_config
self.queue_config = queue_config_

MessageHandlers = [APICallMessageHandler]

Expand Down Expand Up @@ -694,7 +695,12 @@ def resolve_future(
result = self.queue_stash.pop_on_complete(credentials, uid)

if result.is_ok():
return result.ok()
queue_obj = result.ok()
queue_obj._set_obj_location_(
node_uid=self.id,
credentials=credentials,
)
return queue_obj
return result.err()

def forward_message(
Expand Down Expand Up @@ -812,7 +818,12 @@ def handle_api_call_with_unsigned_result(
)
else:
task_uid = UID()
item = QueueItem(id=task_uid, node_uid=self.id)
item = QueueItem(
id=task_uid,
node_uid=self.id,
syft_client_verify_key=api_call.credentials,
syft_node_location=self.id,
)
# 🟡 TODO 36: Needs distributed lock
self.queue_stash.set_placeholder(self.verify_key, item)

Expand Down
9 changes: 5 additions & 4 deletions packages/syft/src/syft/node/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# third party
from fastapi import APIRouter
from fastapi import FastAPI
import gipc
import requests
from starlette.middleware.cors import CORSMiddleware
import uvicorn
Expand Down Expand Up @@ -96,7 +97,7 @@ async def _run_uvicorn(

worker = worker_class.named(
name=name,
processes=0,
processes=1,
reset=reset,
local_db=True,
node_type=node_type,
Expand All @@ -106,7 +107,7 @@ async def _run_uvicorn(
else:
worker = worker_class(
name=name,
processes=0,
processes=1,
local_db=True,
node_type=node_type,
node_side_type=node_side_type,
Expand All @@ -131,7 +132,7 @@ async def _run_uvicorn(
logging.getLogger("uvicorn").setLevel(logging.CRITICAL)
logging.getLogger("uvicorn.access").setLevel(logging.CRITICAL)
config = uvicorn.Config(
app, host=host, port=port, log_level=log_level, reload=dev_mode
app, host=host, port=port, log_level=log_level, reload=dev_mode, workers=10
)
server = uvicorn.Server(config)

Expand Down Expand Up @@ -165,7 +166,7 @@ def serve_node(
tail: bool = False,
enable_warnings: bool = False,
) -> Tuple[Callable, Callable]:
server_process = multiprocessing.Process(
server_process = gipc.start_process(
target=run_uvicorn,
args=(
name,
Expand Down
3 changes: 3 additions & 0 deletions packages/syft/src/syft/node/worker_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ..abstract_node import NodeType
from ..node.credentials import SyftSigningKey
from ..serde.serializable import serializable
from ..service.queue.base_queue import QueueConfig
from ..store.blob_storage import BlobStorageConfig
from ..store.document_store import StoreConfig
from ..types.syft_object import SYFT_OBJECT_VERSION_1
Expand All @@ -33,6 +34,7 @@ class WorkerSettings(SyftObject):
document_store_config: StoreConfig
action_store_config: StoreConfig
blob_store_config: Optional[BlobStorageConfig]
queue_config: Optional[QueueConfig]

@staticmethod
def from_node(node: AbstractNode) -> Self:
Expand All @@ -45,4 +47,5 @@ def from_node(node: AbstractNode) -> Self:
action_store_config=node.action_store_config,
node_side_type=node.node_side_type.value,
blob_store_config=node.blob_store_config,
queue_config=node.queue_config,
)
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,4 @@ def handle_message(message: bytes):
status=status,
)

worker.queue_stash.set_result(worker.verify_key, item)
worker.queue_stash.set_result(api_call.credentials, item)