From e61023266397fa4a6474c4dbaf0f80d435beef29 Mon Sep 17 00:00:00 2001 From: Dominic Black Date: Thu, 21 Jan 2021 16:25:22 +0000 Subject: [PATCH] Adding `isolate-dag` command This command allows us to easily create a slimmer version of a large DBT project, which can then be executed by FishTown's DBT which it needing to parse the whole of your project. i.e. you can get the speed improvement of DDBT, but the compability of DBT. This commit also moves the build status updates from Stdout to Stderr --- README.md | 1 + cmd/isolateDAG.go | 174 +++++++++++++++++++++++++++++++++++++++++++ cmd/run.go | 4 +- cmd/showDAG.go | 2 +- config/config.go | 4 +- fs/file.go | 13 ++++ fs/graph.go | 45 +++++++++++ utils/ProgressBar.go | 2 +- utils/version.go | 2 +- 9 files changed, 241 insertions(+), 6 deletions(-) create mode 100644 cmd/isolateDAG.go diff --git a/README.md b/README.md index 8d84ed2..8c1c4b0 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ ddbt version 0.2.1 - `ddbt watch` will get act like `run`, followed by `test`. DDBT will then watch your file system for any changes and automatically rerun those parts of the DAG and affected downstream tests or failing tests. - `ddbt watch --skip-run` is the same as watch, but will skip the initial run (preventing you having to wait for all the models to run) before running the tests and starting to watch your file system. - `ddbt completion zsh` will generate a shell completion script zsh (or bash if you pass that as argument). Detailed steps to set up the completion script can be found in `ddbt completion --help` +- `ddbt isolate-dag` will create a temporary directory and symlink in all files needed for the given _model_filter_ such that Fishtown's DBT could be run against it without having to be run against every model in your data warehouse ### Global Arguments - `--models model_filter` _or_ `-m model_filter`: Instead of running for every model in your project, DDBT will only execute against the requested models. See filters below for what is accepted for `my_model` diff --git a/cmd/isolateDAG.go b/cmd/isolateDAG.go new file mode 100644 index 0000000..09aa306 --- /dev/null +++ b/cmd/isolateDAG.go @@ -0,0 +1,174 @@ +package cmd + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + + "github.com/spf13/cobra" + + "ddbt/config" + "ddbt/fs" + "ddbt/utils" +) + +func init() { + rootCmd.AddCommand(isolateDAG) + addModelsFlag(isolateDAG) +} + +var isolateDAG = &cobra.Command{ + Use: "isolate-dag", + Short: "Creates a symlinked copy of the selected models, which can be then passed to Fishtown's DBT", + Run: func(cmd *cobra.Command, args []string) { + fileSystem, _ := compileAllModels() + + graph := buildGraph(fileSystem, ModelFilter) // Build the execution graph for the given command + graph.AddReferencingTests() // And then add any tests which reference that graph + + if err := graph.AddAllUsedMacros(); err != nil { + fmt.Printf("❌ Unable to get all used macros: %s\n", err) + os.Exit(1) + } + + isolateGraph(graph) + }, +} + +func isolateGraph(graph *fs.Graph) { + pb := utils.NewProgressBar("đŸ”Ē Isolating DAG", graph.Len()) + defer pb.Stop() + + // Create a temporary directory to stick the isolated models in + isolationDir, err := ioutil.TempDir(os.TempDir(), "isolated-dag-") + if err != nil { + fmt.Printf("❌ Unable to create temporarily directory for DAG isolation: %s\n", err) + os.Exit(1) + } + + // Get the current working directory + cwd, err := os.Getwd() + if err != nil { + fmt.Printf("❌ Unable to get working directory: %s\n", err) + os.Exit(1) + } + + symLink := func(pathInProject string) error { + fullOrgPath := filepath.Join(cwd, pathInProject) + symlinkedPath := filepath.Join(isolationDir, pathInProject) + + // Create the folder in the isolated dir if needed + err := os.MkdirAll(filepath.Dir(symlinkedPath), os.ModePerm) + if err != nil { + return err + } + + // Symlink the file in there + err = os.Symlink(fullOrgPath, symlinkedPath) + if err != nil { + return err + } + + return nil + } + + // Create a blank file which DBT can read + touch := func(pathInProject string) error { + symlinkedPath := filepath.Join(isolationDir, pathInProject) + + // Create the folder in the isolated dir if needed + err := os.MkdirAll(filepath.Dir(symlinkedPath), os.ModePerm) + if err != nil { + return err + } + + // If the file doesn't exist create it with no contents + if _, err := os.Stat(symlinkedPath); os.IsNotExist(err) { + file, err := os.Create(symlinkedPath) + if err != nil { + return err + } + return file.Close() + } + + return nil + } + + projectFiles := []string{ + "dbt_project.yml", + "ddbt_config.yml", + "profiles", + "debug", + "docs", + "dbt_modules", + } + + // If we have a model groups file bring that too + if config.GlobalCfg.ModelGroupsFile != "" { + projectFiles = append(projectFiles, config.GlobalCfg.ModelGroupsFile) + } + + for _, file := range projectFiles { + if err := symLink(file); err != nil && !os.IsNotExist(err) { + pb.Stop() + fmt.Printf("❌ Unable to isolate project file `%s`: %s\n", file, err) + os.Exit(1) + } + } + + err = graph.Execute(func(file *fs.File) error { + // Symlink the file from the DAG into the isolated folder + if err := symLink(file.Path); err != nil { + pb.Stop() + fmt.Printf("❌ Unable to isolate %s `%s`: %s\n", file.Type, file.Name, err) + return err + } + + // Symlink the schema if it exists + schemaFile := strings.TrimSuffix(file.Path, filepath.Ext(file.Path)) + ".yml" + if _, err := os.Stat(schemaFile); file.Schema != nil && err == nil { + if err := symLink(schemaFile); err != nil { + pb.Stop() + fmt.Printf("❌ Unable to isolate schema for %s `%s`: %s\n", file.Type, file.Name, err) + return err + } + } + + // Ensure usptream models are handled + for _, upstream := range file.Upstreams() { + if graph.Contains(upstream) { + continue + } + + switch upstream.Type { + case fs.ModelFile: + // Model's outside of the DAG but referenced by it need to exist for DBT to be able to run on this DAG + // even if we run with the upstream command + if err := touch(upstream.Path); err != nil { + pb.Stop() + fmt.Printf("❌ Unable to touch %s `%s`: %s\n", upstream.Type, upstream.Name, err) + return err + } + + default: + // Any other than a model which is being used _should_ already be in the graph + pb.Stop() + fmt.Printf("❌ Unexpected Upstream %s `%s`\n", upstream.Type, upstream.Name) + return err + } + } + + pb.Increment() + return nil + }, config.NumberThreads(), pb) + + if err != nil { + os.Exit(1) + } + + pb.Stop() + + fmt.Print(isolationDir) +} diff --git a/cmd/run.go b/cmd/run.go index 37450ab..f076f10 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -51,10 +51,10 @@ func addModelsFlag(cmd *cobra.Command) { } func compileAllModels() (*fs.FileSystem, *compiler.GlobalContext) { - fmt.Printf("ℹī¸ Building for %s profile\n", config.GlobalCfg.Target.Name) + _, _ = fmt.Fprintf(os.Stderr, "ℹī¸ Building for %s profile\n", config.GlobalCfg.Target.Name) // Read the models on the file system - fileSystem, err := fs.ReadFileSystem(os.Stdout) + fileSystem, err := fs.ReadFileSystem(os.Stderr) if err != nil { fmt.Printf("❌ Unable to read filesystem: %s\n", err) os.Exit(1) diff --git a/cmd/showDAG.go b/cmd/showDAG.go index cefd754..1473369 100644 --- a/cmd/showDAG.go +++ b/cmd/showDAG.go @@ -12,7 +12,7 @@ import ( func init() { rootCmd.AddCommand(showDAG) - showDAG.Flags().StringVarP(&ModelFilter, "models", "m", "", "Select which model(s) to run") + addModelsFlag(showDAG) } var showDAG = &cobra.Command{ diff --git a/config/config.go b/config/config.go index 3423f11..3a3267d 100644 --- a/config/config.go +++ b/config/config.go @@ -15,7 +15,8 @@ type Config struct { Target *Target // Custom behaviour which allows us to override the target information on a per folder basis within `/models/` - ModelGroups map[string]*Target + ModelGroups map[string]*Target + ModelGroupsFile string // seedConfig holds the seed (global) configurations seedConfig map[string]*SeedConfig @@ -119,6 +120,7 @@ func Read(targetProfile string, upstreamProfile string, threads int, strExecutor } GlobalCfg.ModelGroups = modelGroups + GlobalCfg.ModelGroupsFile = appConfig.ModelGroupsFile } if settings, found := project.Models[project.Name]; found { diff --git a/fs/file.go b/fs/file.go index 101d3cf..f219f3d 100644 --- a/fs/file.go +++ b/fs/file.go @@ -209,6 +209,19 @@ func (f *File) Downstreams() []*File { return downstreams } +// All the upstreams in this file +func (f *File) Upstreams() []*File { + f.Mutex.Lock() + defer f.Mutex.Unlock() + + upstreams := make([]*File, 0, len(f.upstreams)) + for upstream := range f.upstreams { + upstreams = append(upstreams, upstream) + } + + return upstreams +} + func (f *File) MaskAsDynamicSQL() { f.Mutex.Lock() defer f.Mutex.Unlock() diff --git a/fs/graph.go b/fs/graph.go index 6daf98e..618f171 100644 --- a/fs/graph.go +++ b/fs/graph.go @@ -198,6 +198,46 @@ func (g *Graph) addDownstreamModels(file *File, visited map[*File]struct{}) { } } +func (g *Graph) AddAllUsedMacros() error { + visited := make(map[*File]struct{}) + + for file := range g.nodes { + g.addUpstreamMacros(file, visited) + } + + // Check for circular dependencies & all nodes without upstreams + for file := range visited { + node := g.getNodeFor(file) + + if node.upstreamContains(node) { + return errors.New(fmt.Sprintf("%s has a circular dependency on itself", node.file.Name)) + } + } + + return nil +} + +func (g *Graph) addUpstreamMacros(file *File, visited map[*File]struct{}) { + if _, found := visited[file]; found { + return + } + visited[file] = struct{}{} + + thisNode := g.getNodeFor(file) + + file.Mutex.Lock() + defer file.Mutex.Unlock() + for upstream := range file.upstreams { + if upstream.Type == MacroFile { + upstreamNode := g.getNodeFor(upstream) + + g.edge(upstreamNode, thisNode) + + g.addUpstreamMacros(upstream, visited) + } + } +} + // Find all tests which reference the models in the existing graph // and add them to the graph // @@ -404,3 +444,8 @@ func (g *Graph) NumberNodesNeedRerunning() int { return count } + +func (g *Graph) Contains(file *File) bool { + _, found := g.nodes[file] + return found +} diff --git a/utils/ProgressBar.go b/utils/ProgressBar.go index 5de0798..aa8c477 100644 --- a/utils/ProgressBar.go +++ b/utils/ProgressBar.go @@ -47,7 +47,7 @@ func NewProgressBar(label string, numberItems int) *ProgressBar { label: label, completedItems: 0, numberItems: uint32(numberItems), - output: os.Stdout, + output: os.Stderr, startTime: time.Now(), lastIncremented: time.Now(), diff --git a/utils/version.go b/utils/version.go index 3c76eb2..f3120f9 100644 --- a/utils/version.go +++ b/utils/version.go @@ -1,3 +1,3 @@ package utils -const DdbtVersion = "0.2.1" +const DdbtVersion = "0.3.0"