Skip to content

Commit

Permalink
small fix
Browse files Browse the repository at this point in the history
  • Loading branch information
calmacx committed Feb 2, 2024
1 parent 2121c7d commit fe534c2
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 68 deletions.
33 changes: 16 additions & 17 deletions hdr_schemata/models/GWDM/v1_0/Temporal.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,41 @@
from datetime import date,datetime
from datetime import date, datetime
from typing import Optional, List, Union
from pydantic import BaseModel, Field
from hdr_schemata.definitions.HDRUK import *


class Temporal(BaseModel):
class Config:
extra = 'forbid'
extra = "forbid"


startDate: Optional[Union[date, datetime]] = Field(
...,
description='The start of the time period that the dataset provides coverage for',
example='12/03/2020',
title='Start Date',
description="The start of the time period that the dataset provides coverage for",
example="12/03/2020",
title="Start Date",
)
endDate: Optional[Union[date, datetime]] = Field(
None,
description='The end of the time period that the dataset provides coverage for',
example='12/03/2020',
title='End Date',
description="The end of the time period that the dataset provides coverage for",
example="12/03/2020",
title="End Date",
)
timeLag: TimeLag = Field(
...,
description='Rypical time-lag between an event and the data for that event appearing in the dataset',
description="Typical time-lag between an event and the data for that event appearing in the dataset",
example="LESS 1 WEEK",
title='Time Lag',
title="Time Lag",
)

accrualPeriodicity: Periodicity = Field(
...,
description='frequency of distribution release. If a dataset is distributed regularly please choose a distribution release periodicity from the constrained list and indicate the next release date. When the release date becomes historical, a new release date will be calculated based on the publishing periodicity.',
description="frequency of distribution release. If a dataset is distributed regularly please choose a distribution release periodicity from the constrained list and indicate the next release date. When the release date becomes historical, a new release date will be calculated based on the publishing periodicity.",
example="MONTHLY",
title='Periodicity',
title="Periodicity",
)

distributionReleaseDate: Optional[Union[date, datetime]] = Field(
None,
description='Date of the latest release of the dataset. If this is a regular release i.e. quarterly, or this is a static dataset please complete this alongside Periodicity.',
title='Release Date',
description="Date of the latest release of the dataset. If this is a regular release i.e. quarterly, or this is a static dataset please complete this alongside Periodicity.",
title="Release Date",
)
156 changes: 109 additions & 47 deletions hdr_schemata/utils/create_markdown.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,135 @@
#from hdr_schemata.models.GWDM.v1_0 import Gwdm10 as Model
#from hdr_schemata.models.HDRUK.base import Observation as Model
from hdr_schemata.models.HDRUK import Hdruk220 as Model
#from hdr_schemata.models.GWDM.v1_1 import Gwdm11 as Model
from pydantic import BaseModel
# from hdr_schemata.models.GWDM.v1_0 import Gwdm10 as Model
# from hdr_schemata.models.HDRUK.base import Observation as Model
# from hdr_schemata.models.HDRUK import Hdruk220 as Model
from hdr_schemata.models.GWDM.v1_1 import Gwdm11 as Model

from pydantic._internal._model_construction import ModelMetaclass
from pydantic import BaseModel, RootModel
import pandas as pd
import json
import typing
import enum

def get_fields(structure,model: type[BaseModel]):
from hdr_schemata.models.HDRUK.v2_1_2.Observations import Observation

_type1 = typing.List[Observation]
_type2 = typing.Optional[Observation]
_type3 = typing.Union[Observation, str]


def extract_type_info(type_hint):
is_list = False
is_optional = False
inner_types = None
if getattr(type_hint, "__origin__", None) is list:
is_list = True
inner_types = type_hint.__args__
elif getattr(type_hint, "__origin__", None) is typing.Union:
inner_types = type_hint.__args__
inner_types_not_none = [
_type for _type in inner_types if not _type is type(None)
]
is_optional = len(inner_types_not_none) < len(inner_types)

inner_types = inner_types_not_none
if is_optional:
inner_types += ["null"]

inner_type = inner_types[0]
is_list = getattr(inner_type, "__origin__", None) is list
if hasattr(inner_type, "__args__"):
inner_types = inner_type.__args__

