-
Notifications
You must be signed in to change notification settings - Fork 362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid serializing cache for file objects #1753
Conversation
Should we have some kind of test here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, definitely. I'll push something up
Are there any other concerns you have at the moment? Do you know of other AbstractBufferedFile
subclasses our there in the wild that we need to take into consideration?
I think what you have is very reasonable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll push something up
Famous last words...
Alright, so I think a test like this (this actually passes) makes sense:
def test_cache_not_pickled(tmp_path):
fs = fsspec.filesystem("s3")
data = b"abcdefghi"
filepath = "s3://oss-scratch-space/jrbourbeau/test.txt"
length = 3
# Write file and then read it back in
with fs.open(filepath, mode="wb") as f:
f.write(data)
f = fs.open(filepath, mode="rb")
assert not f.cache.cache # No cache initially
assert f.read(length=length) == data[:length]
assert f.cache.cache == data # Cache is populated
# Roundtrip through pickle
import pickle
f2 = pickle.loads(pickle.dumps(f))
assert not f2.cache.cache # No cache initially
assert (
f2.read(length=length) == data[length : 2 * length]
) # Read file from previous seek point
assert f2.cache.cache == data[length:] # Cache is populated
It's be nice to use AbstractFileSystem
more directly (^ that test is using s3fs.S3File
instead) but I'm having some trouble getting file writing and reading to work with AbstractFileSystem
directly. Another closer option would be to use the http filesystem (also inherits from AbstractFileSystem
) but the http tests aren't working for me locally at the moment. @martindurant do you have any suggestions on how to proceed? I'm still learning my way around the test suite here
None of them? I would normally use HTTP as the first proxy for an async (read-only) filesystem, but any FS that use a subclass of AbstractBufferedFile would do here, and indeed ideally one that lives in this repo. |
______________________________________________________________________________________________________________ test_list ______________________________________________________________________________________________________________
self = <aiohttp.connector.TCPConnector object at 0x105923b60>, req = <aiohttp.client_reqrep.ClientRequest object at 0x105b2c1a0>, traces = []
timeout = ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=30, ceil_threshold=5)
async def _create_direct_connection(
self,
req: ClientRequest,
traces: List["Trace"],
timeout: "ClientTimeout",
*,
client_error: Type[Exception] = ClientConnectorError,
) -> Tuple[asyncio.Transport, ResponseHandler]:
sslcontext = self._get_ssl_context(req)
fingerprint = self._get_fingerprint(req)
host = req.url.raw_host
assert host is not None
# Replace multiple trailing dots with a single one.
# A trailing dot is only present for fully-qualified domain names.
# See https://github.com/aio-libs/aiohttp/pull/7364.
if host.endswith(".."):
host = host.rstrip(".") + "."
port = req.port
assert port is not None
try:
# Cancelling this lookup should not cancel the underlying lookup
# or else the cancel event will get broadcast to all the waiters
# across all connections.
> hosts = await self._resolve_host(host, port, traces=traces)
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:1345:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:989: in _resolve_host
return await asyncio.shield(resolved_host_task)
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:1020: in _resolve_host_with_throttle
addrs = await self._resolver.resolve(host, port, family=self._family)
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/resolver.py:36: in resolve
infos = await self._loop.getaddrinfo(
../../../mambaforge/envs/fsspec/lib/python3.13/asyncio/base_events.py:935: in getaddrinfo
return await self.run_in_executor(
../../../mambaforge/envs/fsspec/lib/python3.13/concurrent/futures/thread.py:58: in run
result = self.fn(*self.args, **self.kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
host = '173.1.168.192.in-addr.arpa', port = 50522, family = <AddressFamily.AF_UNSPEC: 0>, type = <SocketKind.SOCK_STREAM: 1>, proto = 0, flags = <AddressInfo.AI_ADDRCONFIG: 1024>
def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
"""Resolve host and port into list of address info entries.
Translate the host/port argument into a sequence of 5-tuples that contain
all the necessary arguments for creating a socket connected to that service.
host is a domain name, a string representation of an IPv4/v6 address or
None. port is a string service name such as 'http', a numeric port number or
None. By passing None as the value of host and port, you can pass NULL to
the underlying C API.
The family, type and proto arguments can be optionally specified in order to
narrow the list of addresses returned. Passing zero as a value for each of
these arguments selects the full range of results.
"""
# We override this function since we want to translate the numeric family
# and socket type values to enum constants.
addrlist = []
> for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
E socket.gaierror: [Errno 8] nodename nor servname provided, or not known
../../../mambaforge/envs/fsspec/lib/python3.13/socket.py:975: gaierror
The above exception was the direct cause of the following exception:
server = namespace(address='http://173.1.168.192.in-addr.arpa:50522', realfile='http://173.1.168.192.in-addr.arpa:50522/index/realfile')
def test_list(server):
h = fsspec.filesystem("http")
> out = h.glob(server.address + "/index/*")
fsspec/implementations/tests/test_http.py:19:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fsspec/asyn.py:118: in wrapper
return sync(self.loop, func, *args, **kwargs)
fsspec/asyn.py:103: in sync
raise return_result
fsspec/asyn.py:56: in _runner
result[0] = await coro
fsspec/implementations/http.py:490: in _glob
allpaths = await self._find(
fsspec/asyn.py:850: in _find
if withdirs and path != "" and await self._isdir(path):
fsspec/implementations/http.py:517: in _isdir
return bool(await self._ls(path))
fsspec/implementations/http.py:207: in _ls
out = await self._ls_real(url, detail=detail, **kwargs)
fsspec/implementations/http.py:159: in _ls_real
async with session.get(self.encode_url(url), **self.kwargs) as r:
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/client.py:1418: in __aenter__
self._resp: _RetType = await self._coro
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/client.py:696: in _request
conn = await self._connector.connect(
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:544: in connect
proto = await self._create_connection(req, traces, timeout)
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:1050: in _create_connection
_, proto = await self._create_direct_connection(req, traces, timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <aiohttp.connector.TCPConnector object at 0x105923b60>, req = <aiohttp.client_reqrep.ClientRequest object at 0x105b2c1a0>, traces = []
timeout = ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=30, ceil_threshold=5)
async def _create_direct_connection(
self,
req: ClientRequest,
traces: List["Trace"],
timeout: "ClientTimeout",
*,
client_error: Type[Exception] = ClientConnectorError,
) -> Tuple[asyncio.Transport, ResponseHandler]:
sslcontext = self._get_ssl_context(req)
fingerprint = self._get_fingerprint(req)
host = req.url.raw_host
assert host is not None
# Replace multiple trailing dots with a single one.
# A trailing dot is only present for fully-qualified domain names.
# See https://github.com/aio-libs/aiohttp/pull/7364.
if host.endswith(".."):
host = host.rstrip(".") + "."
port = req.port
assert port is not None
try:
# Cancelling this lookup should not cancel the underlying lookup
# or else the cancel event will get broadcast to all the waiters
# across all connections.
hosts = await self._resolve_host(host, port, traces=traces)
except OSError as exc:
if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
raise
# in case of proxy it is not ClientProxyConnectionError
# it is problem of resolving proxy ip itself
> raise ClientConnectorDNSError(req.connection_key, exc) from exc
E aiohttp.client_exceptions.ClientConnectorDNSError: Cannot connect to host 173.1.168.192.in-addr.arpa:50522 ssl:default [nodename nor servname provided, or not known]
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:1351: ClientConnectorDNSError Maybe I'm missing some sort of testing setup step? |
The server address passed to the server is |
perfect |
Thanks @martindurant! |
Includes and builds on #1751 from @hendrikmakait
Closes #1747