Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LJ-474 Refactor privacy request processing to never re-use sessions #5862

Merged
merged 3 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Changes can also be flagged with a GitHub label for tracking purposes. The URL o
- Bumped supported Python versions to `3.10.16` and `3.9.21` [#5840](https://github.com/ethyca/fides/pull/5840)
- Update the privacy request detail page to a new layout and improved styling [#5824](https://github.com/ethyca/fides/pull/5824)
- Updated privacy request handling to still succeed if not all identities are provided [#5836](https://github.com/ethyca/fides/pull/5836)
- Refactored privacy request processing to never re-use sessions [#5862](https://github.com/ethyca/fides/pull/5862)


### Developer Experience
Expand Down
127 changes: 84 additions & 43 deletions src/fides/api/task/graph_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
ConnectionType,
)
from fides.api.models.datasetconfig import DatasetConfig
from fides.api.models.policy import Policy
from fides.api.models.policy import Policy, Rule
from fides.api.models.privacy_preference import PrivacyPreferenceHistory
from fides.api.models.privacy_request import ExecutionLog, PrivacyRequest, RequestTask
from fides.api.schemas.policy import ActionType, CurrentStep
from fides.api.schemas.privacy_request import ExecutionLogStatus
Expand All @@ -54,7 +55,9 @@
make_immutable,
make_mutable,
)
from fides.api.util.consent_util import add_errored_system_status_for_consent_reporting
from fides.api.util.consent_util import (
add_errored_system_status_for_consent_reporting_on_preferences,
)
from fides.api.util.logger import Pii
from fides.api.util.logger_context_utils import LoggerContextKeys
from fides.api.util.saas_util import FIDESOPS_GROUPED_INPUTS
Expand Down Expand Up @@ -138,13 +141,7 @@ def result(*args: Any, **kwargs: Any) -> Any:
self.resources.request.id,
)
self.log_skipped(action_type, exc)
for pref in self.resources.request.privacy_preferences:
# For consent reporting, also caching the given system as skipped for all historical privacy preferences.
pref.cache_system_status(
self.resources.session,
self.connector.configuration.system_key,
ExecutionLogStatus.skipped,
)
self.cache_system_status_for_preferences()
return default_return
except BaseException as ex: # pylint: disable=W0703
traceback.print_exc()
Expand All @@ -164,11 +161,7 @@ def result(*args: Any, **kwargs: Any) -> Any:
action_type.value
] # Convert ActionType into a CurrentStep, no longer coerced with Pydantic V2
)
add_errored_system_status_for_consent_reporting(
self.resources.session,
self.resources.request,
self.connector.configuration,
)
self.add_error_status_for_consent_reporting()
if not self.request_task.id:
# TODO Remove when we stop support for DSR 2.0
# Re-raise to stop privacy request execution on failure for
Expand Down Expand Up @@ -730,6 +723,48 @@ def consent_request(self, identity: Dict[str, Any]) -> bool:
self.log_end(ActionType.consent)
return output

def cache_system_status_for_preferences(self) -> None:
"""
Calls cache_system_status for all historical privacy preferences for the given request.
Purposely uses a new session.
"""

privacy_request_id = self.resources.request.id

with get_db() as db:

privacy_preferences = db.query(PrivacyPreferenceHistory).filter(
PrivacyPreferenceHistory.privacy_request_id == privacy_request_id
)
for pref in privacy_preferences:
# For consent reporting, also caching the given system as skipped for all historical privacy preferences.
pref.cache_system_status(
db,
self.connector.configuration.system_key, # type: ignore[arg-type]
ExecutionLogStatus.skipped,
)

def add_error_status_for_consent_reporting(self) -> None:
"""
Adds the errored system status for all historical privacy preferences for the given request that
are deemed relevant for the connector failure (i.e if they had a "pending" log added to them).
Purposely uses a new session.
"""
privacy_request_id = self.resources.request.id
with get_db() as db:
privacy_preferences = (
db.query(PrivacyPreferenceHistory)
.filter(
PrivacyPreferenceHistory.privacy_request_id == privacy_request_id
)
.all()
)
add_errored_system_status_for_consent_reporting_on_preferences(
db, privacy_preferences, self.connector.configuration
)


def collect_queries(
traversal: Traversal, resources: TaskResources
Expand Down Expand Up @@ -816,39 +851,45 @@ def build_affected_field_logs(
}]
"""

targeted_field_paths: Dict[FieldAddress, str] = {}
policy_id = policy.id

for rule in policy.rules: # type: ignore[attr-defined]
if rule.action_type != action_type:
continue
rule_categories: List[str] = rule.get_target_data_categories()
if not rule_categories:
continue
with get_db() as db:

collection_categories: Dict[
str, List[FieldPath]
] = node.collection.field_paths_by_category # type: ignore
for rule_cat in rule_categories:
for collection_cat, field_paths in collection_categories.items():
if collection_cat.startswith(rule_cat):
targeted_field_paths.update(
{
node.address.field_address(field_path): collection_cat
for field_path in field_paths
}
)
rules = db.query(Rule).filter(Rule.policy_id == policy_id)

ret: List[Dict[str, Any]] = []
for field_address, data_categories in targeted_field_paths.items():
ret.append(
{
"path": field_address.value,
"field_name": field_address.field_path.string_path,
"data_categories": [data_categories],
}
)
targeted_field_paths: Dict[FieldAddress, str] = {}

for rule in rules: # type: ignore[attr-defined]
if rule.action_type != action_type:
continue
rule_categories: List[str] = rule.get_target_data_categories()
if not rule_categories:
continue

collection_categories: Dict[
str, List[FieldPath]
] = node.collection.field_paths_by_category # type: ignore
for rule_cat in rule_categories:
for collection_cat, field_paths in collection_categories.items():
if collection_cat.startswith(rule_cat):
targeted_field_paths.update(
{
node.address.field_address(field_path): collection_cat
for field_path in field_paths
}
)

ret: List[Dict[str, Any]] = []
for field_address, data_categories in targeted_field_paths.items():
ret.append(
{
"path": field_address.value,
"field_name": field_address.field_path.string_path,
"data_categories": [data_categories],
}
)

return ret
return ret


def build_consent_dataset_graph(datasets: List[DatasetConfig]) -> DatasetGraph:
Expand Down
25 changes: 20 additions & 5 deletions src/fides/api/util/consent_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,30 @@ def add_errored_system_status_for_consent_reporting(

Deeming them relevant if they already had a "pending" log added to them.
"""
for pref in privacy_request.privacy_preferences: # type: ignore[attr-defined]
add_errored_system_status_for_consent_reporting_on_preferences(db, privacy_request.privacy_preferences, connection_config) # type: ignore[attr-defined]


def add_errored_system_status_for_consent_reporting_on_preferences(
db: Session,
privacy_preferences: List[PrivacyPreferenceHistory],
connection_config: ConnectionConfig,
) -> None:
"""
Cache an errored system status for consent reporting on just the subset
of preferences that were deemed relevant for the connector on failure,
from the provided list of preferences.

Deeming them relevant if they already had a "pending" log added to them.
"""
for preference in privacy_preferences:
if (
pref.affected_system_status
and pref.affected_system_status.get(connection_config.system_key)
preference.affected_system_status
and preference.affected_system_status.get(connection_config.system_key)
== ExecutionLogStatus.pending.value
):
pref.cache_system_status(
preference.cache_system_status(
db,
connection_config.system_key,
connection_config.system_key, # type: ignore[arg-type]
ExecutionLogStatus.error,
)

Expand Down