Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async filesystem implementation #283

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dev = [
"pre-commit>=3.3.3",
"pytest>=7.4.0",
"pytest-cov>=4.1.0",
"pytest-asyncio",
"pydoclint",
# for integration tests.
"pandas[parquet]",
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pydantic-core==2.18.4
pydoclint==0.5.3
pyproject-hooks==1.1.0
pytest==8.2.2
pytest-asyncio==0.23.7
pytest-cov==5.0.0
python-dateutil==2.9.0.post0
pytz==2024.1
Expand Down
3 changes: 3 additions & 0 deletions src/lakefs_spec/asyn/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .spec import AsyncLakeFSFileSystem

Check warning on line 1 in src/lakefs_spec/asyn/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/__init__.py#L1

Added line #L1 was not covered by tests

__all__ = ["AsyncLakeFSFileSystem"]

Check warning on line 3 in src/lakefs_spec/asyn/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/__init__.py#L3

Added line #L3 was not covered by tests
69 changes: 69 additions & 0 deletions src/lakefs_spec/asyn/spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Any

Check warning on line 1 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L1

Added line #L1 was not covered by tests

from fsspec.asyn import AsyncFileSystem

Check warning on line 3 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L3

Added line #L3 was not covered by tests

from lakefs_spec import LakeFSFileSystem, LakeFSTransaction
from lakefs_spec.util import async_wrapper

Check warning on line 6 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L5-L6

Added lines #L5 - L6 were not covered by tests


class AsyncLakeFSFileSystem(AsyncFileSystem):

Check warning on line 9 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L9

Added line #L9 was not covered by tests
"""Asynchronous wrapper around a LakeFSFileSystem"""

protocol = "lakefs"
transaction_type = LakeFSTransaction

Check warning on line 13 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L12-L13

Added lines #L12 - L13 were not covered by tests

def __init__(

Check warning on line 15 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L15

Added line #L15 was not covered by tests
self,
host: str | None = None,
username: str | None = None,
password: str | None = None,
api_key: str | None = None,
api_key_prefix: str | None = None,
access_token: str | None = None,
verify_ssl: bool = True,
ssl_ca_cert: str | None = None,
proxy: str | None = None,
create_branch_ok: bool = True,
source_branch: str = "main",
**storage_options: Any,
):
super().__init__(**storage_options)

Check warning on line 30 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L30

Added line #L30 was not covered by tests

self._sync_fs = LakeFSFileSystem(

Check warning on line 32 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L32

Added line #L32 was not covered by tests
host,
username,
password,
api_key,
api_key_prefix,
access_token,
verify_ssl,
ssl_ca_cert,
proxy,
create_branch_ok,
source_branch,
**storage_options,
)

async def _rm_file(self, path, **kwargs):
return async_wrapper(self._sync_fs.rm_file)(path)

Check warning on line 48 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L47-L48

Added lines #L47 - L48 were not covered by tests

async def _cp_file(self, path1, path2, **kwargs):
return async_wrapper(self._sync_fs.cp_file)(path1, path2, **kwargs)

Check warning on line 51 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L50-L51

Added lines #L50 - L51 were not covered by tests

async def _pipe_file(self, path, value, **kwargs):
return async_wrapper(self._sync_fs.pipe_file)(path, value, **kwargs)

Check warning on line 54 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L53-L54

Added lines #L53 - L54 were not covered by tests

async def _cat_file(self, path, start=None, end=None, **kwargs):
return async_wrapper(self._sync_fs.cat_file)(path, start, end, **kwargs)

Check warning on line 57 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L56-L57

Added lines #L56 - L57 were not covered by tests

async def _put_file(self, lpath, rpath, **kwargs):
return async_wrapper(self._sync_fs.put_file)(lpath, rpath, **kwargs)

Check warning on line 60 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L59-L60

Added lines #L59 - L60 were not covered by tests

async def _get_file(self, rpath, lpath, **kwargs):
return async_wrapper(self._sync_fs.get_file)(rpath, lpath, **kwargs)

Check warning on line 63 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L62-L63

Added lines #L62 - L63 were not covered by tests

async def _info(self, path, **kwargs):
return async_wrapper(self._sync_fs.info)(path, **kwargs)

Check warning on line 66 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L65-L66

Added lines #L65 - L66 were not covered by tests

async def _ls(self, path, detail=True, **kwargs):
return async_wrapper(self._sync_fs.ls)(path, detail, **kwargs)

Check warning on line 69 in src/lakefs_spec/asyn/spec.py

View check run for this annotation

Codecov / codecov/patch

src/lakefs_spec/asyn/spec.py#L68-L69

Added lines #L68 - L69 were not covered by tests
20 changes: 19 additions & 1 deletion src/lakefs_spec/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

from __future__ import annotations

import asyncio
import functools
import hashlib
import os
import re
from typing import Any, Callable, Generator, Protocol
from typing import Any, Callable, Coroutine, Generator, Protocol, TypeVar

from lakefs_sdk import Pagination
from lakefs_sdk import __version__ as __lakefs_sdk_version__
from typing_extensions import ParamSpec
nicholasjng marked this conversation as resolved.
Show resolved Hide resolved

lakefs_sdk_version = tuple(int(v) for v in __lakefs_sdk_version__.split("."))
del __lakefs_sdk_version__
Expand Down Expand Up @@ -108,3 +111,18 @@ def parse(path: str) -> tuple[str, str, str]:

repo, ref, resource = results.groups()
return repo, ref, resource


P = ParamSpec("P")
T = TypeVar("T")


def async_wrapper(fn: Callable[P, T]) -> Callable[P, Coroutine[None, None, T]]:
"""Wrap a synchronous function in an asyncio coroutine."""

@functools.wraps(fn)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: fn(*args, **kwargs))

return _wrapper
13 changes: 13 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest

from lakefs_spec.util import async_wrapper


@pytest.mark.asyncio
async def test_async_wrapper():
def sync_add(n: int) -> int:
return n + 42

async_add = async_wrapper(sync_add)

assert await async_add(42) == sync_add(42)