Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rfix/pipeline v2 extract comments #68

Closed
wants to merge 66 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
6eba24d
initial implementation of plugabble data writers and buffered writer
rudolfix Sep 19, 2022
47e4766
experimental implementation of extraction pipe, source, resources and…
rudolfix Sep 19, 2022
20f6180
adds schema utils to diff tables
rudolfix Sep 20, 2022
ddd06a9
adds hard linking and path validation to file storage
rudolfix Sep 20, 2022
15d29b5
implements pipe, resource, source and extractor without tests
rudolfix Sep 20, 2022
533f576
adds pathvalidate to deps
rudolfix Sep 20, 2022
3db4695
adds additional caps to loader clients, fixes tests to use new writers
rudolfix Sep 20, 2022
be15172
refactoring extractor and normalizer code to introduce new data write…
rudolfix Sep 20, 2022
059e77d
makes schema name validation part of name normalization, function to …
rudolfix Sep 20, 2022
e9b70dd
several name cleanips
rudolfix Sep 20, 2022
fcaaef6
adds close data writer operation and written items count
rudolfix Sep 24, 2022
40c5acc
adds explicit table name to json normalizer func signature
rudolfix Sep 24, 2022
b07fb54
adds performance improvements to Schema, 3x normalization speed gaine…
rudolfix Sep 24, 2022
d8d8563
removes runtime protocol check from schema altogether
rudolfix Sep 24, 2022
f98ddbb
implements data item storage and adds to normalize and load storages …
rudolfix Sep 24, 2022
4d3861b
rewrites Normalize to work with new extracted jsonl files and use buf…
rudolfix Sep 24, 2022
52eebb9
adds decorator for Pool workers to pass configuration types across pr…
rudolfix Sep 24, 2022
693f89d
simplifies ExtractorStorage by moving data items storage out
rudolfix Sep 24, 2022
1ce4735
refactors configurations: uses dataclass based instances, no producti…
rudolfix Sep 28, 2022
277f8e4
adds init values and embedded config tests
rudolfix Sep 28, 2022
400ca1a
reorganizes configuration module
rudolfix Sep 30, 2022
b53ed23
moves configurations into specs folder
rudolfix Oct 1, 2022
c0cc1d6
moves file_storage to storages, writes lists of items into jsonl file…
rudolfix Oct 1, 2022
80233c0
data item may be of any type, not dict loaded from json
rudolfix Oct 1, 2022
f7b5434
adds resolving configs via providers with variable namespaces, provid…
rudolfix Oct 5, 2022
f16285b
moves config tests
rudolfix Oct 5, 2022
a80b2f5
adds config inject decorator, basic tests, applies decorator to buffe…
rudolfix Oct 12, 2022
1e0695f
adds injection container with tests
rudolfix Oct 12, 2022
81eb730
adds config providers framewrok, injectable config and basic tests
rudolfix Oct 12, 2022
f08b8d5
adds namespaces, pipeline name, injectable namespace config to config…
rudolfix Oct 12, 2022
0d126d5
removes legacy extractor
rudolfix Oct 13, 2022
47e5029
moves pipe, extract and sources code into extract
rudolfix Oct 13, 2022
5c76bcb
adds config injection to normalize, passes instantiated configs to wo…
rudolfix Oct 14, 2022
afcabae
removes per component version, uses package version in logging
rudolfix Oct 14, 2022
4e4645c
applies config injection to storages, adds dictionary config provider…
rudolfix Oct 14, 2022
c476254
adds create pipeline examples
rudolfix Oct 16, 2022
ec20591
improves config resolution: last exception is preserved, resolved con…
rudolfix Oct 18, 2022
71fcddc
implements new instantiation mechanics for destinations
rudolfix Oct 18, 2022
3f40da6
implements first version of pipeline v2 and applies config injection …
rudolfix Oct 18, 2022
47277d6
adding pseudo code samples for api v2: create pipeline, working with …
rudolfix Oct 20, 2022
6b65451
adds more code samples
rudolfix Oct 23, 2022
b17e513
adds general usage samples
rudolfix Oct 23, 2022
f20871e
adds example to general usage
rudolfix Oct 23, 2022
0d5a894
general usage doc completed + README
rudolfix Oct 24, 2022
20a9186
adds project structure example
rudolfix Oct 25, 2022
150da6d
adds working with schemas doc
rudolfix Oct 25, 2022
8282f30
first implementation of source and resource decorators, adds pipeline…
rudolfix Oct 26, 2022
8ca3b9b
extracts destination and pipeline common code
rudolfix Oct 26, 2022
ce9c3b9
adds ignored example secrets.toml
rudolfix Oct 26, 2022
c4f6c8c
implements toml config provider, changes how embedded and config name…
rudolfix Oct 27, 2022
8b4d6fd
ports pipeline v1 util methods
rudolfix Oct 28, 2022
3fa7d21
renames modules holding client implementations in load
rudolfix Oct 28, 2022
08c10db
removes pipeline v1
rudolfix Oct 28, 2022
0290aa3
moves pipeline v2 in
rudolfix Oct 28, 2022
d16b430
fixes pipeline imports
rudolfix Oct 28, 2022
b2a9bde
fixes typing errors, adds overloads
rudolfix Oct 28, 2022
3ce749e
moves source decorators to extract
rudolfix Oct 28, 2022
daba606
adds restore pipeline and better state management, with cli support
rudolfix Oct 29, 2022
dab58bf
fixes dependent resources handling, partially adds missing exceptions
rudolfix Oct 29, 2022
0a9691a
adds self importing via name or module name for destination reference
rudolfix Oct 29, 2022
1f7bf39
fixes config exceptions names
rudolfix Oct 29, 2022
371a058
adds optional embedded namespaces extension in config resolve + tests
rudolfix Oct 31, 2022
bc61466
refactors interface to select resources in source, adds missing excep…
rudolfix Oct 31, 2022
161b8d6
changes the schema file pattern _schema. -> .schema.
rudolfix Oct 31, 2022
32d769b
adds flag to wipe out loader storage before initializing
rudolfix Oct 31, 2022
11a80d8
various typing improvements
rudolfix Oct 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@ VERSION := ${AUTV}${VERSION_SUFFIX}
VERSION_MM := ${AUTVMINMAJ}${VERSION_SUFFIX}


# dbt runner version info
DBT_AUTV=$(shell python3 -c "from dlt.dbt_runner._version import __version__;print(__version__)")
DBT_AUTVMINMAJ=$(shell python3 -c "from dlt.dbt_runner._version import __version__;print('.'.join(__version__.split('.')[:-1]))")

DBT_NAME := scalevector/dlt-dbt-runner
DBT_IMG := ${DBT_NAME}:${TAG}
DBT_LATEST := ${DBT_NAME}:latest${VERSION_SUFFIX}
DBT_VERSION := ${DBT_AUTV}${VERSION_SUFFIX}
DBT_VERSION_MM := ${DBT_AUTVMINMAJ}${VERSION_SUFFIX}
DBT_VERSION := ${AUTV}${VERSION_SUFFIX}
DBT_VERSION_MM := ${AUTVMINMAJ}${VERSION_SUFFIX}

install-poetry:
ifneq ($(VIRTUAL_ENV),)
Expand All @@ -38,7 +34,7 @@ dev: has-poetry

lint:
./check-package.sh
poetry run mypy --config-file mypy.ini dlt examples
poetry run mypy --config-file mypy.ini dlt
poetry run flake8 --max-line-length=200 examples dlt
poetry run flake8 --max-line-length=200 tests
# dlt/pipeline dlt/common/schema dlt/common/normalizers
Expand All @@ -50,7 +46,7 @@ lint-security:
reset-test-storage:
-rm -r _storage
mkdir _storage
python3 test/tools/create_storages.py
python3 tests/tools/create_storages.py

recreate-compiled-deps:
poetry export -f requirements.txt --output _gen_requirements.txt --without-hashes --extras gcp --extras redshift
Expand Down
2 changes: 1 addition & 1 deletion dlt/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from dlt._version import common_version as __version__
__version__ = "0.1.0"
3 changes: 0 additions & 3 deletions dlt/_version.py

This file was deleted.

57 changes: 33 additions & 24 deletions dlt/cli/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
from dlt.cli import TRunnerArgs
from dlt.common.schema import Schema
from dlt.common.typing import DictStrAny
from dlt.common.utils import str2bool

