Skip to content

Commit

Permalink
feat: remove store + add protocol + apply to all methods
Browse files Browse the repository at this point in the history
  • Loading branch information
machichima committed Feb 4, 2025
1 parent 909b5b0 commit 29464a7
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import fsspec.spec

import obstore as obs
from obstore.store import S3Store, GCSStore, AzureStore
from obstore.store import S3Store, GCSStore, AzureStore, from_url


class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):
Expand All @@ -45,7 +45,6 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):

def __init__(
self,
store: obs.store.ObjectStore,
*args,
config: dict[str, Any] = {},
client_options: dict[str, Any] = {},
Expand Down Expand Up @@ -80,8 +79,6 @@ def __init__(
```
"""

self.store = store

self.config = config
self.client_options = client_options
self.retry_config = retry_config
Expand All @@ -102,12 +99,9 @@ def _split_path(self, path: str) -> Tuple[str, str]:
['mybucket', 'path/to/file']
"""

store_with_bucket = (S3Store, GCSStore, AzureStore)
protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs"]

if (
not isinstance(self.store, store_with_bucket) # instance
and not self.store in store_with_bucket # not instantiation
):
if not self.protocol in protocol_with_bucket:
# no bucket name in path
return "", path

Expand All @@ -121,21 +115,28 @@ def _split_path(self, path: str) -> Tuple[str, str]:

@lru_cache(maxsize=10)
def _construct_store(self, bucket: str):
return self.store.from_url(
f"{self.protocol}://{bucket}",
**self.config,
# from obstore.store import from_url
return from_url(
url=f"{self.protocol}://{bucket}",
config=self.config,
client_options=self.client_options,
retry_config=self.retry_config if self.retry_config else None,
)

async def _rm_file(self, path, **kwargs):
return await obs.delete_async(self.store, path)
bucket, path = self._split_path(path)
store = self._construct_store(bucket)
return await obs.delete_async(store, path)

async def _cp_file(self, path1, path2, **kwargs):
return await obs.copy_async(self.store, path1, path2)
bucket, path = self._split_path(path)
store = self._construct_store(bucket)
return await obs.copy_async(store, path1, path2)

async def _pipe_file(self, path, value, **kwargs):
return await obs.put_async(self.store, path, value)
bucket, path = self._split_path(path)
store = self._construct_store(bucket)
return await obs.put_async(store, path, value)

async def _cat_file(self, path, start=None, end=None, **kwargs):
bucket, path = self._split_path(path)
Expand All @@ -158,6 +159,9 @@ async def _cat_ranges(
on_error="return",
**kwargs,
):
bucket, path = self._split_path(path)
store = self._construct_store(bucket)

if isinstance(starts, int):
starts = [starts] * len(paths)
if isinstance(ends, int):
Expand All @@ -173,7 +177,7 @@ async def _cat_ranges(
for path, ranges in per_file_requests.items():
offsets = [r[0] for r in ranges]
ends = [r[1] for r in ranges]
fut = obs.get_ranges_async(self.store, path, starts=offsets, ends=ends)
fut = obs.get_ranges_async(store, path, starts=offsets, ends=ends)
futs.append(fut)

result = await asyncio.gather(*futs)
Expand All @@ -188,17 +192,24 @@ async def _cat_ranges(
return output_buffers

async def _put_file(self, lpath, rpath, **kwargs):
bucket, path = self._split_path(path)
store = self._construct_store(bucket)
with open(lpath, "rb") as f:
await obs.put_async(self.store, rpath, f)
await obs.put_async(store, rpath, f)

async def _get_file(self, rpath, lpath, **kwargs):
bucket, path = self._split_path(path)
store = self._construct_store(bucket)
with open(lpath, "wb") as f:
resp = await obs.get_async(self.store, rpath)
resp = await obs.get_async(store, rpath)
async for buffer in resp.stream():
f.write(buffer)

async def _info(self, path, **kwargs):
head = await obs.head_async(self.store, path)
bucket, path = self._split_path(path)
store = self._construct_store(bucket)

head = await obs.head_async(store, path)
return {
# Required of `info`: (?)
"name": head["path"],
Expand All @@ -211,7 +222,10 @@ async def _info(self, path, **kwargs):
}

async def _ls(self, path, detail=True, **kwargs):
result = await obs.list_with_delimiter_async(self.store, path)
bucket, path = self._split_path(path)
store = self._construct_store(bucket)

result = await obs.list_with_delimiter_async(store, path)
objects = result["objects"]
prefs = result["common_prefixes"]
if detail:
Expand All @@ -229,6 +243,9 @@ async def _ls(self, path, detail=True, **kwargs):

def _open(self, path, mode="rb", **kwargs):
"""Return raw bytes-mode file-like from the file-system"""
bucket, path = self._split_path(path)
store = self._construct_store(bucket)

return BufferedFileSimple(self, path, mode, **kwargs)


Expand Down

0 comments on commit 29464a7

Please sign in to comment.