Skip to content
This repository has been archived by the owner on Nov 19, 2023. It is now read-only.

Commit

Permalink
fix/208: resolved indeterminism
Browse files Browse the repository at this point in the history
  • Loading branch information
Na'aman Hirschfeld authored and Na'aman Hirschfeld committed Feb 28, 2021
1 parent e89a5cc commit c281ffc
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 32 deletions.
12 changes: 5 additions & 7 deletions openapi_tester/schema_tester.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
""" Schema Tester """
from itertools import combinations
from itertools import chain
from typing import Any, Callable, Dict, List, Optional, Union, cast

from django.conf import settings
Expand All @@ -19,7 +19,7 @@
)
from openapi_tester.exceptions import DocumentationError, UndocumentedSchemaSectionError
from openapi_tester.loaders import DrfSpectacularSchemaLoader, DrfYasgSchemaLoader, StaticSchemaLoader
from openapi_tester.utils import merge_objects, normalize_schema_section
from openapi_tester.utils import lazy_combinations, normalize_schema_section
from openapi_tester.validators import (
validate_enum,
validate_format,
Expand Down Expand Up @@ -154,7 +154,7 @@ def get_response_schema_section(self, response: td.Response) -> Dict[str, Any]:

def handle_one_of(self, schema_section: dict, data: Any, reference: str, **kwargs: Any):
matches = 0
for option in [normalize_schema_section(entry) for entry in schema_section["oneOf"]]:
for option in schema_section["oneOf"]:
try:
self.test_schema_section(schema_section=option, data=data, reference=f"{reference}.oneOf", **kwargs)
matches += 1
Expand All @@ -164,10 +164,8 @@ def handle_one_of(self, schema_section: dict, data: Any, reference: str, **kwarg
raise DocumentationError(f"{VALIDATE_ONE_OF_ERROR.format(matches=matches)}\n\nReference: {reference}.oneOf")

def handle_any_of(self, schema_section: dict, data: Any, reference: str, **kwargs: Any):
any_of: List[Dict[str, Any]] = [normalize_schema_section(entry) for entry in schema_section.get("anyOf", [])]
for i in range(2, len(any_of) + 1):
any_of.extend([merge_objects(combination) for combination in combinations(any_of, i)])
for schema in any_of:
any_of: List[Dict[str, Any]] = schema_section.get("anyOf", [])
for schema in chain(any_of, lazy_combinations(any_of)):
try:
self.test_schema_section(schema_section=schema, data=data, reference=f"{reference}.anyOf", **kwargs)
return
Expand Down
34 changes: 21 additions & 13 deletions openapi_tester/utils.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
""" Utils Module - this file contains utility functions used in multiple places """
from typing import Any, Dict, Sequence
from copy import deepcopy
from itertools import chain, combinations
from typing import Any, Dict, Iterator, Sequence


def merge_objects(dictionaries: Sequence[Dict[str, Any]]) -> Dict[str, Any]:
""" helper function to deep merge objects """
output: Dict[str, Any] = {}
for dictionary in dictionaries:
for key, value in dictionary.items():
if isinstance(value, dict) and "allOf" in value:
all_of = merge_objects(value.pop("allOf"))
value = merge_objects([value, all_of])
if key not in output:
output[key] = value
continue
current_value = output[key]
if isinstance(current_value, list) and isinstance(value, list):
output[key] = [*output[key], *value]
output[key] = list(chain(output[key], value))
continue
if isinstance(current_value, dict) and isinstance(value, dict):
output[key] = merge_objects([current_value, value])
continue
return output


def normalize_schema_section(schema_section: dict) -> dict:
""" helper method to remove allOf and handle edge uses of oneOf"""
output: Dict[str, Any] = {**schema_section}
if "allOf" in schema_section:
all_of = schema_section.pop("allOf")
schema_section = {**schema_section, **merge_objects(all_of)}
if schema_section.get("oneOf") and all(item.get("enum") for item in schema_section["oneOf"]):
output: Dict[str, Any] = deepcopy(schema_section)
if output.get("allOf"):
all_of = output.pop("allOf")
output = {**output, **merge_objects(all_of)}
if output.get("oneOf") and all(item.get("enum") for item in output["oneOf"]):
# handle the way drf-spectacular is doing enums
one_of = schema_section.pop("oneOf")
schema_section = {**schema_section, **merge_objects(one_of)}
one_of = output.pop("oneOf")
output = {**output, **merge_objects(one_of)}
for key, value in output.items():
if isinstance(value, dict):
output[key] = normalize_schema_section(value)
elif isinstance(value, list):
output[key] = [normalize_schema_section(entry) if isinstance(entry, dict) else entry for entry in value]
return schema_section
return output


def lazy_combinations(options_list: Sequence[Dict[str, Any]]) -> Iterator[dict]:
""" helper to lazy evaluate possible permutations of possible combinations """
for i in range(2, len(options_list) + 1):
for combination in combinations(options_list, i):
yield merge_objects(combination)
16 changes: 6 additions & 10 deletions tests/schema_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,15 @@ def __init__(self, schema: dict):

def convert_schema(self, schema: Dict[str, Any]) -> Any:
schema_type = schema.get("type", "object")
sample: List[Dict[str, Any]] = []
schema = normalize_schema_section(schema)
if "oneOf" in schema:
one_of = schema.pop("oneOf")
while not sample:
sample = random.sample(one_of, 1)
return self.convert_schema({**schema, **sample[0]})
return self.convert_schema({**schema, **random.sample(one_of, 1)[0]})
if "anyOf" in schema:
any_of = schema.pop("anyOf")
while not sample:
sample = random.sample(any_of, random.randint(1, len(any_of)))
sample = [normalize_schema_section(item) for item in sample]
return self.convert_schema({**schema, **merge_objects(sample)})
return self.convert_schema(
{**schema, **merge_objects(random.sample(any_of, random.randint(1, len(any_of))))}
)
if schema_type == "array":
return self.convert_schema_array_to_list(schema)
if schema_type == "object":
Expand Down Expand Up @@ -77,9 +73,9 @@ def schema_type_to_mock_value(self, schema_object: Dict[str, Any]) -> Any:
return random.sample(enum, 1)[0]
if schema_type in ["integer", "number"] and (minimum is not None or maximum is not None):
if minimum is not None:
minimum += 1 if schema_object.get("excludeMinimum") else 0
minimum += 1 if schema_object.get("exclusiveMinimum") else 0
if maximum is not None:
maximum -= 1 if schema_object.get("excludeMaximum") else 0
maximum -= 1 if schema_object.get("exclusiveMaximum") else 0
if minimum is not None or maximum is not None:
minimum = minimum or 0
maximum = maximum or minimum * 2
Expand Down
3 changes: 1 addition & 2 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ def test_merge_objects():
test_schemas = [
object_1,
object_2,
{"type": "object", "properties": {"key3": {"allOf": [object_1, object_2]}}},
]
expected = {
"type": "object",
"required": ["key1", "key2"],
"properties": {"key1": {"type": "string"}, "key2": {"type": "string"}, "key3": merged_object},
"properties": {"key1": {"type": "string"}, "key2": {"type": "string"}},
}
assert sort_object(merge_objects(test_schemas)) == sort_object(expected)

0 comments on commit c281ffc

Please sign in to comment.