diff --git a/lib/dl_core/dl_core/united_storage_client.py b/lib/dl_core/dl_core/united_storage_client.py index 197552256..0eda4b9a6 100644 --- a/lib/dl_core/dl_core/united_storage_client.py +++ b/lib/dl_core/dl_core/united_storage_client.py @@ -461,23 +461,24 @@ def _req_data_move_entry(cls, entry_id: str, destination: str): # type: ignore ) @classmethod - def _req_data_update_entry( # type: ignore # TODO: fix + def _req_data_update_entry( cls, entry_id: str, - data=None, - unversioned_data=None, - meta=None, - mode="publish", - lock=None, - hidden=None, - links=None, - ): + data: Optional[dict[str, Any]] = None, + unversioned_data: Optional[dict[str, Any]] = None, + meta: Optional[dict[str, str]] = None, + mode: str = "publish", + lock: Optional[str] = None, + hidden: Optional[bool] = None, + links: Optional[dict[str, Any]] = None, + update_revision: Optional[bool] = None, + ) -> RequestData: data = data or {} unversioned_data = unversioned_data or {} meta = meta or {} links = links or {} - json_data = { + json_data: dict[str, Any] = { "data": data, "unversionedData": unversioned_data, "meta": meta, @@ -485,9 +486,11 @@ def _req_data_update_entry( # type: ignore # TODO: fix "links": links, } if hidden is not None: - json_data.update(hidden=hidden) + json_data["hidden"] = hidden if lock is not None: - json_data.update(lockToken=lock) + json_data["lockToken"] = lock + if update_revision is not None: + json_data["updateRevision"] = update_revision return cls.RequestData( method="post", relative_url="/entries/{}".format(entry_id), @@ -727,7 +730,7 @@ def get_entry( def move_entry(self, entry_id, destination): # type: ignore # TODO: fix return self._request(self._req_data_move_entry(entry_id, destination=destination)) - def update_entry( # type: ignore # TODO: fix + def update_entry( self, entry_id: str, data: Optional[dict[str, Any]] = None, @@ -736,7 +739,8 @@ def update_entry( # type: ignore # TODO: fix lock: Optional[str] = None, hidden: Optional[bool] = None, links: Optional[dict[str, Any]] = None, - ): + update_revision: Optional[bool] = None, + ) -> dict[str, Any]: return self._request( self._req_data_update_entry( entry_id, @@ -746,6 +750,7 @@ def update_entry( # type: ignore # TODO: fix lock=lock, hidden=hidden, links=links, + update_revision=update_revision, ) ) diff --git a/lib/dl_core/dl_core/united_storage_client_aio.py b/lib/dl_core/dl_core/united_storage_client_aio.py index 97b6070e0..c9998e41a 100644 --- a/lib/dl_core/dl_core/united_storage_client_aio.py +++ b/lib/dl_core/dl_core/united_storage_client_aio.py @@ -5,6 +5,7 @@ import logging import time from typing import ( + Any, AsyncGenerator, Iterable, NoReturn, @@ -202,9 +203,17 @@ async def create_entry( # type: ignore # TODO: fix ) return await self._request(rq_data) - async def update_entry( # type: ignore # TODO: fix - self, entry_id, data=None, unversioned_data=None, meta=None, lock=None, hidden=None, links=None - ): + async def update_entry( + self, + entry_id: str, + data: Optional[dict[str, Any]] = None, + unversioned_data: Optional[dict[str, Any]] = None, + meta: Optional[dict[str, str]] = None, + lock: Optional[str] = None, + hidden: Optional[bool] = None, + links: Optional[dict[str, Any]] = None, + update_revision: Optional[bool] = None, + ) -> dict[str, Any]: return await self._request( self._req_data_update_entry( entry_id, @@ -214,6 +223,7 @@ async def update_entry( # type: ignore # TODO: fix lock=lock, hidden=hidden, links=links, + update_revision=update_revision, ) ) diff --git a/lib/dl_core/dl_core/us_manager/us_manager_async.py b/lib/dl_core/dl_core/us_manager/us_manager_async.py index 039af7965..1323b7fe6 100644 --- a/lib/dl_core/dl_core/us_manager/us_manager_async.py +++ b/lib/dl_core/dl_core/us_manager/us_manager_async.py @@ -131,7 +131,7 @@ async def get_by_id(self, entry_id: str, expected_type: Type[_ENTRY_TV] = None) return obj - async def save(self, entry: USEntry) -> None: + async def save(self, entry: USEntry, update_revision: Optional[bool] = None) -> None: self.get_lifecycle_manager(entry=entry).pre_save_hook() save_params = self._get_entry_save_params(entry) @@ -154,6 +154,8 @@ async def save(self, entry: USEntry) -> None: entry._stored_in_db = True else: # noinspection PyProtectedMember + save_params["update_revision"] = update_revision + assert entry.uuid is not None resp = await self._us_client.update_entry(entry.uuid, lock=entry._lock, **save_params) entry._us_resp = resp diff --git a/lib/dl_core/dl_core/us_manager/us_manager_sync.py b/lib/dl_core/dl_core/us_manager/us_manager_sync.py index c4126eac1..f51b9f519 100644 --- a/lib/dl_core/dl_core/us_manager/us_manager_sync.py +++ b/lib/dl_core/dl_core/us_manager/us_manager_sync.py @@ -119,7 +119,7 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: # CRUD # - def save(self, entry: USEntry) -> None: + def save(self, entry: USEntry, update_revision: Optional[bool] = None) -> None: lifecycle_manager = self.get_lifecycle_manager(entry=entry, service_registry=self._services_registry) lifecycle_manager.pre_save_hook() @@ -142,6 +142,7 @@ def save(self, entry: USEntry) -> None: entry._stored_in_db = True else: # noinspection PyProtectedMember + save_params["update_revision"] = update_revision resp = self._us_client.update_entry( entry.uuid, lock=entry._lock, **save_params # type: ignore # TODO: fix ) diff --git a/lib/dl_core/dl_core/us_manager/us_manager_sync_mock.py b/lib/dl_core/dl_core/us_manager/us_manager_sync_mock.py index 61f773029..949cc6dbd 100644 --- a/lib/dl_core/dl_core/us_manager/us_manager_sync_mock.py +++ b/lib/dl_core/dl_core/us_manager/us_manager_sync_mock.py @@ -108,6 +108,7 @@ def update_entry( lock: Optional[str] = None, hidden: Optional[bool] = None, links: Optional[Dict[str, Any]] = None, + update_revision: Optional[bool] = None, ) -> Dict[str, Any]: previous_resp = self._saved_entries[entry_id] diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py index 22a9c174a..94eecb3ad 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py @@ -290,7 +290,11 @@ def _construct_insert_from_select_query(for_debug: bool = False) -> str: **extra_dsrc_params, ) conn.data.component_errors.remove_errors(id=dst_source_id) - await usm.save(conn) + + if self.meta.exec_mode == TaskExecutionMode.UPDATE_AND_SAVE: + await usm.save(conn, update_revision=True) + else: + await usm.save(conn) # sync source id with the connection to enable consistent work with dfile (e.g. polling) src_source.id = dst_source_id diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/connection_error_tracker.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/connection_error_tracker.py index 468c4a924..0c18b5a4a 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/connection_error_tracker.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/connection_error_tracker.py @@ -125,7 +125,7 @@ async def _fail_connection_sources(self, connection_id: str) -> None: details=error.details, ) - await self._usm.save(conn) + await self._usm.save(conn, update_revision=True) for delete_file_task in delete_tasks: await self._task_processor.schedule(delete_file_task)