Skip to content

Commit

Permalink
core - update determine_new_type()
Browse files Browse the repository at this point in the history
  • Loading branch information
cobycloud committed Jan 4, 2025
1 parent 51fd76d commit a1c1f0e
Showing 1 changed file with 39 additions and 19 deletions.
58 changes: 39 additions & 19 deletions pkgs/core/swarmauri_core/ComponentBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,29 +331,49 @@ def determine_new_type(cls, field_annotation, resource_type):
- The updated type annotation incorporating SubclassUnion.
"""
logger.debug(f"Determining new type for field annotation '{field_annotation}' with resource type '{resource_type.__name__}'")
origin = get_origin(field_annotation)
args = get_args(field_annotation)

is_optional = False
try:
origin = get_origin(field_annotation)
args = get_args(field_annotation)

if origin is Union and type(None) in args:
non_none_args = [arg for arg in args if arg is not type(None)]
if len(non_none_args) == 1:
field_annotation = non_none_args[0]
origin = get_origin(field_annotation)
args = get_args(field_annotation)
is_optional = True
is_optional = False

if origin is Annotated:
# Preserve existing metadata while updating resource type
logger.debug(f"Field annotation '{field_annotation}' is Annotated")
field_annotation = Annotated[args[0], *(a for a in args[1:] if not isinstance(a, ResourceType)), ResourceType(resource_type)]
# Handle Optional[...] (Union[..., NoneType])
if origin is Union and type(None) in args:
non_none_args = [arg for arg in args if arg is not type(None)]
if len(non_none_args) == 1:
field_annotation = non_none_args[0]
origin = get_origin(field_annotation)
args = get_args(field_annotation)
is_optional = True
else:
# Multiple non-None types, complex Union
logger.warning(f"Field annotation '{field_annotation}' has multiple non-None Union types; optionality may not be preserved correctly.")

new_type = Annotated[SubclassUnion[resource_type], Field(discriminator='type'), ResourceType(resource_type)]
if is_optional:
new_type = Union[new_type, type(None)]
# Handle Annotated
if origin is Annotated:
base_type = args[0]
metadata = [arg for arg in args[1:] if not isinstance(arg, ResourceType)]
# Append the new ResourceType
metadata.append(ResourceType(resource_type))
logger.debug(f"Preserving existing metadata and adding ResourceType for resource '{resource_type.__name__}'")
field_annotation = Annotated[base_type, *metadata]

# Construct the new type with SubclassUnion and discriminated Union
subclass_union = SubclassUnion[resource_type]
annotated_union = Annotated[Union[tuple(ComponentBase.TYPE_REGISTRY.get(resource_type, {}).values())], Field(discriminator='type'), ResourceType(resource_type)]

# Preserve Optionality if necessary
if is_optional:
new_type = Union[annotated_union, type(None)]
else:
new_type = annotated_union

logger.debug(f"New type for field: {new_type}")
return new_type
except TypeError as e:
logger.error(f"TypeError while determining new type for field annotation '{field_annotation}': {e}")
return field_annotation # Fallback to original type if error occurs

return new_type


@classmethod
Expand Down

0 comments on commit a1c1f0e

Please sign in to comment.