From 38d3cdbc7a532995344a1cd3f8877aeef00908fb Mon Sep 17 00:00:00 2001 From: Nathan Zimmerman Date: Tue, 5 Nov 2024 12:54:11 -0600 Subject: [PATCH 1/5] Add async wrapper for sync FS --- fsspec/implementations/asyn_wrapper.py | 80 +++++++++++ .../tests/test_asyn_wrapper.py | 131 ++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 fsspec/implementations/asyn_wrapper.py create mode 100644 fsspec/implementations/tests/test_asyn_wrapper.py diff --git a/fsspec/implementations/asyn_wrapper.py b/fsspec/implementations/asyn_wrapper.py new file mode 100644 index 000000000..a7e40606d --- /dev/null +++ b/fsspec/implementations/asyn_wrapper.py @@ -0,0 +1,80 @@ +import asyncio +import functools +from fsspec.asyn import AsyncFileSystem + + +def async_wrapper(func, obj=None): + """ + Wraps a synchronous function to make it awaitable. + + Parameters + ---------- + func : callable + The synchronous function to wrap. + obj : object, optional + The instance to bind the function to, if applicable. + + Returns + ------- + coroutine + An awaitable version of the function. + """ + @functools.wraps(func) + async def wrapper(*args, **kwargs): + self = obj or args[0] + return await asyncio.to_thread(func, *args, **kwargs) + return wrapper + + +class AsyncFileSystemWrapper(AsyncFileSystem): + """ + A wrapper class to convert a synchronous filesystem into an asynchronous one. + + This class takes an existing synchronous filesystem implementation and wraps all + its methods to provide an asynchronous interface. + + Parameters + ---------- + sync_fs : AbstractFileSystem + The synchronous filesystem instance to wrap. + """ + def __init__(self, sync_fs, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fs = sync_fs + self._wrap_all_sync_methods() + + def _wrap_all_sync_methods(self): + """ + Wrap all synchronous methods of the underlying filesystem with asynchronous versions. + """ + for method_name in dir(self.fs): + if method_name.startswith("_"): + continue + method = getattr(self.fs, method_name) + if callable(method) and not asyncio.iscoroutinefunction(method): + async_method = async_wrapper(method, obj=self) + setattr(self, f"_{method_name}", async_method) + + @classmethod + def wrap_class(cls, sync_fs_class): + """ + Create a new class that can be used to instantiate an AsyncFileSystemWrapper + with lazy instantiation of the underlying synchronous filesystem. + + Parameters + ---------- + sync_fs_class : type + The class of the synchronous filesystem to wrap. + + Returns + ------- + type + A new class that wraps the provided synchronous filesystem class. + """ + class GeneratedAsyncFileSystemWrapper(cls): + def __init__(self, *args, **kwargs): + sync_fs = sync_fs_class(*args, **kwargs) + super().__init__(sync_fs) + + GeneratedAsyncFileSystemWrapper.__name__ = f"Async{sync_fs_class.__name__}Wrapper" + return GeneratedAsyncFileSystemWrapper \ No newline at end of file diff --git a/fsspec/implementations/tests/test_asyn_wrapper.py b/fsspec/implementations/tests/test_asyn_wrapper.py new file mode 100644 index 000000000..6305c574e --- /dev/null +++ b/fsspec/implementations/tests/test_asyn_wrapper.py @@ -0,0 +1,131 @@ +import asyncio +import pytest +import os + +import fsspec +from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper +from fsspec.implementations.local import LocalFileSystem +from .test_local import csv_files, filetexts + + +def test_is_async(): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + assert async_fs.async_impl == True + + +def test_class_wrapper(): + fs_cls = LocalFileSystem + async_fs_cls = AsyncFileSystemWrapper.wrap_class(fs_cls) + assert async_fs_cls.__name__ == "AsyncLocalFileSystemWrapper" + async_fs = async_fs_cls() + assert async_fs.async_impl == True + + +@pytest.mark.asyncio +async def test_cats(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + result = await async_fs._cat(".test.fakedata.1.csv") + assert result == b"a,b\n1,2\n" + + out = set((await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"])).values()) + assert out == {b"a,b\n1,2\n", b"a,b\n3,4\n"} + + result = await async_fs._cat(".test.fakedata.1.csv", None, None) + assert result == b"a,b\n1,2\n" + + result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=6) + assert result == b"a,b\n1,2\n"[1:6] + + result = await async_fs._cat(".test.fakedata.1.csv", start=-1) + assert result == b"a,b\n1,2\n"[-1:] + + result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=-2) + assert result == b"a,b\n1,2\n"[1:-2] + + # test synchronous API is available as expected + result = async_fs.cat(".test.fakedata.1.csv", start=1, end=-2) + assert result == b"a,b\n1,2\n"[1:-2] + + out = set( + (await async_fs._cat( + [".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1 + )).values() + ) + assert out == {b"a,b\n1,2\n"[1:-1], b"a,b\n3,4\n"[1:-1]} + +@pytest.mark.asyncio +async def test_basic_crud_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + await async_fs._touch(".test.fakedata.3.csv") + assert await async_fs._exists(".test.fakedata.3.csv") + + data = await async_fs._cat(".test.fakedata.1.csv") + assert data == b"a,b\n1,2\n" + + await async_fs._pipe(".test.fakedata.1.csv", b"a,b\n5,6\n") + data = await async_fs._cat(".test.fakedata.1.csv") + assert data == b"a,b\n5,6\n" + + await async_fs._rm(".test.fakedata.1.csv") + assert not await async_fs._exists(".test.fakedata.1.csv") + +@pytest.mark.asyncio +async def test_error_handling(): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + with pytest.raises(FileNotFoundError): + await async_fs._cat(".test.non_existent.csv") + + with pytest.raises(FileNotFoundError): + await async_fs._rm(".test.non_existent.csv") + +@pytest.mark.asyncio +async def test_concurrent_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + async def read_file(file_path): + return await async_fs._cat(file_path) + + results = await asyncio.gather( + read_file(".test.fakedata.1.csv"), + read_file(".test.fakedata.2.csv"), + read_file(".test.fakedata.1.csv") + ) + + assert results == [b"a,b\n1,2\n", b"a,b\n3,4\n", b"a,b\n1,2\n"] + +@pytest.mark.asyncio +async def test_directory_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + await async_fs._makedirs("new_directory") + assert await async_fs._isdir("new_directory") + + files = await async_fs._ls(".") + filenames = [os.path.basename(file) for file in files] + + assert ".test.fakedata.1.csv" in filenames + assert ".test.fakedata.2.csv" in filenames + assert "new_directory" in filenames + +@pytest.mark.asyncio +async def test_batch_operations(): + with filetexts(csv_files, mode="b"): + fs = fsspec.filesystem("file") + async_fs = AsyncFileSystemWrapper(fs) + + await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"]) + assert not await async_fs._exists(".test.fakedata.1.csv") + assert not await async_fs._exists(".test.fakedata.2.csv") \ No newline at end of file From 394e03289bda73a942f8ff73889e45067800eac4 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerman Date: Tue, 5 Nov 2024 13:23:29 -0600 Subject: [PATCH 2/5] Avoid trying to async-wrap properties --- fsspec/implementations/asyn_wrapper.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/fsspec/implementations/asyn_wrapper.py b/fsspec/implementations/asyn_wrapper.py index a7e40606d..af38ba1a7 100644 --- a/fsspec/implementations/asyn_wrapper.py +++ b/fsspec/implementations/asyn_wrapper.py @@ -1,4 +1,5 @@ import asyncio +import inspect import functools from fsspec.asyn import AsyncFileSystem @@ -43,6 +44,10 @@ def __init__(self, sync_fs, *args, **kwargs): self.fs = sync_fs self._wrap_all_sync_methods() + @property + def fsid(self): + return f"async_{self.fs.fsid}" + def _wrap_all_sync_methods(self): """ Wrap all synchronous methods of the underlying filesystem with asynchronous versions. @@ -50,6 +55,11 @@ def _wrap_all_sync_methods(self): for method_name in dir(self.fs): if method_name.startswith("_"): continue + + attr = inspect.getattr_static(self.fs, method_name) + if isinstance(attr, property): + continue + method = getattr(self.fs, method_name) if callable(method) and not asyncio.iscoroutinefunction(method): async_method = async_wrapper(method, obj=self) From 8197465afc708846b79b2a32f5311045d38cb2ac Mon Sep 17 00:00:00 2001 From: Nathan Zimmerman Date: Wed, 6 Nov 2024 12:59:32 -0600 Subject: [PATCH 3/5] Set asynchronous to true on asyncwrapper --- fsspec/implementations/asyn_wrapper.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fsspec/implementations/asyn_wrapper.py b/fsspec/implementations/asyn_wrapper.py index af38ba1a7..7258be690 100644 --- a/fsspec/implementations/asyn_wrapper.py +++ b/fsspec/implementations/asyn_wrapper.py @@ -41,6 +41,7 @@ class AsyncFileSystemWrapper(AsyncFileSystem): """ def __init__(self, sync_fs, *args, **kwargs): super().__init__(*args, **kwargs) + self.asynchronous = True self.fs = sync_fs self._wrap_all_sync_methods() From 6b6b0e6ee842d101e80423259e472814cbaf5dad Mon Sep 17 00:00:00 2001 From: Nathan Zimmerman Date: Tue, 12 Nov 2024 11:44:53 -0600 Subject: [PATCH 4/5] Add docs; remove py3.8 from ci --- .github/workflows/main.yaml | 1 - docs/source/async.rst | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 653eb8084..f9e5ed98c 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -14,7 +14,6 @@ jobs: fail-fast: false matrix: PY: - - "3.8" - "3.9" - "3.10" - "3.11" diff --git a/docs/source/async.rst b/docs/source/async.rst index 58334b333..dc44df381 100644 --- a/docs/source/async.rst +++ b/docs/source/async.rst @@ -152,3 +152,37 @@ available as the attribute ``.loop``. + +AsyncFileSystemWrapper +---------------------- + +The `AsyncFileSystemWrapper` class is an experimental feature that allows you to convert +a synchronous filesystem into an asynchronous one. This is useful for quickly integrating +synchronous filesystems into workflows that may expect `AsyncFileSystem` instances. + +Basic Usage +~~~~~~~~~~~ + +To use `AsyncFileSystemWrapper`, wrap any synchronous filesystem to work in an asynchronous context. +In this example, the synchronous `LocalFileSystem` is wrapped, creating an `AsyncFileSystem` instance +backed by the normal, synchronous methods of `LocalFileSystem`: + +.. code-block:: python + + import asyncio + import fsspec + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + async def async_copy_file(): + sync_fs = fsspec.filesystem('file') # by-default synchronous, local filesystem + async_fs = AsyncFileSystemWrapper(sync_fs) + return await async_fs._copy('/source/file.txt', '/destination/file.txt') + + asyncio.run(async_copy_file()) + +Limitations +----------- + +This is experimental. Users should not expect this wrapper to magically make things faster. +It is primarily provided to allow usage of synchronous filesystems with interfaces that expect +`AsyncFileSystem` instances. From 34074c17c677d66f49157c097b1c14a7786ecd13 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 12 Nov 2024 13:33:34 -0500 Subject: [PATCH 5/5] lint --- fsspec/implementations/asyn_wrapper.py | 11 +++++-- .../tests/test_asyn_wrapper.py | 29 +++++++++++++------ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/fsspec/implementations/asyn_wrapper.py b/fsspec/implementations/asyn_wrapper.py index 7258be690..9ba7811ce 100644 --- a/fsspec/implementations/asyn_wrapper.py +++ b/fsspec/implementations/asyn_wrapper.py @@ -20,10 +20,11 @@ def async_wrapper(func, obj=None): coroutine An awaitable version of the function. """ + @functools.wraps(func) async def wrapper(*args, **kwargs): - self = obj or args[0] return await asyncio.to_thread(func, *args, **kwargs) + return wrapper @@ -39,6 +40,7 @@ class AsyncFileSystemWrapper(AsyncFileSystem): sync_fs : AbstractFileSystem The synchronous filesystem instance to wrap. """ + def __init__(self, sync_fs, *args, **kwargs): super().__init__(*args, **kwargs) self.asynchronous = True @@ -82,10 +84,13 @@ def wrap_class(cls, sync_fs_class): type A new class that wraps the provided synchronous filesystem class. """ + class GeneratedAsyncFileSystemWrapper(cls): def __init__(self, *args, **kwargs): sync_fs = sync_fs_class(*args, **kwargs) super().__init__(sync_fs) - GeneratedAsyncFileSystemWrapper.__name__ = f"Async{sync_fs_class.__name__}Wrapper" - return GeneratedAsyncFileSystemWrapper \ No newline at end of file + GeneratedAsyncFileSystemWrapper.__name__ = ( + f"Async{sync_fs_class.__name__}Wrapper" + ) + return GeneratedAsyncFileSystemWrapper diff --git a/fsspec/implementations/tests/test_asyn_wrapper.py b/fsspec/implementations/tests/test_asyn_wrapper.py index 6305c574e..d29202003 100644 --- a/fsspec/implementations/tests/test_asyn_wrapper.py +++ b/fsspec/implementations/tests/test_asyn_wrapper.py @@ -11,7 +11,7 @@ def test_is_async(): fs = fsspec.filesystem("file") async_fs = AsyncFileSystemWrapper(fs) - assert async_fs.async_impl == True + assert async_fs.async_impl def test_class_wrapper(): @@ -19,7 +19,7 @@ def test_class_wrapper(): async_fs_cls = AsyncFileSystemWrapper.wrap_class(fs_cls) assert async_fs_cls.__name__ == "AsyncLocalFileSystemWrapper" async_fs = async_fs_cls() - assert async_fs.async_impl == True + assert async_fs.async_impl @pytest.mark.asyncio @@ -27,11 +27,15 @@ async def test_cats(): with filetexts(csv_files, mode="b"): fs = fsspec.filesystem("file") async_fs = AsyncFileSystemWrapper(fs) - + result = await async_fs._cat(".test.fakedata.1.csv") assert result == b"a,b\n1,2\n" - out = set((await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"])).values()) + out = set( + ( + await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"]) + ).values() + ) assert out == {b"a,b\n1,2\n", b"a,b\n3,4\n"} result = await async_fs._cat(".test.fakedata.1.csv", None, None) @@ -51,12 +55,15 @@ async def test_cats(): assert result == b"a,b\n1,2\n"[1:-2] out = set( - (await async_fs._cat( - [".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1 - )).values() + ( + await async_fs._cat( + [".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1 + ) + ).values() ) assert out == {b"a,b\n1,2\n"[1:-1], b"a,b\n3,4\n"[1:-1]} + @pytest.mark.asyncio async def test_basic_crud_operations(): with filetexts(csv_files, mode="b"): @@ -76,6 +83,7 @@ async def test_basic_crud_operations(): await async_fs._rm(".test.fakedata.1.csv") assert not await async_fs._exists(".test.fakedata.1.csv") + @pytest.mark.asyncio async def test_error_handling(): fs = fsspec.filesystem("file") @@ -87,6 +95,7 @@ async def test_error_handling(): with pytest.raises(FileNotFoundError): await async_fs._rm(".test.non_existent.csv") + @pytest.mark.asyncio async def test_concurrent_operations(): with filetexts(csv_files, mode="b"): @@ -99,11 +108,12 @@ async def read_file(file_path): results = await asyncio.gather( read_file(".test.fakedata.1.csv"), read_file(".test.fakedata.2.csv"), - read_file(".test.fakedata.1.csv") + read_file(".test.fakedata.1.csv"), ) assert results == [b"a,b\n1,2\n", b"a,b\n3,4\n", b"a,b\n1,2\n"] + @pytest.mark.asyncio async def test_directory_operations(): with filetexts(csv_files, mode="b"): @@ -120,6 +130,7 @@ async def test_directory_operations(): assert ".test.fakedata.2.csv" in filenames assert "new_directory" in filenames + @pytest.mark.asyncio async def test_batch_operations(): with filetexts(csv_files, mode="b"): @@ -128,4 +139,4 @@ async def test_batch_operations(): await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"]) assert not await async_fs._exists(".test.fakedata.1.csv") - assert not await async_fs._exists(".test.fakedata.2.csv") \ No newline at end of file + assert not await async_fs._exists(".test.fakedata.2.csv")