Skip to content

Commit

Permalink
Support uploading stash/filter from multiple block types (#22884)
Browse files Browse the repository at this point in the history
* Support uploading stash/filter from multiple block types

* Apply suggestions from code review

Co-authored-by: William Durand <[email protected]>

* Fix ordering of complete_session: (this has a new bug that we set the config_keys before completing the session.. if any error raises we have updated config keys, but not updated files in remote settings which could be a critical bug

* Better, more explicit tests and then make them all pass

* Apply suggestions from code review

Co-authored-by: William Durand <[email protected]>

* Apply suggestions from code review

Co-authored-by: William Durand <[email protected]>

---------

Co-authored-by: William Durand <[email protected]>
  • Loading branch information
2 people authored and chrstinalin committed Jan 10, 2025
1 parent 0f7f7db commit 293f80c
Show file tree
Hide file tree
Showing 9 changed files with 529 additions and 85 deletions.
10 changes: 8 additions & 2 deletions src/olympia/blocklist/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def get_last_generation_time():


def get_base_generation_time():
return get_config(MLBF_BASE_ID_CONFIG_KEY, None, json_value=True)
return get_config(
MLBF_BASE_ID_CONFIG_KEY(BlockType.BLOCKED, compat=True), None, json_value=True
)


def get_blocklist_last_modified_time():
Expand Down Expand Up @@ -130,7 +132,11 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
else:
mlbf.generate_and_write_stash(previous_filter)

upload_filter.delay(generation_time, is_base=make_base_filter)
upload_filter.delay(
generation_time,
filter_list=[BlockType.BLOCKED.name] if make_base_filter else [],
create_stash=not make_base_filter,
)

if base_filter:
cleanup_old_files.delay(base_filter_id=base_filter.created_at)
Expand Down
7 changes: 3 additions & 4 deletions src/olympia/blocklist/mlbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,14 @@ def hash_filter_inputs(cls, input_list):
for (guid, version) in input_list
]

@property
def filter_path(self):
def filter_path(self, _block_type: BlockType = BlockType.BLOCKED):
return self.storage.path('filter')

@property
def stash_path(self):
return self.storage.path('stash.json')

def generate_and_write_filter(self):
def generate_and_write_filter(self, block_type: BlockType = BlockType.BLOCKED):
stats = {}

bloomfilter = generate_mlbf(
Expand All @@ -225,7 +224,7 @@ def generate_and_write_filter(self):
)

# write bloomfilter
mlbf_path = self.filter_path
mlbf_path = self.filter_path(block_type)
with self.storage.open(mlbf_path, 'wb') as filter_file:
log.info(f'Writing to file {mlbf_path}')
bloomfilter.tofile(filter_file)
Expand Down
118 changes: 104 additions & 14 deletions src/olympia/blocklist/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
import os
import re
from datetime import datetime, timedelta
from typing import List

from django.conf import settings
from django.contrib.admin.models import CHANGE, LogEntry
from django.contrib.admin.options import get_content_type_for_model
from django.db import transaction
from django.utils.encoding import force_str

import waffle
from django_statsd.clients import statsd

import olympia.core.logger
Expand All @@ -21,10 +23,10 @@
REMOTE_SETTINGS_COLLECTION_MLBF,
)
from olympia.lib.remote_settings import RemoteSettings
from olympia.zadmin.models import set_config
from olympia.zadmin.models import get_config, set_config

from .mlbf import MLBF
from .models import BlocklistSubmission
from .models import BlocklistSubmission, BlockType
from .utils import (
datetime_to_ts,
)
Expand All @@ -35,7 +37,15 @@
bracket_open_regex = re.compile(r'(?<!\\){')
bracket_close_regex = re.compile(r'(?<!\\)}')

BLOCKLIST_RECORD_MLBF_BASE = 'bloomfilter-base'

def BLOCKLIST_RECORD_MLBF_BASE(block_type: BlockType):
match block_type:
case BlockType.SOFT_BLOCKED:
return 'softblocks-bloomfilter-base'
case BlockType.BLOCKED:
return 'bloomfilter-base'
case _:
raise ValueError(f'Unknown block type: {block_type}')


