diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 8d3c198..ed0aca5 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -31,7 +31,7 @@ import fsspec.spec import obstore as obs -from obstore.store import S3Store, GCSStore, AzureStore +from obstore.store import S3Store, GCSStore, AzureStore, from_url class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -45,7 +45,6 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): def __init__( self, - store: obs.store.ObjectStore, *args, config: dict[str, Any] = {}, client_options: dict[str, Any] = {}, @@ -80,8 +79,6 @@ def __init__( ``` """ - self.store = store - self.config = config self.client_options = client_options self.retry_config = retry_config @@ -102,12 +99,9 @@ def _split_path(self, path: str) -> Tuple[str, str]: ['mybucket', 'path/to/file'] """ - store_with_bucket = (S3Store, GCSStore, AzureStore) + protocol_with_bucket = ["s3", "s3a", "gcs", "gs", "abfs"] - if ( - not isinstance(self.store, store_with_bucket) # instance - and not self.store in store_with_bucket # not instantiation - ): + if not self.protocol in protocol_with_bucket: # no bucket name in path return "", path @@ -121,21 +115,28 @@ def _split_path(self, path: str) -> Tuple[str, str]: @lru_cache(maxsize=10) def _construct_store(self, bucket: str): - return self.store.from_url( - f"{self.protocol}://{bucket}", - **self.config, + # from obstore.store import from_url + return from_url( + url=f"{self.protocol}://{bucket}", + config=self.config, client_options=self.client_options, retry_config=self.retry_config if self.retry_config else None, ) async def _rm_file(self, path, **kwargs): - return await obs.delete_async(self.store, path) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return await obs.delete_async(store, path) async def _cp_file(self, path1, path2, **kwargs): - return await obs.copy_async(self.store, path1, path2) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return await obs.copy_async(store, path1, path2) async def _pipe_file(self, path, value, **kwargs): - return await obs.put_async(self.store, path, value) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return await obs.put_async(store, path, value) async def _cat_file(self, path, start=None, end=None, **kwargs): bucket, path = self._split_path(path) @@ -158,6 +159,9 @@ async def _cat_ranges( on_error="return", **kwargs, ): + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + if isinstance(starts, int): starts = [starts] * len(paths) if isinstance(ends, int): @@ -173,7 +177,7 @@ async def _cat_ranges( for path, ranges in per_file_requests.items(): offsets = [r[0] for r in ranges] ends = [r[1] for r in ranges] - fut = obs.get_ranges_async(self.store, path, starts=offsets, ends=ends) + fut = obs.get_ranges_async(store, path, starts=offsets, ends=ends) futs.append(fut) result = await asyncio.gather(*futs) @@ -188,17 +192,24 @@ async def _cat_ranges( return output_buffers async def _put_file(self, lpath, rpath, **kwargs): + bucket, path = self._split_path(path) + store = self._construct_store(bucket) with open(lpath, "rb") as f: - await obs.put_async(self.store, rpath, f) + await obs.put_async(store, rpath, f) async def _get_file(self, rpath, lpath, **kwargs): + bucket, path = self._split_path(path) + store = self._construct_store(bucket) with open(lpath, "wb") as f: - resp = await obs.get_async(self.store, rpath) + resp = await obs.get_async(store, rpath) async for buffer in resp.stream(): f.write(buffer) async def _info(self, path, **kwargs): - head = await obs.head_async(self.store, path) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + + head = await obs.head_async(store, path) return { # Required of `info`: (?) "name": head["path"], @@ -211,7 +222,10 @@ async def _info(self, path, **kwargs): } async def _ls(self, path, detail=True, **kwargs): - result = await obs.list_with_delimiter_async(self.store, path) + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + + result = await obs.list_with_delimiter_async(store, path) objects = result["objects"] prefs = result["common_prefixes"] if detail: @@ -229,6 +243,9 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" + bucket, path = self._split_path(path) + store = self._construct_store(bucket) + return BufferedFileSimple(self, path, mode, **kwargs)