Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schema tracking support for UDFs #15705

Merged
merged 13 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions changelog/20.0/20.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [Update with Multi Target Support](#update-multi-target)
- [Delete with Subquery Support](#delete-subquery)
- [Delete with Multi Target Support](#delete-multi-target)
- [User Defined Functions Support](#udf-support)
- **[Flag changes](#flag-changes)**
- [`pprof-http` default change](#pprof-http-default)
- [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag)
Expand Down Expand Up @@ -183,6 +184,15 @@ Example: `delete t1, t3 from t1 join t2 on t1.id = t2.id join t3 on t1.col = t3.

More details about how it works is available in [MySQL Docs](https://dev.mysql.com/doc/refman/8.0/en/delete.html)

#### <a id="udf-support"/> User Defined Functions Support

VTGate can track any user defined functions for better planning.
User Defined Functions (UDFs) should be directly loaded in the underlying MySQL.

It should be enabled in VTGate with the `--enable-udfs` flag.

More details about how to load UDFs is available in [MySQL Docs](https://dev.mysql.com/doc/extending-mysql/8.0/en/adding-loadable-function.html)

### <a id="flag-changes"/>Flag Changes

#### <a id="pprof-http-default"/> `pprof-http` Default Change
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Flags:
--enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false)
--enable-per-workload-table-metrics If true, query counts and query error metrics include a label that identifies the workload
--enable-tx-throttler Synonym to -enable_tx_throttler
--enable-udfs Enable UDFs support in vtgate.
--enable-views Enable views support in vtgate.
--enable_buffer Enable buffering (stalling) of primary traffic during failovers.
--enable_buffer_dry_run Detect and log failover events, but do not actually buffer requests.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Flags:
--discovery_low_replication_lag duration Threshold below which replication lag is considered low enough to be healthy. (default 30s)
--emit_stats If set, emit stats to push-based monitoring and stats backends
--enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false)
--enable-udfs Enable UDFs support in vtgate.
--enable-views Enable views support in vtgate.
--enable_buffer Enable buffering (stalling) of primary traffic during failovers.
--enable_buffer_dry_run Detect and log failover events, but do not actually buffer requests.
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/sidecardb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ var numSidecarDBTables int
var ddls1, ddls2 []string

func init() {
sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action", "redo_state",
"redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", "tables",
"vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"}
sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action",
"redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version",
"tables", "udfs", "vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"}
numSidecarDBTables = len(sidecarDBTables)
ddls1 = []string{
"drop table _vt.vreplication_log",
Expand Down
306 changes: 161 additions & 145 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions go/vt/sidecardb/schema/schemaengine/udfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

CREATE TABLE IF NOT EXISTS udfs
(
FUNCTION_NAME varchar(128) CHARACTER SET `utf8mb3` COLLATE `utf8mb3_bin` NOT NULL,
FUNCTION_RETURN_TYPE varchar(20) CHARACTER SET `utf8mb3` COLLATE `utf8mb3_bin` NOT NULL,
FUNCTION_TYPE varchar(20) CHARACTER SET `utf8mb3` COLLATE `utf8mb3_bin` NOT NULL,
PRIMARY KEY (FUNCTION_NAME)
) engine = InnoDB
72 changes: 65 additions & 7 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package schema
import (
"context"
"maps"
"slices"
"strings"
"sync"
"time"
Expand All @@ -45,6 +46,7 @@ type (
mu sync.Mutex
tables *tableMap
views *viewMap
udfs map[keyspaceStr][]string
ctx context.Context
signal func() // a function that we'll call whenever we have new schema data

Expand All @@ -60,7 +62,7 @@ type (
const defaultConsumeDelay = 1 * time.Second

// NewTracker creates the tracker object.
func NewTracker(ch chan *discovery.TabletHealth, enableViews bool, parser *sqlparser.Parser) *Tracker {
func NewTracker(ch chan *discovery.TabletHealth, enableViews, enableUDFs bool, parser *sqlparser.Parser) *Tracker {
t := &Tracker{
ctx: context.Background(),
ch: ch,
Expand All @@ -73,6 +75,9 @@ func NewTracker(ch chan *discovery.TabletHealth, enableViews bool, parser *sqlpa
if enableViews {
t.views = &viewMap{m: map[keyspaceStr]map[viewNameStr]sqlparser.SelectStatement{}, parser: parser}
}
if enableUDFs {
t.udfs = map[keyspaceStr][]string{}
}
return t
}

Expand All @@ -86,6 +91,10 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T
if err != nil {
return err
}
err = t.loadUDFs(conn, target)
if err != nil {
return err
}

t.tracked[target.Keyspace].setLoaded(true)
return nil
Expand Down Expand Up @@ -146,6 +155,32 @@ func (t *Tracker) loadViews(conn queryservice.QueryService, target *querypb.Targ
return nil
}

func (t *Tracker) loadUDFs(conn queryservice.QueryService, target *querypb.Target) error {
if t.udfs == nil {
// This happens only when UDFs are not enabled.
return nil
}

t.mu.Lock()
defer t.mu.Unlock()

err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_UDF_AGGREGATE, nil, func(schemaRes *querypb.GetSchemaResponse) error {
var udfs []string
for name := range schemaRes.TableDefinition {
udfs = append(udfs, name)
}

t.udfs[target.Keyspace] = udfs
return nil
})
if err != nil {
log.Errorf("error fetching new UDFs for %v: %w", target.Keyspace, err)
return err
}
log.Infof("finished loading UDFs for keyspace %s", target.Keyspace)
return nil
}

// Start starts the schema tracking.
func (t *Tracker) Start() {
log.Info("Starting schema tracking")
Expand Down Expand Up @@ -208,6 +243,9 @@ func (t *Tracker) GetColumns(ks string, tbl string) []vindexes.Column {
defer t.mu.Unlock()

tblInfo := t.tables.get(ks, tbl)
if tblInfo == nil {
return nil
}
return tblInfo.Columns
}

Expand Down Expand Up @@ -244,27 +282,47 @@ func (t *Tracker) Tables(ks string) map[string]*vindexes.TableInfo {

// Views returns all known views in the keyspace with their definition.
func (t *Tracker) Views(ks string) map[string]sqlparser.SelectStatement {
t.mu.Lock()
defer t.mu.Unlock()

if t.views == nil {
return nil
}

t.mu.Lock()
defer t.mu.Unlock()

m := t.views.m[ks]
return maps.Clone(m)
}

func (t *Tracker) UDFs(ks string) []string {
if t.udfs == nil {
return nil
}

t.mu.Lock()
defer t.mu.Unlock()

return slices.Clone(t.udfs[ks])
}

func (t *Tracker) updateSchema(th *discovery.TabletHealth) bool {
success := true
if th.Stats.TableSchemaChanged != nil {
success = t.updatedTableSchema(th)
}
if !success || th.Stats.ViewSchemaChanged == nil {
return success
if !success {
return false
}

// there is view definition change in the tablet
return t.updatedViewSchema(th)
if th.Stats.ViewSchemaChanged != nil {
success = t.updatedViewSchema(th)
}

if !success || !th.Stats.UdfsChanged {
return success
}

return t.loadUDFs(th.Conn, th.Target) == nil
}

func (t *Tracker) updatedTableSchema(th *discovery.TabletHealth) bool {
Expand Down
Loading
Loading