Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allows to define hints for nested tables #1855

Draft
wants to merge 1 commit into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No
# convert to table meta if created table variant so item is assigned to this table
if meta.create_table_variant:
# name in hints meta must be a string, otherwise merge_hints would fail
meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type]
meta = TableNameMeta(meta.hints["table_name"]) # type: ignore[arg-type]
self._reset_contracts_cache()

if table_name := self._get_static_table_name(resource, meta):
Expand Down
72 changes: 51 additions & 21 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TypedDict, cast, Any, Optional, Dict
from typing import Sequence, TypedDict, Union, cast, Any, Optional, Dict
from typing_extensions import Self

from dlt.common import logger
Expand Down Expand Up @@ -42,23 +42,23 @@


class TResourceHintsBase(TypedDict, total=False):
table_name: Optional[TTableHintTemplate[str]]
write_disposition: Optional[TTableHintTemplate[TWriteDispositionConfig]]
parent: Optional[TTableHintTemplate[str]]
primary_key: Optional[TTableHintTemplate[TColumnNames]]
columns: Optional[TTableHintTemplate[TAnySchemaColumns]]
schema_contract: Optional[TTableHintTemplate[TSchemaContract]]
table_format: Optional[TTableHintTemplate[TTableFormat]]
file_format: TTableHintTemplate[TFileFormat]
merge_key: Optional[TTableHintTemplate[TColumnNames]]
nested_hints: Optional[Dict[str, "TResourceHintsBase"]]


class TResourceHints(TResourceHintsBase, total=False):
name: TTableHintTemplate[str]
# description: TTableHintTemplate[str]
# table_sealed: Optional[bool]
columns: TTableHintTemplate[TTableSchemaColumns]
incremental: Incremental[Any]
file_format: TTableHintTemplate[TFileFormat]
incremental: Optional[Incremental[Any]]
validator: ValidateItem
original_columns: TTableHintTemplate[TAnySchemaColumns]
original_columns: Optional[TTableHintTemplate[TAnySchemaColumns]]