else:
inner_types = [type_hint]

type_names = []
for _type in inner_types:
type_name = getattr(_type, "__name__", str(_type))

try:
if _type and issubclass(_type, RootModel):
info = _type.model_json_schema()
title = info.pop("title")
type_name += "[" + json.dumps(info).replace('"', "'") + "]"
# type_name = {title: info}
except TypeError:
...

if type(_type) == enum.EnumMeta:
type_name += (
"["
+ ",".join(
[
"'" + member.value + "'" if member.value else "null"
for member in _type
]
)
+ "]"
)

type_names.append(type_name)

return is_list, is_optional, type_names


def get_fields(structure, model: type[BaseModel]):
model_hints = typing.get_type_hints(model)
for name, field in model.model_fields.items():
if name == 'root':
if name == "root":
continue
# if name != "structuralMetadata":
# continue

t = field.annotation

_type = model_hints[name]
if isinstance(model_hints[name],type):
_type = model_hints[name].__name__
else:
_type = model_hints[name].__args__[0]
if not isinstance(_type,type):
_type = _type.__args__[0]
_type = _type.__name__

is_list, is_optional, type_names = extract_type_info(_type)

value = {
'name':name,
'required':field.is_required(),
'title':field.title,
'description':field.description,
'title':field.title,
'examples':field.examples,
'type':_type
"name": name,
"required": field.is_required(),
"title": field.title,
"description": field.description,
"title": field.title,
"examples": field.examples,
"type": type_names,
"is_list": is_list,
"is_optional": is_optional,
}

if hasattr(t,'__args__'):
if hasattr(t, "__args__"):
t = t.__args__[0]

if isinstance(t, type) and issubclass(t, BaseModel):
subItems = []
get_fields(subItems,t)
value['subItems'] = subItems
get_fields(subItems, t)
value["subItems"] = subItems

structure.append(value)

def json_to_markdown(structure,level=2):

def json_to_markdown(structure, level=2):
md = ""
for field in structure:
name = field.pop('name')
subItems = field.pop('subItems',None)
description = field.pop('description')
examples = field.pop('examples')
name = field.pop("name")
subItems = field.pop("subItems", None)
description = field.pop("description")
examples = field.pop("examples")
if examples:
examples = "\n".join([' * '+str(x) for x in examples])
examples = "\n".join([" * " + str(x) for x in examples])
examples = "Examples: \n\n " + examples
else:
examples = ""

table = ""
if not subItems:
table = pd.Series(field).sort_index().to_frame().T.set_index('title')
table = pd.Series(field).sort_index().to_frame().T.set_index("title")
table = table.to_markdown()
heading = "#"*level
md += rf'''

heading = "#" * level
md += rf"""
{heading} {name}
{description}
Expand All @@ -74,25 +138,23 @@ def json_to_markdown(structure,level=2):
{examples}
'''
"""

if subItems:
md += json_to_markdown(subItems,level=level+1)
md += json_to_markdown(subItems, level=level + 1)


return md


structure = []
get_fields(structure,Model)
#get_fields(structure,Hdruk212)
get_fields(structure, Model)
# get_fields(structure,Hdruk212)

with open('temp.json','w') as f:
json.dump(structure,f,indent=6)
with open("temp.json", "w") as f:
print(json.dumps(structure, indent=6))
json.dump(structure, f, indent=6)

md = json_to_markdown(structure)

with open('temp.md','w') as f:
f.write(md)



with open("temp.md", "w") as f:
f.write(md)
9 changes: 5 additions & 4 deletions hdr_schemata/utils/print_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import json

structure = {}
for schema in glob.glob('hdr_schemata/models/**/schema.json',recursive=True):
items = schema.split('/')
if len(items) != 5: continue
for schema in glob.glob("hdr_schemata/models/**/schema.json", recursive=True):
items = schema.split("/")
if len(items) != 5:
continue
model = items[2]
version = items[3]
if model not in structure:
structure[model] = []
structure[model].append(version)

json.dump(structure,open('available.json','w'),indent=6)
json.dump(structure, open("available.json", "w"), indent=6)

0 comments on commit fe534c2

Please sign in to comment.