diff --git a/tests/unit/test_application.py b/tests/unit/test_application.py index 4528808a..0c8e7dd3 100644 --- a/tests/unit/test_application.py +++ b/tests/unit/test_application.py @@ -166,6 +166,56 @@ def test_visit(self): "&timeout=200s" ) in urls + def test_visit_slice_id(self): + app = Vespa(url="http://localhost", port=8080) + with requests_mock.Mocker() as m: + m.get( + "http://localhost:8080/ApplicationStatus", + status_code=200, + ) + m.get( + "http://localhost:8080/document/v1/foo/foo/docid/", + [ + {"json": {"continuation": "AAA"}, "status_code": 200}, + {"json": {}, "status_code": 200}, + ], + ) + + results = [] + for slice in app.visit( + schema="foo", + namespace="foo", + content_cluster_name="content", + timeout="200s", + slices=10, + slice_id=2, + ): + for response in slice: + results.append(response) + assert len(results) == 2 + + urls = [response.url for response in results] + assert ( + "http://localhost:8080/document/v1/foo/foo/docid/" + "?cluster=content" + "&selection=true" + "&wantedDocumentCount=500" + "&slices=10" + "&sliceId=2" + "&timeout=200s" + "&continuation=AAA" + ) in urls + + assert ( + "http://localhost:8080/document/v1/foo/foo/docid/" + "?cluster=content" + "&selection=true" + "&wantedDocumentCount=500" + "&slices=10" + "&sliceId=2" + "&timeout=200s" + ) in urls + class TestVespa(unittest.TestCase): def test_end_point(self): diff --git a/vespa/application.py b/vespa/application.py index 89881f1c..80bc1b28 100644 --- a/vespa/application.py +++ b/vespa/application.py @@ -857,6 +857,7 @@ def visit( slices: int = 1, selection: str = "true", wanted_document_count: int = 500, + slice_id: int = -1, **kwargs, ) -> Generator[Generator[VespaVisitResponse, None, None], None, None]: """ @@ -875,7 +876,9 @@ def visit( :param schema: The schema that we are visiting data from. :param namespace: The namespace that we are visiting data from. :param slices: Number of slices to use for parallel GET. + :param selection: Selection expression to filter documents. :param wanted_document_count: Best effort number of documents to retrieve for each request. May contain less if there are not enough documents left. + :param slice_id: Slice id to use for the visit. If -1, all slices will be used. :param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters) :return: A generator of slices, each containing a generator of responses. :raises HTTPError: if one occurred @@ -889,6 +892,7 @@ def visit( slices=slices, selection=selection, wanted_document_count=wanted_document_count, + slice_id=slice_id, **kwargs, ) @@ -1410,6 +1414,7 @@ def visit( slices: int = 1, selection: str = "true", wanted_document_count: int = 500, + slice_id: int = -1, **kwargs, ) -> Generator[Generator[VespaVisitResponse, None, None], None, None]: """ @@ -1423,6 +1428,8 @@ def visit( :param namespace: The namespace that we are visiting data from. :param slices: Number of slices to use for parallel GET. :param wanted_document_count: Best effort number of documents to retrieve for each request. May contain less if there are not enough documents left. + :param selection: Selection expression to use. Defaults to "true". See https://docs.vespa.ai/en/reference/document-select-language.html + :param slice_id: Slice id to use. Defaults to -1, which means all slices. :param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters) :return: A generator of slices, each containing a generator of responses. :raises HTTPError: if one occurred @@ -1442,6 +1449,11 @@ def visit( self.app.end_point, target, ) + # Validate that if slice_id is set, slice_id is in range [0, slices) + if slice_id >= 0 and slice_id >= slices: + raise ValueError( + f"slice_id must be in range [0, {slices - 1}] or -1. Got {slice_id} instead." + ) @retry(retry=retry_if_exception_type(HTTPError), stop=stop_after_attempt(3)) def visit_request(end_point: str, params: Dict[str, str]): @@ -1469,10 +1481,15 @@ def visit_slice(slice_id): else: break - with ThreadPoolExecutor(max_workers=slices) as executor: - futures = [executor.submit(visit_slice, slice) for slice in range(slices)] - for future in as_completed(futures): - yield future.result() + if slice_id == -1: + with ThreadPoolExecutor(max_workers=slices) as executor: + futures = [ + executor.submit(visit_slice, slice) for slice in range(slices) + ] + for future in as_completed(futures): + yield future.result() + else: + yield visit_slice(slice_id) def get_data( self,