Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rfix/pipeline v2 extract comments #68

Closed
wants to merge 66 commits into from
Closed

Conversation

adrianbr
Copy link
Contributor

No description provided.

…on configs, snake case field names, recursive binding to values
## default and explicitly configured pipelines
When the `dlt` is imported a default pipeline is automatically created. That pipeline is configured via configuration providers (ie. `config.toml` or env variables - see [secrets_and_config.md](secrets_and_config.md)). If no configuration is present, default values will be used.

1. the name of the pipeline, the name of default schema (if not overridden by the source extractor function) and the default dataset (in destination) are set to **current module name** which in 99% of cases is the name of executing python script
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we default destination (dataset/schema) we need to make sure we somehow communicate that to avoid "where did the data go?"
I would log this happening and perhaps default could include our branding dlt name for multiple reasons (easy to find, branding)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree. I want to add user readable logs that will tell exactly when data went. the default name will always start with dlt_ prefix (I will update info above)

When the `dlt` is imported a default pipeline is automatically created. That pipeline is configured via configuration providers (ie. `config.toml` or env variables - see [secrets_and_config.md](secrets_and_config.md)). If no configuration is present, default values will be used.

1. the name of the pipeline, the name of default schema (if not overridden by the source extractor function) and the default dataset (in destination) are set to **current module name** which in 99% of cases is the name of executing python script
2. the working directory of the pipeline will be **OS temporary folder/pipeline name**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do users need to interact with this ever? then i would give an option for setting paths

if the runner does not have permission, can the user (can be hacky) config a new folder? or do they need to fix permissions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, they can set the working dir. also I decided to change /tmp to ~/.dlt/pipelines/<name> so it is easier to implement CLI. I will put the correct info here.


1. the name of the pipeline, the name of default schema (if not overridden by the source extractor function) and the default dataset (in destination) are set to **current module name** which in 99% of cases is the name of executing python script
2. the working directory of the pipeline will be **OS temporary folder/pipeline name**
3. the logging level will be **INFO**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we/should we log schema changes as warnings?

Or should we collect them in the run summary like the errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would push them to slack :) as logs are for troubleshooting, not daily reading

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets gather idea for end user log messages in one place.

Pipeline can be explicitly created and configured via `dlt.pipeline()` that returns `Pipeline` object. All parameters are optional. If no parameter is provided then default pipeline is returned. Here's a list of options. All the options are configurable.
1. pipeline_name - default as above
2. working_dir - default as above
3. pipeline_secret - for deterministic hashing - default is random number
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deterministic ids by default has more value IMO than randomising by default

I would expect it to stay deterministic between envs and , so I can later distinct them on the hash, would sure be cheaper than distincting on all columns. For example, stripe might return data from X till last activity. One would then request from that last activity including its timestamp because there may be a new activity within the same second that came later.

I would also expect them to be deterministic between local/prod envs as there are cases where the engineer might run a fix/migration etc from local

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have an hardcoded secret value for the deterministic hashing. rename it to hashing slat or seed

2. working_dir - default as above
3. pipeline_secret - for deterministic hashing - default is random number
4. destination - the imported destination module or module name (we accept strings so they can be configured) - default is None
5. import_schema_path - default is None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine default path would ideally be at extractor script location, and it would not be exported by default.

So i guess it can be achieved by defaulting to None and accepting some relative path parameter, perhaps this relative path can be relative to extractor.

**Pipeline working directory should be preserved between the runs - if possible**

If the working directory is not preserved:
1. the auto-evolved schema is reset to the initial one. the schema evolution is deterministic so it should not be a problem - just a time wasted to compare schemas with each run
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should allow storing schemas on s3 or similar, or perhaps we offer schema hosting as a service for money

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or they could be stored at destination in binary or something

2. the current schemas with all the recent updates
3. the pipeline and source state files.

**Pipeline working directory should be preserved between the runs - if possible**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This likely will not happen in any non-basic data engineering setups.

To scale pipelines, most companies with some maturity send their jobs to k8/cloud run/airflow


the `run` and `load` return information on loaded packages: to which datasets, list of jobs etc. let me think what should be the content

> `load` is atomic if SQL transformations ie in `dbt` and all the SQL queries take into account only committed `load_ids`. It is certainly possible - we did in for RASA but requires some work... Maybe we implement a fully atomic staging at some point in the loader.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atomic staging is the way to go IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the "load id" thing for rasa is useful for incremental processing but can be implemented without our help


# the `run` command below will create default pipeline and use it to load data
# I only want logs from the resources present in taktile_data
taktile_data.select("logs").run(destination=bigquery)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want select('endpoint') or do we want endpoints = ['endpoint']

I think the latter is more explicit

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you would do this like

taktile_data.endpoints = ["logs"]
taktile_data.run()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm you are probably right. current interface is not very intuitive

dlt.pipeline(name="pipe", destination=bigquery, dataset="extract_1")
# use dlt secrets directly to get api key
# no parameters needed to run - we configured destination and dataset already
data(dlt.secrets["api_key"]).run()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting but I think we wanna parametrise the changing stuff and freeze names on the function calls?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one I do not understand?

@rudolfix rudolfix closed this Dec 8, 2022
@rudolfix rudolfix deleted the rfix/pipeline-v2-extract-2 branch February 9, 2023 14:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants