Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support uploading stash/filter from multiple block types #22884

Merged
merged 6 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/olympia/blocklist/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def get_last_generation_time():


def get_base_generation_time():
return get_config(MLBF_BASE_ID_CONFIG_KEY, None, json_value=True)
return get_config(
MLBF_BASE_ID_CONFIG_KEY(BlockType.BLOCKED, compat=True), None, json_value=True
)


def get_blocklist_last_modified_time():
Expand Down Expand Up @@ -130,7 +132,11 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
else:
mlbf.generate_and_write_stash(previous_filter)

upload_filter.delay(generation_time, is_base=make_base_filter)
upload_filter.delay(
generation_time,
filter_list=[BlockType.BLOCKED.name] if make_base_filter else [],
create_stash=not make_base_filter,
)

if base_filter:
cleanup_old_files.delay(base_filter_id=base_filter.created_at)
Expand Down
7 changes: 3 additions & 4 deletions src/olympia/blocklist/mlbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,14 @@ def hash_filter_inputs(cls, input_list):
for (guid, version) in input_list
]

@property
def filter_path(self):
def filter_path(self, _block_type: BlockType = BlockType.BLOCKED):
return self.storage.path('filter')

@property
def stash_path(self):
return self.storage.path('stash.json')

def generate_and_write_filter(self):
def generate_and_write_filter(self, block_type: BlockType = BlockType.BLOCKED):
stats = {}

bloomfilter = generate_mlbf(
Expand All @@ -225,7 +224,7 @@ def generate_and_write_filter(self):
)

# write bloomfilter
mlbf_path = self.filter_path
mlbf_path = self.filter_path(block_type)
with self.storage.open(mlbf_path, 'wb') as filter_file:
log.info(f'Writing to file {mlbf_path}')
bloomfilter.tofile(filter_file)
Expand Down
118 changes: 104 additions & 14 deletions src/olympia/blocklist/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
import os
import re
from datetime import datetime, timedelta
from typing import List

from django.conf import settings
from django.contrib.admin.models import CHANGE, LogEntry
from django.contrib.admin.options import get_content_type_for_model
from django.db import transaction
from django.utils.encoding import force_str

import waffle
from django_statsd.clients import statsd

import olympia.core.logger
Expand All @@ -21,10 +23,10 @@
REMOTE_SETTINGS_COLLECTION_MLBF,
)
from olympia.lib.remote_settings import RemoteSettings
from olympia.zadmin.models import set_config
from olympia.zadmin.models import get_config, set_config

from .mlbf import MLBF
from .models import BlocklistSubmission
from .models import BlocklistSubmission, BlockType
from .utils import (
datetime_to_ts,
)
Expand All @@ -35,7 +37,15 @@
bracket_open_regex = re.compile(r'(?<!\\){')
bracket_close_regex = re.compile(r'(?<!\\)}')

BLOCKLIST_RECORD_MLBF_BASE = 'bloomfilter-base'

def BLOCKLIST_RECORD_MLBF_BASE(block_type: BlockType):
match block_type:
case BlockType.SOFT_BLOCKED:
return 'softblocks-bloomfilter-base'
case BlockType.BLOCKED:
return 'bloomfilter-base'
case _:
raise ValueError(f'Unknown block type: {block_type}')


@task
Expand Down Expand Up @@ -88,28 +98,64 @@ def monitor_remote_settings():


@task
def upload_filter(generation_time, is_base=True):
def upload_filter(generation_time, filter_list=None, create_stash=False):
filters_to_upload: List[BlockType] = []
base_filter_ids = dict()
bucket = settings.REMOTE_SETTINGS_WRITER_BUCKET
server = RemoteSettings(
bucket, REMOTE_SETTINGS_COLLECTION_MLBF, sign_off_needed=False
)
mlbf = MLBF.load_from_storage(generation_time, error_on_missing=True)
if is_base:
# clear the collection for the base - we want to be the only filter
server.delete_all_records()
statsd.incr('blocklist.tasks.upload_filter.reset_collection')
# Then the bloomfilter
# Download old records before uploading new ones.
# This ensures we do not delete any records we just uploaded.
old_records = server.records()
attachment_types_to_delete = []

for block_type in BlockType:
# Skip soft blocked filters if the switch is not active.
if block_type == BlockType.SOFT_BLOCKED and not waffle.switch_is_active(
'enable-soft-blocking'
):
continue

# Only upload filters that are in the filter_list arg.
# We cannot send enum values to tasks so we serialize
# them in the filter_list arg as the name of the enum.
if filter_list and block_type.name in filter_list:
filters_to_upload.append(block_type)

base_filter_id = get_config(
MLBF_BASE_ID_CONFIG_KEY(block_type, compat=True),
json_value=True,
)

# If there is an existing base filter id, we need to keep track of it
# so we can potentially delete stashes older than this timestamp.
if base_filter_id is not None:
base_filter_ids[block_type] = base_filter_id

for block_type in filters_to_upload:
attachment_type = BLOCKLIST_RECORD_MLBF_BASE(block_type)
data = {
'key_format': MLBF.KEY_FORMAT,
'generation_time': generation_time,
'attachment_type': BLOCKLIST_RECORD_MLBF_BASE,
'attachment_type': attachment_type,
}
with mlbf.storage.open(mlbf.filter_path, 'rb') as filter_file:
with mlbf.storage.open(mlbf.filter_path(block_type), 'rb') as filter_file:
attachment = ('filter.bin', filter_file, 'application/octet-stream')
server.publish_attachment(data, attachment)
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf')
# After we have succesfully uploaded the new filter
# we can safely delete others of that type.
attachment_types_to_delete.append(attachment_type)

statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base')
else:
# Update the base filter id for this block type to the generation time
# so we can delete stashes older than this new filter.
base_filter_ids[block_type] = generation_time

# It is possible to upload a stash and a filter in the same task.
if create_stash:
with mlbf.storage.open(mlbf.stash_path, 'r') as stash_file:
stash_data = json.load(stash_file)
# If we have a stash, write that
Expand All @@ -121,10 +167,54 @@ def upload_filter(generation_time, is_base=True):
server.publish_record(stash_upload_data)
statsd.incr('blocklist.tasks.upload_filter.upload_stash')

# Get the oldest base filter id so we can delete only stashes
# that are definitely not needed anymore.
oldest_base_filter_id = min(base_filter_ids.values()) if base_filter_ids else None

for record in old_records:
# Delete attachment records that match the
# attachment types of filters we just uploaded.
# This ensures we only have one filter attachment
# per block_type.
if 'attachment' in record:
attachment_type = record['attachment_type']
if attachment_type in attachment_types_to_delete:
server.delete_record(record['id'])

# Delete stash records that are older than the oldest
# pre-existing filter attachment records. These records
# cannot apply to any existing filter since we uploaded.
elif 'stash' in record and oldest_base_filter_id is not None:
record_time = record['stash_time']
if record_time < oldest_base_filter_id:
server.delete_record(record['id'])

# Commit the changes to remote settings for review + signing.
# Only after any changes to records (attachments and stashes)
# and including deletions can we commit the session and update
# the config with the new timestamps.
server.complete_session()
set_config(MLBF_TIME_CONFIG_KEY, generation_time, json_value=True)
if is_base:
set_config(MLBF_BASE_ID_CONFIG_KEY, generation_time, json_value=True)

# Update the base_filter_id for uploaded filters.
for block_type in filters_to_upload:
# We currently write to the old singular config key for hard blocks
# to preserve backward compatibility.
# In https://github.com/mozilla/addons/issues/15193
# we can remove this and start writing to the new plural key.
if block_type == BlockType.BLOCKED:
set_config(
MLBF_BASE_ID_CONFIG_KEY(block_type, compat=True),
generation_time,
json_value=True,
)

set_config(
MLBF_BASE_ID_CONFIG_KEY(block_type), generation_time, json_value=True
)

cleanup_old_files.delay(base_filter_id=oldest_base_filter_id)
statsd.incr('blocklist.tasks.upload_filter.reset_collection')


@task
Expand Down
2 changes: 1 addition & 1 deletion src/olympia/blocklist/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ def test_command(self):

call_command('export_blocklist', '1')
mlbf = MLBF.load_from_storage(1)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path())
32 changes: 19 additions & 13 deletions src/olympia/blocklist/tests/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_skip_update_unless_force_base(self):

# Check that a filter was created on the second attempt
mlbf = MLBF.load_from_storage(self.current_time)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path())
assert not mlbf.storage.exists(mlbf.stash_path)

def test_skip_update_unless_no_base_mlbf(self):
Expand Down Expand Up @@ -220,11 +220,12 @@ def test_upload_stash_unless_force_base(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=force_base,
filter_list=[],
create_stash=True,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
assert mlbf.storage.exists(mlbf.filter_path) == force_base
assert mlbf.storage.exists(mlbf.filter_path()) == force_base
assert mlbf.storage.exists(mlbf.stash_path) != force_base

def test_upload_stash_unless_missing_base_filter(self):
Expand All @@ -238,11 +239,12 @@ def test_upload_stash_unless_missing_base_filter(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=False,
filter_list=[],
create_stash=True,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
assert not mlbf.storage.exists(mlbf.filter_path)
assert not mlbf.storage.exists(mlbf.filter_path())
assert mlbf.storage.exists(mlbf.stash_path)

self.mocks[
Expand All @@ -252,11 +254,12 @@ def test_upload_stash_unless_missing_base_filter(self):
assert (
mock.call(
self.current_time,
is_base=True,
filter_list=[BlockType.BLOCKED.name],
create_stash=False,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path())

@mock.patch('olympia.blocklist.cron.BASE_REPLACE_THRESHOLD', 1)
def test_upload_stash_unless_enough_changes(self):
Expand All @@ -271,11 +274,12 @@ def test_upload_stash_unless_enough_changes(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=False,
filter_list=[],
create_stash=True,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
assert not mlbf.storage.exists(mlbf.filter_path)
assert not mlbf.storage.exists(mlbf.filter_path())
assert mlbf.storage.exists(mlbf.stash_path)

self._block_version(is_signed=True)
Expand All @@ -288,12 +292,13 @@ def test_upload_stash_unless_enough_changes(self):
assert (
mock.call(
self.current_time,
is_base=True,
filter_list=[BlockType.BLOCKED.name],
create_stash=False,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
new_mlbf = MLBF.load_from_storage(self.current_time)
assert new_mlbf.storage.exists(new_mlbf.filter_path)
assert new_mlbf.storage.exists(new_mlbf.filter_path())
assert not new_mlbf.storage.exists(new_mlbf.stash_path)

def test_cleanup_old_files(self):
Expand Down Expand Up @@ -374,7 +379,8 @@ def test_invalid_cache_results_in_diff(self):
assert (
mock.call(
self.current_time,
is_base=False,
filter_list=[],
create_stash=True,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
Expand All @@ -393,7 +399,7 @@ def test_get_last_generation_time(self):

def test_get_base_generation_time(self):
assert get_base_generation_time() is None
set_config(MLBF_BASE_ID_CONFIG_KEY, 1)
set_config(MLBF_BASE_ID_CONFIG_KEY(BlockType.BLOCKED, compat=True), 1)
assert get_base_generation_time() == 1


Expand Down
Loading
Loading