Skip to content

Commit

Permalink
introduce custom checks for bq
Browse files Browse the repository at this point in the history
  • Loading branch information
karakanb committed Jan 26, 2024
1 parent d8db72b commit 957cf73
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 13 deletions.
6 changes: 6 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,14 @@ func setupExecutors(s *scheduler.Scheduler, config *config.Config, conn *connect
return nil, err
}

bqCustomCheckRunner, err := bigquery.NewCustomCheckOperator(conn)
if err != nil {
return nil, err
}

mainExecutors[pipeline.AssetTypeBigqueryQuery][scheduler.TaskInstanceTypeMain] = bqOperator
mainExecutors[pipeline.AssetTypeBigqueryQuery][scheduler.TaskInstanceTypeColumnCheck] = bqCheckRunner
mainExecutors[pipeline.AssetTypeBigqueryQuery][scheduler.TaskInstanceTypeCustomCheck] = bqCustomCheckRunner

// we set the Python runners to run the checks on BigQuery assuming that there won't be many usecases where a user has both BQ and Snowflake
mainExecutors[pipeline.AssetTypePython][scheduler.TaskInstanceTypeColumnCheck] = bqCheckRunner
Expand Down
5 changes: 5 additions & 0 deletions examples/simple-pipeline/assets/hello_bq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ columns:
- name: accepted_values
value: [1, 2]
custom_checks:
- name: This is a custom check name
value: 2
query: select count(*) from dashboard.hello_bq
@bruin */

