Skip to content

Commit

Permalink
Fix trio future deprecation warnings (PR #805)
Browse files Browse the repository at this point in the history
  • Loading branch information
vxgmichel authored Nov 19, 2019
2 parents 64ca0ed + 3dfcf48 commit 071f7bf
Show file tree
Hide file tree
Showing 19 changed files with 110 additions and 111 deletions.
2 changes: 1 addition & 1 deletion parsec/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ async def handle_client(self, stream):

if hasattr(client_ctx, "event_bus_ctx"):
with self.event_bus.connection_context() as client_ctx.event_bus_ctx:
with trio.open_cancel_scope() as cancel_scope:
with trio.CancelScope() as cancel_scope:

def _on_revoked(event, organization_id, user_id):
if (
Expand Down
4 changes: 2 additions & 2 deletions parsec/backend/s3_blockstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ async def read(self, organization_id: OrganizationID, id: UUID) -> bytes:
async def create(self, organization_id: OrganizationID, id: UUID, block: bytes) -> None:
slug = f"{organization_id}/{id}"
try:
await trio.run_sync_in_worker_thread(
await trio.to_thread.run_sync(
partial(self._s3.head_object, Bucket=self._s3_bucket, Key=slug)
)
except S3ClientError as exc:
if exc.response["Error"]["Code"] == "404":
try:
await trio.run_sync_in_worker_thread(
await trio.to_thread.run_sync(
partial(self._s3.put_object, Bucket=self._s3_bucket, Key=slug, Body=block)
)
except (S3ClientError, S3EndpointConnectionError) as exc:
Expand Down
6 changes: 3 additions & 3 deletions parsec/backend/swift_blockstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, auth_url, tenant, container, user, password):
async def read(self, organization_id: OrganizationID, id: UUID) -> bytes:
slug = f"{organization_id}/{id}"
try:
headers, obj = await trio.run_sync_in_worker_thread(
headers, obj = await trio.to_thread.run_sync(
self.swift_client.get_object, self._container, slug
)

Expand All @@ -55,13 +55,13 @@ async def read(self, organization_id: OrganizationID, id: UUID) -> bytes:
async def create(self, organization_id: OrganizationID, id: UUID, block: bytes) -> None:
slug = f"{organization_id}/{id}"
try:
_, obj = await trio.run_sync_in_worker_thread(
_, obj = await trio.to_thread.run_sync(
self.swift_client.get_object, self._container, slug
)

except ClientException as exc:
if exc.http_status == 404:
await trio.run_sync_in_worker_thread(
await trio.to_thread.run_sync(
partial(self.swift_client.put_object, self._container, slug, block)
)
else:
Expand Down
4 changes: 2 additions & 2 deletions parsec/core/backend_connection/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ async def wait_offline(nursery):
if connection_states[event_bus] != BackendState.INCOMPATIBLE_VERSION:
event_bus.send("backend.connection.lost")
connection_states[event_bus] = BackendState.LOST
for e in events.values():
e.clear()
for name in events:
events[name] = trio.Event()
nursery.cancel_scope.cancel()

async def wait_incompatible_version():
Expand Down
6 changes: 3 additions & 3 deletions parsec/core/fs/storage/local_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ async def thread_pool_runner(max_workers=None):
This should be removed if trio decides to add support for thread pools:
https://github.com/python-trio/trio/blob/c5497c5ac4/trio/_threads.py#L32-L128
"""
portal = trio.BlockingTrioPortal()
executor = ThreadPoolExecutor(max_workers=max_workers)
trio_token = trio.hazmat.current_trio_token()

async def run_in_thread(fn, *args):
send_channel, receive_channel = trio.open_memory_channel(1)

def target():
result = outcome.capture(fn, *args)
portal.run_sync(send_channel.send_nowait, result)
trio.from_thread.run_sync(send_channel.send_nowait, result, trio_token=trio_token)

executor.submit(target)
result = await receive_channel.receive()
Expand All @@ -38,7 +38,7 @@ def target():
yield run_in_thread
finally:
with trio.CancelScope(shield=True):
await trio.run_sync_in_worker_thread(executor.shutdown)
await trio.to_thread.run_sync(executor.shutdown)


def protect_with_lock(fn):
Expand Down
21 changes: 16 additions & 5 deletions parsec/core/gui/custom_dialogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
QListView,
)

import trio

from parsec.core.gui.lang import translate as _
from parsec.core.gui import desktop

Expand Down Expand Up @@ -62,11 +64,11 @@ def text(self):

# TODO: If this ever gets used again, it needs to transition to the new job system
class UserInputDialog(QDialog, Ui_InputDialog):
def __init__(self, portal, core, title, message, exclude=None, *args, **kwargs):
def __init__(self, trio_token, core, title, message, exclude=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setupUi(self)
self.core = core
self.portal = portal
self._trio_token = trio_token
self.label_title.setText(title)
self.label_message.setText(message)
self.line_edit_text.setPlaceholderText(_("LABEL_USER_NAME"))
Expand All @@ -87,7 +89,11 @@ def text_changed(self, text):
def show_auto_complete(self):
self.timer.stop()
if len(self.line_edit_text.text()):
users = self.portal.run(self.core.backend_cmds.user_find, self.line_edit_text.text())
users = trio.from_thread.run(
self.core.backend_cmds.user_find,
self.line_edit_text.text(),
trio_token=self._trio_token,
)
if self.exclude:
users = [u for u in users if u not in self.exclude]
completer = QCompleter(users)
Expand All @@ -101,9 +107,14 @@ def text(self):
return self.line_edit_text.text()


def get_user_name(parent, portal, core, title, message, exclude=None):
def get_user_name(parent, trio_token, core, title, message, exclude=None):
m = UserInputDialog(
core=core, portal=portal, title=title, message=message, exclude=exclude, parent=parent
core=core,
trio_token=trio_token,
title=title,
message=message,
exclude=exclude,
parent=parent,
)
status = m.exec_()
if status == QDialog.Accepted:
Expand Down
2 changes: 1 addition & 1 deletion parsec/core/gui/new_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _extract_version_tuple(raw):

async def _do_check_new_version(url):
# urlopen automatically follows redirections
resolved_url = await trio.run_sync_in_worker_thread(
resolved_url = await trio.to_thread.run_sync(
lambda: urlopen(Request(url, method="HEAD")).geturl()
)

Expand Down
32 changes: 9 additions & 23 deletions parsec/core/gui/trio_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def emit(self, *args):


class QtToTrioJob:
def __init__(self, portal, fn, args, kwargs, qt_on_success, qt_on_error):
self._portal = portal
def __init__(self, trio_token, fn, args, kwargs, qt_on_success, qt_on_error):
self._trio_token = trio_token
assert isinstance(qt_on_success, ThreadSafeQtSignal)
assert qt_on_success.args_types in ((), (QtToTrioJob,))
self._qt_on_success = qt_on_success
Expand Down Expand Up @@ -147,22 +147,22 @@ def _set_done(self):
def cancel_and_join(self):
assert self.cancel_scope
try:
self._portal.run_sync(self.cancel_scope.cancel)
trio.from_thread.run_sync(self.cancel_scope.cancel, trio_token=self._trio_token)
except trio.RunFinishedError:
pass
self._done.wait()


class QtToTrioJobScheduler:
def __init__(self):
self._portal = None
self._trio_token = None
self._cancel_scope = None
self.started = threading.Event()
self._stopped = trio.Event()

async def _start(self, *, task_status=trio.TASK_STATUS_IGNORED):
assert not self.started.is_set()
self._portal = trio.BlockingTrioPortal()
self._trio_token = trio.hazmat.current_trio_token()
self._send_job_channel, recv_job_channel = trio.open_memory_channel(1)
try:
async with trio.open_service_nursery() as nursery, recv_job_channel:
Expand All @@ -184,16 +184,16 @@ async def _stop(self):

def stop(self):
try:
self._portal.run(self._stop)
trio.from_thread.run(self._stop, trio_token=self._trio_token)
except trio.RunFinishedError:
pass

def _run_job(self, job, *args, sync=False):
try:
if sync:
return self._portal.run_sync(job, *args)
return trio.from_thread.run_sync(job, *args, trio_token=self._trio_token)
else:
return self._portal.run(job, *args)
return trio.from_thread.run(job, *args, trio_token=self._trio_token)

except trio.BrokenResourceError:
logger.info(f"The submitted job `{job}` won't run as the scheduler is stopped")
Expand All @@ -207,7 +207,7 @@ def submit_job(self, qt_on_success, qt_on_error, fn, *args, **kwargs):
# Fool-proof sanity check, signals must be wrapped in `ThreadSafeQtSignal`
assert not [x for x in args if isinstance(x, pyqtBoundSignal)]
assert not [v for v in kwargs.values() if isinstance(v, pyqtBoundSignal)]
job = QtToTrioJob(self._portal, fn, args, kwargs, qt_on_success, qt_on_error)
job = QtToTrioJob(self._trio_token, fn, args, kwargs, qt_on_success, qt_on_error)

async def _submit_job():
# While inside this async function we are blocking the Qt thread
Expand Down Expand Up @@ -242,25 +242,12 @@ def run_sync(self, fn, *args):
return self._run_job(fn, *args, sync=True)


# TODO: Running the trio loop in a QThread shouldn't be needed
# make sure it's the case, then remove this dead code
# class QtToTrioJobSchedulerThread(QThread):
# def __init__(self, job_scheduler):
# super().__init__()
# self.job_scheduler = job_scheduler

# def run(self):
# trio_run(self.job_scheduler._start)


@contextmanager
def run_trio_thread():
job_scheduler = QtToTrioJobScheduler()
thread = threading.Thread(target=trio_run, args=[job_scheduler._start])
thread.setName("TrioLoop")
thread.start()
# thread = QtToTrioJobSchedulerThread(job_scheduler)
# thread.start()
job_scheduler.started.wait()

try:
Expand All @@ -269,4 +256,3 @@ def run_trio_thread():
finally:
job_scheduler.stop()
thread.join()
# thread.wait()
9 changes: 5 additions & 4 deletions parsec/core/messages_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def _on_backend_online(event):
backend_online_event.set()

def _on_backend_offline(event):
backend_online_event.clear()
nonlocal backend_online_event
backend_online_event = trio.Event()
if process_message_cancel_scope:
process_message_cancel_scope.cancel()

Expand Down Expand Up @@ -62,11 +63,11 @@ async def _process_last_messages():

while True:
await msg_arrived.wait()
msg_arrived.clear()
msg_arrived = trio.Event()
await freeze_messages_monitor_mockpoint()
await _process_last_messages()

except FSBackendOfflineError:
backend_online_event.clear()
backend_online_event = trio.Event()
process_message_cancel_scope = None
msg_arrived.clear()
msg_arrived = trio.Event()
8 changes: 4 additions & 4 deletions parsec/core/mountpoint/fuse_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ async def fuse_mountpoint_runner(
"""
fuse_thread_started = threading.Event()
fuse_thread_stopped = threading.Event()
portal = trio.BlockingTrioPortal()
fs_access = ThreadFSAccess(portal, workspace_fs)
trio_token = trio.hazmat.current_trio_token()
fs_access = ThreadFSAccess(trio_token, workspace_fs)
fuse_operations = FuseOperations(event_bus, fs_access)

mountpoint_path, initial_st_dev = await _bootstrap_mountpoint(
Expand Down Expand Up @@ -141,7 +141,7 @@ def _run_fuse_thread():
# restore the signals to their previous state once the fuse instance is started.
with _reset_signals():
nursery.start_soon(
lambda: trio.run_sync_in_worker_thread(_run_fuse_thread, cancellable=True)
lambda: trio.to_thread.run_sync(_run_fuse_thread, cancellable=True)
)
await _wait_for_fuse_ready(mountpoint_path, fuse_thread_started, initial_st_dev)

Expand Down Expand Up @@ -185,5 +185,5 @@ async def _stop_fuse_thread(mountpoint_path, fuse_operations, fuse_thread_stoppe
await trio.Path(mountpoint_path / "__shutdown_fuse__").exists()
except OSError:
pass
await trio.run_sync_in_worker_thread(fuse_thread_stopped.wait)
await trio.to_thread.run_sync(fuse_thread_stopped.wait)
logger.info("Fuse thread stopped", mountpoint=mountpoint_path)
44 changes: 26 additions & 18 deletions parsec/core/mountpoint/thread_fs_access.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,75 @@
# Parsec Cloud (https://parsec.cloud) Copyright (c) AGPLv3 2019 Scille SAS

import trio


class ThreadFSAccess:
def __init__(self, portal, workspace_fs):
def __init__(self, trio_token, workspace_fs):
self.workspace_fs = workspace_fs
self._portal = portal
self._trio_token = trio_token

def _run(self, fn, *args):
return trio.from_thread.run(fn, *args, trio_token=self._trio_token)

def _run_sync(self, fn, *args):
return trio.from_thread.run_sync(fn, *args, trio_token=self._trio_token)

# Rights check

def check_read_rights(self, path):
return self._portal.run_sync(self.workspace_fs.transactions.check_read_rights, path)
return self._run_sync(self.workspace_fs.transactions.check_read_rights, path)

def check_write_rights(self, path):
return self._portal.run_sync(self.workspace_fs.transactions.check_write_rights, path)
return self._run_sync(self.workspace_fs.transactions.check_write_rights, path)

# Entry transactions

def entry_info(self, path):
return self._portal.run(self.workspace_fs.transactions.entry_info, path)
return self._run(self.workspace_fs.transactions.entry_info, path)

def entry_rename(self, source, destination, *, overwrite):
return self._portal.run(
return self._run(
self.workspace_fs.transactions.entry_rename, source, destination, overwrite
)

# Folder transactions

def folder_create(self, path):
return self._portal.run(self.workspace_fs.transactions.folder_create, path)
return self._run(self.workspace_fs.transactions.folder_create, path)

def folder_delete(self, path):
return self._portal.run(self.workspace_fs.transactions.folder_delete, path)
return self._run(self.workspace_fs.transactions.folder_delete, path)

# File transactions

def file_create(self, path, *, open):
return self._portal.run(self.workspace_fs.transactions.file_create, path, open)
return self._run(self.workspace_fs.transactions.file_create, path, open)

def file_open(self, path, *, mode):
return self._portal.run(self.workspace_fs.transactions.file_open, path, mode)
return self._run(self.workspace_fs.transactions.file_open, path, mode)

def file_delete(self, path):
return self._portal.run(self.workspace_fs.transactions.file_delete, path)
return self._run(self.workspace_fs.transactions.file_delete, path)

def file_resize(self, path, length):
return self._portal.run(self.workspace_fs.transactions.file_resize, path, length)
return self._run(self.workspace_fs.transactions.file_resize, path, length)

# File descriptor transactions

def fd_close(self, fh):
return self._portal.run(self.workspace_fs.transactions.fd_close, fh)
return self._run(self.workspace_fs.transactions.fd_close, fh)

def fd_seek(self, fh, offset):
return self._portal.run(self.workspace_fs.transactions.fd_seek, fh, offset)
return self._run(self.workspace_fs.transactions.fd_seek, fh, offset)

def fd_read(self, fh, size, offset):
return self._portal.run(self.workspace_fs.transactions.fd_read, fh, size, offset)
return self._run(self.workspace_fs.transactions.fd_read, fh, size, offset)

def fd_write(self, fh, data, offset):
return self._portal.run(self.workspace_fs.transactions.fd_write, fh, data, offset)
return self._run(self.workspace_fs.transactions.fd_write, fh, data, offset)

def fd_resize(self, fh, length):
return self._portal.run(self.workspace_fs.transactions.fd_resize, fh, length)
return self._run(self.workspace_fs.transactions.fd_resize, fh, length)

def fd_flush(self, fh):
return self._portal.run(self.workspace_fs.transactions.fd_flush, fh)
return self._run(self.workspace_fs.transactions.fd_flush, fh)
Loading

0 comments on commit 071f7bf

Please sign in to comment.