@task
Expand Down Expand Up @@ -88,28 +98,64 @@ def monitor_remote_settings():


@task
def upload_filter(generation_time, is_base=True):
def upload_filter(generation_time, filter_list=None, create_stash=False):
filters_to_upload: List[BlockType] = []
base_filter_ids = dict()
bucket = settings.REMOTE_SETTINGS_WRITER_BUCKET
server = RemoteSettings(
bucket, REMOTE_SETTINGS_COLLECTION_MLBF, sign_off_needed=False
)
mlbf = MLBF.load_from_storage(generation_time, error_on_missing=True)
if is_base:
# clear the collection for the base - we want to be the only filter
server.delete_all_records()
statsd.incr('blocklist.tasks.upload_filter.reset_collection')
# Then the bloomfilter
# Download old records before uploading new ones.
# This ensures we do not delete any records we just uploaded.
old_records = server.records()
attachment_types_to_delete = []

for block_type in BlockType:
# Skip soft blocked filters if the switch is not active.
if block_type == BlockType.SOFT_BLOCKED and not waffle.switch_is_active(
'enable-soft-blocking'
):
continue

# Only upload filters that are in the filter_list arg.
# We cannot send enum values to tasks so we serialize
# them in the filter_list arg as the name of the enum.
if filter_list and block_type.name in filter_list:
filters_to_upload.append(block_type)

base_filter_id = get_config(
MLBF_BASE_ID_CONFIG_KEY(block_type, compat=True),
json_value=True,
)

# If there is an existing base filter id, we need to keep track of it
# so we can potentially delete stashes older than this timestamp.
if base_filter_id is not None:
base_filter_ids[block_type] = base_filter_id

for block_type in filters_to_upload:
attachment_type = BLOCKLIST_RECORD_MLBF_BASE(block_type)
data = {
'key_format': MLBF.KEY_FORMAT,
'generation_time': generation_time,
'attachment_type': BLOCKLIST_RECORD_MLBF_BASE,
'attachment_type': attachment_type,
}
with mlbf.storage.open(mlbf.filter_path, 'rb') as filter_file:
with mlbf.storage.open(mlbf.filter_path(block_type), 'rb') as filter_file:
attachment = ('filter.bin', filter_file, 'application/octet-stream')
server.publish_attachment(data, attachment)
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf')
# After we have succesfully uploaded the new filter
# we can safely delete others of that type.
attachment_types_to_delete.append(attachment_type)

statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base')
else:
# Update the base filter id for this block type to the generation time
# so we can delete stashes older than this new filter.
base_filter_ids[block_type] = generation_time

# It is possible to upload a stash and a filter in the same task.
if create_stash:
with mlbf.storage.open(mlbf.stash_path, 'r') as stash_file:
stash_data = json.load(stash_file)
# If we have a stash, write that
Expand All @@ -121,10 +167,54 @@ def upload_filter(generation_time, is_base=True):
server.publish_record(stash_upload_data)
statsd.incr('blocklist.tasks.upload_filter.upload_stash')

# Get the oldest base filter id so we can delete only stashes
# that are definitely not needed anymore.
oldest_base_filter_id = min(base_filter_ids.values()) if base_filter_ids else None

for record in old_records:
# Delete attachment records that match the
# attachment types of filters we just uploaded.
# This ensures we only have one filter attachment
# per block_type.
if 'attachment' in record:
attachment_type = record['attachment_type']
if attachment_type in attachment_types_to_delete:
server.delete_record(record['id'])

# Delete stash records that are older than the oldest
# pre-existing filter attachment records. These records
# cannot apply to any existing filter since we uploaded.
elif 'stash' in record and oldest_base_filter_id is not None:
record_time = record['stash_time']
if record_time < oldest_base_filter_id:
server.delete_record(record['id'])

# Commit the changes to remote settings for review + signing.
# Only after any changes to records (attachments and stashes)
# and including deletions can we commit the session and update
# the config with the new timestamps.
server.complete_session()
set_config(MLBF_TIME_CONFIG_KEY, generation_time, json_value=True)
if is_base:
set_config(MLBF_BASE_ID_CONFIG_KEY, generation_time, json_value=True)