Expand Down
50 changes: 37 additions & 13 deletions pkg/bigquery/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ensureCountZero(check string, res [][]interface{}) (int64, error) {
func (c *NotNullCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT count(*) FROM %s WHERE %s IS NULL", ti.GetAsset().Name, ti.Column.Name)

return (&countZeroCheck{
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "not_null",
Expand All @@ -39,7 +39,7 @@ type PositiveCheck struct {

func (c *PositiveCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT count(*) FROM %s WHERE %s <= 0", ti.GetAsset().Name, ti.Column.Name)
return (&countZeroCheck{
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "positive",
Expand All @@ -55,7 +55,7 @@ type UniqueCheck struct {

func (c *UniqueCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT COUNT(%s) - COUNT(DISTINCT %s) FROM %s", ti.Column.Name, ti.Column.Name, ti.GetAsset().Name)
return (&countZeroCheck{
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "unique",
Expand Down Expand Up @@ -100,7 +100,7 @@ func (c *AcceptedValuesCheck) Check(ctx context.Context, ti *scheduler.ColumnChe
res = res[1 : sz-1]

qq := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE CAST(%s as STRING) NOT IN (%s)", ti.GetAsset().Name, ti.Column.Name, res)
return (&countZeroCheck{
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "accepted_values",
Expand All @@ -110,15 +110,24 @@ func (c *AcceptedValuesCheck) Check(ctx context.Context, ti *scheduler.ColumnChe
}).Check(ctx, ti)
}

type countZeroCheck struct {
conn connectionFetcher
queryInstance *query.Query
checkName string
customError func(count int64) error
type countableQueryCheck struct {
conn connectionFetcher
expectedQueryResult int64
queryInstance *query.Query
checkName string
customError func(count int64) error
}

func (c *countZeroCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
q, err := c.conn.GetBqConnection(ti.Pipeline.GetConnectionNameForAsset(ti.GetAsset()))
func (c *countableQueryCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
return c.check(ctx, ti.Pipeline.GetConnectionNameForAsset(ti.GetAsset()))
}

func (c *countableQueryCheck) CustomCheck(ctx context.Context, ti *scheduler.CustomCheckInstance) error {
return c.check(ctx, ti.Pipeline.GetConnectionNameForAsset(ti.GetAsset()))
}

func (c *countableQueryCheck) check(ctx context.Context, connectionName string) error {
q, err := c.conn.GetBqConnection(connectionName)
if err != nil {
return errors.Wrapf(err, "failed to get connection for '%s' check", c.checkName)
}
Expand All @@ -133,10 +142,25 @@ func (c *countZeroCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckIns
return err
}

if count != 0 {
if count != c.expectedQueryResult {
return c.customError(count)
// return errors.Errorf("column %s has %d positive values", ti.Column.Name, count)
}

return nil
}

type CustomCheck struct {
conn connectionFetcher
}

func (c *CustomCheck) Check(ctx context.Context, ti *scheduler.CustomCheckInstance) error {
return (&countableQueryCheck{
conn: c.conn,
expectedQueryResult: ti.Check.Value,
queryInstance: &query.Query{Query: ti.Check.Query},
checkName: ti.Check.Name,
customError: func(count int64) error {
return errors.Errorf("custom check '%s' has returned %d instead of the expected %d", ti.Check.Name, count, ti.Check.Value)
},
}).CustomCheck(ctx, ti)
}
93 changes: 93 additions & 0 deletions pkg/bigquery/checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,96 @@ func runTestsFoCountZeroCheck(t *testing.T, instanceBuilder func(q *mockQuerierW
})
}
}

func TestCustomCheck(t *testing.T) {
t.Parallel()

expectedQueryString := "SELECT 1"
expectedQuery := &query.Query{Query: expectedQueryString}
setupFunc := func(val [][]interface{}, err error) func(n *mockQuerierWithResult) {
return func(q *mockQuerierWithResult) {
q.On("Select", mock.Anything, expectedQuery).
Return(val, err).
Once()
}
}

checkError := func(message string) assert.ErrorAssertionFunc {
return func(t assert.TestingT, err error, i ...interface{}) bool {
return assert.EqualError(t, err, message)
}
}

tests := []struct {
name string
setup func(n *mockQuerierWithResult)
wantErr assert.ErrorAssertionFunc
}{
{
name: "failed to run query",
setup: setupFunc(nil, assert.AnError),
wantErr: assert.Error,
},
{
name: "multiple results are returned",
setup: setupFunc([][]interface{}{{1}, {2}}, nil),
wantErr: assert.Error,
},
{
name: "null values found",
setup: setupFunc([][]interface{}{{nil}}, nil),
wantErr: checkError("unexpected result from query during check1 check, result is nil"),
},
{
name: "wrong result returned",
setup: setupFunc([][]interface{}{{int64(10)}}, nil),
wantErr: checkError("custom check 'check1' has returned 10 instead of the expected 5"),
},
{
name: "no null values found, test passed",
setup: setupFunc([][]interface{}{{5}}, nil),
wantErr: assert.NoError,
},
{
name: "no null values found, result is a string, test passed",
setup: setupFunc([][]interface{}{{"5"}}, nil),
wantErr: assert.NoError,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

q := new(mockQuerierWithResult)
tt.setup(q)

conn := new(mockConnectionFetcher)
conn.On("GetBqConnection", "test").Return(q, nil)
n := &CustomCheck{conn: conn}

testInstance := &scheduler.CustomCheckInstance{
AssetInstance: &scheduler.AssetInstance{
Asset: &pipeline.Asset{
Name: "dataset.test_asset",
Type: pipeline.AssetTypeBigqueryQuery,
},
Pipeline: &pipeline.Pipeline{
Name: "test",
DefaultConnections: map[string]string{
"google_cloud_platform": "test",
},
},
},
Check: &pipeline.CustomCheck{
Name: "check1",
Value: 5,
Query: expectedQueryString,
},
}

tt.wantErr(t, n.Check(context.Background(), testInstance))
defer q.AssertExpectations(t)
})
}
}
23 changes: 23 additions & 0 deletions pkg/bigquery/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,26 @@ func (o ColumnCheckOperator) Run(ctx context.Context, ti scheduler.TaskInstance)

return executor.Check(ctx, test)
}

type customCheckRunner interface {
Check(ctx context.Context, ti *scheduler.CustomCheckInstance) error
}

func NewCustomCheckOperator(manager connectionFetcher) (*CustomCheckOperator, error) {
return &CustomCheckOperator{
checkRunner: &CustomCheck{conn: manager},
}, nil
}

type CustomCheckOperator struct {
checkRunner customCheckRunner
}

func (o *CustomCheckOperator) Run(ctx context.Context, ti scheduler.TaskInstance) error {
instance, ok := ti.(*scheduler.CustomCheckInstance)
if !ok {
return errors.New("cannot run a non-custom check instance")
}

return o.checkRunner.Check(ctx, instance)
}

0 comments on commit 957cf73

Please sign in to comment.