Skip to content

Commit

Permalink
added progressbar for repack and pack_all_loose
Browse files Browse the repository at this point in the history
  • Loading branch information
khsrali committed May 21, 2024
1 parent e469ef4 commit 0f16565
Showing 1 changed file with 44 additions and 5 deletions.
49 changes: 44 additions & 5 deletions disk_objectstore/container.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
The main implementation of the ``Container`` class of the object store.
"""

# pylint: disable=too-many-lines
import dataclasses
import io
Expand Down Expand Up @@ -1319,6 +1320,7 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
compress: Union[bool, CompressMode] = CompressMode.NO,
validate_objects: bool = True,
do_fsync: bool = True,
callback: Optional[Callable] = None,
) -> None:
"""Pack all loose objects.
Expand Down Expand Up @@ -1393,6 +1395,17 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
# on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them,
# and deletion is deferred to a manual clean-up operation.

if callback:
callback(
action="init",
value={
"total": self.get_total_size()["total_size_loose"],
"description": "Packing loose objects",
},
)
# I wish this would show as MB, GB. In tqdm it's easy:
# just pass unit='B' and unit_scale=1024. But would callback accept **keywords?

# Outer loop: this is used to continue when a new pack file needs to be created
while loose_objects:
# Store the last pack integer ID, needed to know later if I need to open a new pack
Expand Down Expand Up @@ -1469,6 +1482,11 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
# Appending for later bulk commit - see comments in add_streamed_objects_to_pack
obj_dicts.append(obj_dict)

if callback:
callback(
action="update",
value=obj_dict["size"],
)
# It's now time to write to the DB, in a single bulk operation (per pack)
if obj_dicts:
# Here I shouldn't need to do `OR IGNORE` as in `add_streamed_objects_to_pack`
Expand Down Expand Up @@ -1504,6 +1522,8 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches,too-man
# HOWEVER, while this would work fine on Linux, there are concurrency issues both
# on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them,
# and deletion is deferred to a manual clean-up operation.
if callback:
callback(action="close", value=None)

def add_streamed_object_to_pack( # pylint: disable=too-many-arguments
self,
Expand Down Expand Up @@ -2578,7 +2598,11 @@ def delete_objects(self, hashkeys: List[str]) -> List[Union[str, Any]]:
# was deleted) should be considered as if the object has *not* been deleted
return list(deleted_loose.union(deleted_packed))

def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None:
def repack(
self,
compress_mode: CompressMode = CompressMode.KEEP,
callback: Optional[Callable] = None,
) -> None:
"""Perform a repack of all packed objects.
At the end, it also VACUUMs the DB to reclaim unused space and make
Expand All @@ -2589,11 +2613,14 @@ def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None:
:param compress_mode: see docstring of ``repack_pack``.
"""
for pack_id in self._list_packs():
self.repack_pack(pack_id, compress_mode=compress_mode)
self.repack_pack(pack_id, compress_mode=compress_mode, callback=callback)
self._vacuum()

def repack_pack( # pylint: disable=too-many-branches,too-many-statements
self, pack_id: str, compress_mode: CompressMode = CompressMode.KEEP
def repack_pack( # pylint: disable=too-many-branches,too-many-statements,too-many-locals
self,
pack_id: str,
compress_mode: CompressMode = CompressMode.KEEP,
callback: Optional[Callable] = None,
) -> None:
"""Perform a repack of a given pack object.
Expand Down Expand Up @@ -2628,6 +2655,14 @@ def repack_pack( # pylint: disable=too-many-branches,too-many-statements
obj_dicts = []
# At least one object. Let's repack. We have checked before that the
# REPACK_PACK_ID did not exist.
if callback:
callback(
action="init",
value={
"total": self.get_total_size()["total_size_packed"],
"description": f"Repack {pack_id}",
},
)
with self.lock_pack(
str(self._REPACK_PACK_ID), allow_repack_pack=True
) as write_pack_handle:
Expand Down Expand Up @@ -2725,13 +2760,17 @@ def repack_pack( # pylint: disable=too-many-branches,too-many-statements
# Appending for later bulk commit
# I will assume that all objects of a single pack fit in memory
obj_dicts.append(obj_dict)
if callback:
callback(action="update", value=obj_dict["size"])
# safe flush to disk seems to be a time consuming operation, but no easy way to include in the progress bar
safe_flush_to_disk(
write_pack_handle,
self._get_pack_path_from_pack_id(
self._REPACK_PACK_ID, allow_repack_pack=True
),
)

if callback:
callback(action="close", value=None)
# We are done with data transfer.
# At this stage we just have a new pack -1 (_REPACK_PACK_ID) but it is never referenced.
# Let us store the information in the DB.
Expand Down

0 comments on commit 0f16565

Please sign in to comment.