Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Create obstore store in fsspec on demand #198

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

import asyncio
from collections import defaultdict
from functools import lru_cache
from typing import Any, Coroutine, Dict, List, Tuple

import fsspec.asyn
import fsspec.spec

import obstore as obs
from obstore.store import S3Store, GCSStore, AzureStore


class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):
Expand All @@ -45,6 +47,9 @@ def __init__(
self,
store: obs.store.ObjectStore,
*args,
config: dict[str, Any] = {},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allow these, store should be optional?

And before merge we should enable typing overloads for better typing. You can see how from_url is implemented

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use store here for deciding the store Interface (whether it is S3Store, GCSStore, ...), so that in AsyncFsspecStore we don't need to decide the interface based on the protocol.

Maybe there's a better way of deciding the store interface?

obstore_fs: AsyncFsspecStore = fsspec.filesystem(
    "s3",
    store=S3Store,
    config={
        "endpoint": "http://localhost:30002",
        "access_key_id": "minio",
        "secret_access_key": "miniostorage",
        "virtual_hosted_style_request": True,  # path contain bucket name
    },
    client_options={"timeout": "99999s", "allow_http": "true"},
    retry_config={
        "max_retries": 2,
        "backoff": {
            "base": 2,
            "init_backoff": timedelta(seconds=2),
            "max_backoff": timedelta(seconds=16),
        },
        "retry_timeout": timedelta(minutes=3),
    },
)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look at the typing later on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's confusing because store is the type of the class and not an instance.

We should be able to use the from_url top level function directly here?

client_options: dict[str, Any] = {},
retry_config: dict[str, Any] = {},
asynchronous: bool = False,
loop=None,
batch_size: int | None = None,
Expand Down Expand Up @@ -76,10 +81,53 @@ def __init__(
"""

self.store = store

self.config = config
self.client_options = client_options
self.retry_config = retry_config

super().__init__(
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size
)

def _split_path(self, path: str) -> Tuple[str, str]:
"""
Split bucket and file path

Args:
path (str): Input path, like `s3://mybucket/path/to/file`

Examples:
>>> split_path("s3://mybucket/path/to/file")
['mybucket', 'path/to/file']
"""

store_with_bucket = (S3Store, GCSStore, AzureStore)

if (
not isinstance(self.store, store_with_bucket) # instance
and not self.store in store_with_bucket # not instantiation
):
# no bucket name in path
return "", path

if "/" not in path:
return path, ""
else:
path_li = path.split("/")
bucket = path_li[0]
file_path = "/".join(path_li[1:])
return (bucket, file_path)

@lru_cache(maxsize=10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if this cache size could be user specified but we can come back to it

def _construct_store(self, bucket: str):
return self.store.from_url(
f"{self.protocol}://{bucket}",
**self.config,
client_options=self.client_options,
retry_config=self.retry_config if self.retry_config else None,
)

async def _rm_file(self, path, **kwargs):
return await obs.delete_async(self.store, path)

Expand All @@ -90,11 +138,14 @@ async def _pipe_file(self, path, value, **kwargs):
return await obs.put_async(self.store, path, value)

async def _cat_file(self, path, start=None, end=None, **kwargs):
bucket, path = self._split_path(path)
store = self._construct_store(bucket)

if start is None and end is None:
resp = await obs.get_async(self.store, path)
return await resp.bytes_async()
resp = await obs.get_async(store, path)
return (await resp.bytes_async()).to_bytes()

range_bytes = await obs.get_range_async(self.store, path, start=start, end=end)
range_bytes = await obs.get_range_async(store, path, start=start, end=end)
return range_bytes.to_bytes()

async def _cat_ranges(
Expand Down
Loading