-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Create obstore store in fsspec on demand #198
Open
machichima
wants to merge
20
commits into
developmentseed:main
Choose a base branch
from
machichima:obstore-instance-in-fsspec
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+367
−60
Open
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
909b5b0
feat: split bucket from path + construct store
machichima 29464a7
feat: remove store + add protocol + apply to all methods
machichima a0d9e1d
feat: inherit from AsyncFsspecStore to specify protocol
machichima 6614906
fix: correctly split protocol if exists in path
machichima 75c738e
feat: use urlparse to extract protocol
machichima 2209839
Merge branch 'main' into obstore-instance-in-fsspec
kylebarron 46c6b59
update typing
kylebarron 9ab35e1
fix: unbounded error
machichima cb80495
fix: remove redundant import
machichima b6a3d3a
feat: add register() to register AsyncFsspecStore for provided protocol
machichima 68cdff9
feat: add validation for protocol in register()
machichima fa5b539
test: for register()
machichima b704779
feat: add async parameter for register()
machichima 61deac4
test: test async store created by register()
machichima 4bc1599
feat: add http(s) into protocol_with_bucket list
machichima 4dc9143
feat: ls return path with bucket name
machichima fb607d0
feat: enable re-register same protocol
machichima b74948a
test: update pytest fixture to use register()
machichima f6ba27c
test: update test with new path format
machichima 30250cf
fix: mkdocs build error
machichima File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,12 +24,14 @@ | |
|
||
import asyncio | ||
from collections import defaultdict | ||
from functools import lru_cache | ||
from typing import Any, Coroutine, Dict, List, Tuple | ||
|
||
import fsspec.asyn | ||
import fsspec.spec | ||
|
||
import obstore as obs | ||
from obstore.store import S3Store, GCSStore, AzureStore | ||
|
||
|
||
class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): | ||
|
@@ -45,6 +47,9 @@ def __init__( | |
self, | ||
store: obs.store.ObjectStore, | ||
*args, | ||
config: dict[str, Any] = {}, | ||
client_options: dict[str, Any] = {}, | ||
retry_config: dict[str, Any] = {}, | ||
asynchronous: bool = False, | ||
loop=None, | ||
batch_size: int | None = None, | ||
|
@@ -76,10 +81,53 @@ def __init__( | |
""" | ||
|
||
self.store = store | ||
|
||
self.config = config | ||
self.client_options = client_options | ||
self.retry_config = retry_config | ||
|
||
super().__init__( | ||
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size | ||
) | ||
|
||
def _split_path(self, path: str) -> Tuple[str, str]: | ||
""" | ||
Split bucket and file path | ||
|
||
Args: | ||
path (str): Input path, like `s3://mybucket/path/to/file` | ||
|
||
Examples: | ||
>>> split_path("s3://mybucket/path/to/file") | ||
['mybucket', 'path/to/file'] | ||
""" | ||
|
||
store_with_bucket = (S3Store, GCSStore, AzureStore) | ||
|
||
if ( | ||
not isinstance(self.store, store_with_bucket) # instance | ||
and not self.store in store_with_bucket # not instantiation | ||
): | ||
# no bucket name in path | ||
return "", path | ||
|
||
if "/" not in path: | ||
return path, "" | ||
else: | ||
path_li = path.split("/") | ||
bucket = path_li[0] | ||
file_path = "/".join(path_li[1:]) | ||
return (bucket, file_path) | ||
|
||
@lru_cache(maxsize=10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if this cache size could be user specified but we can come back to it |
||
def _construct_store(self, bucket: str): | ||
return self.store.from_url( | ||
f"{self.protocol}://{bucket}", | ||
**self.config, | ||
client_options=self.client_options, | ||
retry_config=self.retry_config if self.retry_config else None, | ||
) | ||
|
||
async def _rm_file(self, path, **kwargs): | ||
return await obs.delete_async(self.store, path) | ||
|
||
|
@@ -90,11 +138,14 @@ async def _pipe_file(self, path, value, **kwargs): | |
return await obs.put_async(self.store, path, value) | ||
|
||
async def _cat_file(self, path, start=None, end=None, **kwargs): | ||
bucket, path = self._split_path(path) | ||
store = self._construct_store(bucket) | ||
|
||
if start is None and end is None: | ||
resp = await obs.get_async(self.store, path) | ||
return await resp.bytes_async() | ||
resp = await obs.get_async(store, path) | ||
return (await resp.bytes_async()).to_bytes() | ||
|
||
range_bytes = await obs.get_range_async(self.store, path, start=start, end=end) | ||
range_bytes = await obs.get_range_async(store, path, start=start, end=end) | ||
return range_bytes.to_bytes() | ||
|
||
async def _cat_ranges( | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we allow these, store should be optional?
And before merge we should enable typing overloads for better typing. You can see how from_url is implemented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use store here for deciding the store Interface (whether it is S3Store, GCSStore, ...), so that in AsyncFsspecStore we don't need to decide the interface based on the protocol.
Maybe there's a better way of deciding the store interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll have a look at the typing later on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's confusing because store is the type of the class and not an instance.
We should be able to use the from_url top level function directly here?