From b8968304b5d6d4db52dd89f3d66e56739131e27c Mon Sep 17 00:00:00 2001 From: Burak Karakan Date: Mon, 6 May 2024 10:21:16 +0100 Subject: [PATCH] validate bigquery query sensor fields --- pkg/executor/defaults.go | 2 +- pkg/lint/rules.go | 25 +++++++++++++ pkg/lint/rules_test.go | 78 ++++++++++++++++++++++++++++++++++++++++ pkg/pipeline/pipeline.go | 1 + 4 files changed, 105 insertions(+), 1 deletion(-) diff --git a/pkg/executor/defaults.go b/pkg/executor/defaults.go index d46b0e16..1f7129ab 100644 --- a/pkg/executor/defaults.go +++ b/pkg/executor/defaults.go @@ -15,7 +15,7 @@ var DefaultExecutorsV2 = map[pipeline.AssetType]Config{ pipeline.AssetTypeBigqueryTableSensor: { scheduler.TaskInstanceTypeMain: NoOpOperator{}, }, - "bq.sensor.query": { + pipeline.AssetTypeBigqueryQuerySensor: { scheduler.TaskInstanceTypeMain: NoOpOperator{}, }, "bq.cost_tracker": { diff --git a/pkg/lint/rules.go b/pkg/lint/rules.go index 178e2080..283ffc0f 100644 --- a/pkg/lint/rules.go +++ b/pkg/lint/rules.go @@ -569,3 +569,28 @@ func EnsureBigQueryTableSensorHasTableParameterForASingleAsset(ctx context.Conte return issues, nil } + +func EnsureBigQueryQuerySensorHasTableParameterForASingleAsset(ctx context.Context, p *pipeline.Pipeline, asset *pipeline.Asset) ([]*Issue, error) { + issues := make([]*Issue, 0) + if asset.Type != pipeline.AssetTypeBigqueryQuerySensor { + return issues, nil + } + + query, ok := asset.Parameters["query"] + if !ok { + issues = append(issues, &Issue{ + Task: asset, + Description: "BigQuery query sensor requires a `query` parameter", + }) + return issues, nil + } + + if query == "" { + issues = append(issues, &Issue{ + Task: asset, + Description: "BigQuery query sensor requires a `query` parameter that is not empty", + }) + } + + return issues, nil +} diff --git a/pkg/lint/rules_test.go b/pkg/lint/rules_test.go index dc880ca2..9d22c8e2 100644 --- a/pkg/lint/rules_test.go +++ b/pkg/lint/rules_test.go @@ -1418,6 +1418,84 @@ func TestEnsureSnowflakeSensorHasQueryParameter(t *testing.T) { } } +func TestEnsureBigqueryQuerySensorHasQueryParameter(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + p *pipeline.Pipeline + want []string + wantErr assert.ErrorAssertionFunc + }{ + { + name: "no query param", + p: &pipeline.Pipeline{ + Assets: []*pipeline.Asset{ + { + Name: "task1", + Type: pipeline.AssetTypeBigqueryQuerySensor, + }, + }, + }, + wantErr: assert.NoError, + want: []string{"BigQuery query sensor requires a `query` parameter"}, + }, + { + name: "query param exists but empty", + p: &pipeline.Pipeline{ + Assets: []*pipeline.Asset{ + { + Name: "task1", + Type: pipeline.AssetTypeBigqueryQuerySensor, + Parameters: map[string]string{ + "query": "", + }, + }, + }, + }, + wantErr: assert.NoError, + want: []string{"BigQuery query sensor requires a `query` parameter that is not empty"}, + }, + { + name: "no issues", + p: &pipeline.Pipeline{ + Assets: []*pipeline.Asset{ + { + Name: "task1", + Type: pipeline.AssetTypeBigqueryQuerySensor, + Parameters: map[string]string{ + "query": "SELECT 1", + }, + }, + }, + }, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, err := CallFuncForEveryAsset(EnsureBigQueryQuerySensorHasTableParameterForASingleAsset)(tt.p) + if !tt.wantErr(t, err) { + return + } + + // I am doing this because I don't care if I get a nil or empty slice + if tt.want != nil { + gotMessages := make([]string, len(got)) + for i, issue := range got { + gotMessages[i] = issue.Description + } + + assert.Equal(t, tt.want, gotMessages) + } else { + assert.Equal(t, []*Issue{}, got) + } + }) + } +} + func TestEnsureBigQueryTableSensorHasTableParameter(t *testing.T) { t.Parallel() diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 14b43550..c6b84b96 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -21,6 +21,7 @@ const ( AssetTypeSnowflakeQuerySensor = AssetType("sf.sensor.query") AssetTypeBigqueryQuery = AssetType("bq.sql") AssetTypeBigqueryTableSensor = AssetType("bq.sensor.table") + AssetTypeBigqueryQuerySensor = AssetType("bq.sensor.query") AssetTypeEmpty = AssetType("empty") AssetTypePostgresQuery = AssetType("pg.sql") AssetTypeRedshiftQuery = AssetType("rs.sql")