Skip to content

Commit 1531201

Browse files
author
Thiago G. Martins
authored
Merge pull request #189 from vespa-engine/tgm/delete-batch-async
Implement delete_batch with async mode as default
2 parents 6acc7b5 + 2efd888 commit 1531201

File tree

2 files changed

+85
-10
lines changed

2 files changed

+85
-10
lines changed

vespa/application.py

+48-7
Original file line numberDiff line numberDiff line change
@@ -270,14 +270,45 @@ def delete_data(self, schema: str, data_id: str) -> VespaResponse:
270270
with VespaSync(self) as sync_app:
271271
return sync_app.delete_data(schema=schema, data_id=data_id)
272272

273-
def delete_batch(self, batch: List):
273+
def _delete_batch_sync(self, schema: str, batch: List[Dict]):
274+
return [self.delete_data(schema, data_point["id"]) for data_point in batch]
275+
276+
async def _delete_batch_async(
277+
self, schema: str, batch: List[Dict], connections, total_timeout
278+
):
279+
async with VespaAsync(
280+
app=self, connections=connections, total_timeout=total_timeout
281+
) as async_app:
282+
return await async_app.delete_batch(schema=schema, batch=batch)
283+
284+
def delete_batch(
285+
self,
286+
schema: str,
287+
batch: List[Dict],
288+
asynchronous=True,
289+
connections: Optional[int] = 100,
290+
total_timeout: int = 100,
291+
):
274292
"""
275-
Async delete a batch of data from a Vespa app.
293+
Delete a batch of data from a Vespa app.
276294
277-
:param batch: A list of tuples with 'schema' and 'id'
278-
:return:
295+
:param schema: The schema that we are deleting data from.
296+
:param batch: A list of dict containing the key 'id'.
297+
:param asynchronous: Set True to get data in async mode. Default to True.
298+
:param connections: Number of allowed concurrent connections, valid only if `asynchronous=True`.
299+
:param total_timeout: Total timeout in secs for each of the concurrent requests when using `asynchronous=True`.
300+
:return: List of HTTP POST responses
279301
"""
280-
return [self.delete_data(schema, id) for schema, id in batch]
302+
if asynchronous:
303+
coro = self._delete_batch_async(
304+
schema=schema,
305+
batch=batch,
306+
connections=connections,
307+
total_timeout=total_timeout,
308+
)
309+
return self._check_for_running_loop_and_run_coroutine(coro=coro)
310+
else:
311+
return self._delete_batch_sync(schema=schema, batch=batch)
281312

282313
def delete_all_docs(self, content_cluster_name: str, schema: str) -> Response:
283314
"""
@@ -977,8 +1008,18 @@ async def delete_data(self, schema: str, data_id: str) -> VespaResponse:
9771008
operation_type="delete",
9781009
)
9791010

980-
async def delete_batch(self, batch):
981-
return await self._wait(self.delete_data, batch)
1011+
async def _delete_data_semaphore(
1012+
self, schema: str, data_id: str, semaphore: asyncio.Semaphore
1013+
):
1014+
async with semaphore:
1015+
return await self.delete_data(schema=schema, data_id=data_id)
1016+
1017+
async def delete_batch(self, schema: str, batch: List[Dict]):
1018+
sem = asyncio.Semaphore(self.connections)
1019+
return await self._wait(
1020+
self._delete_data_semaphore,
1021+
[(schema, data_point["id"], sem) for data_point in batch],
1022+
)
9821023

9831024
async def get_data(self, schema: str, data_id: str):
9841025
end_point = "{}/document/v1/{}/{}/docid/{}".format(

vespa/test_integration_docker.py

+37-3
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,23 @@ def batch_operations_synchronous_mode(
648648
expected_updated_fields.update(fields_to_update[idx])
649649
self.assertDictEqual(response.json["fields"], expected_updated_fields)
650650

651+
#
652+
# Delete data
653+
#
654+
result = app.delete_batch(schema=schema, batch=docs, asynchronous=False)
655+
for idx, response in enumerate(result):
656+
self.assertEqual(
657+
response.json["id"],
658+
"id:{}:{}::{}".format(schema, schema, docs[idx]["id"]),
659+
)
660+
661+
#
662+
# get batch deleted data
663+
#
664+
result = app.get_batch(schema=schema, batch=docs, asynchronous=False)
665+
for idx, response in enumerate(result):
666+
self.assertEqual(response.status_code, 404)
667+
651668
def batch_operations_asynchronous_mode(
652669
self,
653670
app,
@@ -724,6 +741,23 @@ def batch_operations_asynchronous_mode(
724741
expected_updated_fields.update(fields_to_update[idx])
725742
self.assertDictEqual(response.json["fields"], expected_updated_fields)
726743

744+
#
745+
# Delete data
746+
#
747+
result = app.delete_batch(schema=schema, batch=docs, asynchronous=True)
748+
for idx, response in enumerate(result):
749+
self.assertEqual(
750+
response.json["id"],
751+
"id:{}:{}::{}".format(schema, schema, docs[idx]["id"]),
752+
)
753+
754+
#
755+
# get batch deleted data
756+
#
757+
result = app.get_batch(schema=schema, batch=docs, asynchronous=True)
758+
for idx, response in enumerate(result):
759+
self.assertEqual(response.status_code, 404)
760+
727761
@staticmethod
728762
def _parse_vespa_tensor(hit, feature):
729763
return [x["value"] for x in hit["fields"]["summaryfeatures"][feature]["cells"]]
@@ -911,7 +945,7 @@ def test_batch_operations_synchronous_mode(self):
911945
schema_name=self.app_package.name,
912946
fields_to_send=self.fields_to_send,
913947
expected_fields_from_get_operation=self.fields_to_send,
914-
fields_to_update=self.fields_to_update
948+
fields_to_update=self.fields_to_update,
915949
)
916950

917951
def test_batch_operations_asynchronous_mode(self):
@@ -1004,7 +1038,7 @@ def test_batch_operations_synchronous_mode(self):
10041038
schema_name=self.app_package.name,
10051039
fields_to_send=self.fields_to_send,
10061040
expected_fields_from_get_operation=self.expected_fields_from_get_operation,
1007-
fields_to_update=self.fields_to_update
1041+
fields_to_update=self.fields_to_update,
10081042
)
10091043

10101044
def test_batch_operations_asynchronous_mode(self):
@@ -1113,7 +1147,7 @@ def test_batch_operations_synchronous_mode(self):
11131147
schema_name="sentence",
11141148
fields_to_send=self.fields_to_send_sentence,
11151149
expected_fields_from_get_operation=self.expected_fields_from_sentence_get_operation,
1116-
fields_to_update=self.fields_to_update
1150+
fields_to_update=self.fields_to_update,
11171151
)
11181152

11191153
def test_batch_operations_asynchronous_mode(self):

0 commit comments

Comments
 (0)