Skip to content

Commit aa502bd

Browse files
committed
[Storage] Garbage collector CLI
Added a new command to the CCN operator CLI to run a garbage collector on local storage. The new `gc run` command lists all the files that are not linked to any message or permanent pin and deletes them. Using the --verbose option, the command will print more details on the files it will preserve and delete. The --dry-run option allows to run the GC without actually deleting any file.
1 parent 50744dd commit aa502bd

File tree

4 files changed

+171
-3
lines changed

4 files changed

+171
-3
lines changed

setup.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ testing =
9595
pytest-aiohttp
9696
pytest-asyncio
9797
pytest-mock
98+
types-pytz
9899
types-pyyaml
99100
types-requests
100101
types-setuptools
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
"""
2+
This migration checks all the files stored in local storage (=GridFS) and compares them to the list
3+
of messages already on the node. The files that are not linked to any message are scheduled for
4+
deletion.
5+
"""
6+
import asyncio
7+
import datetime as dt
8+
from typing import Any, Dict, FrozenSet, List
9+
from typing import cast
10+
11+
import pytz
12+
import typer
13+
from aleph_message.models import MessageType
14+
from configmanager import Config
15+
16+
import aleph.model
17+
from aleph.ccn_cli.cli_config import CliConfig
18+
from aleph.config import get_defaults
19+
from aleph.model import init_db_globals
20+
from aleph.model.filepin import PermanentPin
21+
from aleph.model.hashes import delete_value as delete_gridfs_file
22+
from aleph.model.messages import Message
23+
24+
gc_ns = typer.Typer()
25+
26+
27+
# Get all the messages that potentially store data in local storage:
28+
# * AGGREGATEs with item_type=="storage"
29+
# * POSTs with item_type=="storage"
30+
# * STOREs with content.item_type=="storage"
31+
async def get_hashes(
32+
msg_type: MessageType, item_type_field: str, item_hash_field: str
33+
) -> FrozenSet[str]:
34+
def rgetitem(dictionary: Any, fields: List[str]) -> Any:
35+
value = dictionary[fields[0]]
36+
if len(fields) > 1:
37+
return rgetitem(value, fields[1:])
38+
return value
39+
40+
return frozenset(
41+
[
42+
rgetitem(msg, item_hash_field.split("."))
43+
async for msg in Message.collection.find(
44+
{"type": msg_type, item_type_field: "storage"},
45+
{item_hash_field: 1},
46+
batch_size=1000,
47+
)
48+
]
49+
)
50+
51+
52+
def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None:
53+
typer.echo("The following files will be preserved:")
54+
for file_type, files in files_to_preserve.items():
55+
typer.echo(f"* {len(files)} {file_type}")
56+
57+
58+
async def list_files_to_preserve(
59+
gridfs_files_dict: Dict[str, Dict],
60+
temporary_files_ttl: int,
61+
) -> Dict[str, FrozenSet[str]]:
62+
files_to_preserve_dict = {}
63+
64+
# Preserve any file that was uploaded less than an hour ago
65+
current_datetime = pytz.utc.localize(dt.datetime.utcnow())
66+
files_to_preserve_dict["temporary files"] = frozenset(
67+
[
68+
file["filename"]
69+
for file in gridfs_files_dict.values()
70+
if file["uploadDate"]
71+
> current_datetime - dt.timedelta(seconds=temporary_files_ttl)
72+
]
73+
)
74+
75+
# Get all the messages that potentially store data in local storage:
76+
# * AGGREGATEs with item_type=="storage"
77+
# * POSTs with item_type=="storage"
78+
# * STOREs with content.item_type=="storage"
79+
files_to_preserve_dict["aggregates"] = await get_hashes(
80+
MessageType.aggregate, "item_type", "item_hash"
81+
)
82+
files_to_preserve_dict["posts"] = await get_hashes(
83+
MessageType.post, "item_type", "item_hash"
84+
)
85+
files_to_preserve_dict["stores"] = await get_hashes(
86+
MessageType.store, "content.item_type", "content.item_hash"
87+
)
88+
89+
# We also keep permanent pins, even if they are also stored on IPFS
90+
files_to_preserve_dict["file pins"] = frozenset(
91+
[
92+
pin["multihash"]
93+
async for pin in PermanentPin.collection.find({}, {"multihash": 1})
94+
]
95+
)
96+
97+
return files_to_preserve_dict
98+
99+
100+
async def run(ctx: typer.Context, dry_run: bool):
101+
config = Config(schema=get_defaults())
102+
cli_config = cast(CliConfig, ctx.obj)
103+
config.yaml.load(str(cli_config.config_file_path))
104+
105+
init_db_globals(config=config)
106+
if aleph.model.db is None: # for mypy
107+
raise ValueError("DB not initialized as expected.")
108+
109+
# Get a set of all the files currently in GridFS
110+
gridfs_files_dict = {
111+
file["filename"]: file
112+
async for file in aleph.model.db["fs.files"].find(
113+
projection={"_id": 0, "filename": 1, "length": 1, "uploadDate": 1},
114+
batch_size=1000,
115+
)
116+
}
117+
gridfs_files = frozenset(gridfs_files_dict.keys())
118+
119+
typer.echo(f"Found {len(gridfs_files_dict)} files in local storage.")
120+
121+
files_to_preserve_dict = await list_files_to_preserve(
122+
gridfs_files_dict=gridfs_files_dict,
123+
temporary_files_ttl=config.storage.temporary_files_ttl.value,
124+
)
125+
files_to_preserve = frozenset().union(*files_to_preserve_dict.values())
126+
files_to_delete = gridfs_files - files_to_preserve
127+
128+
if cli_config.verbose:
129+
print_files_to_preserve(files_to_preserve_dict)
130+
131+
restored_memory = sum(
132+
gridfs_files_dict[filename]["length"] for filename in files_to_delete
133+
)
134+
typer.echo(
135+
f"{len(files_to_delete)} will be deleted, totaling {restored_memory} bytes."
136+
)
137+
138+
if dry_run:
139+
if cli_config.verbose:
140+
typer.echo("The following files will be deleted:")
141+
for file_to_delete in files_to_delete:
142+
typer.echo(f"* {file_to_delete}")
143+
144+
else:
145+
for file_to_delete in files_to_delete:
146+
typer.echo(f"Deleting {file_to_delete}...")
147+
await delete_gridfs_file(file_to_delete)
148+
149+
typer.echo("Done.")
150+
151+
152+
@gc_ns.command(name="run")
153+
def run_gc(
154+
ctx: typer.Context,
155+
dry_run: bool = typer.Option(
156+
False, help="If set, display files to delete without deleting them."
157+
),
158+
):
159+
asyncio.run(run(ctx, dry_run))

