Skip to content

Commit 455394e

Browse files
committed
[Storage] Garbage collector
Added a new process in charge of deleting files from local storage. Files can now be marked for deletion by being listed in the scheduled_deletions collection. The garbage collector process will periodically look up this collection and delete all the files for which the `delete_by` datetime field is in the past. Files are now automatically marked for deletion when the user posts them using the /storage/add_json and /storage/add_file endpoints. The deletion is cancelled if a user creates a message using this content before a given period of time (one hour by default). Added a migration script that goes through all the files currently stored on a CCN and schedules all the files not related to an Aleph message for deletion.
1 parent 1c379e3 commit 455394e

File tree

16 files changed

+474
-31
lines changed

16 files changed

+474
-31
lines changed

deployment/migrations/config_updater.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,6 @@ async def main(args: argparse.Namespace):
159159
if __name__ == "__main__":
160160
try:
161161
asyncio.run(main(cli_parse()))
162-
except Exception as e:
163-
LOGGER.error("%s", str(e))
162+
except Exception:
163+
LOGGER.exception("Failed to run migration scripts")
164164
sys.exit(1)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""
2+
This migration checks all the files stored in local storage (=GridFS) and compares them to the list
3+
of messages already on the node. The files that are not linked to any message are scheduled for
4+
deletion.
5+
"""
6+
7+
import asyncio
8+
import datetime as dt
9+
import logging
10+
from dataclasses import asdict
11+
from typing import Optional, FrozenSet, Any, List
12+
13+
from aleph_message.models import MessageType
14+
from configmanager import Config
15+
16+
import aleph.model
17+
from aleph.config import get_defaults
18+
from aleph.model import init_db_globals
19+
from aleph.model.messages import Message
20+
from aleph.model.scheduled_deletions import ScheduledDeletion, ScheduledDeletionInfo
21+
22+
logger = logging.getLogger()
23+
24+
25+
async def async_upgrade(config_file: Optional[str], **kwargs):
26+
config = Config(schema=get_defaults())
27+
if config_file is not None:
28+
config.yaml.load(config_file)
29+
30+
init_db_globals(config=config)
31+
collections = await aleph.model.db.list_collection_names()
32+
if ScheduledDeletion.COLLECTION in collections:
33+
logging.info(
34+
"%s collection is already present. Skipping migration.",
35+
ScheduledDeletion.COLLECTION,
36+
)
37+
return
38+
39+
# Get a set of all the files currently in GridFS
40+
gridfs_files = frozenset(
41+
[
42+
file["filename"]
43+
async for file in aleph.model.db["fs.files"].find(
44+
projection={"filename": 1}, batch_size=1000
45+
)
46+
]
47+
)
48+
49+
print(len(gridfs_files))
50+
51+
# Get all the messages that potentially store data in local storage:
52+
# * AGGREGATEs with item_type=="storage"
53+
# * POSTs with item_type=="storage"
54+
# * STOREs with content.item_type=="storage"
55+
async def get_hashes(
56+
msg_type: MessageType, item_type_field: str, item_hash_field: str
57+
) -> FrozenSet[str]:
58+
def rgetitem(dictionary: Any, fields: List[str]) -> Any:
59+
value = dictionary[fields[0]]
60+
if len(fields) > 1:
61+
return rgetitem(value, fields[1:])
62+
return value
63+
64+
return frozenset(
65+
[
66+
rgetitem(msg, item_hash_field.split("."))
67+
async for msg in Message.collection.find(
68+
{"type": msg_type, item_type_field: "storage"},
69+
{item_hash_field: 1},
70+
batch_size=1000,
71+
)
72+
]
73+
)
74+
75+
aggregates = await get_hashes(MessageType.aggregate, "item_type", "item_hash")
76+
posts = await get_hashes(MessageType.post, "item_type", "item_hash")
77+
stores = await get_hashes(
78+
MessageType.store, "content.item_type", "content.item_hash"
79+
)
80+
81+
files_to_preserve = aggregates | posts | stores
82+
files_to_delete = gridfs_files - files_to_preserve
83+
delete_by = dt.datetime.utcnow()
84+
85+
await ScheduledDeletion.collection.insert_many(
86+
[
87+
asdict(ScheduledDeletionInfo(filename=file_to_delete, delete_by=delete_by))
88+
for file_to_delete in files_to_delete
89+
]
90+
)
91+
92+
93+
def upgrade(config_file: str, **kwargs):
94+
asyncio.run(async_upgrade(config_file=config_file, **kwargs))
95+
96+
97+
def downgrade(**kwargs):
98+
# Nothing to do, processing the chain data multiple times only adds some load on the node.
99+
pass

docs/architecture.rst

-6
This file was deleted.
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
*****************
2+
Garbage collector
3+
*****************
4+
5+
Core Channel Nodes dispose of unneeded files through a process called garbage collection.
6+
Two kinds of garbage collection are in place, one for the local storage system and
7+
one for the IPFS service.
8+
9+
Local storage
10+
=============
11+
12+
CCNs have a dedicated process to dispose of files, the garbage collector.
13+
This process monitors the files on the local storage and deletes them once
14+
they are scheduled for deletion.
15+
16+
Files can be scheduled for deletion for a number of reasons:
17+
- They were temporary files that ended up being unused by the user that pushed them
18+
- The user decided to delete them
19+
- The payment plan of the user no longer covered for them.
20+
21+
In any of these situations, a date and time of deletion is assigned to the file.
22+
The garbage collector runs periodically and simply deletes the files for which
23+
this date and time is passed.
24+
25+
By default, the garbage collector runs once every hour. Temporary files uploaded
26+
using the /storage/add_[json|file] endpoints are given a lifetime of one hour
27+
before deletion.
28+
29+
IPFS
30+
====
31+
32+
The IPFS daemon has its own garbage collector process. You can read more about it
33+
in their `official documentation <https://docs.ipfs.io/concepts/persistence/#garbage-collection>`_.

docs/architecture/index.rst

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
============
2+
Architecture
3+
============
4+
5+
.. image:: ../figures/architecture-stack.*
6+
:width: 100%
7+
8+
.. toctree::
9+
:maxdepth: 2
10+
11+
garbage_collector

docs/index.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Contents
1616
.. toctree::
1717
:maxdepth: 2
1818

19-
architecture
19+
architecture/index
2020
guides/index
2121
node-synchronisation
2222
protocol/index

src/aleph/config.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@ def get_defaults():
3030
"/ip4/62.210.93.220/tcp/4025/p2p/QmXdci5feFmA2pxTg8p3FCyWmSKnWYAAmr7Uys1YCTFD8U",
3131
],
3232
},
33-
"storage": {"folder": "./data/", "store_files": False, "engine": "mongodb"},
33+
"storage": {
34+
"folder": "./data/",
35+
"store_files": False,
36+
"engine": "mongodb",
37+
"delete_interval": 3600,
38+
"garbage_collector": {"period": 3600},
39+
},
3440
"nuls": {
3541
"chain_id": 8964,
3642
"enabled": False,
@@ -80,7 +86,7 @@ def get_defaults():
8086
"peers": [
8187
"/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx",
8288
"/ip4/51.159.57.71/tcp/4001/p2p/12D3KooWBH3JVSBwHLNzxv7EzniBP3tDmjJaoa3EJBF9wyhZtHt2",
83-
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF"
89+
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF",
8490
],
8591
},
8692
"sentry": {

src/aleph/handlers/storage.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414

1515
import aioipfs
1616
from aioipfs import InvalidCIDError
17+
from aleph_message.models import StoreMessage
18+
from pydantic import ValidationError
19+
1720
from aleph.config import get_config
1821
from aleph.exceptions import AlephStorageException, UnknownHashError
22+
from aleph.model.scheduled_deletions import ScheduledDeletion
1923
from aleph.services.ipfs.common import get_ipfs_api
20-
from aleph.storage import get_hash_content
24+
from aleph.storage import get_hash_content, ContentSource
2125
from aleph.types import ItemType
22-
from aleph_message.models import StoreMessage
23-
from pydantic import ValidationError
2426

2527
LOGGER = logging.getLogger("HANDLERS.STORAGE")
2628

@@ -111,6 +113,12 @@ async def handle_new_storage(message: Dict, content: Dict):
111113
except AlephStorageException:
112114
return None
113115

116+
# If the file was found locally, it might be marked for deletion.
117+
# Ensure that we keep the content in the DB by removing the scheduled
118+
# deletion entry.
119+
if file_content.source == ContentSource.DB:
120+
ScheduledDeletion.collection.delete_one({"filename": item_hash})
121+
114122
size = len(file_content)
115123

116124
content["size"] = size

src/aleph/jobs/__init__.py

+14-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
from multiprocessing import Process
33
from typing import Dict, List, Coroutine
44

5-
from aleph.jobs.process_pending_messages import pending_messages_subprocess, retry_messages_task
5+
from aleph.jobs.garbage_collector import garbage_collector_subprocess
6+
from aleph.jobs.process_pending_messages import (
7+
pending_messages_subprocess,
8+
retry_messages_task,
9+
)
610
from aleph.jobs.process_pending_txs import pending_txs_subprocess, handle_txs_task
711
from aleph.jobs.reconnect_ipfs import reconnect_ipfs_job
812

@@ -20,20 +24,25 @@ def start_jobs(
2024

2125
if use_processes:
2226
config_values = config.dump_values()
23-
p1 = Process(
27+
pending_messages_job = Process(
2428
target=pending_messages_subprocess,
2529
args=(
2630
config_values,
2731
shared_stats,
2832
api_servers,
2933
),
3034
)
31-
p2 = Process(
35+
pending_txs_job = Process(
3236
target=pending_txs_subprocess,
3337
args=(config_values, api_servers),
3438
)
35-
p1.start()
36-
p2.start()
39+
40+
garbage_collector_job = Process(
41+
target=garbage_collector_subprocess, args=(config_values,)
42+
)
43+
pending_messages_job.start()
44+
pending_txs_job.start()
45+
garbage_collector_job.start()
3746
else:
3847
tasks.append(retry_messages_task(shared_stats=shared_stats))
3948
tasks.append(handle_txs_task())

src/aleph/jobs/garbage_collector.py

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""
2+
Job in charge of deleting files from IPFS and local storage when they are scheduled for deletion.
3+
"""
4+
5+
import asyncio
6+
import datetime as dt
7+
import logging
8+
from typing import Dict
9+
10+
import sentry_sdk
11+
from setproctitle import setproctitle
12+
13+
from aleph.logging import setup_logging
14+
from aleph.model.hashes import delete_value as delete_gridfs_file
15+
from aleph.model.scheduled_deletions import ScheduledDeletion, ScheduledDeletionInfo
16+
from .job_utils import prepare_loop
17+
18+
LOGGER = logging.getLogger("jobs.garbage_collector")
19+
20+
21+
async def delete_file(file_to_delete: ScheduledDeletionInfo) -> None:
22+
await delete_gridfs_file(key=file_to_delete.filename)
23+
LOGGER.info("Deleted '%s' from local storage", file_to_delete.filename)
24+
25+
26+
async def garbage_collector_task(job_period: int):
27+
while True:
28+
try:
29+
async for file_to_delete in ScheduledDeletion.files_to_delete(
30+
delete_by=dt.datetime.utcnow()
31+
):
32+
try:
33+
await delete_file(file_to_delete)
34+
finally:
35+
ScheduledDeletion.collection.delete_one(
36+
{"_id": file_to_delete.object_id}
37+
)
38+
39+
except Exception:
40+
LOGGER.exception("Error in garbage collector job")
41+
# Sleep to avoid overloading the logs in case of a repeating error
42+
await asyncio.sleep(5)
43+
44+
await asyncio.sleep(job_period)
45+
46+
47+
def garbage_collector_subprocess(config_values: Dict):
48+
setproctitle("aleph.jobs.garbage_collector")
49+
loop, config = prepare_loop(config_values)
50+
51+
sentry_sdk.init(
52+
dsn=config.sentry.dsn.value,
53+
traces_sample_rate=config.sentry.traces_sample_rate.value,
54+
ignore_errors=[KeyboardInterrupt],
55+
)
56+
setup_logging(
57+
loglevel=config.logging.level.value,
58+
filename="/tmp/aleph_ccn_garbage_collector.log",
59+
max_log_file_size=config.logging.max_log_file_size.value,
60+
)
61+
62+
loop.run_until_complete(
63+
garbage_collector_task(job_period=config.storage.garbage_collector.period.value)
64+
)

src/aleph/model/__init__.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import asyncio
21
from logging import getLogger
32

43
from configmanager import Config
54

65
from aleph.model.filepin import PermanentPin
6+
from aleph.model.scheduled_deletions import ScheduledDeletion
77

88
try:
99
from pymongo import MongoClient
@@ -56,8 +56,7 @@ def init_db(config: Config, ensure_indexes: bool = True):
5656
Peer.ensure_indexes(sync_db)
5757

5858
PermanentPin.ensure_indexes(sync_db)
59-
# from aleph.model.hashes import Hash
60-
# Hash.ensure_indexes(sync_db)
59+
ScheduledDeletion.ensure_indexes(sync_db)
6160

6261
from aleph.model.messages import Message
6362

0 commit comments

Comments
 (0)