Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use CAR requests when requesting directory listings and file info #40

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions ipfsspec/async_ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from multiformats import CID, multicodec
from . import unixfsv1
from .car import read_car

import logging

Expand Down Expand Up @@ -69,20 +70,30 @@ def __str__(self):
return f"GW({self.url})"

async def info(self, path, session):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"})
self._raise_not_found_for_status(res, path)
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])

roots = res.headers["X-Ipfs-Roots"].split(",")
if len(roots) != len(path.split("/")):
raise FileNotFoundError(path)

cid = CID.decode(roots[-1])
resdata = await res.read()

_, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/
blocks = {cid: data for cid, data, _ in blocks}
block = blocks[cid]

if cid.codec == RawCodec:
return {
"name": path,
"CID": str(cid),
"type": "file",
"size": len(resdata),
"size": len(block),
}
elif cid.codec == DagPbCodec:
node = unixfsv1.PBNode.loads(resdata)

node = unixfsv1.PBNode.loads(block)
data = unixfsv1.Data.loads(node.Data)
if data.Type == unixfsv1.DataType.Raw:
raise FileNotFoundError(path) # this is not a file, it's only a part of it
Expand Down Expand Up @@ -133,12 +144,20 @@ async def iter_chunked(self, path, session, chunk_size):
yield size, res.content.iter_chunked(chunk_size)

async def ls(self, path, session, detail=False):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"})
self._raise_not_found_for_status(res, path)
resdata = await res.read()
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])
roots = res.headers["X-Ipfs-Roots"].split(",")
if len(roots) != len(path.split("/")):
raise FileNotFoundError(path)

cid = CID.decode(roots[-1])
assert cid.codec == DagPbCodec, "this is not a directory"
node = unixfsv1.PBNode.loads(resdata)

resdata = await res.read()

_, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/
blocks = {cid: data for cid, data, _ in blocks}
node = unixfsv1.PBNode.loads(blocks[cid])
data = unixfsv1.Data.loads(node.Data)
if data.Type != unixfsv1.DataType.Directory:
# TODO: we might need support for HAMTShard here (for large directories)
Expand Down Expand Up @@ -180,13 +199,17 @@ def gateway_from_file(gateway_path, protocol="ipfs"):


@lru_cache
def get_gateway(protocol="ipfs"):
def get_gateway(protocol="ipfs", gateway_addr=None):
"""
Get IPFS gateway according to IPIP-280

see: https://github.com/ipfs/specs/pull/280
"""

if gateway_addr:
logger.debug("using IPFS gateway as specified via function argument: %s", gateway_addr)
return AsyncIPFSGateway(gateway_addr, protocol)

# IPFS_GATEWAY environment variable should override everything
ipfs_gateway = os.environ.get("IPFS_GATEWAY", "")
if ipfs_gateway:
Expand Down Expand Up @@ -263,19 +286,20 @@ class AsyncIPFSFileSystem(AsyncFileSystem):
sep = "/"
protocol = "ipfs"

def __init__(self, asynchronous=False, loop=None, client_kwargs=None, **storage_options):
def __init__(self, asynchronous=False, loop=None, client_kwargs=None, gateway_addr=None, **storage_options):
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
self._session = None

self.client_kwargs = client_kwargs or {}
self.get_client = get_client
self.gateway_addr = gateway_addr

if not asynchronous:
sync(self.loop, self.set_session)

@property
def gateway(self):
return get_gateway(self.protocol)
return get_gateway(self.protocol, gateway_addr=self.gateway_addr)

