Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Create obstore store in fsspec on demand #198

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

machichima
Copy link

Construct the obstore store instance on demand in fsspec when calling methods. This allows automatic store creation for reads/writes across different buckets, aligning usage with fsspec conventions

constructe store with from_url using protocol and bucket name
@@ -45,6 +47,9 @@ def __init__(
self,
store: obs.store.ObjectStore,
*args,
config: dict[str, Any] = {},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allow these, store should be optional?

And before merge we should enable typing overloads for better typing. You can see how from_url is implemented

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use store here for deciding the store Interface (whether it is S3Store, GCSStore, ...), so that in AsyncFsspecStore we don't need to decide the interface based on the protocol.

Maybe there's a better way of deciding the store interface?

obstore_fs: AsyncFsspecStore = fsspec.filesystem(
    "s3",
    store=S3Store,
    config={
        "endpoint": "http://localhost:30002",
        "access_key_id": "minio",
        "secret_access_key": "miniostorage",
        "virtual_hosted_style_request": True,  # path contain bucket name
    },
    client_options={"timeout": "99999s", "allow_http": "true"},
    retry_config={
        "max_retries": 2,
        "backoff": {
            "base": 2,
            "init_backoff": timedelta(seconds=2),
            "max_backoff": timedelta(seconds=16),
        },
        "retry_timeout": timedelta(minutes=3),
    },
)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look at the typing later on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's confusing because store is the type of the class and not an instance.

We should be able to use the from_url top level function directly here?

file_path = "/".join(path_li[1:])
return (bucket, file_path)

@lru_cache(maxsize=10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if this cache size could be user specified but we can come back to it

@kylebarron
Copy link
Member

Would there be one fsspec instance per cloud provider? So if you wanted to use s3 and gcs you'd make two separate instances?

@machichima
Copy link
Author

machichima commented Feb 3, 2025

Would there be one fsspec instance per cloud provider? So if you wanted to use s3 and gcs you'd make two separate instances?

Based on what I know, to use fsspec, we will do:

fsspec.register_implementation("s3", AsyncFsspecStore)
fsspec.register_implementation("gs", AsyncFsspecStore)

Each will have their own AsyncFsspecStore innstance already. To config, we can use (based on my current implementation):

s3_fs: AsyncFsspecStore = fsspec.filesystem(
    "s3",
    store=S3Store,
    config={...}
)

gcs_fs: AsyncFsspecStore = fsspec.filesystem(
    "gs",
    store=GCSStore,
    config={...}
)

@kylebarron
Copy link
Member

It would be nice to take out the store arg and use from_url directly. from_url will automatically construct the correct store based on the url protocol

@machichima machichima force-pushed the obstore-instance-in-fsspec branch from 34f79f0 to 29464a7 Compare February 4, 2025 14:06
@machichima
Copy link
Author

I use from_url and remove store in the newest commit. However, by doing this, we need to specify the protocol by inherit the AsyncFsspecStore class for each store instance. I added here

class S3FsspecStore(AsyncFsspecStore):
protocol = "s3"
class GCSFsspecStore(AsyncFsspecStore):
protocol = "gs"
class AzureFsspecStore(AsyncFsspecStore):
protocol = "abfs"

@kylebarron
Copy link
Member

Is it true that a single fsspec class can't be associated with more than one protocol? E.g. Azure has three different protocols abfs, adlfs and az, but it looks like adlfs exports three separate classes.

@kylebarron
Copy link
Member

The latest PRs allow you to access the config back out of a store, which may be useful to you? You can validate that you already have a store in your cache for a specific bucket

@machichima
Copy link
Author

machichima commented Feb 6, 2025

Is it true that a single fsspec class can't be associated with more than one protocol? E.g. Azure has three different protocols abfs, adlfs and az, but it looks like adlfs exports three separate classes.

I think we can if those protocols refer to the same object instance. s3fs do have 2 protocols ("s3", "s3a"), see: https://github.com/fsspec/s3fs/blob/023aecf00b5c6243ff5f8a016dac8b6af3913c6b/s3fs/core.py#L277

I think abfs, adlfs, and az have different implementation so that they exports different classes. If we use them in obstore, I think we can define a class with protocol (abfs, adlfs, az), but need to test is they all work

@@ -104,6 +104,12 @@ def _split_path(self, path: str) -> Tuple[str, str]:
# no bucket name in path
return "", path

if path.startswith(self.protocol + "://"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that this function will always receive something a URL like s3://mybucket/path/to/file, I'm inclined for this function to use urlparse instead of manually handling the parts of the URL

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not always be s3://mybucket/path/to/file, but may be without protocol like mybucket/path/to/file

Copy link
Author

@machichima machichima Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use urlparse like this here, which works for both s3://mybucket/path/to/file and mybucket/path/to/file

res = urlparse(path)
if res.scheme:
if res.scheme != self.protocol:
raise ValueError(f"Expect protocol to be {self.protocol}. Got {res.schema}")
path = res.netloc + res.path

@kylebarron
Copy link
Member

I think we can if those protocols refer to the same object instance. s3fs do have 2 protocols ("s3", "s3a"), see: fsspec/s3fs@023aecf/s3fs/core.py#L277

Oh cool! That seems to indicate that we could have a single class that defines supported protocols as:

    protocol = ("s3", "s3a", "gs", "az", "abfs", "adlfs")

Because the fsspec class used for each is the same? It's just custom kwargs that would need to be passed down for each?

@machichima
Copy link
Author

Oh cool! That seems to indicate that we could have a single class that defines supported protocols as:

    protocol = ("s3", "s3a", "gs", "az", "abfs", "adlfs")

Because the fsspec class used for each is the same? It's just custom kwargs that would need to be passed down for each?

I don't think we can put all the protocols together into a class, as when using fsspec.register_implementation("s3", AsyncFsspecStore), fsspec wouldn't tell AsyncFsspecStore what the protocol is, so that when constructing store instance, we cannot get the protocol

def _construct_store(self, bucket: str):
return from_url(
url=f"{self.protocol}://{bucket}",

I think the better way is to create obstore.fsspec.register("protocol"), that wraps around the fsspec.register and directly set the protocol for AsyncFsspecStore (like what mentioned in this comment), then we do not need more classes. Let me have a try.

@kylebarron
Copy link
Member

I did a quick look through your PR; it's really good progress but a few thoughts:

  • There are a bunch of cases where bucket, path = self._split_path(path) doesn't work because path is not in scope. E.g. in _cp_file where path1 and path2 are in scope
  • in _cp_file we need to validate that the bucket of the source and destination paths are the same
  • We need some tests for edits that happen in this PR
  • It's not clear how BufferedFileSimple works, because that subclasses from an upstream fsspec.spec.AbstractBufferedFile but doesn't touch obstore apis at all
  • If you don't already, I'd highly suggest using a linter like https://docs.astral.sh/ruff/ in your editor, so that you can catch some of these issue before hitting CI

@machichima
Copy link
Author

  • If you don't already, I'd highly suggest using a linter like https://docs.astral.sh/ruff/ in your editor, so that you can catch some of these issue before hitting CI

Thanks for the suggestion! I just added ruff linter and remove error for path. I also add the check for validating the two bucket name from two path are the same.

  • It's not clear how BufferedFileSimple works, because that subclasses from an upstream fsspec.spec.AbstractBufferedFile but doesn't touch obstore apis at all

For BufferedFileSimple, when self.fs.cat_file() is called, it will direct to the _cat_file() function in AsyncFsspecStore.

  • We need some tests for edits that happen in this PR

Yes! I will update the test in the next few days

@machichima
Copy link
Author

machichima commented Feb 8, 2025

Hi @kylebarron ,

I would like to confirm if changing the output for _ls to the path with bucket name is fine here?

The problem I faced is that when I call find(), it will get into _find() in fsspec, which will then call _walk() (see here). Then in _walk(), _ls is called (see here). But as _walk() will run recurrsively if the path is directory (see here), if _ls give the path without bucket name, then when walk is called recurrsively by the previous result, _ls will get the path without bucket name, which cause error.

For example, call _walk("bucket/") first, and the _ls in _walk give "dir/", then walk will be called again with _walk("dir/"), which gives error

UPDATE: this is how I do it

async def _ls(self, path, detail=True, **kwargs):
bucket, path = self._split_path(path)
store = self._construct_store(bucket)
result = await obs.list_with_delimiter_async(store, path)
objects = result["objects"]
prefs = result["common_prefixes"]
if detail:
return [
{
"name": self._fill_bucket_name(object["path"], bucket),
"size": object["size"],
"type": "file",
"e_tag": object["e_tag"],
}
for object in objects
] + [
{
"name": self._fill_bucket_name(pref, bucket),
"size": 0,
"type": "directory",
}
for pref in prefs
]
else:
return sorted(
[self._fill_bucket_name(object["path"], bucket) for object in objects]
+ [self._fill_bucket_name(pref, bucket) for pref in prefs]
)

@machichima machichima changed the title [WIP] [FEAT] Create obstore store in fsspec on demand [FEAT] Create obstore store in fsspec on demand Feb 9, 2025
@kylebarron
Copy link
Member

kylebarron commented Feb 11, 2025

I would like to confirm if changing the output for _ls to the path with bucket name is fine here?

I think that's a question for @martindurant. I don't know what's standard for fsspec

It appears that based on s3fs behavior, the bucket name is always returned in list results:

import s3fs

fs = s3fs.S3FileSystem(anon=True)
fs.ls("sentinel-cogs")
# ['sentinel-cogs/sentinel-s2-l2a-cogs']

@kylebarron
Copy link
Member

@machichima I merged a PR that updates our use of ruff, a fast Python linter. As part of this, there were some minor updates to the existing fsspec code. Would you be able to merge in main?

I also wanted to have ruff merged so that we could run the lints on this PR's changes.

@kylebarron
Copy link
Member

I would like to confirm if changing the output for _ls to the path with bucket name is fine here?

I think that's a question for @martindurant. I don't know what's standard for fsspec

It appears that based on s3fs behavior, the bucket name is always returned in list results:

import s3fs

fs = s3fs.S3FileSystem(anon=True)
fs.ls("sentinel-cogs")
# ['sentinel-cogs/sentinel-s2-l2a-cogs']

It would be great if we could have integration tests with s3fs. We could reuse the existing test setup we have in test_fsspec.py, but then validate that we get the same results for all methods across both the obstore-backed store and the s3fs implementation.

@martindurant
Copy link
Contributor

Yes, fsspec expects that the paths returned by ls() are the same form as you can then use in further open/get/put etc operations. Since the input paths contain the bucket (and optional protocol), the output of ls() should too.

The alternative would be, to have the filesystem instance not expect the bucket in the input. You could still get this to work by extracting the bucket once (as passed in fsspec.open/url_to_fs). This is essentially what the "prefix" filesystem does:

In [10]: fs, path = fsspec.url_to_fs("dir::file:///Users")

In [11]: path
Out[11]: ''

In [12]: fs.ls("", detail=False) # contents of /Users
Out[12]: ['moxie', '.localized', 'Shared', 'mdurant']

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants