Skip to content

Commit

Permalink
Merge pull request #2 from bruin-data/sf-sql
Browse files Browse the repository at this point in the history
add snowflake operators init
  • Loading branch information
karakanb authored Dec 18, 2023
2 parents 283269e + 17db642 commit 4030ea7
Show file tree
Hide file tree
Showing 15 changed files with 1,180 additions and 13 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ linters-settings:
- stdlib
- (or|er)$
- bigquery.DB
- snowflake.SfClient
16 changes: 16 additions & 0 deletions cmd/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ func Lint(isDebug *bool) *cli.Command {
logger.Debug("no GCP connections found, skipping BigQuery validation")
}

if len(cm.SelectedEnvironment.Connections.Snowflake) > 0 {
rules = append(rules, &lint.QueryValidatorRule{
Identifier: "snowflake-validator",
TaskType: pipeline.AssetTypeSnowflakeQuery,
Connections: connectionManager,
Extractor: &query.WholeFileExtractor{
Fs: fs,
Renderer: query.DefaultJinjaRenderer,
},
WorkerCount: 32,
Logger: logger,
})
} else {
logger.Debug("no Snowflake connections found, skipping Snowflake validation")
}

linter := lint.NewLinter(path.GetPipelinePaths, builder, rules, logger)

infoPrinter.Printf("Validating pipelines in '%s' for '%s' environment...\n", rootPath, cm.SelectedEnvironmentName)
Expand Down
27 changes: 24 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/bruin-data/bruin/pkg/python"
"github.com/bruin-data/bruin/pkg/query"
"github.com/bruin-data/bruin/pkg/scheduler"
"github.com/bruin-data/bruin/pkg/snowflake"
"github.com/fatih/color"
"github.com/pkg/errors"
"github.com/spf13/afero"
Expand Down Expand Up @@ -314,16 +315,36 @@ func setupExecutors(s *scheduler.Scheduler, config *config.Config, conn *connect

bqOperator := bigquery.NewBasicOperator(conn, wholeFileExtractor, bigquery.Materializer{})

bqTestRunner, err := bigquery.NewColumnCheckOperator(conn)
bqCheckRunner, err := bigquery.NewColumnCheckOperator(conn)
if err != nil {
return nil, err
}

mainExecutors[pipeline.AssetTypeBigqueryQuery][scheduler.TaskInstanceTypeMain] = bqOperator
mainExecutors[pipeline.AssetTypeBigqueryQuery][scheduler.TaskInstanceTypeColumnCheck] = bqTestRunner
mainExecutors[pipeline.AssetTypeBigqueryQuery][scheduler.TaskInstanceTypeColumnCheck] = bqCheckRunner

// we set the Python runners to run the checks on BigQuery assuming that there won't be many usecases where a user has both BQ and Snowflake
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeColumnCheck] = bqTestRunner
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeColumnCheck] = bqCheckRunner
}

if s.WillRunTaskOfType(pipeline.AssetTypeSnowflakeQuery) {
wholeFileExtractor := &query.WholeFileExtractor{
Fs: fs,
Renderer: jinja.NewRendererWithStartEndDates(&startDate, &endDate),
}

sfOperator := snowflake.NewBasicOperator(conn, wholeFileExtractor, snowflake.Materializer{})

sfCheckRunner, err := snowflake.NewColumnCheckOperator(conn)
if err != nil {
return nil, err
}

mainExecutors[pipeline.AssetTypeSnowflakeQuery][scheduler.TaskInstanceTypeMain] = sfOperator
mainExecutors[pipeline.AssetTypeSnowflakeQuery][scheduler.TaskInstanceTypeColumnCheck] = sfCheckRunner

// we set the Python runners to run the checks on Snowflake assuming that there won't be many usecases where a user has both BQ and Snowflake
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeColumnCheck] = sfCheckRunner
}

