Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress bar for repack and pack_all_loose #171

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions disk_objectstore/container.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
The main implementation of the ``Container`` class of the object store.
"""

# pylint: disable=too-many-lines
import dataclasses
import io
Expand Down Expand Up @@ -1319,6 +1320,7 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
compress: Union[bool, CompressMode] = CompressMode.NO,
validate_objects: bool = True,
do_fsync: bool = True,
callback: Optional[Callable] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
callback: Optional[Callable] = None,
callback: Optional[Callable[[str, Any], None]] = None,

Please add some doc that describes how the callback looks like or at least refers to aiida-core aiida.common.progress_reporter.py.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why, when I put your line mypy send useless errors:

disk_objectstore/container.py:1408: error: Unexpected keyword argument "action"  [call-arg]
disk_objectstore/container.py:1408: error: Unexpected keyword argument "value"  [call-arg]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

marking the arguments with mypy_extensions should work. I only load it during TYPE_CHECKING as mypy is only a dev depedency

from collections.abc import Callable
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
    from mypy_extensions import Arg

def foo(callback: Callable[[Arg(str, "action"), Arg(Any, "value")], None]):
    callback(action="hello", value=None)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
callback: Optional[Callable] = None,
callback: Optional[Callable[Arg(str, "action"), Arg(Any, "value")], None]] = None,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the remaining import somewhere on the top

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agoscinski , it was really easy to make it work, took only 1 second :)

) -> None:
"""Pack all loose objects.
Expand Down Expand Up @@ -1393,6 +1395,17 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
# on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them,
# and deletion is deferred to a manual clean-up operation.

if callback:
callback(
action="init",
value={
"total": self.get_total_size()["total_size_loose"],
"description": "Packing loose objects",
},
)
# I wish this would show as MB, GB. In tqdm it's easy:
# just pass unit='B' and unit_scale=1024. But would callback accept **keywords?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you pass these arguments to your other PR in aiida-core in the function set_progress_bar_tqdm(..., unit=..., unit_scale=...)? You even make 'B', 'MB', 'GB' a user arg. You could also determine a reasonable value from the total size, but then you need to pass set_progress_bar_tqdm as local function in the callback as the total size is only known through the callback

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be even easier, since it is a partial function so one should be able to pass it to progress_reporter. You might need a check if the the progress_reporter is actually tqdm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agoscinski , this will be a bit tricky, because in that case we have to change the total value of the bar from items to byte.
And some of the thing has to be done in aiida-core, I feel like this makes disk-objectstore less and less transferable.

Copy link
Contributor

@agoscinski agoscinski Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be done fully in aiida-core in the create_callback function, but I can give it a go after this PR, if the progress bar is really not so readable. Maybe you are right and I just don't see the complexity for now.

Copy link
Contributor Author

@khsrali khsrali Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good suggestion @agoscinski . However that requires some change in aiida-core I opened an issue for future references : aiidateam/aiida-core#6564

I think for now, we can accept the way it is.


# Outer loop: this is used to continue when a new pack file needs to be created
while loose_objects:
# Store the last pack integer ID, needed to know later if I need to open a new pack
Expand Down Expand Up @@ -1469,6 +1482,11 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
# Appending for later bulk commit - see comments in add_streamed_objects_to_pack
obj_dicts.append(obj_dict)

if callback:
callback(
action="update",
value=obj_dict["size"],
)
# It's now time to write to the DB, in a single bulk operation (per pack)
if obj_dicts:
# Here I shouldn't need to do `OR IGNORE` as in `add_streamed_objects_to_pack`
Expand Down Expand Up @@ -1504,6 +1522,8 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
# HOWEVER, while this would work fine on Linux, there are concurrency issues both
# on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them,
# and deletion is deferred to a manual clean-up operation.
if callback:
callback(action="close", value=None)

def add_streamed_object_to_pack( # pylint: disable=too-many-arguments
self,
Expand Down Expand Up @@ -2578,7 +2598,11 @@ def delete_objects(self, hashkeys: List[str]) -> List[Union[str, Any]]:
# was deleted) should be considered as if the object has *not* been deleted
return list(deleted_loose.union(deleted_packed))

def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None:
def repack(
self,
compress_mode: CompressMode = CompressMode.KEEP,
callback: Optional[Callable] = None,
) -> None:
"""Perform a repack of all packed objects.
At the end, it also VACUUMs the DB to reclaim unused space and make
Expand All @@ -2589,11 +2613,14 @@ def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None:
:param compress_mode: see docstring of ``repack_pack``.
"""
for pack_id in self._list_packs():
self.repack_pack(pack_id, compress_mode=compress_mode)
self.repack_pack(pack_id, compress_mode=compress_mode, callback=callback)
self._vacuum()

def repack_pack( # pylint: disable=too-many-branches,too-many-statements
self, pack_id: str, compress_mode: CompressMode = CompressMode.KEEP
def repack_pack( # pylint: disable=too-many-branches,too-many-statements,too-many-locals
self,
pack_id: str,
compress_mode: CompressMode = CompressMode.KEEP,
callback: Optional[Callable] = None,
khsrali marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Perform a repack of a given pack object.
Expand Down Expand Up @@ -2628,6 +2655,14 @@ def repack_pack( # pylint: disable=too-many-branches,too-many-statements
obj_dicts = []
# At least one object. Let's repack. We have checked before that the
# REPACK_PACK_ID did not exist.
if callback:
callback(
action="init",
value={
"total": self.get_total_size()["total_size_packed"],
"description": f"Repack {pack_id}",
},
)
with self.lock_pack(
str(self._REPACK_PACK_ID), allow_repack_pack=True
) as write_pack_handle:
Expand Down Expand Up @@ -2725,13 +2760,17 @@ def repack_pack( # pylint: disable=too-many-branches,too-many-statements
# Appending for later bulk commit
# I will assume that all objects of a single pack fit in memory
obj_dicts.append(obj_dict)
if callback:
callback(action="update", value=obj_dict["size"])
# safe flush to disk seems to be a time consuming operation, but no easy way to include in the progress bar
safe_flush_to_disk(
write_pack_handle,
self._get_pack_path_from_pack_id(
self._REPACK_PACK_ID, allow_repack_pack=True
),
)

if callback:
callback(action="close", value=None)
# We are done with data transfer.
# At this stage we just have a new pack -1 (_REPACK_PACK_ID) but it is never referenced.
# Let us store the information in the DB.
Expand Down
Loading