# Update the base_filter_id for uploaded filters.
for block_type in filters_to_upload:
# We currently write to the old singular config key for hard blocks
# to preserve backward compatibility.
# In https://github.com/mozilla/addons/issues/15193
# we can remove this and start writing to the new plural key.
if block_type == BlockType.BLOCKED:
set_config(
MLBF_BASE_ID_CONFIG_KEY(block_type, compat=True),
generation_time,
json_value=True,
)

set_config(
MLBF_BASE_ID_CONFIG_KEY(block_type), generation_time, json_value=True
)

cleanup_old_files.delay(base_filter_id=oldest_base_filter_id)
statsd.incr('blocklist.tasks.upload_filter.reset_collection')


@task
Expand Down
2 changes: 1 addition & 1 deletion src/olympia/blocklist/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ def test_command(self):

call_command('export_blocklist', '1')
mlbf = MLBF.load_from_storage(1)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path())
32 changes: 19 additions & 13 deletions src/olympia/blocklist/tests/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_skip_update_unless_force_base(self):

# Check that a filter was created on the second attempt
mlbf = MLBF.load_from_storage(self.current_time)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path())
assert not mlbf.storage.exists(mlbf.stash_path)

def test_skip_update_unless_no_base_mlbf(self):
Expand Down Expand Up @@ -220,11 +220,12 @@ def test_upload_stash_unless_force_base(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=force_base,
filter_list=[],
create_stash=True,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
assert mlbf.storage.exists(mlbf.filter_path) == force_base
assert mlbf.storage.exists(mlbf.filter_path()) == force_base
assert mlbf.storage.exists(mlbf.stash_path) != force_base

def test_upload_stash_unless_missing_base_filter(self):
Expand All @@ -238,11 +239,12 @@ def test_upload_stash_unless_missing_base_filter(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=False,
filter_list=[],
create_stash=True,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
assert not mlbf.storage.exists(mlbf.filter_path)
assert not mlbf.storage.exists(mlbf.filter_path())
assert mlbf.storage.exists(mlbf.stash_path)

self.mocks[
Expand All @@ -252,11 +254,12 @@ def test_upload_stash_unless_missing_base_filter(self):
assert (
mock.call(
self.current_time,
is_base=True,
filter_list=[BlockType.BLOCKED.name],
create_stash=False,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path())

@mock.patch('olympia.blocklist.cron.BASE_REPLACE_THRESHOLD', 1)
def test_upload_stash_unless_enough_changes(self):
Expand All @@ -271,11 +274,12 @@ def test_upload_stash_unless_enough_changes(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=False,
filter_list=[],
create_stash=True,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
assert not mlbf.storage.exists(mlbf.filter_path)
assert not mlbf.storage.exists(mlbf.filter_path())
assert mlbf.storage.exists(mlbf.stash_path)

self._block_version(is_signed=True)
Expand All @@ -288,12 +292,13 @@ def test_upload_stash_unless_enough_changes(self):
assert (
mock.call(
self.current_time,
is_base=True,
filter_list=[BlockType.BLOCKED.name],
create_stash=False,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
new_mlbf = MLBF.load_from_storage(self.current_time)
assert new_mlbf.storage.exists(new_mlbf.filter_path)
assert new_mlbf.storage.exists(new_mlbf.filter_path())
assert not new_mlbf.storage.exists(new_mlbf.stash_path)

def test_cleanup_old_files(self):
Expand Down Expand Up @@ -374,7 +379,8 @@ def test_invalid_cache_results_in_diff(self):
assert (
mock.call(
self.current_time,
is_base=False,
filter_list=[],
create_stash=True,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
Expand All @@ -393,7 +399,7 @@ def test_get_last_generation_time(self):

def test_get_base_generation_time(self):
assert get_base_generation_time() is None
set_config(MLBF_BASE_ID_CONFIG_KEY, 1)
set_config(MLBF_BASE_ID_CONFIG_KEY(BlockType.BLOCKED, compat=True), 1)
assert get_base_generation_time() == 1


Expand Down
Loading

0 comments on commit 293f80c

Please sign in to comment.