return mainExecutors, nil
Expand Down
28 changes: 28 additions & 0 deletions examples/simple-pipeline/assets/hello_sf.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* @bruin
name: dashboard.hello_sf
type: sf.sql
depends:
- hello_python
materialization:
type: table
columns:
- name: one
type: integer
description: "Just a number"
checks:
- name: unique
- name: not_null
- name: positive
- name: accepted_values
value: [1, 2]
@bruin */

select 1 as one
union all
select 2 as one
3 changes: 2 additions & 1 deletion examples/simple-pipeline/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ schedule: daily
start_date: "2023-03-20"

default_connections:
google_cloud_platform: "gcp"
google_cloud_platform: "gcp"
snowflake: "snowflake"
4 changes: 2 additions & 2 deletions pkg/bigquery/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func NewColumnCheckOperator(manager connectionFetcher) (*ColumnCheckOperator, er
func (o ColumnCheckOperator) Run(ctx context.Context, ti scheduler.TaskInstance) error {
test, ok := ti.(*scheduler.ColumnCheckInstance)
if !ok {
return errors.New("cannot run a non-column test instance")
return errors.New("cannot run a non-column check instance")
}

executor, ok := o.testRunners[test.Check.Name]
if !ok {
return errors.New("there is no executor configured for the test type, test cannot be run: " + test.Check.Name)
return errors.New("there is no executor configured for the check type, check cannot be run: " + test.Check.Name)
}

return executor.Check(ctx, test)
Expand Down
47 changes: 46 additions & 1 deletion pkg/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (

"github.com/bruin-data/bruin/pkg/bigquery"
"github.com/bruin-data/bruin/pkg/config"
"github.com/bruin-data/bruin/pkg/snowflake"
)

type Manager struct {
BigQuery map[string]*bigquery.Client
BigQuery map[string]*bigquery.Client
Snowflake map[string]*snowflake.DB
}

func (m *Manager) GetConnection(name string) (interface{}, error) {
Expand All @@ -28,6 +30,19 @@ func (m *Manager) GetBqConnection(name string) (bigquery.DB, error) {
return db, nil
}

func (m *Manager) GetSfConnection(name string) (snowflake.SfClient, error) {
if m.Snowflake == nil {
return nil, errors.New("no snowflake connections found")
}

db, ok := m.Snowflake[name]
if !ok {
return nil, errors.New("snowflake connection not found")
}

return db, nil
}

func (m *Manager) AddBqConnectionFromConfig(connection *config.GoogleCloudPlatformConnection) error {
if m.BigQuery == nil {
m.BigQuery = make(map[string]*bigquery.Client)
Expand All @@ -48,6 +63,29 @@ func (m *Manager) AddBqConnectionFromConfig(connection *config.GoogleCloudPlatfo
return nil
}

func (m *Manager) AddSfConnectionFromConfig(connection *config.SnowflakeConnection) error {
if m.Snowflake == nil {
m.Snowflake = make(map[string]*snowflake.DB)
}

db, err := snowflake.NewDB(&snowflake.Config{
Account: connection.Account,
Username: connection.Username,
Password: connection.Password,
Region: connection.Region,
Role: connection.Role,
Database: connection.Database,
Schema: connection.Schema,
})
if err != nil {
return err
}

m.Snowflake[connection.Name] = db

return nil
}

func NewManagerFromConfig(cm *config.Config) (*Manager, error) {
connectionManager := &Manager{}
for _, conn := range cm.SelectedEnvironment.Connections.GoogleCloudPlatform {
Expand All @@ -57,6 +95,13 @@ func NewManagerFromConfig(cm *config.Config) (*Manager, error) {
return nil, err
}
}
for _, conn := range cm.SelectedEnvironment.Connections.Snowflake {
conn := conn
err := connectionManager.AddSfConnectionFromConfig(&conn)
if err != nil {
return nil, err
}
}

return connectionManager, nil
}
154 changes: 154 additions & 0 deletions pkg/snowflake/checks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package snowflake

import (
"context"
"encoding/json"
"fmt"

"github.com/bruin-data/bruin/pkg/query"
"github.com/bruin-data/bruin/pkg/scheduler"
"github.com/pkg/errors"
)

type NotNullCheck struct {
conn connectionFetcher
}

func ensureCountZero(check string, res [][]interface{}) (int64, error) {
if len(res) != 1 || len(res[0]) != 1 {
return 0, errors.Errorf("unexpected result from query during %s check", check)
}

nullCount, ok := res[0][0].(int64)
if !ok {
nullCountInt, ok := res[0][0].(int)
if !ok {
return 0, errors.Errorf("unexpected result from query during %s check, cannot cast result to integer", check)
}

nullCount = int64(nullCountInt)
}

return nullCount, nil
}

func (c *NotNullCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT count(*) FROM `%s` WHERE `%s` IS NULL", ti.GetAsset().Name, ti.Column.Name)

return (&countZeroCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "not_null",
customError: func(count int64) error {
return errors.Errorf("column `%s` has %d null values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}

type PositiveCheck struct {
conn connectionFetcher
}

func (c *PositiveCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT count(*) FROM `%s` WHERE `%s` <= 0", ti.GetAsset().Name, ti.Column.Name)
return (&countZeroCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "positive",
customError: func(count int64) error {
return errors.Errorf("column `%s` has %d non-positive values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}

type UniqueCheck struct {
conn connectionFetcher
}

func (c *UniqueCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT COUNT(`%s`) - COUNT(DISTINCT `%s`) FROM `%s`", ti.Column.Name, ti.Column.Name, ti.GetAsset().Name)
return (&countZeroCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "unique",
customError: func(count int64) error {
return errors.Errorf("column `%s` has %d non-unique values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}

type AcceptedValuesCheck struct {
conn connectionFetcher
}

func (c *AcceptedValuesCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
if ti.Check.Value.StringArray == nil && ti.Check.Value.IntArray == nil {
return errors.Errorf("unexpected value for accepted_values check, the values must to be an array, instead %T", ti.Check.Value)
}

if ti.Check.Value.StringArray != nil && len(*ti.Check.Value.StringArray) == 0 {
return errors.Errorf("no values provided for accepted_values check")
}

if ti.Check.Value.IntArray != nil && len(*ti.Check.Value.IntArray) == 0 {
return errors.Errorf("no values provided for accepted_values check")
}

var val []string
if ti.Check.Value.StringArray != nil {
val = *ti.Check.Value.StringArray
} else {
for _, v := range *ti.Check.Value.IntArray {
val = append(val, fmt.Sprintf("%d", v))
}
}

res, err := json.Marshal(val)
if err != nil {
return errors.Wrap(err, "failed to marshal accepted values for the query result")
}

sz := len(res)
res = res[1 : sz-1]

qq := fmt.Sprintf("SELECT COUNT(*) FROM `%s` WHERE CAST(`%s` as STRING) NOT IN (%s)", ti.GetAsset().Name, ti.Column.Name, res)
return (&countZeroCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "accepted_values",
customError: func(count int64) error {
return errors.Errorf("column `%s` has %d rows that are not in the accepted values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}

type countZeroCheck struct {
conn connectionFetcher
queryInstance *query.Query
checkName string
customError func(count int64) error
}

func (c *countZeroCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
q, err := c.conn.GetSfConnection(ti.Pipeline.GetConnectionNameForAsset(ti.GetAsset()))
if err != nil {
return errors.Wrapf(err, "failed to get connection for '%s' check", c.checkName)
}

res, err := q.Select(ctx, c.queryInstance)
if err != nil {
return errors.Wrapf(err, "failed '%s' check", c.checkName)
}

count, err := ensureCountZero(c.checkName, res)
if err != nil {
return err
}

if count != 0 {
return c.customError(count)
// return errors.Errorf("column `%s` has %d positive values", ti.Column.Name, count)
}

return nil
}
Loading

0 comments on commit 4030ea7

Please sign in to comment.