Skip to content
This repository has been archived by the owner on May 4, 2023. It is now read-only.

Commit

Permalink
Merge pull request #28 from nickng/schema-gen-flag
Browse files Browse the repository at this point in the history
Models flag for schema-gen
  • Loading branch information
nickng authored Mar 1, 2021
2 parents 7bb85a9 + 2343ed5 commit 653bad0
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 47 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ ddbt version 0.2.1
- `ddbt watch --skip-run` is the same as watch, but will skip the initial run (preventing you having to wait for all the models to run) before running the tests and starting to watch your file system.
- `ddbt completion zsh` will generate a shell completion script zsh (or bash if you pass that as argument). Detailed steps to set up the completion script can be found in `ddbt completion --help`
- `ddbt isolate-dag` will create a temporary directory and symlink in all files needed for the given _model_filter_ such that Fishtown's DBT could be run against it without having to be run against every model in your data warehouse
- `ddbt schema-gen my_model` will output a new or updated schema yml file for the model provided in the same directory as the dbt model file. (Note: this does not currenly support the `-m` flag for model dependencies)
- `ddbt schema-gen -m my_model` will output a new or updated schema yml file for the model provided in the same directory as the dbt model file.

### Global Arguments
- `--models model_filter` _or_ `-m model_filter`: Instead of running for every model in your project, DDBT will only execute against the requested models. See filters below for what is accepted for `my_model`
Expand Down
12 changes: 8 additions & 4 deletions bigquery/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ func NumberRows(query string, target *config.Target) (uint64, error) {
return itr.TotalRows, nil
}

