Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix trio future deprecation warnings #805

Merged
merged 6 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parsec/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ async def handle_client(self, stream):

if hasattr(client_ctx, "event_bus_ctx"):
with self.event_bus.connection_context() as client_ctx.event_bus_ctx:
with trio.open_cancel_scope() as cancel_scope:
with trio.CancelScope() as cancel_scope:

def _on_revoked(event, organization_id, user_id):
if (
Expand Down
4 changes: 2 additions & 2 deletions parsec/backend/s3_blockstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ async def read(self, organization_id: OrganizationID, id: UUID) -> bytes:
async def create(self, organization_id: OrganizationID, id: UUID, block: bytes) -> None:
slug = f"{organization_id}/{id}"
try:
await trio.run_sync_in_worker_thread(
await trio.to_thread.run_sync(
partial(self._s3.head_object, Bucket=self._s3_bucket, Key=slug)
)
except S3ClientError as exc:
if exc.response["Error"]["Code"] == "404":
try:
await trio.run_sync_in_worker_thread(
await trio.to_thread.run_sync(
partial(self._s3.put_object, Bucket=self._s3_bucket, Key=slug, Body=block)
)
except (S3ClientError, S3EndpointConnectionError) as exc:
Expand Down
6 changes: 3 additions & 3 deletions parsec/backend/swift_blockstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, auth_url, tenant, container, user, password):
async def read(self, organization_id: OrganizationID, id: UUID) -> bytes:
slug = f"{organization_id}/{id}"
try:
headers, obj = await trio.run_sync_in_worker_thread(
headers, obj = await trio.to_thread.run_sync(
self.swift_client.get_object, self._container, slug
)

Expand All @@ -55,13 +55,13 @@ async def read(self, organization_id: OrganizationID, id: UUID) -> bytes:
async def create(self, organization_id: OrganizationID, id: UUID, block: bytes) -> None:
slug = f"{organization_id}/{id}"
try:
_, obj = await trio.run_sync_in_worker_thread(
_, obj = await trio.to_thread.run_sync(
self.swift_client.get_object, self._container, slug
)

except ClientException as exc:
if exc.http_status == 404:
await trio.run_sync_in_worker_thread(
await trio.to_thread.run_sync(
partial(self.swift_client.put_object, self._container, slug, block)
)
else:
Expand Down
4 changes: 2 additions & 2 deletions parsec/core/backend_connection/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ async def wait_offline(nursery):
if connection_states[event_bus] != BackendState.INCOMPATIBLE_VERSION:
event_bus.send("backend.connection.lost")
connection_states[event_bus] = BackendState.LOST
for e in events.values():
e.clear()
for name in events:
events[name] = trio.Event()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May cay dé fous !!!! python-trio/trio#637 ^^

nursery.cancel_scope.cancel()

async def wait_incompatible_version():
Expand Down
6 changes: 3 additions & 3 deletions parsec/core/fs/storage/local_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ async def thread_pool_runner(max_workers=None):
This should be removed if trio decides to add support for thread pools:
https://github.com/python-trio/trio/blob/c5497c5ac4/trio/_threads.py#L32-L128
"""
portal = trio.BlockingTrioPortal()
executor = ThreadPoolExecutor(max_workers=max_workers)
trio_token = trio.hazmat.current_trio_token()

async def run_in_thread(fn, *args):
send_channel, receive_channel = trio.open_memory_channel(1)

def target():
result = outcome.capture(fn, *args)
portal.run_sync(send_channel.send_nowait, result)
trio.from_thread.run_sync(send_channel.send_nowait, result, trio_token=trio_token)

executor.submit(target)
result = await receive_channel.receive()
Expand All @@ -38,7 +38,7 @@ def target():
yield run_in_thread
finally:
with trio.CancelScope(shield=True):
await trio.run_sync_in_worker_thread(executor.shutdown)
await trio.to_thread.run_sync(executor.shutdown)


def protect_with_lock(fn):
Expand Down
21 changes: 16 additions & 5 deletions parsec/core/gui/custom_dialogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
QListView,
)

import trio

from parsec.core.gui.lang import translate as _
from parsec.core.gui import desktop

Expand Down Expand Up @@ -62,11 +64,11 @@ def text(self):

# TODO: If this ever gets used again, it needs to transition to the new job system
class UserInputDialog(QDialog, Ui_InputDialog):
def __init__(self, portal, core, title, message, exclude=None, *args, **kwargs):
def __init__(self, trio_token, core, title, message, exclude=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setupUi(self)
self.core = core
self.portal = portal
self._trio_token = trio_token
self.label_title.setText(title)
self.label_message.setText(message)
self.line_edit_text.setPlaceholderText(_("LABEL_USER_NAME"))
Expand All @@ -87,7 +89,11 @@ def text_changed(self, text):
def show_auto_complete(self):
self.timer.stop()
if len(self.line_edit_text.text()):
users = self.portal.run(self.core.backend_cmds.user_find, self.line_edit_text.text())
users = trio.from_thread.run(
self.core.backend_cmds.user_find,
self.line_edit_text.text(),
trio_token=self._trio_token,
)
if self.exclude:
users = [u for u in users if u not in self.exclude]
completer = QCompleter(users)
Expand All @@ -101,9 +107,14 @@ def text(self):
return self.line_edit_text.text()


def get_user_name(parent, portal, core, title, message, exclude=None):
def get_user_name(parent, trio_token, core, title, message, exclude=None):
m = UserInputDialog(
core=core, portal=portal, title=title, message=message, exclude=exclude, parent=parent
core=core,
trio_token=trio_token,
title=title,
message=message,
exclude=exclude,
parent=parent,
)
status = m.exec_()
if status == QDialog.Accepted:
Expand Down
2 changes: 1 addition & 1 deletion parsec/core/gui/new_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _extract_version_tuple(raw):

async def _do_check_new_version(url):
# urlopen automatically follows redirections
resolved_url = await trio.run_sync_in_worker_thread(
resolved_url = await trio.to_thread.run_sync(
lambda: urlopen(Request(url, method="HEAD")).geturl()
)

Expand Down
32 changes: 9 additions & 23 deletions parsec/core/gui/trio_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def emit(self, *args):


class QtToTrioJob:
def __init__(self, portal, fn, args, kwargs, qt_on_success, qt_on_error):
self._portal = portal
def __init__(self, trio_token, fn, args, kwargs, qt_on_success, qt_on_error):
self._trio_token = trio_token
assert isinstance(qt_on_success, ThreadSafeQtSignal)
assert qt_on_success.args_types in ((), (QtToTrioJob,))
self._qt_on_success = qt_on_success
Expand Down Expand Up @@ -147,22 +147,22 @@ def _set_done(self):
def cancel_and_join(self):
assert self.cancel_scope
try:
self._portal.run_sync(self.cancel_scope.cancel)
trio.from_thread.run_sync(self.cancel_scope.cancel, trio_token=self._trio_token)
except trio.RunFinishedError:
pass
self._done.wait()


class QtToTrioJobScheduler:
def __init__(self):
self._portal = None
self._trio_token = None
self._cancel_scope = None
self.started = threading.Event()
self._stopped = trio.Event()

async def _start(self, *, task_status=trio.TASK_STATUS_IGNORED):
assert not self.started.is_set()
self._portal = trio.BlockingTrioPortal()
self._trio_token = trio.hazmat.current_trio_token()
self._send_job_channel, recv_job_channel = trio.open_memory_channel(1)
try:
async with trio.open_service_nursery() as nursery, recv_job_channel:
Expand All @@ -184,16 +184,16 @@ async def _stop(self):

def stop(self):
try:
self._portal.run(self._stop)
trio.from_thread.run(self._stop, trio_token=self._trio_token)
except trio.RunFinishedError:
pass

def _run_job(self, job, *args, sync=False):
try:
if sync:
return self._portal.run_sync(job, *args)
return trio.from_thread.run_sync(job, *args, trio_token=self._trio_token)
else:
return self._portal.run(job, *args)
return trio.from_thread.run(job, *args, trio_token=self._trio_token)

except trio.BrokenResourceError:
logger.info(f"The submitted job `{job}` won't run as the scheduler is stopped")
Expand All @@ -207,7 +207,7 @@ def submit_job(self, qt_on_success, qt_on_error, fn, *args, **kwargs):
# Fool-proof sanity check, signals must be wrapped in `ThreadSafeQtSignal`
assert not [x for x in args if isinstance(x, pyqtBoundSignal)]
assert not [v for v in kwargs.values() if isinstance(v, pyqtBoundSignal)]
job = QtToTrioJob(self._portal, fn, args, kwargs, qt_on_success, qt_on_error)
job = QtToTrioJob(self._trio_token, fn, args, kwargs, qt_on_success, qt_on_error)

async def _submit_job():
# While inside this async function we are blocking the Qt thread
Expand Down Expand Up @@ -242,25 +242,12 @@ def run_sync(self, fn, *args):
return self._run_job(fn, *args, sync=True)


# TODO: Running the trio loop in a QThread shouldn't be needed
# make sure it's the case, then remove this dead code
# class QtToTrioJobSchedulerThread(QThread):
# def __init__(self, job_scheduler):
# super().__init__()
# self.job_scheduler = job_scheduler

# def run(self):
# trio_run(self.job_scheduler._start)


@contextmanager
def run_trio_thread():
job_scheduler = QtToTrioJobScheduler()
thread = threading.Thread(target=trio_run, args=[job_scheduler._start])
thread.setName("TrioLoop")
thread.start()
# thread = QtToTrioJobSchedulerThread(job_scheduler)
# thread.start()
job_scheduler.started.wait()

try:
Expand All @@ -269,4 +256,3 @@ def run_trio_thread():
finally:
job_scheduler.stop()
thread.join()
# thread.wait()
9 changes: 5 additions & 4 deletions parsec/core/messages_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def _on_backend_online(event):
backend_online_event.set()

def _on_backend_offline(event):
backend_online_event.clear()
nonlocal backend_online_event
backend_online_event = trio.Event()
if process_message_cancel_scope:
process_message_cancel_scope.cancel()

Expand Down Expand Up @@ -62,11 +63,11 @@ async def _process_last_messages():

while True:
await msg_arrived.wait()
msg_arrived.clear()
msg_arrived = trio.Event()
await freeze_messages_monitor_mockpoint()
await _process_last_messages()

except FSBackendOfflineError:
backend_online_event.clear()
backend_online_event = trio.Event()
process_message_cancel_scope = None
msg_arrived.clear()
msg_arrived = trio.Event()
8 changes: 4 additions & 4 deletions parsec/core/mountpoint/fuse_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ async def fuse_mountpoint_runner(
"""
fuse_thread_started = threading.Event()
fuse_thread_stopped = threading.Event()
portal = trio.BlockingTrioPortal()
fs_access = ThreadFSAccess(portal, workspace_fs)
trio_token = trio.hazmat.current_trio_token()
fs_access = ThreadFSAccess(trio_token, workspace_fs)
fuse_operations = FuseOperations(event_bus, fs_access)

mountpoint_path, initial_st_dev = await _bootstrap_mountpoint(
Expand Down Expand Up @@ -141,7 +141,7 @@ def _run_fuse_thread():
# restore the signals to their previous state once the fuse instance is started.
with _reset_signals():
nursery.start_soon(
lambda: trio.run_sync_in_worker_thread(_run_fuse_thread, cancellable=True)
lambda: trio.to_thread.run_sync(_run_fuse_thread, cancellable=True)
)
await _wait_for_fuse_ready(mountpoint_path, fuse_thread_started, initial_st_dev)

Expand Down Expand Up @@ -185,5 +185,5 @@ async def _stop_fuse_thread(mountpoint_path, fuse_operations, fuse_thread_stoppe
await trio.Path(mountpoint_path / "__shutdown_fuse__").exists()
except OSError:
pass
await trio.run_sync_in_worker_thread(fuse_thread_stopped.wait)
await trio.to_thread.run_sync(fuse_thread_stopped.wait)
logger.info("Fuse thread stopped", mountpoint=mountpoint_path)
44 changes: 26 additions & 18 deletions parsec/core/mountpoint/thread_fs_access.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,75 @@
# Parsec Cloud (https://parsec.cloud) Copyright (c) AGPLv3 2019 Scille SAS

import trio


class ThreadFSAccess:
def __init__(self, portal, workspace_fs):
def __init__(self, trio_token, workspace_fs):
self.workspace_fs = workspace_fs
self._portal = portal
self._trio_token = trio_token

def _run(self, fn, *args):
return trio.from_thread.run(fn, *args, trio_token=self._trio_token)

def _run_sync(self, fn, *args):
return trio.from_thread.run_sync(fn, *args, trio_token=self._trio_token)

# Rights check

def check_read_rights(self, path):
return self._portal.run_sync(self.workspace_fs.transactions.check_read_rights, path)
return self._run_sync(self.workspace_fs.transactions.check_read_rights, path)

def check_write_rights(self, path):
return self._portal.run_sync(self.workspace_fs.transactions.check_write_rights, path)
return self._run_sync(self.workspace_fs.transactions.check_write_rights, path)

# Entry transactions

def entry_info(self, path):
return self._portal.run(self.workspace_fs.transactions.entry_info, path)
return self._run(self.workspace_fs.transactions.entry_info, path)

def entry_rename(self, source, destination, *, overwrite):
return self._portal.run(
return self._run(
self.workspace_fs.transactions.entry_rename, source, destination, overwrite
)

# Folder transactions

def folder_create(self, path):
return self._portal.run(self.workspace_fs.transactions.folder_create, path)
return self._run(self.workspace_fs.transactions.folder_create, path)

def folder_delete(self, path):
return self._portal.run(self.workspace_fs.transactions.folder_delete, path)
return self._run(self.workspace_fs.transactions.folder_delete, path)

# File transactions

def file_create(self, path, *, open):
return self._portal.run(self.workspace_fs.transactions.file_create, path, open)
return self._run(self.workspace_fs.transactions.file_create, path, open)

def file_open(self, path, *, mode):
return self._portal.run(self.workspace_fs.transactions.file_open, path, mode)
return self._run(self.workspace_fs.transactions.file_open, path, mode)

def file_delete(self, path):
return self._portal.run(self.workspace_fs.transactions.file_delete, path)
return self._run(self.workspace_fs.transactions.file_delete, path)

def file_resize(self, path, length):
return self._portal.run(self.workspace_fs.transactions.file_resize, path, length)
return self._run(self.workspace_fs.transactions.file_resize, path, length)

# File descriptor transactions

def fd_close(self, fh):
return self._portal.run(self.workspace_fs.transactions.fd_close, fh)
return self._run(self.workspace_fs.transactions.fd_close, fh)

def fd_seek(self, fh, offset):
return self._portal.run(self.workspace_fs.transactions.fd_seek, fh, offset)
return self._run(self.workspace_fs.transactions.fd_seek, fh, offset)

def fd_read(self, fh, size, offset):
return self._portal.run(self.workspace_fs.transactions.fd_read, fh, size, offset)
return self._run(self.workspace_fs.transactions.fd_read, fh, size, offset)

def fd_write(self, fh, data, offset):
return self._portal.run(self.workspace_fs.transactions.fd_write, fh, data, offset)
return self._run(self.workspace_fs.transactions.fd_write, fh, data, offset)

def fd_resize(self, fh, length):
return self._portal.run(self.workspace_fs.transactions.fd_resize, fh, length)
return self._run(self.workspace_fs.transactions.fd_resize, fh, length)

def fd_flush(self, fh):
return self._portal.run(self.workspace_fs.transactions.fd_flush, fh)
return self._run(self.workspace_fs.transactions.fd_flush, fh)
Loading