From 4d73aab0cc404516cc15bc09072bd9862a16dc1c Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 31 Oct 2024 01:27:05 +0100 Subject: [PATCH] `vtorc`: use sqlite3 SQL dialect only Signed-off-by: Tim Vaillancourt --- .../external/golib/sqlutils/sqlite_dialect.go | 134 -------- .../golib/sqlutils/sqlite_dialect_test.go | 314 ------------------ .../sqlutils/{dialect.go => statement.go} | 31 +- .../external/golib/sqlutils/statement_test.go | 71 ++++ go/vt/vtorc/db/db.go | 24 +- go/vt/vtorc/db/generate_base.go | 94 +++--- go/vt/vtorc/inst/analysis_dao.go | 302 +++++++++-------- go/vt/vtorc/inst/analysis_dao_test.go | 20 +- go/vt/vtorc/inst/audit_dao.go | 12 +- go/vt/vtorc/inst/instance_dao.go | 161 +++++---- go/vt/vtorc/inst/instance_dao_test.go | 68 ++-- go/vt/vtorc/inst/keyspace_dao.go | 12 +- go/vt/vtorc/inst/shard_dao.go | 12 +- go/vt/vtorc/inst/tablet_dao.go | 12 +- go/vt/vtorc/logic/disable_recovery.go | 22 +- go/vt/vtorc/logic/tablet_discovery.go | 17 +- go/vt/vtorc/logic/topology_recovery_dao.go | 93 +++--- .../vtorc/logic/topology_recovery_dao_test.go | 18 +- go/vt/vtorc/process/health.go | 2 +- 19 files changed, 501 insertions(+), 918 deletions(-) delete mode 100644 go/vt/external/golib/sqlutils/sqlite_dialect.go delete mode 100644 go/vt/external/golib/sqlutils/sqlite_dialect_test.go rename go/vt/external/golib/sqlutils/{dialect.go => statement.go} (56%) create mode 100644 go/vt/external/golib/sqlutils/statement_test.go diff --git a/go/vt/external/golib/sqlutils/sqlite_dialect.go b/go/vt/external/golib/sqlutils/sqlite_dialect.go deleted file mode 100644 index 11f0e531367..00000000000 --- a/go/vt/external/golib/sqlutils/sqlite_dialect.go +++ /dev/null @@ -1,134 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -// What's this about? -// This is a brute-force regular-expression based conversion from MySQL syntax to sqlite3 syntax. -// It is NOT meant to be a general purpose solution and is only expected & confirmed to run on -// queries issued by orchestrator. There are known limitations to this design. -// It's not even pretty. -// In fact... -// Well, it gets the job done at this time. Call it debt. - -package sqlutils - -import ( - "regexp" -) - -var sqlite3CreateTableConversions = []regexpMap{ - rmap(`(?i) (character set|charset) [\S]+`, ``), - rmap(`(?i)int unsigned`, `int`), - rmap(`(?i)int[\s]*[(][\s]*([0-9]+)[\s]*[)] unsigned`, `int`), - rmap(`(?i)engine[\s]*=[\s]*(innodb|myisam|ndb|memory|tokudb)`, ``), - rmap(`(?i)DEFAULT CHARSET[\s]*=[\s]*[\S]+`, ``), - rmap(`(?i)[\S]*int( not null|) auto_increment`, `integer`), - rmap(`(?i)comment '[^']*'`, ``), - rmap(`(?i)after [\S]+`, ``), - rmap(`(?i)alter table ([\S]+) add (index|key) ([\S]+) (.+)`, `create index ${3}_${1} on $1 $4`), - rmap(`(?i)alter table ([\S]+) add unique (index|key) ([\S]+) (.+)`, `create unique index ${3}_${1} on $1 $4`), - rmap(`(?i)([\S]+) enum[\s]*([(].*?[)])`, `$1 text check($1 in $2)`), - rmap(`(?i)([\s\S]+[/][*] sqlite3-skip [*][/][\s\S]+)`, ``), - rmap(`(?i)timestamp default current_timestamp`, `timestamp default ('')`), - rmap(`(?i)timestamp not null default current_timestamp`, `timestamp not null default ('')`), - - rmap(`(?i)add column (.*int) not null[\s]*$`, `add column $1 not null default 0`), - rmap(`(?i)add column (.* text) not null[\s]*$`, `add column $1 not null default ''`), - rmap(`(?i)add column (.* varchar.*) not null[\s]*$`, `add column $1 not null default ''`), -} - -var sqlite3InsertConversions = []regexpMap{ - rmap(`(?i)insert ignore ([\s\S]+) on duplicate key update [\s\S]+`, `insert or ignore $1`), - rmap(`(?i)insert ignore`, `insert or ignore`), - rmap(`(?i)now[(][)]`, `datetime('now')`), - rmap(`(?i)insert into ([\s\S]+) on duplicate key update [\s\S]+`, `replace into $1`), -} - -var sqlite3GeneralConversions = []regexpMap{ - rmap(`(?i)now[(][)][\s]*[-][\s]*interval [?] ([\w]+)`, `datetime('now', printf('-%d $1', ?))`), - rmap(`(?i)now[(][)][\s]*[+][\s]*interval [?] ([\w]+)`, `datetime('now', printf('+%d $1', ?))`), - rmap(`(?i)now[(][)][\s]*[-][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '-${1} $2')`), - rmap(`(?i)now[(][)][\s]*[+][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '+${1} $2')`), - - rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[-][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('-%d $2', ?))`), - rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[+][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('+%d $2', ?))`), - - rmap(`(?i)unix_timestamp[(][)]`, `strftime('%s', 'now')`), - rmap(`(?i)unix_timestamp[(]([^)]+)[)]`, `strftime('%s', $1)`), - rmap(`(?i)now[(][)]`, `datetime('now')`), - rmap(`(?i)cast[(][\s]*([\S]+) as signed[\s]*[)]`, `cast($1 as integer)`), - - rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2)`), - rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2 || $3)`), - - rmap(`(?i) rlike `, ` like `), - - rmap(`(?i)create index([\s\S]+)[(][\s]*[0-9]+[\s]*[)]([\s\S]+)`, `create index ${1}${2}`), - rmap(`(?i)drop index ([\S]+) on ([\S]+)`, `drop index if exists $1`), -} - -var ( - sqlite3IdentifyCreateTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create table`)) - sqlite3IdentifyCreateIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create( unique|) index`)) - sqlite3IdentifyDropIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*drop index`)) - sqlite3IdentifyAlterTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*alter table`)) - sqlite3IdentifyInsertStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*(insert|replace)`)) -) - -func IsInsert(statement string) bool { - return sqlite3IdentifyInsertStatement.MatchString(statement) -} - -func IsCreateTable(statement string) bool { - return sqlite3IdentifyCreateTableStatement.MatchString(statement) -} - -func IsCreateIndex(statement string) bool { - return sqlite3IdentifyCreateIndexStatement.MatchString(statement) -} - -func IsDropIndex(statement string) bool { - return sqlite3IdentifyDropIndexStatement.MatchString(statement) -} - -func IsAlterTable(statement string) bool { - return sqlite3IdentifyAlterTableStatement.MatchString(statement) -} - -func ToSqlite3CreateTable(statement string) string { - return applyConversions(statement, sqlite3CreateTableConversions) -} - -func ToSqlite3Insert(statement string) string { - return applyConversions(statement, sqlite3InsertConversions) -} - -func ToSqlite3Dialect(statement string) (translated string) { - if IsCreateTable(statement) { - return ToSqlite3CreateTable(statement) - } - if IsAlterTable(statement) { - return ToSqlite3CreateTable(statement) - } - statement = applyConversions(statement, sqlite3GeneralConversions) - if IsInsert(statement) { - return ToSqlite3Insert(statement) - } - return statement -} diff --git a/go/vt/external/golib/sqlutils/sqlite_dialect_test.go b/go/vt/external/golib/sqlutils/sqlite_dialect_test.go deleted file mode 100644 index 1298c379adf..00000000000 --- a/go/vt/external/golib/sqlutils/sqlite_dialect_test.go +++ /dev/null @@ -1,314 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -package sqlutils - -import ( - "regexp" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var spacesRegexp = regexp.MustCompile(`[\s]+`) - -func init() { -} - -func stripSpaces(statement string) string { - statement = strings.TrimSpace(statement) - statement = spacesRegexp.ReplaceAllString(statement, " ") - return statement -} - -func TestIsCreateTable(t *testing.T) { - require.True(t, IsCreateTable("create table t(id int)")) - require.True(t, IsCreateTable(" create table t(id int)")) - require.True(t, IsCreateTable("CREATE TABLE t(id int)")) - require.True(t, IsCreateTable(` - create table t(id int) - `)) - require.False(t, IsCreateTable("where create table t(id int)")) - require.False(t, IsCreateTable("insert")) -} - -func TestToSqlite3CreateTable(t *testing.T) { - { - statement := "create table t(id int)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, statement) - } - { - statement := "create table t(id int, v varchar(123) CHARACTER SET ascii NOT NULL default '')" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(id int, v varchar(123) NOT NULL default '')") - } - { - statement := "create table t(id int, v varchar ( 123 ) CHARACTER SET ascii NOT NULL default '')" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(id int, v varchar ( 123 ) NOT NULL default '')") - } - { - statement := "create table t(i smallint unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } - { - statement := "create table t(i smallint(5) unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } - { - statement := "create table t(i smallint ( 5 ) unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } -} - -func TestToSqlite3AlterTable(t *testing.T) { - { - statement := ` - ALTER TABLE - database_instance - ADD COLUMN sql_delay INT UNSIGNED NOT NULL AFTER replica_lag_seconds - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - ALTER TABLE - database_instance - add column sql_delay int not null default 0 - `)) - } - { - statement := ` - ALTER TABLE - database_instance - ADD INDEX source_host_port_idx (source_host, source_port) - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - create index - source_host_port_idx_database_instance - on database_instance (source_host, source_port) - `)) - } - { - statement := ` - ALTER TABLE - topology_recovery - ADD KEY last_detection_idx (last_detection_id) - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - create index - last_detection_idx_topology_recovery - on topology_recovery (last_detection_id) - `)) - } - -} - -func TestCreateIndex(t *testing.T) { - { - statement := ` - create index - source_host_port_idx_database_instance - on database_instance (source_host(128), source_port) - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - create index - source_host_port_idx_database_instance - on database_instance (source_host, source_port) - `)) - } -} - -func TestIsInsert(t *testing.T) { - require.True(t, IsInsert("insert into t")) - require.True(t, IsInsert("insert ignore into t")) - require.True(t, IsInsert(` - insert ignore into t - `)) - require.False(t, IsInsert("where create table t(id int)")) - require.False(t, IsInsert("create table t(id int)")) - require.True(t, IsInsert(` - insert into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - on duplicate key update - domain_name=values(domain_name), - last_registered=values(last_registered) - `)) -} - -func TestToSqlite3Insert(t *testing.T) { - { - statement := ` - insert into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - on duplicate key update - domain_name=values(domain_name), - last_registered=values(last_registered) - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - replace into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - `)) - } -} - -func TestToSqlite3GeneralConversions(t *testing.T) { - { - statement := "select now()" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime('now')") - } - { - statement := "select now() - interval ? second" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime('now', printf('-%d second', ?))") - } - { - statement := "select now() + interval ? minute" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime('now', printf('+%d minute', ?))") - } - { - statement := "select now() + interval 5 minute" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime('now', '+5 minute')") - } - { - statement := "select some_table.some_column + interval ? minute" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime(some_table.some_column, printf('+%d minute', ?))") - } - { - statement := "AND primary_instance.last_attempted_check <= primary_instance.last_seen + interval ? minute" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "AND primary_instance.last_attempted_check <= datetime(primary_instance.last_seen, printf('+%d minute', ?))") - } - { - statement := "select concat(primary_instance.port, '') as port" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select (primary_instance.port || '') as port") - } - { - statement := "select concat( 'abc' , 'def') as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select ('abc' || 'def') as s") - } - { - statement := "select concat( 'abc' , 'def', last.col) as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select ('abc' || 'def' || last.col) as s") - } - { - statement := "select concat(myself.only) as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select concat(myself.only) as s") - } - { - statement := "select concat(1, '2', 3, '4') as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select concat(1, '2', 3, '4') as s") - } - { - statement := "select group_concat( 'abc' , 'def') as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select group_concat( 'abc' , 'def') as s") - } -} - -func TestIsCreateIndex(t *testing.T) { - tests := []struct { - input string - expected bool - }{ - {"create index my_index on my_table(column);", true}, - {"CREATE INDEX my_index ON my_table(column);", true}, - {"create unique index my_index on my_table(column);", true}, - {"CREATE UNIQUE INDEX my_index ON my_table(column);", true}, - {"create index my_index on my_table(column) where condition;", true}, - {"create unique index my_index on my_table(column) where condition;", true}, - {"create table my_table(column);", false}, - {"drop index my_index on my_table;", false}, - {"alter table my_table add index my_index (column);", false}, - {"", false}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := IsCreateIndex(test.input) - assert.Equal(t, test.expected, result) - }) - } -} - -func TestIsDropIndex(t *testing.T) { - tests := []struct { - input string - expected bool - }{ - {"drop index my_index on my_table;", true}, - {"DROP INDEX my_index ON my_table;", true}, - {"drop index if exists my_index on my_table;", true}, - {"DROP INDEX IF EXISTS my_index ON my_table;", true}, - {"drop table my_table;", false}, - {"create index my_index on my_table(column);", false}, - {"alter table my_table add index my_index (column);", false}, - {"", false}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := IsDropIndex(test.input) - assert.Equal(t, test.expected, result) - }) - } -} - -func TestToSqlite3Dialect(t *testing.T) { - tests := []struct { - input string - expected string - }{ - {"create table my_table(id int);", "create table my_table(id int);"}, - {"alter table my_table add column new_col int;", "alter table my_table add column new_col int;"}, - {"insert into my_table values (1);", "insert into my_table values (1);"}, - {"", ""}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := ToSqlite3Dialect(test.input) - assert.Equal(t, test.expected, result) - }) - } -} diff --git a/go/vt/external/golib/sqlutils/dialect.go b/go/vt/external/golib/sqlutils/statement.go similarity index 56% rename from go/vt/external/golib/sqlutils/dialect.go rename to go/vt/external/golib/sqlutils/statement.go index 8dabe57ccaf..078bbb60cb4 100644 --- a/go/vt/external/golib/sqlutils/dialect.go +++ b/go/vt/external/golib/sqlutils/statement.go @@ -25,29 +25,24 @@ import ( "strings" ) -type regexpMap struct { - r *regexp.Regexp - replacement string +func regexpSpaces(statement string) string { + return strings.Replace(statement, " ", `[\s]+`, -1) } -func (this *regexpMap) process(text string) (result string) { - return this.r.ReplaceAllString(text, this.replacement) -} +var ( + sqlite3IdentifyCreateIndexStatement = regexp.MustCompile(regexpSpaces(`^[\s]*create( unique|) index`)) + sqlite3IdentifyDropIndexStatement = regexp.MustCompile(regexpSpaces(`^[\s]*drop index`)) + sqlite3IdentifyAlterTableStatement = regexp.MustCompile(regexpSpaces(`^[\s]*alter table`)) +) -func rmap(regexpExpression string, replacement string) regexpMap { - return regexpMap{ - r: regexp.MustCompile(regexpSpaces(regexpExpression)), - replacement: replacement, - } +func IsCreateIndex(statement string) bool { + return sqlite3IdentifyCreateIndexStatement.MatchString(statement) } -func regexpSpaces(statement string) string { - return strings.Replace(statement, " ", `[\s]+`, -1) +func IsDropIndex(statement string) bool { + return sqlite3IdentifyDropIndexStatement.MatchString(statement) } -func applyConversions(statement string, conversions []regexpMap) string { - for _, rmap := range conversions { - statement = rmap.process(statement) - } - return statement +func IsAlterTable(statement string) bool { + return sqlite3IdentifyAlterTableStatement.MatchString(statement) } diff --git a/go/vt/external/golib/sqlutils/statement_test.go b/go/vt/external/golib/sqlutils/statement_test.go new file mode 100644 index 00000000000..38ff1125e2f --- /dev/null +++ b/go/vt/external/golib/sqlutils/statement_test.go @@ -0,0 +1,71 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* + This file has been copied over from VTOrc package +*/ + +package sqlutils + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsCreateIndex(t *testing.T) { + tests := []struct { + input string + expected bool + }{ + {"create index my_index on my_table(column);", true}, + {"create unique index my_index on my_table(column);", true}, + {"create index my_index on my_table(column) where condition;", true}, + {"create unique index my_index on my_table(column) where condition;", true}, + {"create table my_table(column);", false}, + {"drop index my_index on my_table;", false}, + {"alter table my_table add index my_index (column);", false}, + {"", false}, + } + + for _, test := range tests { + t.Run(test.input, func(t *testing.T) { + result := IsCreateIndex(test.input) + assert.Equal(t, test.expected, result) + }) + } +} + +func TestIsDropIndex(t *testing.T) { + tests := []struct { + input string + expected bool + }{ + {"drop index my_index on my_table;", true}, + {"drop index if exists my_index on my_table;", true}, + {"drop table my_table;", false}, + {"create index my_index on my_table(column);", false}, + {"alter table my_table add index my_index (column);", false}, + {"", false}, + } + + for _, test := range tests { + t.Run(test.input, func(t *testing.T) { + result := IsDropIndex(test.input) + assert.Equal(t, test.expected, result) + }) + } +} diff --git a/go/vt/vtorc/db/db.go b/go/vt/vtorc/db/db.go index 00f5b5b2550..92d6a0c6d58 100644 --- a/go/vt/vtorc/db/db.go +++ b/go/vt/vtorc/db/db.go @@ -57,19 +57,13 @@ func OpenVTOrc() (db *sql.DB, err error) { return db, err } -func translateStatement(statement string) string { - return sqlutils.ToSqlite3Dialect(statement) -} - // registerVTOrcDeployment updates the vtorc_db_deployments table upon successful deployment func registerVTOrcDeployment(db *sql.DB) error { - query := ` - replace into vtorc_db_deployments ( - deployed_version, deployed_timestamp - ) values ( - ?, NOW() - ) - ` + query := `replace into vtorc_db_deployments ( + deployed_version, deployed_timestamp + ) values ( + ?, datetime('now') + )` if _, err := execInternal(db, query, ""); err != nil { log.Fatalf("Unable to write to vtorc_db_deployments: %+v", err) } @@ -84,7 +78,6 @@ func deployStatements(db *sql.DB, queries []string) error { log.Fatal(err.Error()) } for _, query := range queries { - query = translateStatement(query) if _, err := tx.Exec(query); err != nil { if strings.Contains(err.Error(), "syntax error") { log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) @@ -134,10 +127,7 @@ func initVTOrcDB(db *sql.DB) error { // execInternal func execInternal(db *sql.DB, query string, args ...any) (sql.Result, error) { - var err error - query = translateStatement(query) - res, err := sqlutils.ExecNoPrepare(db, query, args...) - return res, err + return sqlutils.ExecNoPrepare(db, query, args...) } // ExecVTOrc will execute given query on the vtorc backend database. @@ -151,7 +141,6 @@ func ExecVTOrc(query string, args ...any) (sql.Result, error) { // QueryVTOrcRowsMap func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { - query = translateStatement(query) db, err := OpenVTOrc() if err != nil { return err @@ -162,7 +151,6 @@ func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { // QueryVTOrc func QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error { - query = translateStatement(query) db, err := OpenVTOrc() if err != nil { return err diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index f997dc6ac0a..08faff973aa 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -37,10 +37,10 @@ var TableNames = []string{ // vtorcBackend is a list of SQL statements required to build the vtorc backend var vtorcBackend = []string{ ` -DROP TABLE IF EXISTS database_instance +drop table if exists database_instance `, ` -CREATE TABLE database_instance ( +create table database_instance ( alias varchar(256) NOT NULL, hostname varchar(128) NOT NULL, port smallint NOT NULL, @@ -110,16 +110,16 @@ CREATE TABLE database_instance ( PRIMARY KEY (alias) )`, ` -CREATE INDEX last_checked_idx_database_instance ON database_instance(last_checked) +create index last_checked_idx_database_instance ON database_instance(last_checked) `, ` -CREATE INDEX last_seen_idx_database_instance ON database_instance(last_seen) +create index last_seen_idx_database_instance ON database_instance(last_seen) `, ` -DROP TABLE IF EXISTS audit +drop table if exists audit `, ` -CREATE TABLE audit ( +create table audit ( audit_id integer, audit_timestamp timestamp not null default (''), audit_type varchar(128) NOT NULL, @@ -130,23 +130,23 @@ CREATE TABLE audit ( PRIMARY KEY (audit_id) )`, ` -CREATE INDEX audit_timestamp_idx_audit ON audit (audit_timestamp) +create index audit_timestamp_idx_audit ON audit (audit_timestamp) `, ` -CREATE INDEX alias_idx_audit ON audit (alias, audit_timestamp) +create index alias_idx_audit ON audit (alias, audit_timestamp) `, ` -DROP TABLE IF EXISTS node_health +drop table if exists node_health `, ` -CREATE TABLE node_health ( +create table node_health ( last_seen_active timestamp not null default ('') )`, ` -DROP TABLE IF EXISTS topology_recovery +drop table if exists topology_recovery `, ` -CREATE TABLE topology_recovery ( +create table topology_recovery ( recovery_id integer, alias varchar(256) NOT NULL, start_recovery timestamp NOT NULL DEFAULT (''), @@ -161,13 +161,13 @@ CREATE TABLE topology_recovery ( PRIMARY KEY (recovery_id) )`, ` -CREATE INDEX start_recovery_idx_topology_recovery ON topology_recovery (start_recovery) +create index start_recovery_idx_topology_recovery ON topology_recovery (start_recovery) `, ` -DROP TABLE IF EXISTS database_instance_topology_history +drop table if exists database_instance_topology_history `, ` -CREATE TABLE database_instance_topology_history ( +create table database_instance_topology_history ( snapshot_unix_timestamp int NOT NULL, alias varchar(256) NOT NULL, hostname varchar(128) NOT NULL, @@ -180,13 +180,13 @@ CREATE TABLE database_instance_topology_history ( PRIMARY KEY (snapshot_unix_timestamp, alias) )`, ` -CREATE INDEX keyspace_shard_idx_database_instance_topology_history ON database_instance_topology_history (snapshot_unix_timestamp, keyspace, shard) +create index keyspace_shard_idx_database_instance_topology_history ON database_instance_topology_history (snapshot_unix_timestamp, keyspace, shard) `, ` -DROP TABLE IF EXISTS recovery_detection +drop table if exists recovery_detection `, ` -CREATE TABLE recovery_detection ( +create table recovery_detection ( detection_id integer, alias varchar(256) NOT NULL, analysis varchar(128) NOT NULL, @@ -196,23 +196,23 @@ CREATE TABLE recovery_detection ( PRIMARY KEY (detection_id) )`, ` -DROP TABLE IF EXISTS database_instance_last_analysis +drop table if exists database_instance_last_analysis `, ` -CREATE TABLE database_instance_last_analysis ( +create table database_instance_last_analysis ( alias varchar(256) NOT NULL, analysis_timestamp timestamp not null default (''), analysis varchar(128) NOT NULL, PRIMARY KEY (alias) )`, ` -CREATE INDEX analysis_timestamp_idx_database_instance_last_analysis ON database_instance_last_analysis (analysis_timestamp) +create index analysis_timestamp_idx_database_instance_last_analysis ON database_instance_last_analysis (analysis_timestamp) `, ` -DROP TABLE IF EXISTS database_instance_analysis_changelog +drop table if exists database_instance_analysis_changelog `, ` -CREATE TABLE database_instance_analysis_changelog ( +create table database_instance_analysis_changelog ( changelog_id integer, alias varchar(256) NOT NULL, analysis_timestamp timestamp not null default (''), @@ -220,30 +220,30 @@ CREATE TABLE database_instance_analysis_changelog ( PRIMARY KEY (changelog_id) )`, ` -CREATE INDEX analysis_timestamp_idx_database_instance_analysis_changelog ON database_instance_analysis_changelog (analysis_timestamp) +create index analysis_timestamp_idx_database_instance_analysis_changelog ON database_instance_analysis_changelog (analysis_timestamp) `, ` -DROP TABLE IF EXISTS vtorc_db_deployments +drop table if exists vtorc_db_deployments `, ` -CREATE TABLE vtorc_db_deployments ( +create table vtorc_db_deployments ( deployed_version varchar(128) NOT NULL, deployed_timestamp timestamp NOT NULL, PRIMARY KEY (deployed_version) )`, ` -DROP TABLE IF EXISTS global_recovery_disable +drop table if exists global_recovery_disable `, ` -CREATE TABLE global_recovery_disable ( +create table global_recovery_disable ( disable_recovery tinyint NOT NULL , PRIMARY KEY (disable_recovery) )`, ` -DROP TABLE IF EXISTS topology_recovery_steps +drop table if exists topology_recovery_steps `, ` -CREATE TABLE topology_recovery_steps ( +create table topology_recovery_steps ( recovery_step_id integer, recovery_id integer NOT NULL, audit_at timestamp not null default (''), @@ -251,10 +251,10 @@ CREATE TABLE topology_recovery_steps ( PRIMARY KEY (recovery_step_id) )`, ` -DROP TABLE IF EXISTS database_instance_stale_binlog_coordinates +drop table if exists database_instance_stale_binlog_coordinates `, ` -CREATE TABLE database_instance_stale_binlog_coordinates ( +create table database_instance_stale_binlog_coordinates ( alias varchar(256) NOT NULL, binary_log_file varchar(128) NOT NULL, binary_log_pos bigint NOT NULL, @@ -262,13 +262,13 @@ CREATE TABLE database_instance_stale_binlog_coordinates ( PRIMARY KEY (alias) )`, ` -CREATE INDEX first_seen_idx_database_instance_stale_binlog_coordinates ON database_instance_stale_binlog_coordinates (first_seen) +create index first_seen_idx_database_instance_stale_binlog_coordinates ON database_instance_stale_binlog_coordinates (first_seen) `, ` -DROP TABLE IF EXISTS vitess_tablet +drop table if exists vitess_tablet `, ` -CREATE TABLE vitess_tablet ( +create table vitess_tablet ( alias varchar(256) NOT NULL, hostname varchar(128) NOT NULL, port smallint NOT NULL, @@ -281,26 +281,26 @@ CREATE TABLE vitess_tablet ( PRIMARY KEY (alias) )`, ` -CREATE INDEX cell_idx_vitess_tablet ON vitess_tablet (cell) +create index cell_idx_vitess_tablet ON vitess_tablet (cell) `, ` -CREATE INDEX ks_idx_vitess_tablet ON vitess_tablet (keyspace, shard) +create index ks_idx_vitess_tablet ON vitess_tablet (keyspace, shard) `, ` -DROP TABLE IF EXISTS vitess_keyspace +drop table if exists vitess_keyspace `, ` -CREATE TABLE vitess_keyspace ( +create table vitess_keyspace ( keyspace varchar(128) NOT NULL, keyspace_type smallint(5) NOT NULL, durability_policy varchar(512) NOT NULL, PRIMARY KEY (keyspace) )`, ` -DROP TABLE IF EXISTS vitess_shard +drop table if exists vitess_shard `, ` -CREATE TABLE vitess_shard ( +create table vitess_shard ( keyspace varchar(128) NOT NULL, shard varchar(128) NOT NULL, primary_alias varchar(512) NOT NULL, @@ -308,21 +308,21 @@ CREATE TABLE vitess_shard ( PRIMARY KEY (keyspace, shard) )`, ` -CREATE INDEX source_host_port_idx_database_instance_database_instance on database_instance (source_host, source_port) +create index source_host_port_idx_database_instance_database_instance on database_instance (source_host, source_port) `, ` -CREATE INDEX keyspace_shard_idx_topology_recovery on topology_recovery (keyspace, shard) +create index keyspace_shard_idx_topology_recovery on topology_recovery (keyspace, shard) `, ` -CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recovery) +create index end_recovery_idx_topology_recovery on topology_recovery (end_recovery) `, ` -CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp) +create index instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp) `, ` -CREATE INDEX detection_idx_topology_recovery on topology_recovery (detection_id) +create index detection_idx_topology_recovery on topology_recovery (detection_id) `, ` -CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps(recovery_id) +create index recovery_id_idx_topology_recovery_steps ON topology_recovery_steps(recovery_id) `, } diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 99df358e330..42fc3f71345 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -70,215 +70,215 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna // TODO(sougou); deprecate ReduceReplicationAnalysisCount args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, keyspace, shard) query := ` - SELECT - vitess_tablet.info AS tablet_info, + select + vitess_tablet.info as tablet_info, vitess_tablet.tablet_type, vitess_tablet.primary_timestamp, - vitess_tablet.shard AS shard, - vitess_keyspace.keyspace AS keyspace, - vitess_keyspace.keyspace_type AS keyspace_type, - vitess_keyspace.durability_policy AS durability_policy, - vitess_shard.primary_timestamp AS shard_primary_term_timestamp, - primary_instance.read_only AS read_only, - MIN(primary_instance.gtid_errant) AS gtid_errant, - MIN(primary_instance.alias) IS NULL AS is_invalid, - MIN(primary_instance.binary_log_file) AS binary_log_file, - MIN(primary_instance.binary_log_pos) AS binary_log_pos, - MIN(primary_instance.replica_net_timeout) AS replica_net_timeout, - MIN(primary_instance.heartbeat_interval) AS heartbeat_interval, - MIN(primary_tablet.info) AS primary_tablet_info, - MIN( - IFNULL( + vitess_tablet.shard as shard, + vitess_keyspace.keyspace as keyspace, + vitess_keyspace.keyspace_type as keyspace_type, + vitess_keyspace.durability_policy as durability_policy, + vitess_shard.primary_timestamp as shard_primary_term_timestamp, + primary_instance.read_only as read_only, + min(primary_instance.gtid_errant) as gtid_errant, + min(primary_instance.alias) IS NULL as is_invalid, + min(primary_instance.binary_log_file) as binary_log_file, + min(primary_instance.binary_log_pos) as binary_log_pos, + min(primary_instance.replica_net_timeout) as replica_net_timeout, + min(primary_instance.heartbeat_interval) as heartbeat_interval, + min(primary_tablet.info) as primary_tablet_info, + min( + ifnull( primary_instance.binary_log_file = database_instance_stale_binlog_coordinates.binary_log_file - AND primary_instance.binary_log_pos = database_instance_stale_binlog_coordinates.binary_log_pos - AND database_instance_stale_binlog_coordinates.first_seen < NOW() - interval ? second, + and primary_instance.binary_log_pos = database_instance_stale_binlog_coordinates.binary_log_pos + and database_instance_stale_binlog_coordinates.first_seen < datetime('now', printf('-%d second', ?)), 0 ) - ) AS is_stale_binlog_coordinates, - MIN( + ) as is_stale_binlog_coordinates, + min( primary_instance.last_checked <= primary_instance.last_seen - and primary_instance.last_attempted_check <= primary_instance.last_seen + interval ? second - ) = 1 AS is_last_check_valid, - /* To be considered a primary, traditional async replication must not be present/valid AND the host should either */ + and primary_instance.last_attempted_check <= datetime(primary_instance.last_seen, printf('+%d second', ?)) + ) = 1 as is_last_check_valid, + /* To be considered a primary, traditional async replication must not be present/valid and the host should either */ /* not be a replication group member OR be the primary of the replication group */ - MIN(primary_instance.last_check_partial_success) as last_check_partial_success, - MIN( + min(primary_instance.last_check_partial_success) as last_check_partial_success, + min( ( - primary_instance.source_host IN ('', '_') + primary_instance.source_host in ('', '_') OR primary_instance.source_port = 0 OR substr(primary_instance.source_host, 1, 2) = '//' ) - ) AS is_primary, - MIN(primary_instance.gtid_mode) AS gtid_mode, - COUNT(replica_instance.server_id) AS count_replicas, - IFNULL( - SUM( + ) as is_primary, + min(primary_instance.gtid_mode) as gtid_mode, + count(replica_instance.server_id) as count_replicas, + ifnull( + sum( replica_instance.last_checked <= replica_instance.last_seen ), 0 - ) AS count_valid_replicas, - IFNULL( - SUM( + ) as count_valid_replicas, + ifnull( + sum( replica_instance.last_checked <= replica_instance.last_seen - AND replica_instance.replica_io_running != 0 - AND replica_instance.replica_sql_running != 0 + and replica_instance.replica_io_running != 0 + and replica_instance.replica_sql_running != 0 ), 0 - ) AS count_valid_replicating_replicas, - IFNULL( - SUM( + ) as count_valid_replicating_replicas, + ifnull( + sum( replica_instance.last_checked <= replica_instance.last_seen - AND replica_instance.replica_io_running = 0 - AND replica_instance.last_io_error like '%%error %%connecting to master%%' - AND replica_instance.replica_sql_running = 1 + and replica_instance.replica_io_running = 0 + and replica_instance.last_io_error like '%%error %%connecting to master%%' + and replica_instance.replica_sql_running = 1 ), 0 - ) AS count_replicas_failing_to_connect_to_primary, - MIN( + ) as count_replicas_failing_to_connect_to_primary, + min( primary_instance.replica_sql_running = 0 OR primary_instance.replica_io_running = 0 - ) AS replication_stopped, - MIN( + ) as replication_stopped, + min( primary_instance.supports_oracle_gtid - ) AS supports_oracle_gtid, - MIN( + ) as supports_oracle_gtid, + min( primary_instance.semi_sync_primary_enabled - ) AS semi_sync_primary_enabled, - MIN( + ) as semi_sync_primary_enabled, + min( primary_instance.semi_sync_primary_wait_for_replica_count - ) AS semi_sync_primary_wait_for_replica_count, - MIN( + ) as semi_sync_primary_wait_for_replica_count, + min( primary_instance.semi_sync_primary_clients - ) AS semi_sync_primary_clients, - MIN( + ) as semi_sync_primary_clients, + min( primary_instance.semi_sync_primary_status - ) AS semi_sync_primary_status, - MIN( + ) as semi_sync_primary_status, + min( primary_instance.semi_sync_replica_enabled - ) AS semi_sync_replica_enabled, - SUM(replica_instance.oracle_gtid) AS count_oracle_gtid_replicas, - IFNULL( - SUM( + ) as semi_sync_replica_enabled, + sum(replica_instance.oracle_gtid) as count_oracle_gtid_replicas, + ifnull( + sum( replica_instance.last_checked <= replica_instance.last_seen - AND replica_instance.oracle_gtid != 0 + and replica_instance.oracle_gtid != 0 ), 0 - ) AS count_valid_oracle_gtid_replicas, - SUM( + ) as count_valid_oracle_gtid_replicas, + sum( replica_instance.binlog_server - ) AS count_binlog_server_replicas, - IFNULL( - SUM( + ) as count_binlog_server_replicas, + ifnull( + sum( replica_instance.last_checked <= replica_instance.last_seen - AND replica_instance.binlog_server != 0 + and replica_instance.binlog_server != 0 ), 0 - ) AS count_valid_binlog_server_replicas, - SUM( + ) as count_valid_binlog_server_replicas, + sum( replica_instance.semi_sync_replica_enabled - ) AS count_semi_sync_replicas, - IFNULL( - SUM( + ) as count_semi_sync_replicas, + ifnull( + sum( replica_instance.last_checked <= replica_instance.last_seen - AND replica_instance.semi_sync_replica_enabled != 0 + and replica_instance.semi_sync_replica_enabled != 0 ), 0 - ) AS count_valid_semi_sync_replicas, - MIN( + ) as count_valid_semi_sync_replicas, + min( primary_instance.mariadb_gtid - ) AS is_mariadb_gtid, - SUM(replica_instance.mariadb_gtid) AS count_mariadb_gtid_replicas, - IFNULL( - SUM( + ) as is_mariadb_gtid, + sum(replica_instance.mariadb_gtid) as count_mariadb_gtid_replicas, + ifnull( + sum( replica_instance.last_checked <= replica_instance.last_seen - AND replica_instance.mariadb_gtid != 0 + and replica_instance.mariadb_gtid != 0 ), 0 - ) AS count_valid_mariadb_gtid_replicas, - IFNULL( - SUM( + ) as count_valid_mariadb_gtid_replicas, + ifnull( + sum( replica_instance.log_bin - AND replica_instance.log_replica_updates + and replica_instance.log_replica_updates ), 0 - ) AS count_logging_replicas, - IFNULL( - SUM( + ) as count_logging_replicas, + ifnull( + sum( replica_instance.log_bin - AND replica_instance.log_replica_updates - AND replica_instance.binlog_format = 'STATEMENT' + and replica_instance.log_replica_updates + and replica_instance.binlog_format = 'STATEMENT' ), 0 - ) AS count_statement_based_logging_replicas, - IFNULL( - SUM( + ) as count_statement_based_logging_replicas, + ifnull( + sum( replica_instance.log_bin - AND replica_instance.log_replica_updates - AND replica_instance.binlog_format = 'MIXED' + and replica_instance.log_replica_updates + and replica_instance.binlog_format = 'MIXED' ), 0 - ) AS count_mixed_based_logging_replicas, - IFNULL( - SUM( + ) as count_mixed_based_logging_replicas, + ifnull( + sum( replica_instance.log_bin - AND replica_instance.log_replica_updates - AND replica_instance.binlog_format = 'ROW' + and replica_instance.log_replica_updates + and replica_instance.binlog_format = 'ROW' ), 0 - ) AS count_row_based_logging_replicas, - IFNULL( - SUM(replica_instance.sql_delay > 0), + ) as count_row_based_logging_replicas, + ifnull( + sum(replica_instance.sql_delay > 0), 0 - ) AS count_delayed_replicas, - IFNULL( - SUM(replica_instance.replica_lag_seconds > ?), + ) as count_delayed_replicas, + ifnull( + sum(replica_instance.replica_lag_seconds > ?), 0 - ) AS count_lagging_replicas, - IFNULL(MIN(replica_instance.gtid_mode), '') AS min_replica_gtid_mode, - IFNULL(MAX(replica_instance.gtid_mode), '') AS max_replica_gtid_mode, - IFNULL( - MAX( + ) as count_lagging_replicas, + ifnull(min(replica_instance.gtid_mode), '') as min_replica_gtid_mode, + ifnull(max(replica_instance.gtid_mode), '') as max_replica_gtid_mode, + ifnull( + max( replica_instance.gtid_errant ), '' - ) AS max_replica_gtid_errant, - COUNT( - DISTINCT case when replica_instance.log_bin - AND replica_instance.log_replica_updates then replica_instance.major_version else NULL end - ) AS count_distinct_logging_major_versions + ) as max_replica_gtid_errant, + count( + distinct case when replica_instance.log_bin + and replica_instance.log_replica_updates then replica_instance.major_version else NULL end + ) as count_distinct_logging_major_versions FROM vitess_tablet - JOIN vitess_keyspace ON ( + join vitess_keyspace on ( vitess_tablet.keyspace = vitess_keyspace.keyspace ) - JOIN vitess_shard ON ( + join vitess_shard on ( vitess_tablet.keyspace = vitess_shard.keyspace - AND vitess_tablet.shard = vitess_shard.shard + and vitess_tablet.shard = vitess_shard.shard ) - LEFT JOIN database_instance primary_instance ON ( + left join database_instance primary_instance on ( vitess_tablet.alias = primary_instance.alias - AND vitess_tablet.hostname = primary_instance.hostname - AND vitess_tablet.port = primary_instance.port + and vitess_tablet.hostname = primary_instance.hostname + and vitess_tablet.port = primary_instance.port ) - LEFT JOIN vitess_tablet primary_tablet ON ( + left join vitess_tablet primary_tablet on ( primary_tablet.hostname = primary_instance.source_host - AND primary_tablet.port = primary_instance.source_port + and primary_tablet.port = primary_instance.source_port ) - LEFT JOIN database_instance replica_instance ON ( + left join database_instance replica_instance on ( primary_instance.hostname = replica_instance.source_host - AND primary_instance.port = replica_instance.source_port + and primary_instance.port = replica_instance.source_port ) - LEFT JOIN database_instance_stale_binlog_coordinates ON ( + left join database_instance_stale_binlog_coordinates on ( vitess_tablet.alias = database_instance_stale_binlog_coordinates.alias ) - WHERE - ? IN ('', vitess_keyspace.keyspace) - AND ? IN ('', vitess_tablet.shard) - GROUP BY + where + ? in ('', vitess_keyspace.keyspace) + and ? in ('', vitess_tablet.shard) + group by vitess_tablet.alias - ORDER BY - vitess_tablet.tablet_type ASC, - vitess_tablet.primary_timestamp DESC + order by + vitess_tablet.tablet_type asC, + vitess_tablet.primary_timestamp desc ` clusters := make(map[string]*clusterAnalysis) @@ -658,7 +658,7 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC sqlResult, err := db.ExecVTOrc(` update database_instance_last_analysis set analysis = ?, - analysis_timestamp = now() + analysis_timestamp = datetime('now') where alias = ? and analysis != ? @@ -683,12 +683,11 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC if !lastAnalysisChanged { // The insert only returns more than 1 row changed if this is the first insertion. sqlResult, err := db.ExecVTOrc(` - insert ignore into database_instance_last_analysis ( - alias, analysis_timestamp, analysis - ) values ( - ?, now(), ? - ) - `, + insert or ignore into database_instance_last_analysis ( + alias, analysis_timestamp, analysis + ) values ( + ?, datetime('now'), ? + )`, tabletAlias, string(analysisCode), ) if err != nil { @@ -709,12 +708,11 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC } _, err := db.ExecVTOrc(` - insert into database_instance_analysis_changelog ( - alias, analysis_timestamp, analysis - ) values ( - ?, now(), ? - ) - `, + insert into database_instance_analysis_changelog ( + alias, analysis_timestamp, analysis + ) values ( + ?, datetime('now'), ? + )`, tabletAlias, string(analysisCode), ) if err == nil { @@ -728,11 +726,11 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC // ExpireInstanceAnalysisChangelog removes old-enough analysis entries from the changelog func ExpireInstanceAnalysisChangelog() error { _, err := db.ExecVTOrc(` - delete - from database_instance_analysis_changelog - where - analysis_timestamp < now() - interval ? hour - `, + delete + from database_instance_analysis_changelog + where + analysis_timestamp < datetime('now', printf('-%d hour', ?)) + `, config.UnseenInstanceForgetHours, ) if err != nil { diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index c061d54ebb3..7dc66522002 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -33,16 +33,16 @@ var ( // The initialSQL is a set of insert commands copied from a dump of an actual running VTOrc instances. The relevant insert commands are here. // This is a dump taken from a test running 4 tablets, zone1-101 is the primary, zone1-100 is a replica, zone1-112 is a rdonly and zone2-200 is a cross-cell replica. initialSQL = []string{ - `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,0,'vt-0000000112-relay-bin.000002',15815,0,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0);`, - `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,0,'vt-0000000100-relay-bin.000002',15815,0,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0);`, - `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,0,'',0,0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2);`, - `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,0,'vt-0000000200-relay-bin.000002',15815,0,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0);`, - `INSERT INTO vitess_tablet VALUES('zone1-0000000100','localhost',6711,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731307d20706f72745f6d61703a7b6b65793a227674222076616c75653a363730397d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363731312064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, - `INSERT INTO vitess_tablet VALUES('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, - `INSERT INTO vitess_tablet VALUES('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, - `INSERT INTO vitess_tablet VALUES('zone2-0000000200','localhost',6756,'ks','0','zone2',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653222207569643a3230307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363735357d20706f72745f6d61703a7b6b65793a227674222076616c75653a363735347d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363735362064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, - `INSERT INTO vitess_shard VALUES('ks','0','zone1-0000000101','2022-12-28 07:23:25.129898+00:00');`, - `INSERT INTO vitess_keyspace VALUES('ks',0,'semi_sync');`, + `insert into database_instance values('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,0,'vt-0000000112-relay-bin.000002',15815,0,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0);`, + `insert into database_instance values('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,0,'vt-0000000100-relay-bin.000002',15815,0,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0);`, + `insert into database_instance values('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,0,'',0,0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2);`, + `insert into database_instance values('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,0,'vt-0000000200-relay-bin.000002',15815,0,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0);`, + `insert into vitess_tablet values('zone1-0000000100','localhost',6711,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731307d20706f72745f6d61703a7b6b65793a227674222076616c75653a363730397d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363731312064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, + `insert into vitess_tablet values('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, + `insert into vitess_tablet values('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, + `insert into vitess_tablet values('zone2-0000000200','localhost',6756,'ks','0','zone2',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653222207569643a3230307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363735357d20706f72745f6d61703a7b6b65793a227674222076616c75653a363735347d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363735362064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, + `insert into vitess_shard values('ks','0','zone1-0000000101','2022-12-28 07:23:25.129898+00:00');`, + `insert into vitess_keyspace values('ks',0,'semi_sync');`, } ) diff --git a/go/vt/vtorc/inst/audit_dao.go b/go/vt/vtorc/inst/audit_dao.go index d048f300faf..77da520597b 100644 --- a/go/vt/vtorc/inst/audit_dao.go +++ b/go/vt/vtorc/inst/audit_dao.go @@ -56,13 +56,11 @@ func AuditOperation(auditType string, tabletAlias string, message string) error } if config.Config.AuditToBackendDB { _, err := db.ExecVTOrc(` - insert - into audit ( - audit_timestamp, audit_type, alias, keyspace, shard, message - ) VALUES ( - NOW(), ?, ?, ?, ?, ? - ) - `, + insert into audit ( + audit_timestamp, audit_type, alias, keyspace, shard, message + ) values ( + datetime('now'), ?, ?, ?, ?, ? + )`, auditType, tabletAlias, keyspace, diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index ec0288cc423..5bf8e5c9a07 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -114,10 +114,9 @@ func ExecDBWriteFunc(f func() error) error { func ExpireTableData(tableName string, timestampColumn string) error { writeFunc := func() error { - _, err := db.ExecVTOrc( - fmt.Sprintf("delete from %s where %s < NOW() - INTERVAL ? DAY", tableName, timestampColumn), - config.Config.AuditPurgeDays, - ) + query := fmt.Sprintf("delete from %s where %s < datetime('now', printf('-%%d day', ?))", + tableName, timestampColumn) + _, err := db.ExecVTOrc(query, config.Config.AuditPurgeDays) return err } return ExecDBWriteFunc(writeFunc) @@ -581,20 +580,22 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In sort = `alias` } query := fmt.Sprintf(` - select - *, - unix_timestamp() - unix_timestamp(last_checked) as seconds_since_last_checked, - ifnull(last_checked <= last_seen, 0) as is_last_check_valid, - unix_timestamp() - unix_timestamp(last_seen) as seconds_since_last_seen - from - vitess_tablet - left join database_instance using (alias, hostname, port) - where - %s - order by - %s - `, condition, sort) - + select + *, + strftime('%%s', 'now') - strftime('%%s', last_checked) as seconds_since_last_checked, + ifnull(last_checked <= last_seen, 0) as is_last_check_valid, + strftime('%%s', 'now') - strftime('%%s', last_seen) as seconds_since_last_seen + from + vitess_tablet + left join database_instance using (alias, hostname, port) + where + %s + order by + %s + `, + condition, + sort, + ) err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { instance := readInstanceRow(m) instances = append(instances, instance) @@ -637,7 +638,7 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error) and shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) and ( (last_seen < last_checked) - or (unix_timestamp() - unix_timestamp(last_checked) > ?) + or (strftime('%s', 'now') - strftime('%s', last_checked) > ?) or (replication_sql_thread_state not in (-1 ,1)) or (replication_io_thread_state not in (-1 ,1)) or (abs(cast(replication_lag_seconds as signed) - cast(sql_delay as signed)) > ?) @@ -696,26 +697,25 @@ func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, er func ReadOutdatedInstanceKeys() ([]string, error) { var res []string query := ` - SELECT + select alias - FROM + from database_instance - WHERE - CASE - WHEN last_attempted_check <= last_checked - THEN last_checked < now() - interval ? second - ELSE last_checked < now() - interval ? second - END - UNION - SELECT + where + case + when last_attempted_check <= last_checked + then last_checked < datetime('now', printf('-%d second', ?)) + else last_checked < datetime('now', printf('-%d second', ?)) + end + union + select vitess_tablet.alias - FROM - vitess_tablet LEFT JOIN database_instance ON ( + from + vitess_tablet left join database_instance on ( vitess_tablet.alias = database_instance.alias ) - WHERE - database_instance.alias IS NULL - ` + where + database_instance.alias is null` args := sqlutils.Args(config.Config.InstancePollSeconds, 2*config.Config.InstancePollSeconds) err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { @@ -745,34 +745,34 @@ func mkInsertOdku(table string, columns []string, values []string, nrRows int, i } var q strings.Builder - var ignore string + prefix := "replace" if insertIgnore { - ignore = "ignore" + prefix = "insert or ignore" } valRow := fmt.Sprintf("(%s)", strings.Join(values, ", ")) var val strings.Builder val.WriteString(valRow) for i := 1; i < nrRows; i++ { - val.WriteString(",\n ") // indent VALUES, see below + val.WriteString(",\n ") // indent values(), see below val.WriteString(valRow) } col := strings.Join(columns, ", ") var odku strings.Builder - odku.WriteString(fmt.Sprintf("%s=VALUES(%s)", columns[0], columns[0])) + odku.WriteString(fmt.Sprintf("%s=values(%s)", columns[0], columns[0])) for _, c := range columns[1:] { odku.WriteString(", ") - odku.WriteString(fmt.Sprintf("%s=VALUES(%s)", c, c)) + odku.WriteString(fmt.Sprintf("%s=values(%s)", c, c)) } - q.WriteString(fmt.Sprintf(`INSERT %s INTO %s + q.WriteString(fmt.Sprintf(`%s into %s (%s) - VALUES + values %s - ON DUPLICATE KEY UPDATE + on duplicate key update %s `, - ignore, table, col, val.String(), odku.String())) + prefix, table, col, val.String(), odku.String())) return q.String(), nil } @@ -858,19 +858,19 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo for i := range columns { values[i] = "?" } - values[3] = "NOW()" // last_checked - values[4] = "NOW()" // last_attempted_check - values[5] = "1" // last_check_partial_success + values[3] = "datetime('now')" // last_checked + values[4] = "datetime('now')" // last_attempted_check + values[5] = "1" // last_check_partial_success if updateLastSeen { columns = append(columns, "last_seen") - values = append(values, "NOW()") + values = append(values, "datetime('now')") } var args []any for _, instance := range instances { // number of columns minus 2 as last_checked and last_attempted_check - // updated with NOW() + // updated with datetime('now') args = append(args, instance.InstanceAlias) args = append(args, instance.Hostname) args = append(args, instance.Port) @@ -981,13 +981,14 @@ func WriteInstance(instance *Instance, instanceWasActuallyFound bool, lastError func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error { writeFunc := func() error { _, err := db.ExecVTOrc(` - update - database_instance - set - last_checked = NOW(), - last_check_partial_success = ? + update + database_instance + set + last_checked = datetime('now'), + last_check_partial_success = ? where - alias = ?`, + alias = ? + `, partialSuccess, tabletAlias, ) @@ -1010,12 +1011,11 @@ func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error { func UpdateInstanceLastAttemptedCheck(tabletAlias string) error { writeFunc := func() error { _, err := db.ExecVTOrc(` - update - database_instance - set - last_attempted_check = NOW() + update database_instance set + last_attempted_check = datetime('now') where - alias = ?`, + alias = ? + `, tabletAlias, ) if err != nil { @@ -1046,26 +1046,14 @@ func ForgetInstance(tabletAlias string) error { currentErrantGTIDCount.Reset(tabletAlias) // Delete from the 'vitess_tablet' table. - _, err := db.ExecVTOrc(` - delete - from vitess_tablet - where - alias = ?`, - tabletAlias, - ) + _, err := db.ExecVTOrc(`delete from vitess_tablet where alias = ?`, tabletAlias) if err != nil { log.Error(err) return err } // Also delete from the 'database_instance' table. - sqlResult, err := db.ExecVTOrc(` - delete - from database_instance - where - alias = ?`, - tabletAlias, - ) + sqlResult, err := db.ExecVTOrc(`delete from database_instance where alias = ?`, tabletAlias) if err != nil { log.Error(err) return err @@ -1088,10 +1076,9 @@ func ForgetInstance(tabletAlias string) error { // ForgetLongUnseenInstances will remove entries of all instances that have long since been last seen. func ForgetLongUnseenInstances() error { sqlResult, err := db.ExecVTOrc(` - delete - from database_instance - where - last_seen < NOW() - interval ? hour`, + delete from database_instance where + last_seen < datetime('now', printf('-%d hour', ?)) + `, config.UnseenInstanceForgetHours, ) if err != nil { @@ -1113,17 +1100,17 @@ func ForgetLongUnseenInstances() error { func SnapshotTopologies() error { writeFunc := func() error { _, err := db.ExecVTOrc(` - insert ignore into - database_instance_topology_history (snapshot_unix_timestamp, - alias, hostname, port, source_host, source_port, keyspace, shard, version) - select - UNIX_TIMESTAMP(NOW()), + insert or ignore into database_instance_topology_history ( + snapshot_unix_timestamp, alias, hostname, port, source_host, source_port, keyspace, shard, version + ) + select + strftime('%s', 'now'), vitess_tablet.alias, vitess_tablet.hostname, vitess_tablet.port, database_instance.source_host, database_instance.source_port, vitess_tablet.keyspace, vitess_tablet.shard, database_instance.version from vitess_tablet left join database_instance using (alias, hostname, port) - `, + `, ) if err != nil { log.Error(err) @@ -1142,9 +1129,11 @@ func ExpireStaleInstanceBinlogCoordinates() error { } writeFunc := func() error { _, err := db.ExecVTOrc(` - delete from database_instance_stale_binlog_coordinates - where first_seen < NOW() - INTERVAL ? SECOND - `, expireSeconds, + delete from database_instance_stale_binlog_coordinates + where + first_seen < datetime('now', printf('-%d second', ?)) + `, + expireSeconds, ) if err != nil { log.Error(err) diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index 741fc48bca9..6916f354a1a 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -58,18 +58,18 @@ func TestMkInsertOdkuSingle(t *testing.T) { require.Equal(t, len(args), 0) // one instance - s1 := `INSERT ignore INTO database_instance + s1 := `insert or ignore into database_instance (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) - VALUES - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()) - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_net_timeout=VALUES(replica_net_timeout), heartbeat_interval=VALUES(heartbeat_interval), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), - semi_sync_enforced=VALUES(semi_sync_enforced), semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) + values + (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) + on duplicate key update + alias=values(alias), hostname=values(hostname), port=values(port), last_checked=values(last_checked), last_attempted_check=values(last_attempted_check), last_check_partial_success=values(last_check_partial_success), server_id=values(server_id), server_uuid=values(server_uuid), version=values(version), major_version=values(major_version), version_comment=values(version_comment), binlog_server=values(binlog_server), read_only=values(read_only), binlog_format=values(binlog_format), binlog_row_image=values(binlog_row_image), log_bin=values(log_bin), log_replica_updates=values(log_replica_updates), binary_log_file=values(binary_log_file), binary_log_pos=values(binary_log_pos), source_host=values(source_host), source_port=values(source_port), replica_net_timeout=values(replica_net_timeout), heartbeat_interval=values(heartbeat_interval), replica_sql_running=values(replica_sql_running), replica_io_running=values(replica_io_running), replication_sql_thread_state=values(replication_sql_thread_state), replication_io_thread_state=values(replication_io_thread_state), has_replication_filters=values(has_replication_filters), supports_oracle_gtid=values(supports_oracle_gtid), oracle_gtid=values(oracle_gtid), source_uuid=values(source_uuid), ancestry_uuid=values(ancestry_uuid), executed_gtid_set=values(executed_gtid_set), gtid_mode=values(gtid_mode), gtid_purged=values(gtid_purged), gtid_errant=values(gtid_errant), mariadb_gtid=values(mariadb_gtid), pseudo_gtid=values(pseudo_gtid), source_log_file=values(source_log_file), read_source_log_pos=values(read_source_log_pos), relay_source_log_file=values(relay_source_log_file), exec_source_log_pos=values(exec_source_log_pos), relay_log_file=values(relay_log_file), relay_log_pos=values(relay_log_pos), last_sql_error=values(last_sql_error), last_io_error=values(last_io_error), replication_lag_seconds=values(replication_lag_seconds), replica_lag_seconds=values(replica_lag_seconds), sql_delay=values(sql_delay), data_center=values(data_center), region=values(region), physical_environment=values(physical_environment), replication_depth=values(replication_depth), is_co_primary=values(is_co_primary), has_replication_credentials=values(has_replication_credentials), allow_tls=values(allow_tls), + semi_sync_enforced=values(semi_sync_enforced), semi_sync_primary_enabled=values(semi_sync_primary_enabled), semi_sync_primary_timeout=values(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=values(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=values(semi_sync_replica_enabled), semi_sync_primary_status=values(semi_sync_primary_status), semi_sync_primary_clients=values(semi_sync_primary_clients), semi_sync_replica_status=values(semi_sync_replica_status), + last_discovery_latency=values(last_discovery_latency), last_seen=values(last_seen) ` a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, @@ -85,21 +85,21 @@ func TestMkInsertOdkuThree(t *testing.T) { instances := mkTestInstances() // three instances - s3 := `INSERT INTO database_instance + s3 := `replace into database_instance (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) - VALUES - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()), - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()), - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()) - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_net_timeout=VALUES(replica_net_timeout), heartbeat_interval=VALUES(heartbeat_interval), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), - physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), semi_sync_enforced=VALUES(semi_sync_enforced), - semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) + values + (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), + (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), + (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) + on duplicate key update + alias=values(alias), hostname=values(hostname), port=values(port), last_checked=values(last_checked), last_attempted_check=values(last_attempted_check), last_check_partial_success=values(last_check_partial_success), server_id=values(server_id), server_uuid=values(server_uuid), version=values(version), major_version=values(major_version), version_comment=values(version_comment), binlog_server=values(binlog_server), read_only=values(read_only), binlog_format=values(binlog_format), binlog_row_image=values(binlog_row_image), log_bin=values(log_bin), log_replica_updates=values(log_replica_updates), binary_log_file=values(binary_log_file), binary_log_pos=values(binary_log_pos), source_host=values(source_host), source_port=values(source_port), replica_net_timeout=values(replica_net_timeout), heartbeat_interval=values(heartbeat_interval), replica_sql_running=values(replica_sql_running), replica_io_running=values(replica_io_running), replication_sql_thread_state=values(replication_sql_thread_state), replication_io_thread_state=values(replication_io_thread_state), has_replication_filters=values(has_replication_filters), supports_oracle_gtid=values(supports_oracle_gtid), oracle_gtid=values(oracle_gtid), source_uuid=values(source_uuid), ancestry_uuid=values(ancestry_uuid), executed_gtid_set=values(executed_gtid_set), gtid_mode=values(gtid_mode), gtid_purged=values(gtid_purged), gtid_errant=values(gtid_errant), mariadb_gtid=values(mariadb_gtid), pseudo_gtid=values(pseudo_gtid), source_log_file=values(source_log_file), read_source_log_pos=values(read_source_log_pos), relay_source_log_file=values(relay_source_log_file), exec_source_log_pos=values(exec_source_log_pos), relay_log_file=values(relay_log_file), relay_log_pos=values(relay_log_pos), last_sql_error=values(last_sql_error), last_io_error=values(last_io_error), replication_lag_seconds=values(replication_lag_seconds), replica_lag_seconds=values(replica_lag_seconds), sql_delay=values(sql_delay), data_center=values(data_center), region=values(region), + physical_environment=values(physical_environment), replication_depth=values(replication_depth), is_co_primary=values(is_co_primary), has_replication_credentials=values(has_replication_credentials), allow_tls=values(allow_tls), semi_sync_enforced=values(semi_sync_enforced), + semi_sync_primary_enabled=values(semi_sync_primary_enabled), semi_sync_primary_timeout=values(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=values(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=values(semi_sync_replica_enabled), semi_sync_primary_status=values(semi_sync_primary_status), semi_sync_primary_clients=values(semi_sync_primary_clients), semi_sync_replica_status=values(semi_sync_replica_status), + last_discovery_latency=values(last_discovery_latency), last_seen=values(last_seen) ` a3 := ` zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, @@ -437,27 +437,27 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { }{ { name: "No problems", - sql: []string{"update database_instance set last_checked = now()"}, + sql: []string{"update database_instance set last_checked = datetime('now')"}, instancesRequired: nil, }, { name: "One instance is outdated", sql: []string{ - "update database_instance set last_checked = now()", - "update database_instance set last_checked = datetime(now(), '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = datetime('now')", + "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", }, instancesRequired: []string{"zone1-0000000100"}, }, { name: "One instance doesn't have myql data", sql: []string{ - "update database_instance set last_checked = now()", + "update database_instance set last_checked = datetime('now')", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103"}, }, { name: "One instance doesn't have myql data and one is outdated", sql: []string{ - "update database_instance set last_checked = now()", - "update database_instance set last_checked = datetime(now(), '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = datetime('now')", + "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103", "zone1-0000000100"}, @@ -494,10 +494,10 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { errInDataCollection := db.QueryVTOrcRowsMap(`select alias, last_checked, last_attempted_check, -ROUND((JULIANDAY(now()) - JULIANDAY(last_checked)) * 86400) AS difference, +ROUND((JULIANDAY(datetime('now')) - JULIANDAY(last_checked)) * 86400) AS difference, last_attempted_check <= last_checked as use1, -last_checked < now() - interval 1500 second as is_outdated1, -last_checked < now() - interval 3000 second as is_outdated2 +last_checked < datetime('now', '-1500 second') as is_outdated1, +last_checked < datetime('now', '-3000 second') as is_outdated2 from database_instance`, func(rowMap sqlutils.RowMap) error { log.Errorf("Row in database_instance - %+v", rowMap) return nil @@ -521,12 +521,12 @@ func TestUpdateInstanceLastChecked(t *testing.T) { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", partialSuccess: false, - conditionToCheck: "last_checked >= now() - interval 30 second and last_check_partial_success = false", + conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = false", }, { name: "Verify partial success", tabletAlias: "zone1-0000000100", partialSuccess: true, - conditionToCheck: "last_checked >= now() - interval 30 second and last_check_partial_success = true", + conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = true", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", @@ -572,7 +572,7 @@ func TestUpdateInstanceLastAttemptedCheck(t *testing.T) { { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", - conditionToCheck: "last_attempted_check >= now() - interval 30 second", + conditionToCheck: "last_attempted_check >= datetime('now', '-30 second')", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", @@ -746,8 +746,8 @@ func TestExpireTableData(t *testing.T) { timestampColumn: "audit_timestamp", expectedRowCount: 1, insertQuery: `insert into audit (audit_id, audit_timestamp, audit_type, alias, message, keyspace, shard) values -(1, NOW() - INTERVAL 50 DAY, 'a','a','a','a','a'), -(2, NOW() - INTERVAL 5 DAY, 'a','a','a','a','a')`, +(1, datetime('now', '-50 day'), 'a','a','a','a','a'), +(2, datetime('now', '-5 day'), 'a','a','a','a','a')`, }, { name: "ExpireRecoveryDetectionHistory", @@ -755,9 +755,9 @@ func TestExpireTableData(t *testing.T) { timestampColumn: "detection_timestamp", expectedRowCount: 2, insertQuery: `insert into recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) values -(1, NOW() - INTERVAL 3 DAY,'a','a','a','a'), -(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), -(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, +(1, datetime('now', '-3 day'),'a','a','a','a'), +(2, datetime('now', '-5 day'),'a','a','a','a'), +(3, datetime('now', '-15 day'),'a','a','a','a')`, }, } for _, tt := range tests { diff --git a/go/vt/vtorc/inst/keyspace_dao.go b/go/vt/vtorc/inst/keyspace_dao.go index d764e3fc56a..306a8976b67 100644 --- a/go/vt/vtorc/inst/keyspace_dao.go +++ b/go/vt/vtorc/inst/keyspace_dao.go @@ -65,13 +65,11 @@ func ReadKeyspace(keyspaceName string) (*topo.KeyspaceInfo, error) { // SaveKeyspace saves the keyspace record against the keyspace name. func SaveKeyspace(keyspace *topo.KeyspaceInfo) error { _, err := db.ExecVTOrc(` - replace - into vitess_keyspace ( - keyspace, keyspace_type, durability_policy - ) values ( - ?, ?, ? - ) - `, + replace into vitess_keyspace ( + keyspace, keyspace_type, durability_policy + ) values ( + ?, ?, ? + )`, keyspace.KeyspaceName(), int(keyspace.KeyspaceType), keyspace.GetDurabilityPolicy(), diff --git a/go/vt/vtorc/inst/shard_dao.go b/go/vt/vtorc/inst/shard_dao.go index a90eed0f509..88795f2de27 100644 --- a/go/vt/vtorc/inst/shard_dao.go +++ b/go/vt/vtorc/inst/shard_dao.go @@ -65,13 +65,11 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s // SaveShard saves the shard record against the shard name. func SaveShard(shard *topo.ShardInfo) error { _, err := db.ExecVTOrc(` - replace - into vitess_shard ( - keyspace, shard, primary_alias, primary_timestamp - ) values ( - ?, ?, ?, ? - ) - `, + replace into vitess_shard ( + keyspace, shard, primary_alias, primary_timestamp + ) values ( + ?, ?, ?, ? + )`, shard.Keyspace(), shard.ShardName(), getShardPrimaryAliasString(shard), diff --git a/go/vt/vtorc/inst/tablet_dao.go b/go/vt/vtorc/inst/tablet_dao.go index af304292a70..edcd9fa4a8e 100644 --- a/go/vt/vtorc/inst/tablet_dao.go +++ b/go/vt/vtorc/inst/tablet_dao.go @@ -85,13 +85,11 @@ func SaveTablet(tablet *topodatapb.Tablet) error { return err } _, err = db.ExecVTOrc(` - replace - into vitess_tablet ( - alias, hostname, port, cell, keyspace, shard, tablet_type, primary_timestamp, info - ) values ( - ?, ?, ?, ?, ?, ?, ?, ?, ? - ) - `, + replace into vitess_tablet ( + alias, hostname, port, cell, keyspace, shard, tablet_type, primary_timestamp, info + ) values ( + ?, ?, ?, ?, ?, ?, ?, ?, ? + )`, topoproto.TabletAliasString(tablet.Alias), tablet.MysqlHostname, int(tablet.MysqlPort), diff --git a/go/vt/vtorc/logic/disable_recovery.go b/go/vt/vtorc/logic/disable_recovery.go index 74aa291e17a..38926b60424 100644 --- a/go/vt/vtorc/logic/disable_recovery.go +++ b/go/vt/vtorc/logic/disable_recovery.go @@ -41,12 +41,12 @@ import ( // IsRecoveryDisabled returns true if Recoveries are disabled globally func IsRecoveryDisabled() (disabled bool, err error) { query := ` - SELECT - COUNT(*) as mycount - FROM + select + count(*) as mycount + from global_recovery_disable - WHERE - disable_recovery=? + where + disable_recovery = ? ` err = db.QueryVTOrc(query, sqlutils.Args(1), func(m sqlutils.RowMap) error { mycount := m.GetInt("mycount") @@ -64,11 +64,10 @@ func IsRecoveryDisabled() (disabled bool, err error) { // DisableRecovery ensures recoveries are disabled globally func DisableRecovery() error { _, err := db.ExecVTOrc(` - INSERT IGNORE INTO global_recovery_disable + insert or ignore into global_recovery_disable (disable_recovery) - VALUES (1) - `, - ) + values(1) + `) return err } @@ -76,8 +75,7 @@ func DisableRecovery() error { func EnableRecovery() error { // The "WHERE" clause is just to avoid full-scan reports by monitoring tools _, err := db.ExecVTOrc(` - DELETE FROM global_recovery_disable WHERE disable_recovery >= 0 - `, - ) + delete from global_recovery_disable where disable_recovery >= 0 + `) return err } diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 25820bc5184..c167b54acfc 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -317,17 +317,16 @@ func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, prima // shardPrimary finds the primary of the given keyspace-shard by reading the vtorc backend func shardPrimary(keyspace string, shard string) (primary *topodatapb.Tablet, err error) { - query := `SELECT + query := `select info - FROM + from vitess_tablet - WHERE - keyspace = ? AND shard = ? - AND tablet_type = ? - ORDER BY - primary_timestamp DESC - LIMIT 1 -` + where + keyspace = ? and shard = ? + and tablet_type = ? + order by + primary_timestamp desc + limit 1` err = db.Db.QueryVTOrc(query, sqlutils.Args(keyspace, shard, topodatapb.TabletType_PRIMARY), func(m sqlutils.RowMap) error { if primary == nil { primary = &topodatapb.Tablet{} diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 1920da4dcd8..859e082a3e8 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -31,20 +31,19 @@ import ( // InsertRecoveryDetection inserts the recovery analysis that has been detected. func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error { sqlResult, err := db.ExecVTOrc(` - insert ignore - into recovery_detection ( - alias, - analysis, - keyspace, - shard, - detection_timestamp - ) values ( - ?, - ?, - ?, - ?, - now() - )`, + insert or ignore into recovery_detection ( + alias, + analysis, + keyspace, + shard, + detection_timestamp + ) values ( + ?, + ?, + ?, + ?, + datetime('now') + )`, analysisEntry.AnalyzedInstanceAlias, string(analysisEntry.Analysis), analysisEntry.ClusterDetails.Keyspace, @@ -66,25 +65,23 @@ func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error { func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecovery, error) { analysisEntry := topologyRecovery.AnalysisEntry sqlResult, err := db.ExecVTOrc(` - insert ignore - into topology_recovery ( - recovery_id, - alias, - start_recovery, - analysis, - keyspace, - shard, - detection_id - ) values ( - ?, - ?, - NOW(), - ?, - ?, - ?, - ? - ) - `, + insert or ignore into topology_recovery ( + recovery_id, + alias, + start_recovery, + analysis, + keyspace, + shard, + detection_id + ) values ( + ?, + ?, + datetime('now'), + ?, + ?, + ?, + ? + )`, sqlutils.NilIfZero(topologyRecovery.ID), analysisEntry.AnalyzedInstanceAlias, string(analysisEntry.Analysis), @@ -139,14 +136,15 @@ func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis) (*Topo // It does not clear the "active period" as this still takes place in order to avoid flapping. func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { _, err := db.ExecVTOrc(` - update topology_recovery set - is_successful = ?, - successor_alias = ?, - all_errors = ?, - end_recovery = NOW() - where - recovery_id = ? - `, topologyRecovery.IsSuccessful, + update topology_recovery set + is_successful = ?, + successor_alias = ?, + all_errors = ?, + end_recovery = datetime('now') + where + recovery_id = ? + `, + topologyRecovery.IsSuccessful, topologyRecovery.SuccessorAlias, strings.Join(topologyRecovery.AllErrors, "\n"), topologyRecovery.ID, @@ -237,11 +235,14 @@ func ReadRecentRecoveries(page int) ([]*TopologyRecovery, error) { // writeTopologyRecoveryStep writes down a single step in a recovery process func writeTopologyRecoveryStep(topologyRecoveryStep *TopologyRecoveryStep) error { sqlResult, err := db.ExecVTOrc(` - insert ignore - into topology_recovery_steps ( - recovery_step_id, recovery_id, audit_at, message - ) values (?, ?, now(), ?) - `, sqlutils.NilIfZero(topologyRecoveryStep.ID), topologyRecoveryStep.RecoveryID, topologyRecoveryStep.Message, + insert or ignore into topology_recovery_steps ( + recovery_step_id, recovery_id, audit_at, message + ) values ( + ?, ?, datetime('now'), ?, + )`, + sqlutils.NilIfZero(topologyRecoveryStep.ID), + topologyRecoveryStep.RecoveryID, + topologyRecoveryStep.Message, ) if err != nil { log.Error(err) diff --git a/go/vt/vtorc/logic/topology_recovery_dao_test.go b/go/vt/vtorc/logic/topology_recovery_dao_test.go index 354af82e2b3..1e5bb8e86cf 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao_test.go +++ b/go/vt/vtorc/logic/topology_recovery_dao_test.go @@ -88,9 +88,9 @@ func TestExpireTableData(t *testing.T) { tableName: "recovery_detection", expectedRowCount: 2, insertQuery: `insert into recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) values -(1, NOW() - INTERVAL 3 DAY,'a','a','a','a'), -(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), -(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, +(1, datetime('now', '-3 day'),'a','a','a','a'), +(2, datetime('now', '-5 day'),'a','a','a','a'), +(3, datetime('now', '-15 day'),'a','a','a','a')`, expireFunc: ExpireRecoveryDetectionHistory, }, { @@ -98,9 +98,9 @@ func TestExpireTableData(t *testing.T) { tableName: "topology_recovery", expectedRowCount: 1, insertQuery: `insert into topology_recovery (recovery_id, start_recovery, alias, analysis, keyspace, shard) values -(1, NOW() - INTERVAL 13 DAY,'a','a','a','a'), -(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), -(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, +(1, datetime('now', '-13 day'),'a','a','a','a'), +(2, datetime('now', '-5 day'),'a','a','a','a'), +(3, datetime('now', '-15 day'),'a','a','a','a')`, expireFunc: ExpireTopologyRecoveryHistory, }, { @@ -108,9 +108,9 @@ func TestExpireTableData(t *testing.T) { tableName: "topology_recovery_steps", expectedRowCount: 1, insertQuery: `insert into topology_recovery_steps (recovery_step_id, audit_at, recovery_id, message) values -(1, NOW() - INTERVAL 13 DAY, 1, 'a'), -(2, NOW() - INTERVAL 5 DAY, 2, 'a'), -(3, NOW() - INTERVAL 15 DAY, 3, 'a')`, +(1, datetime('now', '-13 day'), 1, 'a'), +(2, datetime('now', '-5 day'), 2, 'a'), +(3, datetime('now', '-15 day'), 3, 'a')`, expireFunc: ExpireTopologyRecoveryStepsHistory, }, } diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 86101d6c5c0..87a11733f66 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -40,7 +40,7 @@ func writeHealthToDatabase() bool { log.Error(err) return false } - sqlResult, err := db.ExecVTOrc(`insert into node_health (last_seen_active) values (now())`) + sqlResult, err := db.ExecVTOrc(`insert into node_health (last_seen_active) values (datetime('now'))`) if err != nil { log.Error(err) return false