class HintsMeta:
Expand Down Expand Up @@ -89,6 +89,7 @@ def make_hints(
"""
validator, schema_contract = create_item_validator(columns, schema_contract)
# create a table schema template where hints can be functions taking TDataItem
# TODO: do not use new_table here and get rid if typing ignores
new_template: TResourceHints = new_table(
table_name, # type: ignore
parent_table_name, # type: ignore
Expand All @@ -97,8 +98,9 @@ def make_hints(
table_format=table_format, # type: ignore
file_format=file_format, # type: ignore
)
new_template["table_name"] = new_template.pop("name") # type: ignore
if not table_name:
new_template.pop("name")
del new_template["table_name"]
if not write_disposition and "write_disposition" in new_template:
new_template.pop("write_disposition")
# remember original columns and set template columns
Expand All @@ -117,12 +119,34 @@ def make_hints(
return new_template


class DltResourceHintsDict(Dict[str, "DltResourceHints"]):
# def __init__(self, initial_value: TResourceHintsBase)

def __getitem__(self, key: Union[str, Sequence[str]]) -> "DltResourceHints":
"""Get item at `key` is string or recursively if sequence"""
if isinstance(key, str):
return super().__getitem__(key)
else:
item = super().__getitem__(key[0])
for k_ in key[1:]:
item = item.nested_hints[k_]
return item

def __setitem__(self, key: str, value: Union["DltResourceHints", TResourceHintsBase]) -> None:
"""Sets resource hints at given `key` or create new instance from table template"""
if isinstance(value, DltResourceHints):
return super().__setitem__(key, value)
else:
return super().__setitem__(key, DltResourceHints(value)) # type: ignore


class DltResourceHints:
def __init__(self, table_schema_template: TResourceHints = None):
self.__qualname__ = self.__name__ = self.name
self._table_name_hint_fun: TFunHintTemplate[str] = None
self._table_has_other_dynamic_hints: bool = False
self._hints: TResourceHints = None
self._nested_hints: DltResourceHintsDict = None
"""Hints for the resource"""
self._hints_variants: Dict[str, TResourceHints] = {}
"""Hints for tables emitted from resources"""
Expand All @@ -139,7 +163,7 @@ def table_name(self) -> TTableHintTemplate[str]:
if self._table_name_hint_fun:
return self._table_name_hint_fun
# get table name or default name
return self._hints.get("name") or self.name if self._hints else self.name
return self._hints.get("table_name") or self.name if self._hints else self.name

@table_name.setter
def table_name(self, value: TTableHintTemplate[str]) -> None:
Expand All @@ -158,7 +182,11 @@ def write_disposition(self, value: TTableHintTemplate[TWriteDispositionConfig])
@property
def columns(self) -> TTableHintTemplate[TTableSchemaColumns]:
"""Gets columns' schema that can be modified in place"""
return None if self._hints is None else self._hints.get("columns")
return None if self._hints is None else self._hints.get("columns") # type: ignore[return-value]

@property
def nested_hints(self) -> DltResourceHintsDict:
pass

@property
def schema_contract(self) -> TTableHintTemplate[TSchemaContract]:
Expand All @@ -179,24 +207,24 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab
"""
if isinstance(meta, TableNameMeta):
# look for variant
table_template = self._hints_variants.get(meta.table_name, self._hints)
root_table_template = self._hints_variants.get(meta.table_name, self._hints)
else:
table_template = self._hints
if not table_template:
root_table_template = self._hints
if not root_table_template:
return new_table(self.name, resource=self.name)

# resolve a copy of a held template
table_template = self._clone_hints(table_template)
if "name" not in table_template:
table_template["name"] = self.name
root_table_template = self._clone_hints(root_table_template)
if "table_name" not in root_table_template:
root_table_template["table_name"] = self.name

# if table template present and has dynamic hints, the data item must be provided.
if self._table_name_hint_fun and item is None:
raise DataItemRequiredForDynamicTableHints(self.name)
# resolve
resolved_template: TResourceHints = {
k: self._resolve_hint(item, v)
for k, v in table_template.items()
for k, v in root_table_template.items()
if k not in NATURAL_CALLABLES
} # type: ignore
table_schema = self._create_table_schema(resolved_template, self.name)
Expand Down Expand Up @@ -276,9 +304,9 @@ def apply_hints(
t = self._clone_hints(t)
if table_name is not None:
if table_name:
t["name"] = table_name
t["table_name"] = table_name
else:
t.pop("name", None)
t.pop("table_name", None)
if parent_table_name is not None:
if parent_table_name:
t["parent"] = parent_table_name
Expand All @@ -296,6 +324,7 @@ def apply_hints(
# normalize columns
columns = ensure_table_schema_columns(columns)
# this updates all columns with defaults
assert isinstance(t["columns"], dict)
t["columns"] = merge_columns(t["columns"], columns, merge_columns=True)
else:
# set to empty columns
Expand Down Expand Up @@ -354,7 +383,8 @@ def _set_hints(
DltResourceHints.validate_dynamic_hints(hints_template)
DltResourceHints.validate_write_disposition_hint(hints_template.get("write_disposition"))
if create_table_variant:
table_name: str = hints_template["name"] # type: ignore[assignment]
# for table variants, table name must be a str
table_name: str = hints_template["table_name"] # type: ignore[assignment]
# incremental cannot be specified in variant
if hints_template.get("incremental"):
raise InconsistentTableTemplate(
Expand Down Expand Up @@ -388,7 +418,7 @@ def merge_hints(
self, hints_template: TResourceHints, create_table_variant: bool = False
) -> None:
self.apply_hints(
table_name=hints_template.get("name"),
table_name=hints_template.get("table_name"),
parent_table_name=hints_template.get("parent"),
write_disposition=hints_template.get("write_disposition"),
columns=hints_template.get("original_columns"),
Expand Down
2 changes: 0 additions & 2 deletions dlt/sources/rest_api/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,7 @@ class ProcessingSteps(TypedDict):
class ResourceBase(TResourceHintsBase, total=False):
"""Defines hints that may be passed to `dlt.resource` decorator"""

table_name: Optional[TTableHintTemplate[str]]
max_table_nesting: Optional[int]
columns: Optional[TTableHintTemplate[TAnySchemaColumns]]
selected: Optional[bool]
parallelized: Optional[bool]
processing_steps: Optional[List[ProcessingSteps]]
Expand Down
Loading