func GetRows(query string, target *config.Target) ([][]Value, Schema, error) {
ctx := context.Background()

func GetRows(ctx context.Context, query string, target *config.Target) ([][]Value, Schema, error) {
switch {
case target.ProjectID == "":
return nil, nil, errors.New("no project ID defined to run query against")
Expand Down Expand Up @@ -291,8 +289,14 @@ func GetRows(query string, target *config.Target) ([][]Value, Schema, error) {
return rows, schema, nil
}

// GetColumnsFromTable is a fallback GetColumnsFromTableWithContext
// with a background context.
func GetColumnsFromTable(table string, target *config.Target) (Schema, error) {
_, schema, err := GetRows(fmt.Sprintf("SELECT * FROM %s LIMIT 0", table), target)
return GetColumnsFromTableWithContext(context.Background(), table, target)
}

func GetColumnsFromTableWithContext(ctx context.Context, table string, target *config.Target) (Schema, error) {
_, schema, err := GetRows(ctx, fmt.Sprintf("SELECT * FROM %s LIMIT 0", table), target)
if err != nil {
return nil, err
}
Expand Down
124 changes: 86 additions & 38 deletions cmd/schema_gen.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package cmd

import (
"context"
"ddbt/bigquery"
"ddbt/config"
"ddbt/fs"
"ddbt/properties"
"ddbt/utils"

"fmt"
"os"
Expand All @@ -15,64 +17,110 @@ import (

func init() {
rootCmd.AddCommand(schemaGenCmd)
addModelsFlag(schemaGenCmd)
}

var schemaGenCmd = &cobra.Command{
Use: "schema-gen [model name]",
Short: "Generates the YML schema file for a given model",
Args: cobra.ExactValidArgs(1),
Args: cobra.RangeArgs(0, 1),
ValidArgsFunction: completeModelFn,
Run: func(cmd *cobra.Command, args []string) {
modelName := args[0]
switch {
case len(args) == 0 && len(ModelFilters) == 0:
fmt.Println("Please specify model with schema-gen -m model-name")
os.Exit(1)
case len(args) == 1 && len(ModelFilters) > 0:
fmt.Println("Please specify model with either schema-gen model-name or schema-gen -m model-name but not both")
os.Exit(1)
case len(args) == 1:
// This will actually allow something weird like
// ddbt schema-gen +model+
ModelFilters = append(ModelFilters, args[0])
}

// get filesystem, model and target
// Build a graph from the given filter.
fileSystem, _ := compileAllModels()
model := fileSystem.Model(modelName)
graph := buildGraph(fileSystem, ModelFilters)

target, err := model.GetTarget()
if err != nil {
fmt.Println("could not get target for schema")
// Generate schema for every file in the graph concurrently.
if err := generateSchemaForGraph(graph); err != nil {
fmt.Printf("❌ %s\n", err)
os.Exit(1)
}
fmt.Println("\n🎯 Target for retrieving schema:", target.ProjectID+"."+target.DataSet)
},
}

// retrieve columns from BigQuery
bqColumns, err := getColumnsForModel(modelName, target)
if err != nil {
fmt.Println("Could not retrieve schema")
os.Exit(1)
}
fmt.Println("✅ BQ Schema retrieved. Number of columns in BQ table:", len(bqColumns))
func generateSchemaForGraph(graph *fs.Graph) error {
pb := utils.NewProgressBar("🖨️ Generating schemas", graph.Len())
defer pb.Stop()

// create schema file
ymlPath, schemaFile := generateEmptySchemaFile(model)
var schemaModel *properties.Model
ctx, cancel := context.WithCancel(context.Background())

if model.Schema == nil {
fmt.Println("\n🔍 " + modelName + " schema file not found.. 🌱 Generating new schema file")
schemaModel = generateNewSchemaModel(modelName, bqColumns)
return graph.Execute(func(file *fs.File) error {
if file.Type == fs.ModelFile {
if err := generateSchemaForModel(ctx, file); err != nil {
pb.Stop()

} else {
fmt.Println("\n🔍 " + modelName + " schema file found.. 🛠 Updating schema file")
// set working schema model to current schema model
schemaModel = model.Schema
// add and remove columns in-place
addMissingColumnsToSchema(schemaModel, bqColumns)
removeOutdatedColumnsFromSchema(schemaModel, bqColumns)
}
if err != context.Canceled {
fmt.Printf("❌ %s\n", err)
}

schemaFile.Models = properties.Models{schemaModel}
err = schemaFile.WriteToFile(ymlPath)
if err != nil {
fmt.Println("Error writing YML to file in path")
os.Exit(1)
cancel()
return err
}
}
fmt.Println("\n✅ " + modelName + "schema successfully updated at path: " + ymlPath)
},

pb.Increment()
return nil
}, config.NumberThreads(), pb)
}

// generateSchemaForModel generates a schema and writes yml for modelName.
func generateSchemaForModel(ctx context.Context, model *fs.File) error {
target, err := model.GetTarget()
if err != nil {
fmt.Println("could not get target for schema")
return err
}
fmt.Println("\n🎯 Target for retrieving schema:", target.ProjectID+"."+target.DataSet)

// retrieve columns from BigQuery
bqColumns, err := getColumnsForModel(ctx, model.Name, target)
if err != nil {
fmt.Println("Could not retrieve schema")
return err
}
fmt.Println("✅ BQ Schema retrieved. Number of columns in BQ table:", len(bqColumns))

// create schema file
ymlPath, schemaFile := generateEmptySchemaFile(model)
var schemaModel *properties.Model

if model.Schema == nil {
fmt.Println("\n🔍 " + model.Name + " schema file not found.. 🌱 Generating new schema file")
schemaModel = generateNewSchemaModel(model.Name, bqColumns)
} else {
fmt.Println("\n🔍 " + model.Name + " schema file found.. 🛠 Updating schema file")
// set working schema model to current schema model
schemaModel = model.Schema
// add and remove columns in-place
addMissingColumnsToSchema(schemaModel, bqColumns)
removeOutdatedColumnsFromSchema(schemaModel, bqColumns)
}

schemaFile.Models = properties.Models{schemaModel}
err = schemaFile.WriteToFile(ymlPath)
if err != nil {
fmt.Println("Error writing YML to file in path")
return err
}
fmt.Println("\n✅ " + model.Name + "schema successfully updated at path: " + ymlPath)
return nil
}

func getColumnsForModel(modelName string, target *config.Target) ([]string, error) {
schema, err := bigquery.GetColumnsFromTable(modelName, target)
func getColumnsForModel(ctx context.Context, modelName string, target *config.Target) ([]string, error) {
schema, err := bigquery.GetColumnsFromTableWithContext(ctx, modelName, target)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var testCmd = &cobra.Command{
func executeTests(tests []*fs.File, globalContext *compiler.GlobalContext, graph *fs.Graph) bool {
pb := utils.NewProgressBar("🔬 Running Tests", len(tests))

_, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())

var m sync.Mutex
widestTestName := 0
Expand Down Expand Up @@ -88,7 +88,7 @@ func executeTests(tests []*fs.File, globalContext *compiler.GlobalContext, graph
// schema tests: applied in YAML, returns the number of records that do not pass an assertion —
// when this number is 0, all records pass, therefore, your test passes
var results [][]bigquery.Value
results, _, err = bigquery.GetRows(query, target)
results, _, err = bigquery.GetRows(ctx, query, target)

if err == nil {
if len(results) != 1 {
Expand Down
9 changes: 8 additions & 1 deletion compiler/dbtUtils/queryMacros.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package dbtUtils

import (
"context"
"ddbt/bigquery"
"ddbt/compilerInterface"
"fmt"
"strconv"
"strings"
)

// GetColumnValues is a fallback GetColumnValuesWithContext
// with a background context.
func GetColumnValues(ec compilerInterface.ExecutionContext, caller compilerInterface.AST, arguments compilerInterface.Arguments) (*compilerInterface.Value, error) {
return GetColumnValuesWithContext(context.Background(), ec, caller, arguments)
}

func GetColumnValuesWithContext(ctx context.Context, ec compilerInterface.ExecutionContext, caller compilerInterface.AST, arguments compilerInterface.Arguments) (*compilerInterface.Value, error) {
if isOnlyCompilingSQL(ec) {
return ec.MarkAsDynamicSQL()
}
Expand Down Expand Up @@ -39,7 +46,7 @@ func GetColumnValues(ec compilerInterface.ExecutionContext, caller compilerInter
return nil, ec.ErrorAt(caller, fmt.Sprintf("%s", err))
}

rows, _, err := bigquery.GetRows(query, target)
rows, _, err := bigquery.GetRows(ctx, query, target)
if err != nil {
return nil, ec.ErrorAt(caller, fmt.Sprintf("get_column_values query returned an error: %s", err))
}
Expand Down
2 changes: 1 addition & 1 deletion utils/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package utils

const DdbtVersion = "0.4.5"
const DdbtVersion = "0.4.6"

0 comments on commit 653bad0

Please sign in to comment.