Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support slice_idin visit request #1036

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions tests/unit/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,56 @@ def test_visit(self):
"&timeout=200s"
) in urls

def test_visit_slice_id(self):
app = Vespa(url="http://localhost", port=8080)
with requests_mock.Mocker() as m:
m.get(
"http://localhost:8080/ApplicationStatus",
status_code=200,
)
m.get(
"http://localhost:8080/document/v1/foo/foo/docid/",
[
{"json": {"continuation": "AAA"}, "status_code": 200},
{"json": {}, "status_code": 200},
],
)

results = []
for slice in app.visit(
schema="foo",
namespace="foo",
content_cluster_name="content",
timeout="200s",
slices=10,
slice_id=2,
):
for response in slice:
results.append(response)
assert len(results) == 2

urls = [response.url for response in results]
assert (
"http://localhost:8080/document/v1/foo/foo/docid/"
"?cluster=content"
"&selection=true"
"&wantedDocumentCount=500"
"&slices=10"
"&sliceId=2"
"&timeout=200s"
"&continuation=AAA"
) in urls

assert (
"http://localhost:8080/document/v1/foo/foo/docid/"
"?cluster=content"
"&selection=true"
"&wantedDocumentCount=500"
"&slices=10"
"&sliceId=2"
"&timeout=200s"
) in urls


class TestVespa(unittest.TestCase):
def test_end_point(self):
Expand Down
25 changes: 21 additions & 4 deletions vespa/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ def visit(
slices: int = 1,
selection: str = "true",
wanted_document_count: int = 500,
slice_id: int = -1,
**kwargs,
) -> Generator[Generator[VespaVisitResponse, None, None], None, None]:
"""
Expand All @@ -875,7 +876,9 @@ def visit(
:param schema: The schema that we are visiting data from.
:param namespace: The namespace that we are visiting data from.
:param slices: Number of slices to use for parallel GET.
:param selection: Selection expression to filter documents.
:param wanted_document_count: Best effort number of documents to retrieve for each request. May contain less if there are not enough documents left.
:param slice_id: Slice id to use for the visit. If -1, all slices will be used.
:param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters)
:return: A generator of slices, each containing a generator of responses.
:raises HTTPError: if one occurred
Expand All @@ -889,6 +892,7 @@ def visit(
slices=slices,
selection=selection,
wanted_document_count=wanted_document_count,
slice_id=slice_id,
**kwargs,
)

Expand Down Expand Up @@ -1410,6 +1414,7 @@ def visit(
slices: int = 1,
selection: str = "true",
wanted_document_count: int = 500,
slice_id: int = -1,
**kwargs,
) -> Generator[Generator[VespaVisitResponse, None, None], None, None]:
"""
Expand All @@ -1423,6 +1428,8 @@ def visit(
:param namespace: The namespace that we are visiting data from.
:param slices: Number of slices to use for parallel GET.
:param wanted_document_count: Best effort number of documents to retrieve for each request. May contain less if there are not enough documents left.
:param selection: Selection expression to use. Defaults to "true". See https://docs.vespa.ai/en/reference/document-select-language.html
:param slice_id: Slice id to use. Defaults to -1, which means all slices.
:param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters)
:return: A generator of slices, each containing a generator of responses.
:raises HTTPError: if one occurred
Expand All @@ -1442,6 +1449,11 @@ def visit(
self.app.end_point,
target,
)
# Validate that if slice_id is set, slice_id is in range [0, slices)
if slice_id >= 0 and slice_id >= slices:
raise ValueError(
f"slice_id must be in range [0, {slices - 1}] or -1. Got {slice_id} instead."
)

@retry(retry=retry_if_exception_type(HTTPError), stop=stop_after_attempt(3))
def visit_request(end_point: str, params: Dict[str, str]):
Expand Down Expand Up @@ -1469,10 +1481,15 @@ def visit_slice(slice_id):
else:
break

with ThreadPoolExecutor(max_workers=slices) as executor:
futures = [executor.submit(visit_slice, slice) for slice in range(slices)]
for future in as_completed(futures):
yield future.result()
if slice_id == -1:
with ThreadPoolExecutor(max_workers=slices) as executor:
futures = [
executor.submit(visit_slice, slice) for slice in range(slices)
]
for future in as_completed(futures):
yield future.result()
else:
yield visit_slice(slice_id)

def get_data(
self,
Expand Down