@staticmethod
def close_session(loop, session):
Expand Down
116 changes: 116 additions & 0 deletions ipfsspec/car.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
CAR handling functions.
"""

from typing import List, Optional, Tuple, Union, Iterator, BinaryIO
import dataclasses

import dag_cbor
from multiformats import CID, varint, multicodec, multihash

from .utils import is_cid_list, StreamLike, ensure_stream

DagPbCodec = multicodec.get("dag-pb")
Sha256Hash = multihash.get("sha2-256")

@dataclasses.dataclass
class CARBlockLocation:
varint_size: int
cid_size: int
payload_size: int
offset: int = 0

@property
def cid_offset(self) -> int:
return self.offset + self.varint_size

@property
def payload_offset(self) -> int:
return self.offset + self.varint_size + self.cid_size

@property
def size(self) -> int:
return self.varint_size + self.cid_size + self.payload_size


def decode_car_header(stream: BinaryIO) -> Tuple[List[CID], int]:
"""
Decodes a CAR header and returns the list of contained roots.
"""
header_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase
header = dag_cbor.decode(stream.read(header_size))
if not isinstance(header, dict):
raise ValueError("no valid CAR header found")
if header["version"] != 1:
raise ValueError("CAR is not version 1")
roots = header["roots"]
if not isinstance(roots, list):
raise ValueError("CAR header doesn't contain roots")
if not is_cid_list(roots):
raise ValueError("CAR roots do not only contain CIDs")
return roots, visize + header_size


def decode_raw_car_block(stream: BinaryIO) -> Optional[Tuple[CID, bytes, CARBlockLocation]]:
try:
block_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase
except ValueError:
# stream has likely been consumed entirely
return None

data = stream.read(block_size)
# as the size of the CID is variable but not explicitly given in
# the CAR format, we need to partially decode each CID to determine
# its size and the location of the payload data
if data[0] == 0x12 and data[1] == 0x20:
# this is CIDv0
cid_version = 0
default_base = "base58btc"
cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec
hash_codec: Union[int, multihash.Multihash] = Sha256Hash
cid_digest = data[2:34]
data = data[34:]
else:
# this is CIDv1(+)
cid_version, _, data = varint.decode_raw(data)
if cid_version != 1:
raise ValueError(f"CIDv{cid_version} is currently not supported")
default_base = "base32"
cid_codec, _, data = multicodec.unwrap_raw(data)
hash_codec, _, data = varint.decode_raw(data)
digest_size, _, data = varint.decode_raw(data)
cid_digest = data[:digest_size]
data = data[digest_size:]
cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest))

if not cid.hashfun.digest(data) == cid.digest:
raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified")

return cid, bytes(data), CARBlockLocation(visize, block_size - len(data), len(data))


def read_car(stream_or_bytes: StreamLike) -> Tuple[List[CID], Iterator[Tuple[CID, bytes, CARBlockLocation]]]:
"""
Reads a CAR.
Parameters
----------
stream_or_bytes: StreamLike
Stream to read CAR from
Returns
-------
roots : List[CID]
Roots as given by the CAR header
blocks : Iterator[Tuple[cid, BytesLike, CARBlockLocation]]
Iterator over all blocks contained in the CAR
"""
stream = ensure_stream(stream_or_bytes)
roots, header_size = decode_car_header(stream)
def blocks() -> Iterator[Tuple[CID, bytes, CARBlockLocation]]:
offset = header_size
while (next_block := decode_raw_car_block(stream)) is not None:
cid, data, sizes = next_block
yield cid, data, dataclasses.replace(sizes, offset=offset)
offset += sizes.size
return roots, blocks()
21 changes: 21 additions & 0 deletions ipfsspec/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
Some utilities.
"""

from io import BytesIO
from typing import List, Union, BinaryIO

from multiformats import CID
from typing_extensions import TypeGuard

StreamLike = Union[BinaryIO, bytes]

def ensure_stream(stream_or_bytes: StreamLike) -> BinaryIO:
if isinstance(stream_or_bytes, bytes):
return BytesIO(stream_or_bytes)
else:
return stream_or_bytes


def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]:
return all(isinstance(o, CID) for o in os)
22 changes: 20 additions & 2 deletions test/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ async def get_client(**kwargs):


@pytest_asyncio.fixture
async def fs(get_client):
async def fs(request, get_client):
AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop
return AsyncIPFSFileSystem(asynchronous=True, loop=asyncio.get_running_loop(), get_client=get_client)
gateway_addr = getattr(request, "param", None)
return AsyncIPFSFileSystem(asynchronous=True, loop=asyncio.get_running_loop(), get_client=get_client, gateway_addr=gateway_addr)


@pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"])
Expand Down Expand Up @@ -100,3 +101,20 @@ async def test_isfile(fs):
assert res is True
res = await fs._isfile(TEST_ROOT)
assert res is False

@pytest.mark.parametrize("detail", [False, True])
@pytest.mark.parametrize("fs", ["http://127.0.0.1:8080", "https://ipfs.io"], indirect=True)
@pytest.mark.asyncio
async def test_ls_multi_gw(fs, detail):
"""
Test if ls works on different gateway implementations.

See also: https://github.com/fsspec/ipfsspec/issues/39
"""
res = await fs._ls("bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4", detail=detail)
expected = "bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4/welcome-to-IPFS.jpg"
if detail:
assert len(res) == 1
assert res[0]["name"] == expected
else:
assert res == [expected]
Loading