Skip to content

Commit a0ab072

Browse files
authored
Adding support for BigQuery struct updates (#5849)
1 parent fdb0e48 commit a0ab072

12 files changed

+1108
-54
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Changes can also be flagged with a GitHub label for tracking purposes. The URL o
2828
- Attachments storage capabilities (S3 or local) [#5812](https://github.com/ethyca/fides/pull/5812) https://github.com/ethyca/fides/labels/db-migration
2929
- DB model support for Comments [#5833](https://github.com/ethyca/fides/pull/5833/files) https://github.com/ethyca/fides/labels/db-migration
3030
- Added UI for configuring website integrations and monitors [#5867](https://github.com/ethyca/fides/pull/5867)
31+
- Adding support for BigQuery struct updates [#5849](https://github.com/ethyca/fides/pull/5849)
3132

3233
### Changed
3334
- Bumped supported Python versions to `3.10.16` and `3.9.21` [#5840](https://github.com/ethyca/fides/pull/5840)

data/dataset/bigquery_example_test_dataset.yml

+20
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ dataset:
6464
fides_meta:
6565
identity: email
6666
data_type: string
67+
read_only: True
6768
- name: custom_id
6869
data_categories: [user.unique_id]
6970
fides_meta:
@@ -76,6 +77,24 @@ dataset:
7677
fides_meta:
7778
data_type: string
7879
length: 40
80+
- name: tags
81+
data_categories: [user]
82+
fides_meta:
83+
data_type: string[]
84+
- name: purchase_history
85+
fides_meta:
86+
data_type: object[]
87+
fields:
88+
- name: item_id
89+
data_categories: [system.operations]
90+
- name: price
91+
data_categories: [user.financial]
92+
- name: purchase_date
93+
data_categories: [system.operations]
94+
- name: item_tags
95+
data_categories: [user]
96+
fides_meta:
97+
data_type: string[]
7998

8099
- name: employee
81100
fides_meta:
@@ -256,6 +275,7 @@ dataset:
256275
- name: email
257276
data_categories: [user.contact.email]
258277
fides_meta:
278+
read_only: true
259279
identity: email
260280
data_type: string
261281
- name: last_visit
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
## Understanding the `BigQueryQueryConfig.generate_update` Method
2+
3+
### Example: Handling Nested Data and Arrays
4+
5+
Consider this original row in BigQuery:
6+
7+
```python
8+
# Original row data
9+
row = {
10+
"user_id": 123,
11+
"profile": {
12+
"name": "John Doe",
13+
"email": "[email protected]",
14+
"preferences": {
15+
"theme": "dark",
16+
"notifications": True
17+
},
18+
"tags": ["customer", "premium", "active"]
19+
},
20+
"activity": {
21+
"last_login": "2023-05-15",
22+
"login_count": 42
23+
}
24+
}
25+
```
26+
27+
And a masking update:
28+
29+
```python
30+
update_value_map = {
31+
"profile.email": "REDACTED",
32+
"profile.preferences.notifications": None,
33+
"profile.tags.0": None, # Null first tag
34+
"profile.tags.1": None, # Null second tag
35+
"profile.tags.2": None # Null third tag
36+
}
37+
```
38+
39+
### Step-by-Step Process
40+
41+
#### Step 1: Take `update_value_map` as-is
42+
```python
43+
# update_value_map with array indexes
44+
{
45+
"profile.email": "REDACTED",
46+
"profile.preferences.notifications": None,
47+
"profile.tags.0": None,
48+
"profile.tags.1": None,
49+
"profile.tags.2": None
50+
}
51+
```
52+
53+
#### Step 2: Flatten the row with array indexes
54+
```python
55+
flattened_row = {
56+
"user_id": 123,
57+
"profile.name": "John Doe",
58+
"profile.email": "[email protected]",
59+
"profile.preferences.theme": "dark",
60+
"profile.preferences.notifications": True,
61+
"profile.tags.0": "customer",
62+
"profile.tags.1": "premium",
63+
"profile.tags.2": "active",
64+
"activity.last_login": "2023-05-15",
65+
"activity.login_count": 42
66+
}
67+
```
68+
69+
#### Step 3: Merge `update_value_map` into `flattened_row`
70+
```python
71+
merged_dict = {
72+
"user_id": 123,
73+
"profile.name": "John Doe",
74+
"profile.email": "REDACTED", # From update_value_map
75+
"profile.preferences.theme": "dark",
76+
"profile.preferences.notifications": None, # From update_value_map
77+
"profile.tags.0": None, # From update_value_map
78+
"profile.tags.1": None, # From update_value_map
79+
"profile.tags.2": None, # From update_value_map
80+
"activity.last_login": "2023-05-15",
81+
"activity.login_count": 42
82+
}
83+
```
84+
85+
#### Step 4: Unflatten the merged dictionary
86+
```python
87+
nested_result = {
88+
"user_id": 123,
89+
"profile": {
90+
"name": "John Doe",
91+
"email": "REDACTED",
92+
"preferences": {
93+
"theme": "dark",
94+
"notifications": None
95+
},
96+
"tags": [None, None, None] # Reconstructed from indexed entries
97+
},
98+
"activity": {
99+
"last_login": "2023-05-15",
100+
"login_count": 42
101+
}
102+
}
103+
```
104+
105+
#### Step 5: Replace arrays containing only `None` values with empty arrays
106+
```python
107+
nested_result_with_arrays_fixed = {
108+
"user_id": 123,
109+
"profile": {
110+
"name": "John Doe",
111+
"email": "REDACTED",
112+
"preferences": {
113+
"theme": "dark",
114+
"notifications": None
115+
},
116+
"tags": [] # Converted from [None, None, None] to empty array
117+
},
118+
"activity": {
119+
"last_login": "2023-05-15",
120+
"login_count": 42
121+
}
122+
}
123+
```
124+
125+
#### Step 6: Only keep top-level keys that are in the `update_value_map`
126+
```python
127+
# Top-level keys in update_value_map are "profile" only
128+
top_level_keys = {"profile"}
129+
130+
final_update_map = {
131+
"profile": {
132+
"name": "John Doe",
133+
"email": "REDACTED",
134+
"preferences": {
135+
"theme": "dark",
136+
"notifications": null
137+
},
138+
"tags": [] # Empty array
139+
}
140+
}
141+
```
142+
143+
#### Step 7: Create SQL Update statements
144+
This generates a SQLAlchemy Update object that would translate to:
145+
146+
```sql
147+
UPDATE `project_id.dataset_id.table_name`
148+
SET
149+
profile = {
150+
"name": "John Doe",
151+
"email": "REDACTED",
152+
"preferences": {
153+
"theme": "dark",
154+
"notifications": null
155+
},
156+
"tags": []
157+
}
158+
WHERE user_id = 123
159+
```
160+
161+
### Array Handling Notes
162+
163+
- Individual array elements can be targeted using indexed keys like `profile.tags.0`
164+
- When all elements in an array become `None`, it's automatically converted to an empty array `[]`
165+
166+
This approach ensures that complex nested JSON structures in BigQuery can be properly updated while maintaining their hierarchical structure.

src/fides/api/service/connectors/query_configs/bigquery_query_config.py

+46-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import Any, Dict, List, Optional, Union, cast
22

3+
import pydash
34
from fideslang.models import MaskingStrategies
45
from loguru import logger
56
from sqlalchemy import MetaData, Table, text
@@ -17,7 +18,14 @@
1718
from fides.api.service.connectors.query_configs.query_config import (
1819
QueryStringWithoutTuplesOverrideQueryConfig,
1920
)
20-
from fides.api.util.collection_util import Row, filter_nonempty_values
21+
from fides.api.util.collection_util import (
22+
Row,
23+
filter_nonempty_values,
24+
flatten_dict,
25+
merge_dicts,
26+
replace_none_arrays,
27+
unflatten_dict,
28+
)
2129

2230

2331
class BigQueryQueryConfig(QueryStringWithoutTuplesOverrideQueryConfig):
@@ -120,18 +128,45 @@ def generate_update(
120128
A List of multiple Update objects are returned for partitioned tables; for a non-partitioned table,
121129
a single Update object is returned in a List for consistent typing.
122130
123-
TODO: DRY up this method and `generate_delete` a bit
131+
This implementation handles nested fields by grouping them as JSON objects rather than
132+
individual field updates.
133+
134+
See the README.md in this directory for a detailed example of how nested data is handled.
124135
"""
136+
137+
# 1. Take update_value_map as-is (already flattened)
125138
update_value_map: Dict[str, Any] = self.update_value_map(row, policy, request)
139+
140+
# 2. Flatten the row
141+
flattened_row = flatten_dict(row)
142+
143+
# 3. Merge flattened_row with update_value_map (update_value_map takes precedence)
144+
merged_dict = merge_dicts(flattened_row, update_value_map)
145+
146+
# 4. Unflatten the merged dictionary
147+
nested_result = unflatten_dict(merged_dict)
148+
149+
# 5. Replace any arrays containing only None values with empty arrays
150+
nested_result = replace_none_arrays(nested_result) # type: ignore
151+
152+
# 6. Only keep top-level keys that are in the update_value_map
153+
top_level_keys = {key.split(".")[0] for key in update_value_map}
154+
155+
# Filter the nested result to only include those top-level keys
156+
final_update_map = {
157+
k: v for k, v in nested_result.items() if k in top_level_keys
158+
}
159+
160+
# Use existing non-empty reference fields mechanism for WHERE clause
126161
non_empty_reference_field_keys: Dict[str, Field] = filter_nonempty_values(
127162
{
128-
fpath.string_path: fld.cast(row[fpath.string_path])
163+
fpath.string_path: fld.cast(pydash.get(row, fpath.string_path))
129164
for fpath, fld in self.reference_field_paths.items()
130-
if fpath.string_path in row
165+
if pydash.get(row, fpath.string_path) is not None
131166
}
132167
)
133168

134-
valid = len(non_empty_reference_field_keys) > 0 and update_value_map
169+
valid = len(non_empty_reference_field_keys) > 0 and final_update_map
135170
if not valid:
136171
logger.warning(
137172
"There is not enough data to generate a valid update statement for {}",
@@ -154,12 +189,12 @@ def generate_update(
154189
partitioned_queries.append(
155190
table.update()
156191
.where(*(where_clauses + [text(partition_clause)]))
157-
.values(**update_value_map)
192+
.values(**final_update_map)
158193
)
159194

160195
return partitioned_queries
161196

162-
return [table.update().where(*where_clauses).values(**update_value_map)]
197+
return [table.update().where(*where_clauses).values(**final_update_map)]
163198

164199
def generate_delete(self, row: Row, client: Engine) -> List[Delete]:
165200
"""Returns a List of SQLAlchemy DELETE statements for BigQuery. Does not actually execute the delete statement.
@@ -213,18 +248,14 @@ def format_fields_for_query(
213248
self,
214249
field_paths: List[FieldPath],
215250
) -> List[str]:
216-
"""Returns field paths in a format they can be added into SQL queries.
217-
251+
"""
252+
Returns field paths in a format they can be added into SQL queries.
218253
Only returns non-nested fields (fields with exactly one level).
219-
Nested fields are skipped with a warning log.
220254
"""
255+
221256
formatted_fields = []
222257
for field_path in field_paths:
223-
if len(field_path.levels) > 1:
224-
logger.warning(
225-
f"Skipping nested field '{'.'.join(field_path.levels)}' as nested fields are not supported"
226-
)
227-
else:
258+
if len(field_path.levels) == 1:
228259
formatted_fields.append(field_path.levels[0])
229260
return formatted_fields
230261

0 commit comments

Comments
 (0)