diff --git a/pyproject.toml b/pyproject.toml index 14612f91..3dbfab9a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ dev = [ "pre-commit>=3.3.3", "pytest>=7.4.0", "pytest-cov>=4.1.0", + "pytest-asyncio", "pydoclint", # for integration tests. "pandas[parquet]", diff --git a/requirements-dev.txt b/requirements-dev.txt index b799e7fa..e3051f64 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -29,6 +29,7 @@ pydantic-core==2.18.4 pydoclint==0.5.3 pyproject-hooks==1.1.0 pytest==8.2.2 +pytest-asyncio==0.23.7 pytest-cov==5.0.0 python-dateutil==2.9.0.post0 pytz==2024.1 diff --git a/src/lakefs_spec/asyn/__init__.py b/src/lakefs_spec/asyn/__init__.py new file mode 100644 index 00000000..080de97d --- /dev/null +++ b/src/lakefs_spec/asyn/__init__.py @@ -0,0 +1,3 @@ +from .spec import AsyncLakeFSFileSystem + +__all__ = ["AsyncLakeFSFileSystem"] diff --git a/src/lakefs_spec/asyn/spec.py b/src/lakefs_spec/asyn/spec.py new file mode 100644 index 00000000..e1cac610 --- /dev/null +++ b/src/lakefs_spec/asyn/spec.py @@ -0,0 +1,69 @@ +from typing import Any + +from fsspec.asyn import AsyncFileSystem + +from lakefs_spec import LakeFSFileSystem, LakeFSTransaction +from lakefs_spec.util import async_wrapper + + +class AsyncLakeFSFileSystem(AsyncFileSystem): + """Asynchronous wrapper around a LakeFSFileSystem""" + + protocol = "lakefs" + transaction_type = LakeFSTransaction + + def __init__( + self, + host: str | None = None, + username: str | None = None, + password: str | None = None, + api_key: str | None = None, + api_key_prefix: str | None = None, + access_token: str | None = None, + verify_ssl: bool = True, + ssl_ca_cert: str | None = None, + proxy: str | None = None, + create_branch_ok: bool = True, + source_branch: str = "main", + **storage_options: Any, + ): + super().__init__(**storage_options) + + self._sync_fs = LakeFSFileSystem( + host, + username, + password, + api_key, + api_key_prefix, + access_token, + verify_ssl, + ssl_ca_cert, + proxy, + create_branch_ok, + source_branch, + **storage_options, + ) + + async def _rm_file(self, path, **kwargs): + return async_wrapper(self._sync_fs.rm_file)(path) + + async def _cp_file(self, path1, path2, **kwargs): + return async_wrapper(self._sync_fs.cp_file)(path1, path2, **kwargs) + + async def _pipe_file(self, path, value, **kwargs): + return async_wrapper(self._sync_fs.pipe_file)(path, value, **kwargs) + + async def _cat_file(self, path, start=None, end=None, **kwargs): + return async_wrapper(self._sync_fs.cat_file)(path, start, end, **kwargs) + + async def _put_file(self, lpath, rpath, **kwargs): + return async_wrapper(self._sync_fs.put_file)(lpath, rpath, **kwargs) + + async def _get_file(self, rpath, lpath, **kwargs): + return async_wrapper(self._sync_fs.get_file)(rpath, lpath, **kwargs) + + async def _info(self, path, **kwargs): + return async_wrapper(self._sync_fs.info)(path, **kwargs) + + async def _ls(self, path, detail=True, **kwargs): + return async_wrapper(self._sync_fs.ls)(path, detail, **kwargs) diff --git a/src/lakefs_spec/util.py b/src/lakefs_spec/util.py index 83422328..bd11628e 100644 --- a/src/lakefs_spec/util.py +++ b/src/lakefs_spec/util.py @@ -4,10 +4,12 @@ from __future__ import annotations +import asyncio +import functools import hashlib import os import re -from typing import Any, Callable, Generator, Protocol +from typing import Any, Callable, Coroutine, Generator, ParamSpec, Protocol, TypeVar from lakefs_sdk import Pagination from lakefs_sdk import __version__ as __lakefs_sdk_version__ @@ -108,3 +110,18 @@ def parse(path: str) -> tuple[str, str, str]: repo, ref, resource = results.groups() return repo, ref, resource + + +P = ParamSpec("P") +T = TypeVar("T") + + +def async_wrapper(fn: Callable[P, T]) -> Callable[P, Coroutine[None, None, T]]: + """Wrap a synchronous function in an asyncio coroutine.""" + + @functools.wraps(fn) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, lambda: fn(*args, **kwargs)) + + return _wrapper diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 00000000..623a13db --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,13 @@ +import pytest + +from lakefs_spec.util import async_wrapper + + +@pytest.mark.asyncio +async def test_async_wrapper(): + def sync_add(n: int) -> int: + return n + 42 + + async_add = async_wrapper(sync_add) + + assert await async_add(42) == sync_add(42)