from dlt.pipeline import Pipeline, PostgresPipelineCredentials
from dlt.pipeline import pipeline, restore


def add_pool_cli_arguments(parser: argparse.ArgumentParser) -> None:
Expand All @@ -27,33 +26,35 @@ def add_pool_cli_arguments(parser: argparse.ArgumentParser) -> None:
def main() -> None:
parser = argparse.ArgumentParser(description="Runs various DLT modules", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparsers = parser.add_subparsers(dest="command")
normalize = subparsers.add_parser("normalize", help="Runs normalize")
add_pool_cli_arguments(normalize)
load = subparsers.add_parser("load", help="Runs loader")
add_pool_cli_arguments(load)

# normalize = subparsers.add_parser("normalize", help="Runs normalize")
# add_pool_cli_arguments(normalize)
# load = subparsers.add_parser("load", help="Runs loader")
# add_pool_cli_arguments(load)

dbt = subparsers.add_parser("dbt", help="Executes dbt package")
add_pool_cli_arguments(dbt)
schema = subparsers.add_parser("schema", help="Shows, converts and upgrades schemas")
schema.add_argument("file", help="Schema file name, in yaml or json format, will autodetect based on extension")
schema.add_argument("--format", choices=["json", "yaml"], default="yaml", help="Display schema in this format")
schema.add_argument("--remove-defaults", action="store_true", help="Does not show default hint values")
pipeline = subparsers.add_parser("pipeline", help="Operations on the pipelines")
pipeline.add_argument("name", help="Pipeline name")
pipeline.add_argument("workdir", help="Pipeline working directory")
pipeline.add_argument("operation", choices=["failed_loads"], default="failed_loads", help="Show failed loads for a pipeline")
pipe_cmd = subparsers.add_parser("pipeline", help="Operations on the pipelines")
pipe_cmd.add_argument("name", help="Pipeline name")
pipe_cmd.add_argument("operation", choices=["failed_loads", "drop"], default="failed_loads", help="Show failed loads for a pipeline")
pipe_cmd.add_argument("--workdir", help="Pipeline working directory", default=None)

# TODO: consider using fire: https://github.com/google/python-fire
# TODO: this also looks better https://click.palletsprojects.com/en/8.1.x/complex/#complex-guide
args = parser.parse_args()
run_f: Callable[[TRunnerArgs], None] = None

if args.command == "normalize":
from dlt.normalize.normalize import run_main as normalize_run
run_f = normalize_run
elif args.command == "load":
from dlt.load.load import run_main as loader_run
run_f = loader_run
elif args.command == "dbt":
# if args.command == "normalize":
# from dlt.normalize.normalize import run_main as normalize_run
# run_f = normalize_run
# elif args.command == "load":
# from dlt.load.load import run_main as loader_run
# run_f = loader_run
if args.command == "dbt":
from dlt.dbt_runner.runner import run_main as dbt_run
run_f = dbt_run
elif args.command == "schema":
Expand All @@ -70,13 +71,21 @@ def main() -> None:
print(schema_str)
exit(0)
elif args.command == "pipeline":
p = Pipeline(args.name)
p.restore_pipeline(PostgresPipelineCredentials("dummy"), args.workdir)
completed_loads = p.list_completed_loads()
for load_id in completed_loads:
print(f"Checking failed jobs in {load_id}")
for job, failed_message in p.list_failed_jobs(load_id):
print(f"JOB: {job}\nMSG: {failed_message}")
# from dlt.load import dummy

p = restore(pipeline_name=args.name, working_dir=args.workdir)
print(f"Found pipeline {p.pipeline_name} ({args.name}) in {p.working_dir} ({args.workdir}) with state {p._get_state()}")

if args.operation == "failed_loads":
completed_loads = p.list_completed_load_packages()
for load_id in completed_loads:
print(f"Checking failed jobs in load id '{load_id}'")
for job, failed_message in p.list_failed_jobs_in_package(load_id):
print(f"JOB: {os.path.abspath(job)}\nMSG: {failed_message}")

if args.operation == "drop":
p.drop()

exit(0)
else:
parser.print_help()
Expand Down
1 change: 0 additions & 1 deletion dlt/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@
from .pendulum import pendulum # noqa: F401
from .json import json # noqa: F401, I251
from .time import sleep # noqa: F401
from dlt._version import common_version as __version__
13 changes: 4 additions & 9 deletions dlt/common/configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from .run_configuration import RunConfiguration, BaseConfiguration, CredentialsConfiguration # noqa: F401
from .normalize_volume_configuration import NormalizeVolumeConfiguration, ProductionNormalizeVolumeConfiguration # noqa: F401
from .load_volume_configuration import LoadVolumeConfiguration, ProductionLoadVolumeConfiguration # noqa: F401
from .schema_volume_configuration import SchemaVolumeConfiguration, ProductionSchemaVolumeConfiguration # noqa: F401
from .pool_runner_configuration import PoolRunnerConfiguration, TPoolType # noqa: F401
from .gcp_client_credentials import GcpClientCredentials # noqa: F401
from .postgres_credentials import PostgresCredentials # noqa: F401
from .utils import make_configuration # noqa: F401
from .specs.base_configuration import configspec, is_valid_hint # noqa: F401
from .resolve import resolve_configuration, inject_namespace # noqa: F401
from .inject import with_config, last_config, get_fun_spec

from .exceptions import ( # noqa: F401
ConfigEntryMissingException, ConfigEnvValueCannotBeCoercedException, ConfigIntegrityException, ConfigFileNotFoundException)
ConfigFieldMissingException, ConfigValueCannotBeCoercedException, ConfigIntegrityException, ConfigFileNotFoundException)
66 changes: 66 additions & 0 deletions dlt/common/configuration/container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from contextlib import contextmanager
from typing import Dict, Iterator, Type, TypeVar

from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext
from dlt.common.configuration.exceptions import ContainerInjectableContextMangled, ContextDefaultCannotBeCreated

TConfiguration = TypeVar("TConfiguration", bound=ContainerInjectableContext)


class Container:

_INSTANCE: "Container" = None

contexts: Dict[Type[ContainerInjectableContext], ContainerInjectableContext]

def __new__(cls: Type["Container"]) -> "Container":
if not cls._INSTANCE:
cls._INSTANCE = super().__new__(cls)
cls._INSTANCE.contexts = {}
return cls._INSTANCE

def __init__(self) -> None:
pass

def __getitem__(self, spec: Type[TConfiguration]) -> TConfiguration:
# return existing config object or create it from spec
if not issubclass(spec, ContainerInjectableContext):
raise KeyError(f"{spec.__name__} is not a context")

item = self.contexts.get(spec)
if item is None:
if spec.can_create_default:
item = spec()
self.contexts[spec] = item
else:
raise ContextDefaultCannotBeCreated(spec)

return item # type: ignore

def __setitem__(self, spec: Type[TConfiguration], value: TConfiguration) -> None:
self.contexts[spec] = value

def __contains__(self, spec: Type[TConfiguration]) -> bool:
return spec in self.contexts

@contextmanager
def injectable_context(self, config: TConfiguration) -> Iterator[TConfiguration]:
spec = type(config)
previous_config: ContainerInjectableContext = None
if spec in self.contexts:
previous_config = self.contexts[spec]
# set new config and yield context
try:
self.contexts[spec] = config
yield config
finally:
# before setting the previous config for given spec, check if there was no overlapping modification
if self.contexts[spec] is config:
# config is injected for spec so restore previous
if previous_config is None:
del self.contexts[spec]
else:
self.contexts[spec] = previous_config
else:
# value was modified in the meantime and not restored
raise ContainerInjectableContextMangled(spec, self.contexts[spec], config)
107 changes: 92 additions & 15 deletions dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,59 @@
from typing import Iterable, Union
from typing import Any, Mapping, Type, Union, NamedTuple, Sequence

from dlt.common.exceptions import DltException


class LookupTrace(NamedTuple):
provider: str
namespaces: Sequence[str]
key: str
value: Any


class ConfigurationException(DltException):
def __init__(self, msg: str) -> None:
super().__init__(msg)


class ConfigEntryMissingException(ConfigurationException):
"""thrown when not all required config elements are present"""

def __init__(self, missing_set: Iterable[str], namespace: str = None) -> None:
self.missing_set = missing_set
self.namespace = namespace
class ContainerException(ConfigurationException):
"""base exception for all exceptions related to injectable container"""
pass


