36
36
ConnectionType ,
37
37
)
38
38
from fides .api .models .datasetconfig import DatasetConfig
39
- from fides .api .models .policy import Policy
39
+ from fides .api .models .policy import Policy , Rule
40
+ from fides .api .models .privacy_preference import PrivacyPreferenceHistory
40
41
from fides .api .models .privacy_request import ExecutionLog , PrivacyRequest , RequestTask
41
42
from fides .api .schemas .policy import ActionType , CurrentStep
42
43
from fides .api .schemas .privacy_request import ExecutionLogStatus
54
55
make_immutable ,
55
56
make_mutable ,
56
57
)
57
- from fides .api .util .consent_util import add_errored_system_status_for_consent_reporting
58
+ from fides .api .util .consent_util import (
59
+ add_errored_system_status_for_consent_reporting_on_preferences ,
60
+ )
58
61
from fides .api .util .logger import Pii
59
62
from fides .api .util .logger_context_utils import LoggerContextKeys
60
63
from fides .api .util .saas_util import FIDESOPS_GROUPED_INPUTS
@@ -138,13 +141,7 @@ def result(*args: Any, **kwargs: Any) -> Any:
138
141
self .resources .request .id ,
139
142
)
140
143
self .log_skipped (action_type , exc )
141
- for pref in self .resources .request .privacy_preferences :
142
- # For consent reporting, also caching the given system as skipped for all historical privacy preferences.
143
- pref .cache_system_status (
144
- self .resources .session ,
145
- self .connector .configuration .system_key ,
146
- ExecutionLogStatus .skipped ,
147
- )
144
+ self .cache_system_status_for_preferences ()
148
145
return default_return
149
146
except BaseException as ex : # pylint: disable=W0703
150
147
traceback .print_exc ()
@@ -164,11 +161,7 @@ def result(*args: Any, **kwargs: Any) -> Any:
164
161
action_type .value
165
162
] # Convert ActionType into a CurrentStep, no longer coerced with Pydantic V2
166
163
)
167
- add_errored_system_status_for_consent_reporting (
168
- self .resources .session ,
169
- self .resources .request ,
170
- self .connector .configuration ,
171
- )
164
+ self .add_error_status_for_consent_reporting ()
172
165
if not self .request_task .id :
173
166
# TODO Remove when we stop support for DSR 2.0
174
167
# Re-raise to stop privacy request execution on failure for
@@ -730,6 +723,48 @@ def consent_request(self, identity: Dict[str, Any]) -> bool:
730
723
self .log_end (ActionType .consent )
731
724
return output
732
725
726
+ def cache_system_status_for_preferences (self ) -> None :
727
+ """
728
+ Calls cache_system_status for all historical privacy preferences for the given request.
729
+
730
+ Purposely uses a new session.
731
+ """
732
+
733
+ privacy_request_id = self .resources .request .id
734
+
735
+ with get_db () as db :
736
+
737
+ privacy_preferences = db .query (PrivacyPreferenceHistory ).filter (
738
+ PrivacyPreferenceHistory .privacy_request_id == privacy_request_id
739
+ )
740
+ for pref in privacy_preferences :
741
+ # For consent reporting, also caching the given system as skipped for all historical privacy preferences.
742
+ pref .cache_system_status (
743
+ db ,
744
+ self .connector .configuration .system_key , # type: ignore[arg-type]
745
+ ExecutionLogStatus .skipped ,
746
+ )
747
+
748
+ def add_error_status_for_consent_reporting (self ) -> None :
749
+ """
750
+ Adds the errored system status for all historical privacy preferences for the given request that
751
+ are deemed relevant for the connector failure (i.e if they had a "pending" log added to them).
752
+
753
+ Purposely uses a new session.
754
+ """
755
+ privacy_request_id = self .resources .request .id
756
+ with get_db () as db :
757
+ privacy_preferences = (
758
+ db .query (PrivacyPreferenceHistory )
759
+ .filter (
760
+ PrivacyPreferenceHistory .privacy_request_id == privacy_request_id
761
+ )
762
+ .all ()
763
+ )
764
+ add_errored_system_status_for_consent_reporting_on_preferences (
765
+ db , privacy_preferences , self .connector .configuration
766
+ )
767
+
733
768
734
769
def collect_queries (
735
770
traversal : Traversal , resources : TaskResources
@@ -816,39 +851,45 @@ def build_affected_field_logs(
816
851
}]
817
852
"""
818
853
819
- targeted_field_paths : Dict [ FieldAddress , str ] = {}
854
+ policy_id = policy . id
820
855
821
- for rule in policy .rules : # type: ignore[attr-defined]
822
- if rule .action_type != action_type :
823
- continue
824
- rule_categories : List [str ] = rule .get_target_data_categories ()
825
- if not rule_categories :
826
- continue
856
+ with get_db () as db :
827
857
828
- collection_categories : Dict [
829
- str , List [FieldPath ]
830
- ] = node .collection .field_paths_by_category # type: ignore
831
- for rule_cat in rule_categories :
832
- for collection_cat , field_paths in collection_categories .items ():
833
- if collection_cat .startswith (rule_cat ):
834
- targeted_field_paths .update (
835
- {
836
- node .address .field_address (field_path ): collection_cat
837
- for field_path in field_paths
838
- }
839
- )
858
+ rules = db .query (Rule ).filter (Rule .policy_id == policy_id )
840
859
841
- ret : List [Dict [str , Any ]] = []
842
- for field_address , data_categories in targeted_field_paths .items ():
843
- ret .append (
844
- {
845
- "path" : field_address .value ,
846
- "field_name" : field_address .field_path .string_path ,
847
- "data_categories" : [data_categories ],
848
- }
849
- )
860
+ targeted_field_paths : Dict [FieldAddress , str ] = {}
861
+
862
+ for rule in rules : # type: ignore[attr-defined]
863
+ if rule .action_type != action_type :
864
+ continue
865
+ rule_categories : List [str ] = rule .get_target_data_categories ()
866
+ if not rule_categories :
867
+ continue
868
+
869
+ collection_categories : Dict [
870
+ str , List [FieldPath ]
871
+ ] = node .collection .field_paths_by_category # type: ignore
872
+ for rule_cat in rule_categories :
873
+ for collection_cat , field_paths in collection_categories .items ():
874
+ if collection_cat .startswith (rule_cat ):
875
+ targeted_field_paths .update (
876
+ {
877
+ node .address .field_address (field_path ): collection_cat
878
+ for field_path in field_paths
879
+ }
880
+ )
881
+
882
+ ret : List [Dict [str , Any ]] = []
883
+ for field_address , data_categories in targeted_field_paths .items ():
884
+ ret .append (
885
+ {
886
+ "path" : field_address .value ,
887
+ "field_name" : field_address .field_path .string_path ,
888
+ "data_categories" : [data_categories ],
889
+ }
890
+ )
850
891
851
- return ret
892
+ return ret
852
893
853
894
854
895
def build_consent_dataset_graph (datasets : List [DatasetConfig ]) -> DatasetGraph :
0 commit comments