Skip to content

Commit

Permalink
feat: BI-5706 update_revision US param for file connections (#546)
Browse files Browse the repository at this point in the history
  • Loading branch information
ForrestGump authored Jul 19, 2024
1 parent 0d9c323 commit 4f65c3a
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 21 deletions.
33 changes: 19 additions & 14 deletions lib/dl_core/dl_core/united_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,33 +461,36 @@ def _req_data_move_entry(cls, entry_id: str, destination: str): # type: ignore
)

@classmethod
def _req_data_update_entry( # type: ignore # TODO: fix
def _req_data_update_entry(
cls,
entry_id: str,
data=None,
unversioned_data=None,
meta=None,
mode="publish",
lock=None,
hidden=None,
links=None,
):
data: Optional[dict[str, Any]] = None,
unversioned_data: Optional[dict[str, Any]] = None,
meta: Optional[dict[str, str]] = None,
mode: str = "publish",
lock: Optional[str] = None,
hidden: Optional[bool] = None,
links: Optional[dict[str, Any]] = None,
update_revision: Optional[bool] = None,
) -> RequestData:
data = data or {}
unversioned_data = unversioned_data or {}
meta = meta or {}
links = links or {}

json_data = {
json_data: dict[str, Any] = {
"data": data,
"unversionedData": unversioned_data,
"meta": meta,
"mode": mode,
"links": links,
}
if hidden is not None:
json_data.update(hidden=hidden)
json_data["hidden"] = hidden
if lock is not None:
json_data.update(lockToken=lock)
json_data["lockToken"] = lock
if update_revision is not None:
json_data["updateRevision"] = update_revision
return cls.RequestData(
method="post",
relative_url="/entries/{}".format(entry_id),
Expand Down Expand Up @@ -727,7 +730,7 @@ def get_entry(
def move_entry(self, entry_id, destination): # type: ignore # TODO: fix
return self._request(self._req_data_move_entry(entry_id, destination=destination))

def update_entry( # type: ignore # TODO: fix
def update_entry(
self,
entry_id: str,
data: Optional[dict[str, Any]] = None,
Expand All @@ -736,7 +739,8 @@ def update_entry( # type: ignore # TODO: fix
lock: Optional[str] = None,
hidden: Optional[bool] = None,
links: Optional[dict[str, Any]] = None,
):
update_revision: Optional[bool] = None,
) -> dict[str, Any]:
return self._request(
self._req_data_update_entry(
entry_id,
Expand All @@ -746,6 +750,7 @@ def update_entry( # type: ignore # TODO: fix
lock=lock,
hidden=hidden,
links=links,
update_revision=update_revision,
)
)

Expand Down
16 changes: 13 additions & 3 deletions lib/dl_core/dl_core/united_storage_client_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import time
from typing import (
Any,
AsyncGenerator,
Iterable,
NoReturn,
Expand Down Expand Up @@ -202,9 +203,17 @@ async def create_entry( # type: ignore # TODO: fix
)
return await self._request(rq_data)

async def update_entry( # type: ignore # TODO: fix
self, entry_id, data=None, unversioned_data=None, meta=None, lock=None, hidden=None, links=None
):
async def update_entry(
self,
entry_id: str,
data: Optional[dict[str, Any]] = None,
unversioned_data: Optional[dict[str, Any]] = None,
meta: Optional[dict[str, str]] = None,
lock: Optional[str] = None,
hidden: Optional[bool] = None,
links: Optional[dict[str, Any]] = None,
update_revision: Optional[bool] = None,
) -> dict[str, Any]:
return await self._request(
self._req_data_update_entry(
entry_id,
Expand All @@ -214,6 +223,7 @@ async def update_entry( # type: ignore # TODO: fix
lock=lock,
hidden=hidden,
links=links,
update_revision=update_revision,
)
)

Expand Down
4 changes: 3 additions & 1 deletion lib/dl_core/dl_core/us_manager/us_manager_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async def get_by_id(self, entry_id: str, expected_type: Type[_ENTRY_TV] = None)

return obj

async def save(self, entry: USEntry) -> None:
async def save(self, entry: USEntry, update_revision: Optional[bool] = None) -> None:
self.get_lifecycle_manager(entry=entry).pre_save_hook()

save_params = self._get_entry_save_params(entry)
Expand All @@ -154,6 +154,8 @@ async def save(self, entry: USEntry) -> None:
entry._stored_in_db = True
else:
# noinspection PyProtectedMember
save_params["update_revision"] = update_revision
assert entry.uuid is not None
resp = await self._us_client.update_entry(entry.uuid, lock=entry._lock, **save_params)

entry._us_resp = resp
Expand Down
3 changes: 2 additions & 1 deletion lib/dl_core/dl_core/us_manager/us_manager_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:

# CRUD
#
def save(self, entry: USEntry) -> None:
def save(self, entry: USEntry, update_revision: Optional[bool] = None) -> None:
lifecycle_manager = self.get_lifecycle_manager(entry=entry, service_registry=self._services_registry)
lifecycle_manager.pre_save_hook()

Expand All @@ -142,6 +142,7 @@ def save(self, entry: USEntry) -> None:
entry._stored_in_db = True
else:
# noinspection PyProtectedMember
save_params["update_revision"] = update_revision
resp = self._us_client.update_entry(
entry.uuid, lock=entry._lock, **save_params # type: ignore # TODO: fix
)
Expand Down
1 change: 1 addition & 0 deletions lib/dl_core/dl_core/us_manager/us_manager_sync_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def update_entry(
lock: Optional[str] = None,
hidden: Optional[bool] = None,
links: Optional[Dict[str, Any]] = None,
update_revision: Optional[bool] = None,
) -> Dict[str, Any]:
previous_resp = self._saved_entries[entry_id]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,11 @@ def _construct_insert_from_select_query(for_debug: bool = False) -> str:
**extra_dsrc_params,
)
conn.data.component_errors.remove_errors(id=dst_source_id)
await usm.save(conn)

if self.meta.exec_mode == TaskExecutionMode.UPDATE_AND_SAVE:
await usm.save(conn, update_revision=True)
else:
await usm.save(conn)

# sync source id with the connection to enable consistent work with dfile (e.g. polling)
src_source.id = dst_source_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async def _fail_connection_sources(self, connection_id: str) -> None:
details=error.details,
)

await self._usm.save(conn)
await self._usm.save(conn, update_revision=True)

for delete_file_task in delete_tasks:
await self._task_processor.schedule(delete_file_task)
Expand Down

0 comments on commit 4f65c3a

Please sign in to comment.