class ConfigProviderException(ConfigurationException):
"""base exceptions for all exceptions raised by config providers"""
pass


class ConfigurationWrongTypeException(ConfigurationException):
def __init__(self, _typ: type) -> None:
super().__init__(f"Invalid configuration instance type {_typ}. Configuration instances must derive from BaseConfiguration.")


msg = 'Missing config keys: ' + str(missing_set)
if namespace:
msg += ". Note that required namespace for that keys is " + namespace + " and namespace separator is two underscores"
class ConfigFieldMissingException(ConfigurationException):
"""thrown when not all required config fields are present"""

def __init__(self, spec_name: str, traces: Mapping[str, Sequence[LookupTrace]]) -> None:
self.traces = traces
self.spec_name = spec_name

msg = f"Following fields are missing: {str(list(traces.keys()))} in configuration with spec {spec_name}\n"
for f, field_traces in traces.items():
msg += f'\tfor field "{f}" config providers and keys were tried in following order:\n'
for tr in field_traces:
msg += f'\t\tIn {tr.provider} key {tr.key} was not found.\n'
super().__init__(msg)


class ConfigEnvValueCannotBeCoercedException(ConfigurationException):
"""thrown when value from ENV cannot be coerced to hinted type"""
class ConfigValueCannotBeCoercedException(ConfigurationException):
"""thrown when value returned by config provider cannot be coerced to hinted type"""

def __init__(self, attr_name: str, env_value: str, hint: type) -> None:
self.attr_name = attr_name
self.env_value = env_value
def __init__(self, field_name: str, field_value: Any, hint: type) -> None:
self.field_name = field_name
self.field_value = field_value
self.hint = hint
super().__init__('env value %s cannot be coerced into type %s in attr %s' % (env_value, str(hint), attr_name))
super().__init__('env value %s cannot be coerced into type %s in attr %s' % (field_value, str(hint), field_name))


class ConfigIntegrityException(ConfigurationException):
Expand All @@ -46,3 +71,55 @@ class ConfigFileNotFoundException(ConfigurationException):

def __init__(self, path: str) -> None:
super().__init__(f"Missing config file in {path}")


class ConfigFieldMissingTypeHintException(ConfigurationException):
"""thrown when configuration specification does not have type hint"""

def __init__(self, field_name: str, spec: Type[Any]) -> None:
self.field_name = field_name
self.typ_ = spec
super().__init__(f"Field {field_name} on configspec {spec} does not provide required type hint")


class ConfigFieldTypeHintNotSupported(ConfigurationException):
"""thrown when configuration specification uses not supported type in hint"""

def __init__(self, field_name: str, spec: Type[Any], typ_: Type[Any]) -> None:
self.field_name = field_name
self.typ_ = spec
super().__init__(f"Field {field_name} on configspec {spec} has hint with unsupported type {typ_}")


class ValueNotSecretException(ConfigurationException):
def __init__(self, provider_name: str, key: str) -> None:
self.provider_name = provider_name
self.key = key
super().__init__(f"Provider {provider_name} cannot hold secret values but key {key} with secret value is present")


class InvalidInitialValue(ConfigurationException):
def __init__(self, spec: Type[Any], initial_value_type: Type[Any]) -> None:
self.spec = spec
self.initial_value_type = initial_value_type
super().__init__(f"Initial value of type {initial_value_type} is not valid for {spec.__name__}")


class ContainerInjectableContextMangled(ContainerException):
def __init__(self, spec: Type[Any], existing_config: Any, expected_config: Any) -> None:
self.spec = spec
self.existing_config = existing_config
self.expected_config = expected_config
super().__init__(f"When restoring context {spec.__name__}, instance {expected_config} was expected, instead instance {existing_config} was found.")


class ContextDefaultCannotBeCreated(ContainerException, KeyError):
def __init__(self, spec: Type[Any]) -> None:
self.spec = spec
super().__init__(f"Container cannot create the default value of context {spec.__name__}.")


class DuplicateConfigProviderException(ConfigProviderException):
def __init__(self, provider_name: str) -> None:
self.provider_name = provider_name
super().__init__(f"Provider with name {provider_name} already present in ConfigProvidersContext")
33 changes: 0 additions & 33 deletions dlt/common/configuration/gcp_client_credentials.py

This file was deleted.

Loading