Skip to content

Commit

Permalink
opaquely paged blob listing (#9)
Browse files Browse the repository at this point in the history
- Provide a blobstore iterator over blob keys that can be continued using an opaque page token and marker (blob key).
- Token handing is backgrounded unless explicit continuation is needed.
- Iterator will visit every key (as filtered by prefix/delimiter).
  • Loading branch information
xbrianh authored and ttung committed Dec 12, 2017
1 parent fafd4d7 commit aee364c
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 24 deletions.
84 changes: 84 additions & 0 deletions cloud_blobstore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,70 @@
import typing
import types


class PagedIter(typing.Iterable[str]):
"""
Provide an iterator that will iterate over every object, filtered by prefix and delimiter. Alternately continue
iteration with token and key (start_after_key).
"""

def get_api_response(self, next_token):
"""
Make blobstore-specific list api request.
"""
raise NotImplementedError()

def get_listing_from_response(self, resp) -> typing.Iterable[str]:
"""
Retrieve blob key listing from blobstore response.
"""
raise NotImplementedError()

def get_next_token_from_response(self, resp) -> str:
"""
Retrieve opaque continuation token from blobstore response.
"""
raise NotImplementedError()

def __iter__(self):
"""
Iterate over the blobs, saving page tokens and blob key start_after_keys as needed in order to continue
listing where one left off.
If start_after_key is not None, iteration will begin on the next key if start_after_key is found on the
first page of results. If it is not found on the first page of results, BlobPagingError will be raised.
"""
next_token = self.token

while True:
self.token = next_token

resp = self.get_api_response(next_token)
listing = self.get_listing_from_response(resp)

if self.start_after_key:
while True:
try:
key = next(listing)
except StopIteration:
raise BlobPagingError('Marker not found in this page')

if key == self.start_after_key:
break

while True:
try:
self.start_after_key = next(listing)
yield self.start_after_key
except StopIteration:
break

self.start_after_key = None

next_token = self.get_next_token_from_response(resp)

if not next_token:
break


class BlobStore:
Expand All @@ -18,6 +84,21 @@ def list(
"""
raise NotImplementedError()

def list_v2(
self,
bucket: str,
prefix: str=None,
delimiter: str=None,
start_after_key: str=None,
token: str=None,
k_page_max: int=None
) -> typing.Iterable[str]:
"""
Returns an iterator of all blob entries in a bucket that match a given prefix. Do not return any keys that
contain the delimiter past the prefix.
"""
raise NotImplementedError()

def generate_presigned_GET_url(
self,
bucket: str,
Expand Down Expand Up @@ -158,3 +239,6 @@ class BlobNotFoundError(BlobStoreError):

class BlobAlreadyExistsError(BlobStoreError):
pass

class BlobPagingError(BlobStoreError):
pass
63 changes: 62 additions & 1 deletion cloud_blobstore/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,50 @@
from google.cloud.storage import Client
from google.cloud.storage.bucket import Bucket

from . import BlobNotFoundError, BlobStore
from . import BlobNotFoundError, BlobStore, PagedIter


class GSPagedIter(PagedIter):
def __init__(
self,
bucket_obj: Bucket,
*,
prefix: str=None,
delimiter: str=None,
start_after_key: str=None,
token: str=None,
k_page_max: int=None
) -> None:
self.bucket_obj = bucket_obj
self.start_after_key = start_after_key
self.token = token

self.kwargs = dict() # type: dict

if prefix is not None:
self.kwargs['prefix'] = prefix

if delimiter is not None:
self.kwargs['delimiter'] = delimiter

if k_page_max is not None:
self.kwargs['max_results'] = k_page_max

def get_api_response(self, next_token=None):
kwargs = self.kwargs.copy()

if next_token is not None:
kwargs['page_token'] = next_token

resp = self.bucket_obj.list_blobs(**kwargs)

return resp

def get_listing_from_response(self, resp):
return (b.name for b in resp)

def get_next_token_from_response(self, resp):
return resp.next_page_token


class GSBlobStore(BlobStore):
Expand Down Expand Up @@ -44,6 +87,24 @@ def list(
for blob_obj in bucket_obj.list_blobs(**kwargs):
yield blob_obj.name

def list_v2(
self,
bucket: str,
prefix: str=None,
delimiter: str=None,
start_after_key: str=None,
token: str=None,
k_page_max: int=None
) -> typing.Iterable[str]:
return GSPagedIter(
self._ensure_bucket_loaded(bucket),
prefix=prefix,
delimiter=delimiter,
start_after_key=start_after_key,
token=token,
k_page_max=k_page_max
)

def generate_presigned_GET_url(
self,
bucket: str,
Expand Down
73 changes: 73 additions & 0 deletions cloud_blobstore/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,64 @@
BlobStore,
BlobStoreCredentialError,
BlobStoreUnknownError,
PagedIter,
)


class S3PagedIter(PagedIter):
def __init__(
self,
bucket: str,
*,
prefix: str=None,
delimiter: str=None,
start_after_key: str=None,
token: str=None,
k_page_max: int=None
) -> None:
self.start_after_key = start_after_key
self.token = token

self.kwargs = dict() # type: dict

self.kwargs['Bucket'] = bucket

if prefix is not None:
self.kwargs['Prefix'] = prefix

if delimiter is not None:
self.kwargs['Delimiter'] = delimiter

if k_page_max is not None:
self.kwargs['MaxKeys'] = k_page_max

def get_api_response(self, next_token):
kwargs = self.kwargs.copy()

if next_token is not None:
kwargs['ContinuationToken'] = next_token

resp = boto3.client('s3').list_objects_v2(**kwargs)

return resp

def get_listing_from_response(self, resp):
if resp.get('Contents', None):
contents = resp['Contents']
else:
contents = list()

return (b['Key'] for b in contents)

def get_next_token_from_response(self, resp):
if resp['IsTruncated']:
token = resp['NextContinuationToken']
else:
token = None

return token


class S3BlobStore(BlobStore):
def __init__(self) -> None:
super(S3BlobStore, self).__init__()
Expand Down Expand Up @@ -47,6 +102,24 @@ def list(
filter(**kwargs)):
yield item.key

def list_v2(
self,
bucket: str,
prefix: str=None,
delimiter: str=None,
start_after_key: str=None,
token: str=None,
k_page_max: int=None
) -> typing.Iterable[str]:
return S3PagedIter(
bucket,
prefix=prefix,
delimiter=delimiter,
start_after_key=start_after_key,
token=token,
k_page_max=k_page_max
)

def generate_presigned_GET_url(
self,
bucket: str,
Expand Down
Loading

0 comments on commit aee364c

Please sign in to comment.