-
Notifications
You must be signed in to change notification settings - Fork 186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rfix/pipeline v2 extract comments #68
Conversation
… associated typing
…rs and extracted file jsonl format
…normalize all names in tables
…d by just removing runtime protocol check
…fered data writers
…on configs, snake case field names, recursive binding to values
…s between extract and normalize stages
…red writers and pipe
… resolver, typing improvements and tests
## default and explicitly configured pipelines | ||
When the `dlt` is imported a default pipeline is automatically created. That pipeline is configured via configuration providers (ie. `config.toml` or env variables - see [secrets_and_config.md](secrets_and_config.md)). If no configuration is present, default values will be used. | ||
|
||
1. the name of the pipeline, the name of default schema (if not overridden by the source extractor function) and the default dataset (in destination) are set to **current module name** which in 99% of cases is the name of executing python script |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we default destination (dataset/schema) we need to make sure we somehow communicate that to avoid "where did the data go?"
I would log this happening and perhaps default could include our branding dlt name for multiple reasons (easy to find, branding)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree. I want to add user readable logs that will tell exactly when data went. the default name will always start with dlt_
prefix (I will update info above)
When the `dlt` is imported a default pipeline is automatically created. That pipeline is configured via configuration providers (ie. `config.toml` or env variables - see [secrets_and_config.md](secrets_and_config.md)). If no configuration is present, default values will be used. | ||
|
||
1. the name of the pipeline, the name of default schema (if not overridden by the source extractor function) and the default dataset (in destination) are set to **current module name** which in 99% of cases is the name of executing python script | ||
2. the working directory of the pipeline will be **OS temporary folder/pipeline name** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do users need to interact with this ever? then i would give an option for setting paths
if the runner does not have permission, can the user (can be hacky) config a new folder? or do they need to fix permissions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, they can set the working dir. also I decided to change /tmp
to ~/.dlt/pipelines/<name>
so it is easier to implement CLI. I will put the correct info here.
|
||
1. the name of the pipeline, the name of default schema (if not overridden by the source extractor function) and the default dataset (in destination) are set to **current module name** which in 99% of cases is the name of executing python script | ||
2. the working directory of the pipeline will be **OS temporary folder/pipeline name** | ||
3. the logging level will be **INFO** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we/should we log schema changes as warnings?
Or should we collect them in the run summary like the errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would push them to slack :) as logs are for troubleshooting, not daily reading
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets gather idea for end user log messages in one place.
Pipeline can be explicitly created and configured via `dlt.pipeline()` that returns `Pipeline` object. All parameters are optional. If no parameter is provided then default pipeline is returned. Here's a list of options. All the options are configurable. | ||
1. pipeline_name - default as above | ||
2. working_dir - default as above | ||
3. pipeline_secret - for deterministic hashing - default is random number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deterministic ids by default has more value IMO than randomising by default
I would expect it to stay deterministic between envs and , so I can later distinct them on the hash, would sure be cheaper than distincting on all columns. For example, stripe might return data from X till last activity. One would then request from that last activity including its timestamp because there may be a new activity within the same second that came later.
I would also expect them to be deterministic between local/prod envs as there are cases where the engineer might run a fix/migration etc from local
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's have an hardcoded secret value for the deterministic hashing. rename it to hashing slat or seed
2. working_dir - default as above | ||
3. pipeline_secret - for deterministic hashing - default is random number | ||
4. destination - the imported destination module or module name (we accept strings so they can be configured) - default is None | ||
5. import_schema_path - default is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can imagine default path would ideally be at extractor script location, and it would not be exported by default.
So i guess it can be achieved by defaulting to None and accepting some relative path parameter, perhaps this relative path can be relative to extractor.
**Pipeline working directory should be preserved between the runs - if possible** | ||
|
||
If the working directory is not preserved: | ||
1. the auto-evolved schema is reset to the initial one. the schema evolution is deterministic so it should not be a problem - just a time wasted to compare schemas with each run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should allow storing schemas on s3 or similar, or perhaps we offer schema hosting as a service for money
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or they could be stored at destination in binary or something
2. the current schemas with all the recent updates | ||
3. the pipeline and source state files. | ||
|
||
**Pipeline working directory should be preserved between the runs - if possible** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This likely will not happen in any non-basic data engineering setups.
To scale pipelines, most companies with some maturity send their jobs to k8/cloud run/airflow
|
||
the `run` and `load` return information on loaded packages: to which datasets, list of jobs etc. let me think what should be the content | ||
|
||
> `load` is atomic if SQL transformations ie in `dbt` and all the SQL queries take into account only committed `load_ids`. It is certainly possible - we did in for RASA but requires some work... Maybe we implement a fully atomic staging at some point in the loader. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
atomic staging is the way to go IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the "load id" thing for rasa is useful for incremental processing but can be implemented without our help
|
||
# the `run` command below will create default pipeline and use it to load data | ||
# I only want logs from the resources present in taktile_data | ||
taktile_data.select("logs").run(destination=bigquery) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want select('endpoint') or do we want endpoints = ['endpoint']
I think the latter is more explicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you would do this like
taktile_data.endpoints = ["logs"]
taktile_data.run()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm you are probably right. current interface is not very intuitive
dlt.pipeline(name="pipe", destination=bigquery, dataset="extract_1") | ||
# use dlt secrets directly to get api key | ||
# no parameters needed to run - we configured destination and dataset already | ||
data(dlt.secrets["api_key"]).run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting but I think we wanna parametrise the changing stuff and freeze names on the function calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one I do not understand?
…spaces work, config initial values behaving like any other values + tests
…tions and typings
No description provided.