This repository has been archived by the owner on Nov 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #229 from snok/fix/208-test-indeterminancy
fix/208: updated handling of anyOf
- Loading branch information
Showing
6 changed files
with
49 additions
and
85 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,41 +1,47 @@ | ||
""" Utils Module - this file contains utility functions used in multiple places """ | ||
from typing import Any, Dict, Iterable, List | ||
from copy import deepcopy | ||
from itertools import chain, combinations | ||
from typing import Any, Dict, Iterator, Sequence | ||
|
||
|
||
def merge_objects(dictionaries: List[Dict[str, Any]]) -> Dict[str, Any]: | ||
def merge_objects(dictionaries: Sequence[Dict[str, Any]]) -> Dict[str, Any]: | ||
""" helper function to deep merge objects """ | ||
output: Dict[str, Any] = {} | ||
for dictionary in dictionaries: | ||
for key, value in dictionary.items(): | ||
if isinstance(value, dict) and "allOf" in value: | ||
all_of = merge_objects(value.pop("allOf")) | ||
value = merge_objects([value, all_of]) | ||
if key not in output: | ||
output[key] = value | ||
continue | ||
current_value = output[key] | ||
if isinstance(current_value, list) and isinstance(value, list): | ||
output[key] = list({*output[key], *value}) | ||
output[key] = list(chain(output[key], value)) | ||
continue | ||
if isinstance(current_value, dict) and isinstance(value, dict): | ||
output[key] = merge_objects([current_value, value]) | ||
continue | ||
return output | ||
|
||
|
||
def combine_object_schemas(schemas: List[dict]) -> Dict[str, Any]: | ||
properties = merge_objects([schema.get("properties", {}) for schema in schemas]) | ||
required_list = [schema.get("required", []) for schema in schemas] | ||
required = list({key for required in required_list for key in required}) | ||
return {"type": "object", "required": required, "properties": properties} | ||
def normalize_schema_section(schema_section: dict) -> dict: | ||
""" helper method to remove allOf and handle edge uses of oneOf""" | ||
output: Dict[str, Any] = deepcopy(schema_section) | ||
if output.get("allOf"): | ||
all_of = output.pop("allOf") | ||
output = {**output, **merge_objects(all_of)} | ||
if output.get("oneOf") and all(item.get("enum") for item in output["oneOf"]): | ||
# handle the way drf-spectacular is doing enums | ||
one_of = output.pop("oneOf") | ||
output = {**output, **merge_objects(one_of)} | ||
for key, value in output.items(): | ||
if isinstance(value, dict): | ||
output[key] = normalize_schema_section(value) | ||
elif isinstance(value, list): | ||
output[key] = [normalize_schema_section(entry) if isinstance(entry, dict) else entry for entry in value] | ||
return output | ||
|
||
|
||
def combine_sub_schemas(schemas: Iterable[Dict[str, Any]]) -> Dict[str, Any]: | ||
array_schemas = [schema for schema in schemas if schema.get("type") == "array"] | ||
object_schemas = [schema for schema in schemas if schema.get("type") == "object" or not schema.get("type")] | ||
if array_schemas: | ||
return { | ||
"type": "array", | ||
"items": combine_sub_schemas([schema.get("items", {}) for schema in array_schemas]), | ||
} | ||
if object_schemas: | ||
return combine_object_schemas(object_schemas) | ||
return merge_objects([schema for schema in schemas if schema.get("type") not in ["object", "array"]]) | ||
def lazy_combinations(options_list: Sequence[Dict[str, Any]]) -> Iterator[dict]: | ||
""" helper to lazy evaluate possible permutations of possible combinations """ | ||
for i in range(2, len(options_list) + 1): | ||
for combination in combinations(options_list, i): | ||
yield merge_objects(combination) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters