Skip to content

Commit

Permalink
feat: split bucket from path + construct store
Browse files Browse the repository at this point in the history
constructe store with from_url using protocol and bucket name
  • Loading branch information
machichima committed Feb 3, 2025
1 parent 1bf47c6 commit 909b5b0
Showing 1 changed file with 54 additions and 3 deletions.
57 changes: 54 additions & 3 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

import asyncio
from collections import defaultdict
from functools import lru_cache
from typing import Any, Coroutine, Dict, List, Tuple

import fsspec.asyn
import fsspec.spec

import obstore as obs
from obstore.store import S3Store, GCSStore, AzureStore


class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):
Expand All @@ -45,6 +47,9 @@ def __init__(
self,
store: obs.store.ObjectStore,
*args,
config: dict[str, Any] = {},
client_options: dict[str, Any] = {},
retry_config: dict[str, Any] = {},
asynchronous: bool = False,
loop=None,
batch_size: int | None = None,
Expand Down Expand Up @@ -76,10 +81,53 @@ def __init__(
"""

self.store = store

self.config = config
self.client_options = client_options
self.retry_config = retry_config

super().__init__(
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size
)

def _split_path(self, path: str) -> Tuple[str, str]:
"""
Split bucket and file path
Args:
path (str): Input path, like `s3://mybucket/path/to/file`
Examples:
>>> split_path("s3://mybucket/path/to/file")
['mybucket', 'path/to/file']
"""

store_with_bucket = (S3Store, GCSStore, AzureStore)

if (
not isinstance(self.store, store_with_bucket) # instance
and not self.store in store_with_bucket # not instantiation
):
# no bucket name in path
return "", path

if "/" not in path:
return path, ""
else:
path_li = path.split("/")
bucket = path_li[0]
file_path = "/".join(path_li[1:])
return (bucket, file_path)

@lru_cache(maxsize=10)
def _construct_store(self, bucket: str):
return self.store.from_url(
f"{self.protocol}://{bucket}",
**self.config,
client_options=self.client_options,
retry_config=self.retry_config if self.retry_config else None,
)

async def _rm_file(self, path, **kwargs):
return await obs.delete_async(self.store, path)

Expand All @@ -90,11 +138,14 @@ async def _pipe_file(self, path, value, **kwargs):
return await obs.put_async(self.store, path, value)

async def _cat_file(self, path, start=None, end=None, **kwargs):
bucket, path = self._split_path(path)
store = self._construct_store(bucket)

if start is None and end is None:
resp = await obs.get_async(self.store, path)
return await resp.bytes_async()
resp = await obs.get_async(store, path)
return (await resp.bytes_async()).to_bytes()

range_bytes = await obs.get_range_async(self.store, path, start=start, end=end)
range_bytes = await obs.get_range_async(store, path, start=start, end=end)
return range_bytes.to_bytes()

async def _cat_ranges(
Expand Down

0 comments on commit 909b5b0

Please sign in to comment.