Skip to content

Commit

Permalink
Merge pull request #22 from bruin-data/feature/full-refresh
Browse files Browse the repository at this point in the history
Feature/full refresh
  • Loading branch information
albertobruin authored Mar 14, 2024
2 parents 7e2f41c + 538ffed commit 6147b09
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 87 deletions.
21 changes: 15 additions & 6 deletions cmd/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/bruin-data/bruin/pkg/postgres"
"github.com/bruin-data/bruin/pkg/query"
"github.com/bruin-data/bruin/pkg/snowflake"
"github.com/bruin-data/bruin/pkg/synapse"
"github.com/urfave/cli/v2"
)

Expand All @@ -22,19 +23,27 @@ func Render() *cli.Command {
Name: "render",
Usage: "render a single Bruin SQL asset",
ArgsUsage: "[path to the asset definition]",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "full-refresh",
Aliases: []string{"r"},
Usage: "truncate the table before running",
},
},
Action: func(c *cli.Context) error {
fullRefresh := c.Bool("full-refresh")
r := RenderCommand{
extractor: &query.WholeFileExtractor{
Fs: fs,
Renderer: jinja.NewRendererWithYesterday(),
},
materializers: map[pipeline.AssetType]queryMaterializer{
pipeline.AssetTypeBigqueryQuery: bigquery.NewMaterializer(),
pipeline.AssetTypeSnowflakeQuery: snowflake.NewMaterializer(),
pipeline.AssetTypeRedshiftQuery: postgres.NewMaterializer(),
pipeline.AssetTypePostgresQuery: postgres.NewMaterializer(),
pipeline.AssetTypeMsSQLQuery: mssql.NewMaterializer(),
pipeline.AssetTypeSynapseQuery: mssql.NewMaterializer(),
pipeline.AssetTypeBigqueryQuery: bigquery.NewMaterializer(fullRefresh),
pipeline.AssetTypeSnowflakeQuery: snowflake.NewMaterializer(fullRefresh),
pipeline.AssetTypeRedshiftQuery: postgres.NewMaterializer(fullRefresh),
pipeline.AssetTypePostgresQuery: postgres.NewMaterializer(fullRefresh),
pipeline.AssetTypeMsSQLQuery: mssql.NewMaterializer(fullRefresh),
pipeline.AssetTypeSynapseQuery: synapse.NewRenderer(fullRefresh),
},
builder: DefaultPipelineBuilder,
writer: os.Stdout,
Expand Down
19 changes: 12 additions & 7 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func Run(isDebug *bool) *cli.Command {
Name: "no-log-file",
Usage: "do not create a log file for this run",
},
&cli.BoolFlag{
Name: "full-refresh",
Aliases: []string{"r"},
Usage: "truncate the table before running",
},
},
Action: func(c *cli.Context) error {
defer func() {
Expand Down Expand Up @@ -242,7 +247,7 @@ func Run(isDebug *bool) *cli.Command {
s.MarkTask(task, scheduler.Pending, runDownstreamTasks)
}

mainExecutors, err := setupExecutors(s, cm, connectionManager, startDate, endDate, runID)
mainExecutors, err := setupExecutors(s, cm, connectionManager, startDate, endDate, runID, c.Bool("full-refresh"))
if err != nil {
errorPrinter.Printf(err.Error())
return cli.Exit("", 1)
Expand Down Expand Up @@ -311,7 +316,7 @@ func printErrorsInResults(errorsInTaskResults []*scheduler.TaskExecutionResult,
}
}

func setupExecutors(s *scheduler.Scheduler, config *config.Config, conn *connection.Manager, startDate, endDate time.Time, runID string) (map[pipeline.AssetType]executor.Config, error) {
func setupExecutors(s *scheduler.Scheduler, config *config.Config, conn *connection.Manager, startDate, endDate time.Time, runID string, fullRefresh bool) (map[pipeline.AssetType]executor.Config, error) {
mainExecutors := executor.DefaultExecutorsV2

// this is a heuristic we apply to find what might be the most common type of custom check in the pipeline
Expand All @@ -336,7 +341,7 @@ func setupExecutors(s *scheduler.Scheduler, config *config.Config, conn *connect
}

if s.WillRunTaskOfType(pipeline.AssetTypeBigqueryQuery) || estimateCustomCheckType == pipeline.AssetTypeBigqueryQuery {
bqOperator := bigquery.NewBasicOperator(conn, wholeFileExtractor, bigquery.NewMaterializer())
bqOperator := bigquery.NewBasicOperator(conn, wholeFileExtractor, bigquery.NewMaterializer(fullRefresh))

bqCheckRunner, err := bigquery.NewColumnCheckOperator(conn)
if err != nil {
Expand All @@ -363,7 +368,7 @@ func setupExecutors(s *scheduler.Scheduler, config *config.Config, conn *connect
s.WillRunTaskOfType(pipeline.AssetTypeRedshiftQuery) || estimateCustomCheckType == pipeline.AssetTypeRedshiftQuery {
pgCustomCheckRunner := ansisql.NewCustomCheckOperator(conn)
pgCheckRunner := postgres.NewColumnCheckOperator(conn)
pgOperator := postgres.NewBasicOperator(conn, wholeFileExtractor, postgres.NewMaterializer())
pgOperator := postgres.NewBasicOperator(conn, wholeFileExtractor, postgres.NewMaterializer(fullRefresh))

mainExecutors[pipeline.AssetTypeRedshiftQuery][scheduler.TaskInstanceTypeMain] = pgOperator
mainExecutors[pipeline.AssetTypeRedshiftQuery][scheduler.TaskInstanceTypeColumnCheck] = pgCheckRunner
Expand All @@ -382,7 +387,7 @@ func setupExecutors(s *scheduler.Scheduler, config *config.Config, conn *connect

shouldInitiateSnowflake := s.WillRunTaskOfType(pipeline.AssetTypeSnowflakeQuery) || s.WillRunTaskOfType(pipeline.AssetTypeSnowflakeQuerySensor) || estimateCustomCheckType == pipeline.AssetTypeSnowflakeQuery
if shouldInitiateSnowflake {
sfOperator := snowflake.NewBasicOperator(conn, wholeFileExtractor, snowflake.NewMaterializer())
sfOperator := snowflake.NewBasicOperator(conn, wholeFileExtractor, snowflake.NewMaterializer(fullRefresh))

sfCheckRunner := snowflake.NewColumnCheckOperator(conn)
sfCustomCheckRunner := ansisql.NewCustomCheckOperator(conn)
Expand All @@ -405,8 +410,8 @@ func setupExecutors(s *scheduler.Scheduler, config *config.Config, conn *connect

if s.WillRunTaskOfType(pipeline.AssetTypeMsSQLQuery) || estimateCustomCheckType == pipeline.AssetTypeMsSQLQuery ||
s.WillRunTaskOfType(pipeline.AssetTypeSynapseQuery) || estimateCustomCheckType == pipeline.AssetTypeSynapseQuery {
msOperator := mssql.NewBasicOperator(conn, wholeFileExtractor, mssql.NewMaterializer())
synapseOperator := synapse.NewBasicOperator(conn, wholeFileExtractor, synapse.NewMaterializer())
msOperator := mssql.NewBasicOperator(conn, wholeFileExtractor, mssql.NewMaterializer(fullRefresh))
synapseOperator := synapse.NewBasicOperator(conn, wholeFileExtractor, synapse.NewMaterializer(fullRefresh))

msCheckRunner := mssql.NewColumnCheckOperator(conn)

Expand Down
3 changes: 2 additions & 1 deletion pkg/bigquery/materialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ var matMap = pipeline.AssetMaterializationMap{
},
}

func NewMaterializer() *pipeline.Materializer {
func NewMaterializer(fullRefresh bool) *pipeline.Materializer {
return &pipeline.Materializer{
MaterializationMap: matMap,
FullRefresh: fullRefresh,
}
}

Expand Down
26 changes: 20 additions & 6 deletions pkg/bigquery/materialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
func TestMaterializer_Render(t *testing.T) {
t.Parallel()
tests := []struct {
name string
task *pipeline.Asset
query string
want string
wantErr bool
name string
task *pipeline.Asset
query string
want string
wantErr bool
fullRefresh bool
}{
{
name: "no materialization, return raw query",
Expand Down Expand Up @@ -45,6 +46,19 @@ func TestMaterializer_Render(t *testing.T) {
query: "SELECT 1",
want: "CREATE OR REPLACE TABLE `my.asset` AS\nSELECT 1",
},
{
name: "materialize to a table, no partition or cluster, full refresh results in create+replace",
task: &pipeline.Asset{
Name: "my.asset",
Materialization: pipeline.Materialization{
Type: pipeline.MaterializationTypeTable,
Strategy: pipeline.MaterializationStrategyMerge,
},
},
fullRefresh: true,
query: "SELECT 1",
want: "CREATE OR REPLACE TABLE `my.asset` AS\nSELECT 1",
},
{
name: "materialize to a table with partition, no cluster",
task: &pipeline.Asset{
Expand Down Expand Up @@ -218,7 +232,7 @@ func TestMaterializer_Render(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

m := NewMaterializer()
m := NewMaterializer(tt.fullRefresh)
render, err := m.Render(tt.task, tt.query)

if tt.wantErr {
Expand Down
3 changes: 2 additions & 1 deletion pkg/mssql/materialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ var matMap = pipeline.AssetMaterializationMap{
},
}

func NewMaterializer() *pipeline.Materializer {
func NewMaterializer(fullRefresh bool) *pipeline.Materializer {
return &pipeline.Materializer{
MaterializationMap: matMap,
FullRefresh: fullRefresh,
}
}

Expand Down
29 changes: 23 additions & 6 deletions pkg/mssql/materialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
func TestMaterializer_Render(t *testing.T) {
t.Parallel()
tests := []struct {
name string
task *pipeline.Asset
query string
want string
wantErr bool
name string
task *pipeline.Asset
query string
want string
wantErr bool
fullRefresh bool
}{
{
name: "no materialization, return raw query",
Expand Down Expand Up @@ -48,6 +49,22 @@ func TestMaterializer_Render(t *testing.T) {
"SELECT tmp\\.\\* INTO my.asset FROM \\(SELECT 1\\) AS tmp;\n" +
"COMMIT;",
},
{
name: "materialize to a table, no partition or cluster, full refresh defaults to create+replace",
task: &pipeline.Asset{
Name: "my.asset",
Materialization: pipeline.Materialization{
Type: pipeline.MaterializationTypeTable,
Strategy: pipeline.MaterializationStrategyMerge,
},
},
fullRefresh: true,
query: "SELECT 1",
want: "BEGIN TRANSACTION;\n" +
"DROP TABLE IF EXISTS my\\.asset;\n" +
"SELECT tmp\\.\\* INTO my.asset FROM \\(SELECT 1\\) AS tmp;\n" +
"COMMIT;",
},
{
name: "materialize to a table with cluster, single field to cluster",
task: &pipeline.Asset{
Expand Down Expand Up @@ -181,7 +198,7 @@ func TestMaterializer_Render(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

m := NewMaterializer()
m := NewMaterializer(tt.fullRefresh)
render, err := m.Render(tt.task, tt.query)

if tt.wantErr {
Expand Down
10 changes: 8 additions & 2 deletions pkg/pipeline/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type (

type Materializer struct {
MaterializationMap AssetMaterializationMap
FullRefresh bool
}

func (m *Materializer) Render(asset *Asset, query string) (string, error) {
Expand All @@ -20,7 +21,12 @@ func (m *Materializer) Render(asset *Asset, query string) (string, error) {
return removeComments(query), nil
}

if matFunc, ok := m.MaterializationMap[mat.Type][mat.Strategy]; ok {
strategy := mat.Strategy
if m.FullRefresh && mat.Type == MaterializationTypeTable && mat.Strategy != MaterializationStrategyNone {
strategy = MaterializationStrategyCreateReplace
}

if matFunc, ok := m.MaterializationMap[mat.Type][strategy]; ok {
materializedQuery, err := matFunc(asset, query)
if err != nil {
return "", err
Expand All @@ -34,7 +40,7 @@ func (m *Materializer) Render(asset *Asset, query string) (string, error) {

func removeComments(query string) string {
bytes := []byte(query)
re := regexp.MustCompile("(?s)/\\*.*?\\*/")
re := regexp.MustCompile(`/\* *@bruin[\s\w\S]*@bruin *\*/`)
newBytes := re.ReplaceAll(bytes, []byte(""))
return string(newBytes)
}
66 changes: 54 additions & 12 deletions pkg/pipeline/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,62 @@ import (
func TestMaterializer_Render(t *testing.T) {
t.Parallel()

materializer := Materializer{
MaterializationMap: make(map[MaterializationType]map[MaterializationStrategy]MaterializerFunc),
}

asset := &Asset{
Materialization: Materialization{
Type: MaterializationTypeNone,
tests := []struct {
name string
matMap AssetMaterializationMap
fullRefresh bool
query string
expected string
}{
{
name: "no full refresh, remove comments",
matMap: AssetMaterializationMap{
MaterializationTypeTable: {
MaterializationStrategyMerge: func(task *Asset, query string) (string, error) {
return query, nil
},
},
},
fullRefresh: false,
query: "/* @bruin some yaml @bruin*/SELECT * FROM table",
expected: "SELECT * FROM table",
},
{
name: "full refresh",
matMap: AssetMaterializationMap{
MaterializationTypeTable: {
MaterializationStrategyCreateReplace: func(task *Asset, query string) (string, error) {
return "SELECT 1;" + query, nil
},
},
},
fullRefresh: true,
query: "SELECT * FROM table",
expected: "SELECT 1;SELECT * FROM table",
},
}

query := "/* @bruin some yaml @bruin*/SELECT * FROM table"
expected := "SELECT * FROM table"
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

materializer := Materializer{
MaterializationMap: tt.matMap,
FullRefresh: tt.fullRefresh,
}

result, err := materializer.Render(asset, query)
require.NoError(t, err)
assert.Equal(t, expected, result)
asset := &Asset{
Materialization: Materialization{
Type: MaterializationTypeTable,
Strategy: MaterializationStrategyMerge,
},
}

result, err := materializer.Render(asset, tt.query)

require.NoError(t, err)
assert.Equal(t, tt.expected, result)
})
}
}
3 changes: 2 additions & 1 deletion pkg/postgres/materialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"github.com/bruin-data/bruin/pkg/pipeline"
)

func NewMaterializer() *pipeline.Materializer {
func NewMaterializer(fullRefresh bool) *pipeline.Materializer {
return &pipeline.Materializer{
MaterializationMap: matMap,
FullRefresh: fullRefresh,
}
}

Expand Down
29 changes: 23 additions & 6 deletions pkg/postgres/materialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
func TestMaterializer_Render(t *testing.T) {
t.Parallel()
tests := []struct {
name string
task *pipeline.Asset
query string
want string
wantErr bool
name string
task *pipeline.Asset
query string
want string
wantErr bool
fullRefresh bool
}{
{
name: "no materialization, return raw query",
Expand Down Expand Up @@ -46,6 +47,22 @@ func TestMaterializer_Render(t *testing.T) {
want: `BEGIN TRANSACTION;
DROP TABLE IF EXISTS my.asset;
CREATE TABLE my.asset AS SELECT 1;
COMMIT;`,
},
{
name: "materialize to a table, full refresh defaults to create+replace",
task: &pipeline.Asset{
Name: "my.asset",
Materialization: pipeline.Materialization{
Type: pipeline.MaterializationTypeTable,
Strategy: pipeline.MaterializationStrategyMerge,
},
},
fullRefresh: true,
query: "SELECT 1",
want: `BEGIN TRANSACTION;
DROP TABLE IF EXISTS my.asset;
CREATE TABLE my.asset AS SELECT 1;
COMMIT;`,
},
{
Expand Down Expand Up @@ -155,7 +172,7 @@ COMMIT;`,
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

m := NewMaterializer()
m := NewMaterializer(tt.fullRefresh)
render, err := m.Render(tt.task, tt.query)

if tt.wantErr {
Expand Down
Loading

0 comments on commit 6147b09

Please sign in to comment.