From aee364c13a2e4718364e8e6763881292ab6fe7e1 Mon Sep 17 00:00:00 2001 From: Brian Hannafious <32105697+xbrianh@users.noreply.github.com> Date: Mon, 11 Dec 2017 16:42:40 -0800 Subject: [PATCH] opaquely paged blob listing (#9) - Provide a blobstore iterator over blob keys that can be continued using an opaque page token and marker (blob key). - Token handing is backgrounded unless explicit continuation is needed. - Iterator will visit every key (as filtered by prefix/delimiter). --- cloud_blobstore/__init__.py | 84 +++++++++++++++++++++ cloud_blobstore/gs.py | 63 +++++++++++++++- cloud_blobstore/s3.py | 73 ++++++++++++++++++ tests/blobstore_common_tests.py | 128 ++++++++++++++++++++++++++------ 4 files changed, 324 insertions(+), 24 deletions(-) diff --git a/cloud_blobstore/__init__.py b/cloud_blobstore/__init__.py index 5e65a64..9269076 100644 --- a/cloud_blobstore/__init__.py +++ b/cloud_blobstore/__init__.py @@ -1,4 +1,70 @@ import typing +import types + + +class PagedIter(typing.Iterable[str]): + """ + Provide an iterator that will iterate over every object, filtered by prefix and delimiter. Alternately continue + iteration with token and key (start_after_key). + """ + + def get_api_response(self, next_token): + """ + Make blobstore-specific list api request. + """ + raise NotImplementedError() + + def get_listing_from_response(self, resp) -> typing.Iterable[str]: + """ + Retrieve blob key listing from blobstore response. + """ + raise NotImplementedError() + + def get_next_token_from_response(self, resp) -> str: + """ + Retrieve opaque continuation token from blobstore response. + """ + raise NotImplementedError() + + def __iter__(self): + """ + Iterate over the blobs, saving page tokens and blob key start_after_keys as needed in order to continue + listing where one left off. + + If start_after_key is not None, iteration will begin on the next key if start_after_key is found on the + first page of results. If it is not found on the first page of results, BlobPagingError will be raised. + """ + next_token = self.token + + while True: + self.token = next_token + + resp = self.get_api_response(next_token) + listing = self.get_listing_from_response(resp) + + if self.start_after_key: + while True: + try: + key = next(listing) + except StopIteration: + raise BlobPagingError('Marker not found in this page') + + if key == self.start_after_key: + break + + while True: + try: + self.start_after_key = next(listing) + yield self.start_after_key + except StopIteration: + break + + self.start_after_key = None + + next_token = self.get_next_token_from_response(resp) + + if not next_token: + break class BlobStore: @@ -18,6 +84,21 @@ def list( """ raise NotImplementedError() + def list_v2( + self, + bucket: str, + prefix: str=None, + delimiter: str=None, + start_after_key: str=None, + token: str=None, + k_page_max: int=None + ) -> typing.Iterable[str]: + """ + Returns an iterator of all blob entries in a bucket that match a given prefix. Do not return any keys that + contain the delimiter past the prefix. + """ + raise NotImplementedError() + def generate_presigned_GET_url( self, bucket: str, @@ -158,3 +239,6 @@ class BlobNotFoundError(BlobStoreError): class BlobAlreadyExistsError(BlobStoreError): pass + +class BlobPagingError(BlobStoreError): + pass diff --git a/cloud_blobstore/gs.py b/cloud_blobstore/gs.py index e439473..bc1a326 100644 --- a/cloud_blobstore/gs.py +++ b/cloud_blobstore/gs.py @@ -6,7 +6,50 @@ from google.cloud.storage import Client from google.cloud.storage.bucket import Bucket -from . import BlobNotFoundError, BlobStore +from . import BlobNotFoundError, BlobStore, PagedIter + + +class GSPagedIter(PagedIter): + def __init__( + self, + bucket_obj: Bucket, + *, + prefix: str=None, + delimiter: str=None, + start_after_key: str=None, + token: str=None, + k_page_max: int=None + ) -> None: + self.bucket_obj = bucket_obj + self.start_after_key = start_after_key + self.token = token + + self.kwargs = dict() # type: dict + + if prefix is not None: + self.kwargs['prefix'] = prefix + + if delimiter is not None: + self.kwargs['delimiter'] = delimiter + + if k_page_max is not None: + self.kwargs['max_results'] = k_page_max + + def get_api_response(self, next_token=None): + kwargs = self.kwargs.copy() + + if next_token is not None: + kwargs['page_token'] = next_token + + resp = self.bucket_obj.list_blobs(**kwargs) + + return resp + + def get_listing_from_response(self, resp): + return (b.name for b in resp) + + def get_next_token_from_response(self, resp): + return resp.next_page_token class GSBlobStore(BlobStore): @@ -44,6 +87,24 @@ def list( for blob_obj in bucket_obj.list_blobs(**kwargs): yield blob_obj.name + def list_v2( + self, + bucket: str, + prefix: str=None, + delimiter: str=None, + start_after_key: str=None, + token: str=None, + k_page_max: int=None + ) -> typing.Iterable[str]: + return GSPagedIter( + self._ensure_bucket_loaded(bucket), + prefix=prefix, + delimiter=delimiter, + start_after_key=start_after_key, + token=token, + k_page_max=k_page_max + ) + def generate_presigned_GET_url( self, bucket: str, diff --git a/cloud_blobstore/s3.py b/cloud_blobstore/s3.py index 9274db6..d5ef37f 100644 --- a/cloud_blobstore/s3.py +++ b/cloud_blobstore/s3.py @@ -10,9 +10,64 @@ BlobStore, BlobStoreCredentialError, BlobStoreUnknownError, + PagedIter, ) +class S3PagedIter(PagedIter): + def __init__( + self, + bucket: str, + *, + prefix: str=None, + delimiter: str=None, + start_after_key: str=None, + token: str=None, + k_page_max: int=None + ) -> None: + self.start_after_key = start_after_key + self.token = token + + self.kwargs = dict() # type: dict + + self.kwargs['Bucket'] = bucket + + if prefix is not None: + self.kwargs['Prefix'] = prefix + + if delimiter is not None: + self.kwargs['Delimiter'] = delimiter + + if k_page_max is not None: + self.kwargs['MaxKeys'] = k_page_max + + def get_api_response(self, next_token): + kwargs = self.kwargs.copy() + + if next_token is not None: + kwargs['ContinuationToken'] = next_token + + resp = boto3.client('s3').list_objects_v2(**kwargs) + + return resp + + def get_listing_from_response(self, resp): + if resp.get('Contents', None): + contents = resp['Contents'] + else: + contents = list() + + return (b['Key'] for b in contents) + + def get_next_token_from_response(self, resp): + if resp['IsTruncated']: + token = resp['NextContinuationToken'] + else: + token = None + + return token + + class S3BlobStore(BlobStore): def __init__(self) -> None: super(S3BlobStore, self).__init__() @@ -47,6 +102,24 @@ def list( filter(**kwargs)): yield item.key + def list_v2( + self, + bucket: str, + prefix: str=None, + delimiter: str=None, + start_after_key: str=None, + token: str=None, + k_page_max: int=None + ) -> typing.Iterable[str]: + return S3PagedIter( + bucket, + prefix=prefix, + delimiter=delimiter, + start_after_key=start_after_key, + token=token, + k_page_max=k_page_max + ) + def generate_presigned_GET_url( self, bucket: str, diff --git a/tests/blobstore_common_tests.py b/tests/blobstore_common_tests.py index 2ea8048..86fd840 100644 --- a/tests/blobstore_common_tests.py +++ b/tests/blobstore_common_tests.py @@ -2,7 +2,7 @@ import requests -from cloud_blobstore import BlobNotFoundError, BlobStore +from cloud_blobstore import BlobNotFoundError, BlobStore, BlobPagingError from tests import infra @@ -52,12 +52,48 @@ def testList(self): """ Ensure that the ```list``` method returns sane data. """ - items = [item for item in - self.handle.list( - self.test_fixtures_bucket, - "test_good_source_data/0", - )] - self.assertTrue(len(items) > 0) + items = list(self.handle.list( + self.test_fixtures_bucket, + "test_good_source_data/0", + )) + self.assertIn("test_good_source_data/0", items) + for item in items: + if item == "test_good_source_data/0": + break + else: + self.fail("did not find the requisite key") + + # fetch a bunch of items all at once. + items = list(self.handle.list( + self.test_fixtures_bucket, + "testList/prefix", + )) + self.assertEqual(len(items), 10) + + # this should fetch both testList/delimiter and testList/delimiter/test + items = list(self.handle.list( + self.test_fixtures_bucket, + "testList/delimiter", + )) + self.assertEqual(len(items), 2) + + # this should fetch only testList/delimiter + items = list(self.handle.list( + self.test_fixtures_bucket, + "testList/delimiter", + delimiter="/" + )) + self.assertEqual(len(items), 1) + + def testListV2(self): + """ + Ensure that the ```list_v2``` method returns sane data. + """ + items = list(self.handle.list_v2( + self.test_fixtures_bucket, + "test_good_source_data/0", + )) + self.assertIn("test_good_source_data/0", items) for item in items: if item == "test_good_source_data/0": break @@ -65,30 +101,76 @@ def testList(self): self.fail("did not find the requisite key") # fetch a bunch of items all at once. - items = [item for item in - self.handle.list( - self.test_fixtures_bucket, - "testList/prefix", - )] + items = list(self.handle.list_v2( + self.test_fixtures_bucket, + "testList/prefix", + )) + self.assertEqual(len(items), 10) + + # fetch a bunch of items all at once with small page size + items = list(self.handle.list_v2( + self.test_fixtures_bucket, + "testList/prefix", + k_page_max=3 + )) self.assertEqual(len(items), 10) # this should fetch both testList/delimiter and testList/delimiter/test - items = [item for item in - self.handle.list( - self.test_fixtures_bucket, - "testList/delimiter", - )] + items = list(self.handle.list_v2( + self.test_fixtures_bucket, + "testList/delimiter", + )) self.assertEqual(len(items), 2) # this should fetch only testList/delimiter - items = [item for item in - self.handle.list( - self.test_fixtures_bucket, - "testList/delimiter", - delimiter="/" - )] + items = list(self.handle.list_v2( + self.test_fixtures_bucket, + "testList/delimiter", + delimiter="/" + )) self.assertEqual(len(items), 1) + def testListV2Continuation(self): + self._testListV2Continuation(2, 3) + self._testListV2Continuation(2, 4) + + def _testListV2Continuation(self, page_size, break_size): + blobiter = self.handle.list_v2( + self.test_fixtures_bucket, + "testList/prefix", + k_page_max=page_size, + ) + + items1 = list() + items2 = list() + + for i, item in enumerate(blobiter): + items1.append( + item + ) + if i >= break_size - 1: + break + + blobiter = self.handle.list_v2( + self.test_fixtures_bucket, + "testList/prefix", + token=blobiter.token, + start_after_key=items1[-1], + k_page_max=page_size, + ) + + items2 = [item for item in blobiter] + + self.assertEqual(len(items1) + len(items2), 10) + + # starting from an unfound start_after_key should raise an error + with self.assertRaises(BlobPagingError): + [item for item in + self.handle.list_v2( + self.test_fixtures_bucket, + start_after_key="nonsensicalnonsene" + )] + def testGetPresignedUrl(self): presigned_url = self.handle.generate_presigned_GET_url( self.test_fixtures_bucket,