src/aleph/ccn_cli/main.py

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import typer
55

66
from .cli_config import CliConfig
7+
from .commands.garbage_collector import gc_ns
78
from .commands.keys import keys_ns
89
from .commands.migrations import migrations_ns
910

@@ -17,6 +18,7 @@ def validate_config_file_path(config: Optional[Path]) -> Optional[Path]:
1718

1819
return config
1920

21+
2022
def validate_key_dir(key_dir: Optional[Path]) -> Optional[Path]:
2123
if key_dir is not None:
2224
if key_dir.exists and not key_dir.is_dir():
@@ -63,6 +65,7 @@ def main(
6365
ctx.obj = cli_config
6466

6567

68+
app.add_typer(gc_ns, name="gc", help="Invoke the garbage collector.")
6669
app.add_typer(keys_ns, name="keys", help="Operations on private keys.")
6770
app.add_typer(migrations_ns, name="migrations", help="Run DB migrations.")
6871

src/aleph/config.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ def get_defaults():
77
return {
88
"logging": {
99
"level": logging.WARNING,
10-
"max_log_file_size": 1_000_000_000 # 1GB,
10+
"max_log_file_size": 1_000_000_000, # 1GB,
1111
},
1212
"aleph": {
1313
"queue_topic": "ALEPH-QUEUE",
@@ -30,7 +30,12 @@ def get_defaults():
3030
"/ip4/62.210.93.220/tcp/4025/p2p/QmXdci5feFmA2pxTg8p3FCyWmSKnWYAAmr7Uys1YCTFD8U",
3131
],
3232
},
33-
"storage": {"folder": "./data/", "store_files": False, "engine": "mongodb"},
33+
"storage": {
34+
"folder": "./data/",
35+
"store_files": False,
36+
"engine": "mongodb",
37+
"temporary_files_ttl": 3600,
38+
},
3439
"nuls": {
3540
"chain_id": 8964,
3641
"enabled": False,
@@ -80,7 +85,7 @@ def get_defaults():
8085
"peers": [
8186
"/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx",
8287
"/ip4/51.159.57.71/tcp/4001/p2p/12D3KooWBH3JVSBwHLNzxv7EzniBP3tDmjJaoa3EJBF9wyhZtHt2",
83-
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF"
88+
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF",
8489
],
8590
},
8691
"sentry": {

0 commit comments

Comments
 (0)