Skip to content

Commit

Permalink
feat: BI-5711 switch to the new inter-tenant handler for entries iter…
Browse files Browse the repository at this point in the history
…ation (#583)

* feat: BI-5711 switch to the new inter-tenant handler for entries iteration

* Revert typing, add dataset publicity checker test

* fix response processing & test

* mypy

* 2-in-1 pagination

* mypy + refactor
  • Loading branch information
KonstantAnxiety authored Nov 12, 2024
1 parent 7b8e8c2 commit 7b603c2
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 39 deletions.
11 changes: 11 additions & 0 deletions lib/dl_api_lib/dl_api_lib_tests/db/control_api/test_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,14 @@ def test_datasets_publicity_checker_connection_not_allowed(

assert response.status_code == 200
assert response.json["result"] == expected_resp

def test_dataset_publicity_checker(self, client, dataset_id):
resp = client.post(
"/api/v1/info/datasets_publicity_checker",
content_type="application/json",
data=json.dumps(dict(datasets=[dataset_id])),
)
assert resp.status_code == 200
resp_data = resp.json
assert "result" in resp_data
assert len(resp_data["result"]) == 1 and resp_data["result"][0]["allowed"] == True, resp_data
88 changes: 70 additions & 18 deletions lib/dl_core/dl_core/united_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import abc
from contextlib import contextmanager
from datetime import (
datetime,
timedelta,
timezone,
)
from json.decoder import JSONDecodeError
import logging
import re
Expand Down Expand Up @@ -317,6 +322,13 @@ def set_tenant_override(self, tenant: TenantDef) -> None:
f"US client folder ID is immutable with auth context {type(self._auth_ctx).__qualname__}"
)

@staticmethod
def parse_datetime(dt: str) -> datetime:
# TODO: remove after migrating to python 3.11 or above
if dt[-1] == "Z":
return datetime.fromisoformat(dt.removesuffix("Z")).replace(tzinfo=timezone.utc)
return datetime.fromisoformat(dt)

@staticmethod
def _auth_ctx_to_default_headers(ctx: USAuthContextBase) -> dict[str, str]:
headers: dict[DLHeaders, str] = {
Expand Down Expand Up @@ -511,14 +523,16 @@ def _req_data_iter_entries( # type: ignore # TODO: fix
cls,
scope: str,
entry_type: Optional[str] = None,
meta: Optional[dict] = None,
meta: Optional[dict[str, str]] = None,
all_tenants: bool = False,
include_data: bool = False,
ids: Optional[Iterable[str]] = None,
page: int = 0,
creation_time: Optional[dict[str, Union[str, int, None]]] = None,
page: int = 0,
created_at_from: float = 0,
limit: Optional[int] = None,
) -> RequestData:
req_params = dict(scope=scope, includeData=int(include_data))
req_params: dict[Any, Any] = dict(scope=scope, includeData=int(include_data))
if entry_type:
req_params.update(type=entry_type)
meta = meta or {}
Expand All @@ -528,18 +542,22 @@ def _req_data_iter_entries( # type: ignore # TODO: fix
req_params.update({"creationTime[{}]".format(k): v for k, v in creation_time.items()})
if ids:
req_params["ids"] = ids
if limit:
req_params["limit"] = limit

if all_tenants:
assert include_data is False # not supported for this endpoint
endpoint = "/interTenant/entries"
assert creation_time is None # not supported for this endpoint
req_params["createdAtFrom"] = created_at_from
endpoint = "/inter-tenant/entries"
else:
req_params["page"] = page
endpoint = "/entries"

params: dict[Any, Any] = dict(page=page, **req_params)
return cls.RequestData(
method="get",
relative_url=endpoint,
params=params,
params=req_params,
json=None,
)

Expand Down Expand Up @@ -764,8 +782,13 @@ def entries_iterator(
include_data: bool = False,
ids: Optional[Iterable[str]] = None,
creation_time: Optional[dict[str, Union[str, int, None]]] = None,
limit: Optional[int] = None,
) -> Generator[dict, None, None]:
"""
implements 2-in-1 pagination:
- by page number (in this case entries are returned from the US along with a nextPageToken)
- by creation time (entries are returned as a list ordered by creation time)
:param scope:
:param entry_type:
:param meta: Filter entries by "meta" section values.
Expand All @@ -775,8 +798,15 @@ def entries_iterator(
:param creation_time: Filter entries by creation_time. Available filters: eq, ne, gt, gte, lt, lte
:return:
"""
page = 0
while True:
created_at_from: datetime = datetime(1970, 1, 1) # for creation time pagination
previous_page_entry_ids = set() # for deduplication
page: int = 0 # for page number pagination

done = False
while not done:
# 1. Prepare and make request
created_at_from_ts = created_at_from.timestamp()
unseen_entry_ids = set()
resp = self._request(
self._req_data_iter_entries(
scope,
Expand All @@ -785,22 +815,44 @@ def entries_iterator(
all_tenants=all_tenants,
include_data=include_data,
ids=ids,
page=page,
creation_time=creation_time,
page=page,
created_at_from=created_at_from_ts,
limit=limit,
)
)
if resp.get("entries"):
page_entries = resp["entries"]

# 2. Deal with pagination
page_entries: list
if isinstance(resp, list):
page_entries = resp
if page_entries:
created_at_from = self.parse_datetime(page_entries[-1]["createdAt"]) - timedelta(milliseconds=1)
# minus 1 ms to account for cases where entries, created during a single millisecond, happen to
# return on the border of two batches (one in batch 1 and the other in batch 2),
# hence the deduplication
else:
LOGGER.info("Got an empty entries list in the US response, the listing is completed")
done = True
else:
break
page_entries = resp.get("entries", [])
if resp.get("nextPageToken"):
page = resp["nextPageToken"]
else:
LOGGER.info("Got no nextPageToken in the US response, the listing is completed")
done = True

# 3. Yield results
for entr in page_entries:
yield entr

if resp.get("nextPageToken"):
page = resp["nextPageToken"]
else:
break
if entr["entryId"] not in previous_page_entry_ids:
unseen_entry_ids.add(entr["entryId"])
yield entr

# 4. Stop if got no nextPageToken or unseen entries
previous_page_entry_ids = unseen_entry_ids.copy()
if not unseen_entry_ids:
LOGGER.info("US response is not empty, but we got no unseen entries, assuming the listing is completed")
done = True

def delete_entry(self, entry_id, lock=None): # type: ignore # TODO: fix
self._request(self._req_data_delete_entry(entry_id, lock=lock))
Expand Down
81 changes: 60 additions & 21 deletions lib/dl_core/dl_core/united_storage_client_aio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from __future__ import annotations

import asyncio
from datetime import (
datetime,
timedelta,
)
import json
import logging
import time
Expand Down Expand Up @@ -239,42 +243,77 @@ async def entries_iterator(
include_data: bool = False,
ids: Optional[Iterable[str]] = None,
creation_time: Optional[dict[str, Union[str, int, None]]] = None,
limit: Optional[int] = None,
) -> AsyncGenerator[dict, None]:
page = 0
while True:
"""
implements 2-in-1 pagination:
- by page number (in this case entries are returned from the US along with a nextPageToken)
- by creation time (entries are returned as a list ordered by creation time)
:param scope:
:param entry_type:
:param meta: Filter entries by "meta" section values.
:param all_tenants: Look up across all tenants. False by default.
:param include_data: Return full US entry data. False by default.
:param ids: Filter entries by uuid.
:param creation_time: Filter entries by creation_time. Available filters: eq, ne, gt, gte, lt, lte
:return:
"""
created_at_from: datetime = datetime(1970, 1, 1) # for creation time pagination
previous_page_entry_ids = set() # for deduplication
page: int = 0 # for page number pagination

done = False
while not done:
# 1. Prepare and make request
created_at_from_ts = created_at_from.timestamp()
unseen_entry_ids = set()
resp = await self._request(
self._req_data_iter_entries(
scope,
entry_type=entry_type,
meta=meta,
all_tenants=all_tenants,
page=page,
include_data=include_data,
ids=ids,
creation_time=creation_time,
page=page,
created_at_from=created_at_from_ts,
limit=limit,
)
)
if resp.get("entries"):
page_entries = resp["entries"]
else:
break

for entr in page_entries:
yield entr

if resp.get("nextPageToken"):
page = resp["nextPageToken"]
# 2. Deal with pagination
page_entries: list
if isinstance(resp, list):
page_entries = resp
if page_entries:
created_at_from = self.parse_datetime(page_entries[-1]["createdAt"]) - timedelta(milliseconds=1)
# minus 1 ms to account for cases where entries, created during a single millisecond, happen to
# return on the border of two batches (one in batch 1 and the other in batch 2),
# hence the deduplication
else:
LOGGER.info("Got an empty entries list in the US response, the listing is completed")
done = True
else:
break

async def list_all_entries(
self, scope: str, entry_type: Optional[str] = None, meta: Optional[dict] = None, all_tenants: bool = False
) -> list:
ret = []
async for e in self.entries_iterator(scope, entry_type, meta, all_tenants, include_data=False):
ret.append(e)
page_entries = resp.get("entries", [])
if resp.get("nextPageToken"):
page = resp["nextPageToken"]
else:
LOGGER.info("Got no nextPageToken in the US response, the listing is completed")
done = True

return ret
# 3. Yield results
for entr in page_entries:
if entr["entryId"] not in previous_page_entry_ids:
unseen_entry_ids.add(entr["entryId"])
yield entr

# 4. Stop if got no nextPageToken or unseen entries
previous_page_entry_ids = unseen_entry_ids.copy()
if not unseen_entry_ids:
LOGGER.info("US response is not empty, but we got no unseen entries, assuming the listing is completed")
done = True

async def acquire_lock(
self,
Expand Down
2 changes: 2 additions & 0 deletions lib/dl_core/dl_core/us_manager/us_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class CryptoKeyInfo:


class USManagerBase:
ITER_ENTRIES_PAGE_SIZE: ClassVar[int] = 1500

_MAP_TYPE_TO_SCHEMA: ClassVar[ChainMapGeneric[Type[BaseAttrsDataModel], Type[marshmallow.Schema]]] = ChainMap(
MAP_TYPE_TO_SCHEMA_MAP_TYPE_TO_SCHEMA, # type: ignore # TODO: fix
{
Expand Down
2 changes: 2 additions & 0 deletions lib/dl_core/dl_core/us_manager/us_manager_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ def get_raw_collection(
all_tenants=all_tenants,
creation_time=creation_time,
include_data=False,
limit=self.ITER_ENTRIES_PAGE_SIZE,
)

async def get_collection(
Expand Down Expand Up @@ -330,6 +331,7 @@ async def get_collection(
include_data=include_data,
ids=ids,
creation_time=creation_time,
limit=self.ITER_ENTRIES_PAGE_SIZE,
)

async for us_resp in us_entry_iterator:
Expand Down
2 changes: 2 additions & 0 deletions lib/dl_core/dl_core/us_manager/us_manager_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def get_collection(
include_data=include_data,
ids=ids,
creation_time=creation_time,
limit=self.ITER_ENTRIES_PAGE_SIZE,
)

for us_resp in us_entry_iterator:
Expand All @@ -251,6 +252,7 @@ def get_raw_collection(
entry_type=entry_type,
all_tenants=all_tenants,
include_data=False,
limit=self.ITER_ENTRIES_PAGE_SIZE,
)

def move(self, entry: USEntry, destination: str) -> None:
Expand Down

0 comments on commit 7b603c2

Please sign in to comment.