From 012e601b84a6c47b281b3170ee700782e04fda91 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Mon, 4 Nov 2024 11:59:25 +0100 Subject: [PATCH] Move to native sqlite3 queries (#17124) Signed-off-by: Dirkjan Bussink Co-authored-by: Tim Vaillancourt --- go/vt/external/golib/sqlutils/dialect.go | 53 --- .../external/golib/sqlutils/sqlite_dialect.go | 134 -------- .../golib/sqlutils/sqlite_dialect_test.go | 314 ------------------ go/vt/vtorc/db/db.go | 29 +- go/vt/vtorc/inst/analysis_dao.go | 14 +- go/vt/vtorc/inst/audit_dao.go | 2 +- go/vt/vtorc/inst/instance_dao.go | 61 ++-- go/vt/vtorc/inst/instance_dao_test.go | 65 ++-- go/vt/vtorc/logic/disable_recovery.go | 2 +- go/vt/vtorc/logic/topology_recovery_dao.go | 14 +- .../vtorc/logic/topology_recovery_dao_test.go | 18 +- go/vt/vtorc/process/health.go | 2 +- 12 files changed, 84 insertions(+), 624 deletions(-) delete mode 100644 go/vt/external/golib/sqlutils/dialect.go delete mode 100644 go/vt/external/golib/sqlutils/sqlite_dialect.go delete mode 100644 go/vt/external/golib/sqlutils/sqlite_dialect_test.go diff --git a/go/vt/external/golib/sqlutils/dialect.go b/go/vt/external/golib/sqlutils/dialect.go deleted file mode 100644 index 8dabe57ccaf..00000000000 --- a/go/vt/external/golib/sqlutils/dialect.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -package sqlutils - -import ( - "regexp" - "strings" -) - -type regexpMap struct { - r *regexp.Regexp - replacement string -} - -func (this *regexpMap) process(text string) (result string) { - return this.r.ReplaceAllString(text, this.replacement) -} - -func rmap(regexpExpression string, replacement string) regexpMap { - return regexpMap{ - r: regexp.MustCompile(regexpSpaces(regexpExpression)), - replacement: replacement, - } -} - -func regexpSpaces(statement string) string { - return strings.Replace(statement, " ", `[\s]+`, -1) -} - -func applyConversions(statement string, conversions []regexpMap) string { - for _, rmap := range conversions { - statement = rmap.process(statement) - } - return statement -} diff --git a/go/vt/external/golib/sqlutils/sqlite_dialect.go b/go/vt/external/golib/sqlutils/sqlite_dialect.go deleted file mode 100644 index 11f0e531367..00000000000 --- a/go/vt/external/golib/sqlutils/sqlite_dialect.go +++ /dev/null @@ -1,134 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -// What's this about? -// This is a brute-force regular-expression based conversion from MySQL syntax to sqlite3 syntax. -// It is NOT meant to be a general purpose solution and is only expected & confirmed to run on -// queries issued by orchestrator. There are known limitations to this design. -// It's not even pretty. -// In fact... -// Well, it gets the job done at this time. Call it debt. - -package sqlutils - -import ( - "regexp" -) - -var sqlite3CreateTableConversions = []regexpMap{ - rmap(`(?i) (character set|charset) [\S]+`, ``), - rmap(`(?i)int unsigned`, `int`), - rmap(`(?i)int[\s]*[(][\s]*([0-9]+)[\s]*[)] unsigned`, `int`), - rmap(`(?i)engine[\s]*=[\s]*(innodb|myisam|ndb|memory|tokudb)`, ``), - rmap(`(?i)DEFAULT CHARSET[\s]*=[\s]*[\S]+`, ``), - rmap(`(?i)[\S]*int( not null|) auto_increment`, `integer`), - rmap(`(?i)comment '[^']*'`, ``), - rmap(`(?i)after [\S]+`, ``), - rmap(`(?i)alter table ([\S]+) add (index|key) ([\S]+) (.+)`, `create index ${3}_${1} on $1 $4`), - rmap(`(?i)alter table ([\S]+) add unique (index|key) ([\S]+) (.+)`, `create unique index ${3}_${1} on $1 $4`), - rmap(`(?i)([\S]+) enum[\s]*([(].*?[)])`, `$1 text check($1 in $2)`), - rmap(`(?i)([\s\S]+[/][*] sqlite3-skip [*][/][\s\S]+)`, ``), - rmap(`(?i)timestamp default current_timestamp`, `timestamp default ('')`), - rmap(`(?i)timestamp not null default current_timestamp`, `timestamp not null default ('')`), - - rmap(`(?i)add column (.*int) not null[\s]*$`, `add column $1 not null default 0`), - rmap(`(?i)add column (.* text) not null[\s]*$`, `add column $1 not null default ''`), - rmap(`(?i)add column (.* varchar.*) not null[\s]*$`, `add column $1 not null default ''`), -} - -var sqlite3InsertConversions = []regexpMap{ - rmap(`(?i)insert ignore ([\s\S]+) on duplicate key update [\s\S]+`, `insert or ignore $1`), - rmap(`(?i)insert ignore`, `insert or ignore`), - rmap(`(?i)now[(][)]`, `datetime('now')`), - rmap(`(?i)insert into ([\s\S]+) on duplicate key update [\s\S]+`, `replace into $1`), -} - -var sqlite3GeneralConversions = []regexpMap{ - rmap(`(?i)now[(][)][\s]*[-][\s]*interval [?] ([\w]+)`, `datetime('now', printf('-%d $1', ?))`), - rmap(`(?i)now[(][)][\s]*[+][\s]*interval [?] ([\w]+)`, `datetime('now', printf('+%d $1', ?))`), - rmap(`(?i)now[(][)][\s]*[-][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '-${1} $2')`), - rmap(`(?i)now[(][)][\s]*[+][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '+${1} $2')`), - - rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[-][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('-%d $2', ?))`), - rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[+][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('+%d $2', ?))`), - - rmap(`(?i)unix_timestamp[(][)]`, `strftime('%s', 'now')`), - rmap(`(?i)unix_timestamp[(]([^)]+)[)]`, `strftime('%s', $1)`), - rmap(`(?i)now[(][)]`, `datetime('now')`), - rmap(`(?i)cast[(][\s]*([\S]+) as signed[\s]*[)]`, `cast($1 as integer)`), - - rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2)`), - rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2 || $3)`), - - rmap(`(?i) rlike `, ` like `), - - rmap(`(?i)create index([\s\S]+)[(][\s]*[0-9]+[\s]*[)]([\s\S]+)`, `create index ${1}${2}`), - rmap(`(?i)drop index ([\S]+) on ([\S]+)`, `drop index if exists $1`), -} - -var ( - sqlite3IdentifyCreateTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create table`)) - sqlite3IdentifyCreateIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create( unique|) index`)) - sqlite3IdentifyDropIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*drop index`)) - sqlite3IdentifyAlterTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*alter table`)) - sqlite3IdentifyInsertStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*(insert|replace)`)) -) - -func IsInsert(statement string) bool { - return sqlite3IdentifyInsertStatement.MatchString(statement) -} - -func IsCreateTable(statement string) bool { - return sqlite3IdentifyCreateTableStatement.MatchString(statement) -} - -func IsCreateIndex(statement string) bool { - return sqlite3IdentifyCreateIndexStatement.MatchString(statement) -} - -func IsDropIndex(statement string) bool { - return sqlite3IdentifyDropIndexStatement.MatchString(statement) -} - -func IsAlterTable(statement string) bool { - return sqlite3IdentifyAlterTableStatement.MatchString(statement) -} - -func ToSqlite3CreateTable(statement string) string { - return applyConversions(statement, sqlite3CreateTableConversions) -} - -func ToSqlite3Insert(statement string) string { - return applyConversions(statement, sqlite3InsertConversions) -} - -func ToSqlite3Dialect(statement string) (translated string) { - if IsCreateTable(statement) { - return ToSqlite3CreateTable(statement) - } - if IsAlterTable(statement) { - return ToSqlite3CreateTable(statement) - } - statement = applyConversions(statement, sqlite3GeneralConversions) - if IsInsert(statement) { - return ToSqlite3Insert(statement) - } - return statement -} diff --git a/go/vt/external/golib/sqlutils/sqlite_dialect_test.go b/go/vt/external/golib/sqlutils/sqlite_dialect_test.go deleted file mode 100644 index 1298c379adf..00000000000 --- a/go/vt/external/golib/sqlutils/sqlite_dialect_test.go +++ /dev/null @@ -1,314 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -package sqlutils - -import ( - "regexp" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var spacesRegexp = regexp.MustCompile(`[\s]+`) - -func init() { -} - -func stripSpaces(statement string) string { - statement = strings.TrimSpace(statement) - statement = spacesRegexp.ReplaceAllString(statement, " ") - return statement -} - -func TestIsCreateTable(t *testing.T) { - require.True(t, IsCreateTable("create table t(id int)")) - require.True(t, IsCreateTable(" create table t(id int)")) - require.True(t, IsCreateTable("CREATE TABLE t(id int)")) - require.True(t, IsCreateTable(` - create table t(id int) - `)) - require.False(t, IsCreateTable("where create table t(id int)")) - require.False(t, IsCreateTable("insert")) -} - -func TestToSqlite3CreateTable(t *testing.T) { - { - statement := "create table t(id int)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, statement) - } - { - statement := "create table t(id int, v varchar(123) CHARACTER SET ascii NOT NULL default '')" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(id int, v varchar(123) NOT NULL default '')") - } - { - statement := "create table t(id int, v varchar ( 123 ) CHARACTER SET ascii NOT NULL default '')" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(id int, v varchar ( 123 ) NOT NULL default '')") - } - { - statement := "create table t(i smallint unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } - { - statement := "create table t(i smallint(5) unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } - { - statement := "create table t(i smallint ( 5 ) unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } -} - -func TestToSqlite3AlterTable(t *testing.T) { - { - statement := ` - ALTER TABLE - database_instance - ADD COLUMN sql_delay INT UNSIGNED NOT NULL AFTER replica_lag_seconds - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - ALTER TABLE - database_instance - add column sql_delay int not null default 0 - `)) - } - { - statement := ` - ALTER TABLE - database_instance - ADD INDEX source_host_port_idx (source_host, source_port) - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - create index - source_host_port_idx_database_instance - on database_instance (source_host, source_port) - `)) - } - { - statement := ` - ALTER TABLE - topology_recovery - ADD KEY last_detection_idx (last_detection_id) - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - create index - last_detection_idx_topology_recovery - on topology_recovery (last_detection_id) - `)) - } - -} - -func TestCreateIndex(t *testing.T) { - { - statement := ` - create index - source_host_port_idx_database_instance - on database_instance (source_host(128), source_port) - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - create index - source_host_port_idx_database_instance - on database_instance (source_host, source_port) - `)) - } -} - -func TestIsInsert(t *testing.T) { - require.True(t, IsInsert("insert into t")) - require.True(t, IsInsert("insert ignore into t")) - require.True(t, IsInsert(` - insert ignore into t - `)) - require.False(t, IsInsert("where create table t(id int)")) - require.False(t, IsInsert("create table t(id int)")) - require.True(t, IsInsert(` - insert into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - on duplicate key update - domain_name=values(domain_name), - last_registered=values(last_registered) - `)) -} - -func TestToSqlite3Insert(t *testing.T) { - { - statement := ` - insert into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - on duplicate key update - domain_name=values(domain_name), - last_registered=values(last_registered) - ` - result := stripSpaces(ToSqlite3Dialect(statement)) - require.Equal(t, result, stripSpaces(` - replace into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - `)) - } -} - -func TestToSqlite3GeneralConversions(t *testing.T) { - { - statement := "select now()" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime('now')") - } - { - statement := "select now() - interval ? second" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime('now', printf('-%d second', ?))") - } - { - statement := "select now() + interval ? minute" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime('now', printf('+%d minute', ?))") - } - { - statement := "select now() + interval 5 minute" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime('now', '+5 minute')") - } - { - statement := "select some_table.some_column + interval ? minute" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select datetime(some_table.some_column, printf('+%d minute', ?))") - } - { - statement := "AND primary_instance.last_attempted_check <= primary_instance.last_seen + interval ? minute" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "AND primary_instance.last_attempted_check <= datetime(primary_instance.last_seen, printf('+%d minute', ?))") - } - { - statement := "select concat(primary_instance.port, '') as port" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select (primary_instance.port || '') as port") - } - { - statement := "select concat( 'abc' , 'def') as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select ('abc' || 'def') as s") - } - { - statement := "select concat( 'abc' , 'def', last.col) as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select ('abc' || 'def' || last.col) as s") - } - { - statement := "select concat(myself.only) as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select concat(myself.only) as s") - } - { - statement := "select concat(1, '2', 3, '4') as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select concat(1, '2', 3, '4') as s") - } - { - statement := "select group_concat( 'abc' , 'def') as s" - result := ToSqlite3Dialect(statement) - require.Equal(t, result, "select group_concat( 'abc' , 'def') as s") - } -} - -func TestIsCreateIndex(t *testing.T) { - tests := []struct { - input string - expected bool - }{ - {"create index my_index on my_table(column);", true}, - {"CREATE INDEX my_index ON my_table(column);", true}, - {"create unique index my_index on my_table(column);", true}, - {"CREATE UNIQUE INDEX my_index ON my_table(column);", true}, - {"create index my_index on my_table(column) where condition;", true}, - {"create unique index my_index on my_table(column) where condition;", true}, - {"create table my_table(column);", false}, - {"drop index my_index on my_table;", false}, - {"alter table my_table add index my_index (column);", false}, - {"", false}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := IsCreateIndex(test.input) - assert.Equal(t, test.expected, result) - }) - } -} - -func TestIsDropIndex(t *testing.T) { - tests := []struct { - input string - expected bool - }{ - {"drop index my_index on my_table;", true}, - {"DROP INDEX my_index ON my_table;", true}, - {"drop index if exists my_index on my_table;", true}, - {"DROP INDEX IF EXISTS my_index ON my_table;", true}, - {"drop table my_table;", false}, - {"create index my_index on my_table(column);", false}, - {"alter table my_table add index my_index (column);", false}, - {"", false}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := IsDropIndex(test.input) - assert.Equal(t, test.expected, result) - }) - } -} - -func TestToSqlite3Dialect(t *testing.T) { - tests := []struct { - input string - expected string - }{ - {"create table my_table(id int);", "create table my_table(id int);"}, - {"alter table my_table add column new_col int;", "alter table my_table add column new_col int;"}, - {"insert into my_table values (1);", "insert into my_table values (1);"}, - {"", ""}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := ToSqlite3Dialect(test.input) - assert.Equal(t, test.expected, result) - }) - } -} diff --git a/go/vt/vtorc/db/db.go b/go/vt/vtorc/db/db.go index 00f5b5b2550..64143477645 100644 --- a/go/vt/vtorc/db/db.go +++ b/go/vt/vtorc/db/db.go @@ -18,7 +18,6 @@ package db import ( "database/sql" - "strings" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" @@ -57,17 +56,13 @@ func OpenVTOrc() (db *sql.DB, err error) { return db, err } -func translateStatement(statement string) string { - return sqlutils.ToSqlite3Dialect(statement) -} - // registerVTOrcDeployment updates the vtorc_db_deployments table upon successful deployment func registerVTOrcDeployment(db *sql.DB) error { query := ` replace into vtorc_db_deployments ( deployed_version, deployed_timestamp ) values ( - ?, NOW() + ?, datetime('now') ) ` if _, err := execInternal(db, query, ""); err != nil { @@ -82,25 +77,12 @@ func deployStatements(db *sql.DB, queries []string) error { tx, err := db.Begin() if err != nil { log.Fatal(err.Error()) + return err } for _, query := range queries { - query = translateStatement(query) if _, err := tx.Exec(query); err != nil { - if strings.Contains(err.Error(), "syntax error") { - log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) - return err - } - if !sqlutils.IsAlterTable(query) && !sqlutils.IsCreateIndex(query) && !sqlutils.IsDropIndex(query) { - log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) - return err - } - if !strings.Contains(err.Error(), "duplicate column name") && - !strings.Contains(err.Error(), "Duplicate column name") && - !strings.Contains(err.Error(), "check that column/key exists") && - !strings.Contains(err.Error(), "already exists") && - !strings.Contains(err.Error(), "Duplicate key name") { - log.Errorf("Error initiating vtorc: %+v; query=%+v", err, query) - } + log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) + return err } } if err := tx.Commit(); err != nil { @@ -135,7 +117,6 @@ func initVTOrcDB(db *sql.DB) error { // execInternal func execInternal(db *sql.DB, query string, args ...any) (sql.Result, error) { var err error - query = translateStatement(query) res, err := sqlutils.ExecNoPrepare(db, query, args...) return res, err } @@ -151,7 +132,6 @@ func ExecVTOrc(query string, args ...any) (sql.Result, error) { // QueryVTOrcRowsMap func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { - query = translateStatement(query) db, err := OpenVTOrc() if err != nil { return err @@ -162,7 +142,6 @@ func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { // QueryVTOrc func QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error { - query = translateStatement(query) db, err := OpenVTOrc() if err != nil { return err diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 99df358e330..25d93a6864b 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -91,13 +91,13 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna IFNULL( primary_instance.binary_log_file = database_instance_stale_binlog_coordinates.binary_log_file AND primary_instance.binary_log_pos = database_instance_stale_binlog_coordinates.binary_log_pos - AND database_instance_stale_binlog_coordinates.first_seen < NOW() - interval ? second, + AND database_instance_stale_binlog_coordinates.first_seen < datetime('now', printf('-%d second', ?)), 0 ) ) AS is_stale_binlog_coordinates, MIN( primary_instance.last_checked <= primary_instance.last_seen - and primary_instance.last_attempted_check <= primary_instance.last_seen + interval ? second + and primary_instance.last_attempted_check <= datetime(primary_instance.last_seen, printf('+%d second', ?)) ) = 1 AS is_last_check_valid, /* To be considered a primary, traditional async replication must not be present/valid AND the host should either */ /* not be a replication group member OR be the primary of the replication group */ @@ -658,7 +658,7 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC sqlResult, err := db.ExecVTOrc(` update database_instance_last_analysis set analysis = ?, - analysis_timestamp = now() + analysis_timestamp = datetime('now') where alias = ? and analysis != ? @@ -683,10 +683,10 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC if !lastAnalysisChanged { // The insert only returns more than 1 row changed if this is the first insertion. sqlResult, err := db.ExecVTOrc(` - insert ignore into database_instance_last_analysis ( + insert or ignore into database_instance_last_analysis ( alias, analysis_timestamp, analysis ) values ( - ?, now(), ? + ?, datetime('now'), ? ) `, tabletAlias, string(analysisCode), @@ -712,7 +712,7 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC insert into database_instance_analysis_changelog ( alias, analysis_timestamp, analysis ) values ( - ?, now(), ? + ?, datetime('now'), ? ) `, tabletAlias, string(analysisCode), @@ -731,7 +731,7 @@ func ExpireInstanceAnalysisChangelog() error { delete from database_instance_analysis_changelog where - analysis_timestamp < now() - interval ? hour + analysis_timestamp < datetime('now', printf('-%d hour', ?)) `, config.UnseenInstanceForgetHours, ) diff --git a/go/vt/vtorc/inst/audit_dao.go b/go/vt/vtorc/inst/audit_dao.go index d048f300faf..642fb187509 100644 --- a/go/vt/vtorc/inst/audit_dao.go +++ b/go/vt/vtorc/inst/audit_dao.go @@ -60,7 +60,7 @@ func AuditOperation(auditType string, tabletAlias string, message string) error into audit ( audit_timestamp, audit_type, alias, keyspace, shard, message ) VALUES ( - NOW(), ?, ?, ?, ?, ? + datetime('now'), ?, ?, ?, ?, ? ) `, auditType, diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index ec0288cc423..bd4438dd05f 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -115,7 +115,7 @@ func ExecDBWriteFunc(f func() error) error { func ExpireTableData(tableName string, timestampColumn string) error { writeFunc := func() error { _, err := db.ExecVTOrc( - fmt.Sprintf("delete from %s where %s < NOW() - INTERVAL ? DAY", tableName, timestampColumn), + fmt.Sprintf("delete from %s where %s < datetime('now', printf('-%%d DAY', ?))", tableName, timestampColumn), config.Config.AuditPurgeDays, ) return err @@ -583,9 +583,9 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In query := fmt.Sprintf(` select *, - unix_timestamp() - unix_timestamp(last_checked) as seconds_since_last_checked, + strftime('%%s', 'now') - strftime('%%s', last_checked) as seconds_since_last_checked, ifnull(last_checked <= last_seen, 0) as is_last_check_valid, - unix_timestamp() - unix_timestamp(last_seen) as seconds_since_last_seen + strftime('%%s', 'now') - strftime('%%s', last_seen) as seconds_since_last_seen from vitess_tablet left join database_instance using (alias, hostname, port) @@ -637,11 +637,11 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error) and shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) and ( (last_seen < last_checked) - or (unix_timestamp() - unix_timestamp(last_checked) > ?) + or (strftime('%%s', 'now') - strftime('%%s', last_checked) > ?) or (replication_sql_thread_state not in (-1 ,1)) or (replication_io_thread_state not in (-1 ,1)) - or (abs(cast(replication_lag_seconds as signed) - cast(sql_delay as signed)) > ?) - or (abs(cast(replica_lag_seconds as signed) - cast(sql_delay as signed)) > ?) + or (abs(cast(replication_lag_seconds as integer) - cast(sql_delay as integer)) > ?) + or (abs(cast(replica_lag_seconds as integer) - cast(sql_delay as integer)) > ?) or (gtid_errant != '') ) ` @@ -703,8 +703,8 @@ func ReadOutdatedInstanceKeys() ([]string, error) { WHERE CASE WHEN last_attempted_check <= last_checked - THEN last_checked < now() - interval ? second - ELSE last_checked < now() - interval ? second + THEN last_checked < datetime('now', printf('-%d second', ?)) + ELSE last_checked < datetime('now', printf('-%d second', ?)) END UNION SELECT @@ -733,7 +733,7 @@ func ReadOutdatedInstanceKeys() ([]string, error) { return res, err } -func mkInsertOdku(table string, columns []string, values []string, nrRows int, insertIgnore bool) (string, error) { +func mkInsert(table string, columns []string, values []string, nrRows int, insertIgnore bool) (string, error) { if len(columns) == 0 { return "", errors.New("Column list cannot be empty") } @@ -745,9 +745,9 @@ func mkInsertOdku(table string, columns []string, values []string, nrRows int, i } var q strings.Builder - var ignore string + insertStr := "REPLACE INTO" if insertIgnore { - ignore = "ignore" + insertStr = "INSERT OR IGNORE INTO" } valRow := fmt.Sprintf("(%s)", strings.Join(values, ", ")) var val strings.Builder @@ -758,26 +758,17 @@ func mkInsertOdku(table string, columns []string, values []string, nrRows int, i } col := strings.Join(columns, ", ") - var odku strings.Builder - odku.WriteString(fmt.Sprintf("%s=VALUES(%s)", columns[0], columns[0])) - for _, c := range columns[1:] { - odku.WriteString(", ") - odku.WriteString(fmt.Sprintf("%s=VALUES(%s)", c, c)) - } - - q.WriteString(fmt.Sprintf(`INSERT %s INTO %s + q.WriteString(fmt.Sprintf(`%s %s (%s) VALUES %s - ON DUPLICATE KEY UPDATE - %s `, - ignore, table, col, val.String(), odku.String())) + insertStr, table, col, val.String())) return q.String(), nil } -func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bool, updateLastSeen bool) (string, []any, error) { +func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, updateLastSeen bool) (string, []any, error) { if len(instances) == 0 { return "", nil, nil } @@ -858,13 +849,13 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo for i := range columns { values[i] = "?" } - values[3] = "NOW()" // last_checked - values[4] = "NOW()" // last_attempted_check - values[5] = "1" // last_check_partial_success + values[3] = "datetime('now')" // last_checked + values[4] = "datetime('now')" // last_attempted_check + values[5] = "1" // last_check_partial_success if updateLastSeen { columns = append(columns, "last_seen") - values = append(values, "NOW()") + values = append(values, "datetime('now')") } var args []any @@ -935,7 +926,7 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo args = append(args, instance.LastDiscoveryLatency.Nanoseconds()) } - sql, err := mkInsertOdku("database_instance", columns, values, len(instances), insertIgnore) + sql, err := mkInsert("database_instance", columns, values, len(instances), insertIgnore) if err != nil { errMsg := fmt.Sprintf("Failed to build query: %v", err) log.Errorf(errMsg) @@ -957,7 +948,7 @@ func writeManyInstances(instances []*Instance, instanceWasActuallyFound bool, up if len(writeInstances) == 0 { return nil // nothing to write } - sql, args, err := mkInsertOdkuForInstances(writeInstances, instanceWasActuallyFound, updateLastSeen) + sql, args, err := mkInsertForInstances(writeInstances, instanceWasActuallyFound, updateLastSeen) if err != nil { return err } @@ -984,7 +975,7 @@ func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error { update database_instance set - last_checked = NOW(), + last_checked = datetime('now'), last_check_partial_success = ? where alias = ?`, @@ -1013,7 +1004,7 @@ func UpdateInstanceLastAttemptedCheck(tabletAlias string) error { update database_instance set - last_attempted_check = NOW() + last_attempted_check = datetime('now') where alias = ?`, tabletAlias, @@ -1091,7 +1082,7 @@ func ForgetLongUnseenInstances() error { delete from database_instance where - last_seen < NOW() - interval ? hour`, + last_seen < datetime('now', printf('-%d hour', ?))`, config.UnseenInstanceForgetHours, ) if err != nil { @@ -1113,11 +1104,11 @@ func ForgetLongUnseenInstances() error { func SnapshotTopologies() error { writeFunc := func() error { _, err := db.ExecVTOrc(` - insert ignore into + insert or ignore into database_instance_topology_history (snapshot_unix_timestamp, alias, hostname, port, source_host, source_port, keyspace, shard, version) select - UNIX_TIMESTAMP(NOW()), + strftime('%s', 'now'), vitess_tablet.alias, vitess_tablet.hostname, vitess_tablet.port, database_instance.source_host, database_instance.source_port, vitess_tablet.keyspace, vitess_tablet.shard, database_instance.version @@ -1143,7 +1134,7 @@ func ExpireStaleInstanceBinlogCoordinates() error { writeFunc := func() error { _, err := db.ExecVTOrc(` delete from database_instance_stale_binlog_coordinates - where first_seen < NOW() - INTERVAL ? SECOND + where first_seen < datetime('now', printf('-%d second', ?)) `, expireSeconds, ) if err != nil { diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index 741fc48bca9..2416c1abb90 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -49,57 +49,48 @@ func mkTestInstances() []*Instance { return instances } -func TestMkInsertOdkuSingle(t *testing.T) { +func TestMkInsertSingle(t *testing.T) { instances := mkTestInstances() - sql, args, err := mkInsertOdkuForInstances(nil, true, true) + sql, args, err := mkInsertForInstances(nil, true, true) require.NoError(t, err) require.Equal(t, sql, "") require.Equal(t, len(args), 0) // one instance - s1 := `INSERT ignore INTO database_instance + s1 := `INSERT OR IGNORE INTO database_instance (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) VALUES - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()) - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_net_timeout=VALUES(replica_net_timeout), heartbeat_interval=VALUES(heartbeat_interval), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), - semi_sync_enforced=VALUES(semi_sync_enforced), semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) + (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ` a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,` - sql1, args1, err := mkInsertOdkuForInstances(instances[:1], false, true) + sql1, args1, err := mkInsertForInstances(instances[:1], false, true) require.NoError(t, err) require.Equal(t, normalizeQuery(sql1), normalizeQuery(s1)) require.Equal(t, stripSpaces(fmtArgs(args1)), stripSpaces(a1)) } -func TestMkInsertOdkuThree(t *testing.T) { +func TestMkInsertThree(t *testing.T) { instances := mkTestInstances() // three instances - s3 := `INSERT INTO database_instance + s3 := `REPLACE INTO database_instance (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) VALUES - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()), - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()), - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()) - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_net_timeout=VALUES(replica_net_timeout), heartbeat_interval=VALUES(heartbeat_interval), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), - physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), semi_sync_enforced=VALUES(semi_sync_enforced), - semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) + (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), + (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), + (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ` a3 := ` zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, @@ -107,7 +98,7 @@ func TestMkInsertOdkuThree(t *testing.T) { zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, ` - sql3, args3, err := mkInsertOdkuForInstances(instances[:3], true, true) + sql3, args3, err := mkInsertForInstances(instances[:3], true, true) require.NoError(t, err) require.Equal(t, normalizeQuery(sql3), normalizeQuery(s3)) require.Equal(t, stripSpaces(fmtArgs(args3)), stripSpaces(a3)) @@ -437,27 +428,27 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { }{ { name: "No problems", - sql: []string{"update database_instance set last_checked = now()"}, + sql: []string{"update database_instance set last_checked = datetime('now')"}, instancesRequired: nil, }, { name: "One instance is outdated", sql: []string{ - "update database_instance set last_checked = now()", - "update database_instance set last_checked = datetime(now(), '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = datetime('now')", + "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", }, instancesRequired: []string{"zone1-0000000100"}, }, { name: "One instance doesn't have myql data", sql: []string{ - "update database_instance set last_checked = now()", + "update database_instance set last_checked = datetime('now')", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103"}, }, { name: "One instance doesn't have myql data and one is outdated", sql: []string{ - "update database_instance set last_checked = now()", - "update database_instance set last_checked = datetime(now(), '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = datetime('now')", + "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103", "zone1-0000000100"}, @@ -494,10 +485,10 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { errInDataCollection := db.QueryVTOrcRowsMap(`select alias, last_checked, last_attempted_check, -ROUND((JULIANDAY(now()) - JULIANDAY(last_checked)) * 86400) AS difference, +ROUND((JULIANDAY(datetime('now')) - JULIANDAY(last_checked)) * 86400) AS difference, last_attempted_check <= last_checked as use1, -last_checked < now() - interval 1500 second as is_outdated1, -last_checked < now() - interval 3000 second as is_outdated2 +last_checked < datetime('now', '-1500 second') as is_outdated1, +last_checked < datetime('now', '-3000 second') as is_outdated2 from database_instance`, func(rowMap sqlutils.RowMap) error { log.Errorf("Row in database_instance - %+v", rowMap) return nil @@ -521,12 +512,12 @@ func TestUpdateInstanceLastChecked(t *testing.T) { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", partialSuccess: false, - conditionToCheck: "last_checked >= now() - interval 30 second and last_check_partial_success = false", + conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = false", }, { name: "Verify partial success", tabletAlias: "zone1-0000000100", partialSuccess: true, - conditionToCheck: "last_checked >= now() - interval 30 second and last_check_partial_success = true", + conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = true", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", @@ -572,7 +563,7 @@ func TestUpdateInstanceLastAttemptedCheck(t *testing.T) { { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", - conditionToCheck: "last_attempted_check >= now() - interval 30 second", + conditionToCheck: "last_attempted_check >= datetime('now', '-30 second')", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", @@ -746,8 +737,8 @@ func TestExpireTableData(t *testing.T) { timestampColumn: "audit_timestamp", expectedRowCount: 1, insertQuery: `insert into audit (audit_id, audit_timestamp, audit_type, alias, message, keyspace, shard) values -(1, NOW() - INTERVAL 50 DAY, 'a','a','a','a','a'), -(2, NOW() - INTERVAL 5 DAY, 'a','a','a','a','a')`, +(1, datetime('now', '-50 DAY'), 'a','a','a','a','a'), +(2, datetime('now', '-5 DAY'), 'a','a','a','a','a')`, }, { name: "ExpireRecoveryDetectionHistory", @@ -755,9 +746,9 @@ func TestExpireTableData(t *testing.T) { timestampColumn: "detection_timestamp", expectedRowCount: 2, insertQuery: `insert into recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) values -(1, NOW() - INTERVAL 3 DAY,'a','a','a','a'), -(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), -(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, +(1, datetime('now', '-3 DAY'),'a','a','a','a'), +(2, datetime('now', '-5 DAY'),'a','a','a','a'), +(3, datetime('now', '-15 DAY'),'a','a','a','a')`, }, } for _, tt := range tests { diff --git a/go/vt/vtorc/logic/disable_recovery.go b/go/vt/vtorc/logic/disable_recovery.go index 74aa291e17a..60650798876 100644 --- a/go/vt/vtorc/logic/disable_recovery.go +++ b/go/vt/vtorc/logic/disable_recovery.go @@ -64,7 +64,7 @@ func IsRecoveryDisabled() (disabled bool, err error) { // DisableRecovery ensures recoveries are disabled globally func DisableRecovery() error { _, err := db.ExecVTOrc(` - INSERT IGNORE INTO global_recovery_disable + INSERT OR IGNORE INTO global_recovery_disable (disable_recovery) VALUES (1) `, diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 1920da4dcd8..730e6b2a158 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -31,7 +31,7 @@ import ( // InsertRecoveryDetection inserts the recovery analysis that has been detected. func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error { sqlResult, err := db.ExecVTOrc(` - insert ignore + insert or ignore into recovery_detection ( alias, analysis, @@ -43,7 +43,7 @@ func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error { ?, ?, ?, - now() + datetime('now') )`, analysisEntry.AnalyzedInstanceAlias, string(analysisEntry.Analysis), @@ -66,7 +66,7 @@ func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error { func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecovery, error) { analysisEntry := topologyRecovery.AnalysisEntry sqlResult, err := db.ExecVTOrc(` - insert ignore + insert or ignore into topology_recovery ( recovery_id, alias, @@ -78,7 +78,7 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover ) values ( ?, ?, - NOW(), + datetime('now'), ?, ?, ?, @@ -143,7 +143,7 @@ func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { is_successful = ?, successor_alias = ?, all_errors = ?, - end_recovery = NOW() + end_recovery = datetime('now') where recovery_id = ? `, topologyRecovery.IsSuccessful, @@ -237,10 +237,10 @@ func ReadRecentRecoveries(page int) ([]*TopologyRecovery, error) { // writeTopologyRecoveryStep writes down a single step in a recovery process func writeTopologyRecoveryStep(topologyRecoveryStep *TopologyRecoveryStep) error { sqlResult, err := db.ExecVTOrc(` - insert ignore + insert or ignore into topology_recovery_steps ( recovery_step_id, recovery_id, audit_at, message - ) values (?, ?, now(), ?) + ) values (?, ?, datetime('now'), ?) `, sqlutils.NilIfZero(topologyRecoveryStep.ID), topologyRecoveryStep.RecoveryID, topologyRecoveryStep.Message, ) if err != nil { diff --git a/go/vt/vtorc/logic/topology_recovery_dao_test.go b/go/vt/vtorc/logic/topology_recovery_dao_test.go index 354af82e2b3..20dfb7e91e2 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao_test.go +++ b/go/vt/vtorc/logic/topology_recovery_dao_test.go @@ -88,9 +88,9 @@ func TestExpireTableData(t *testing.T) { tableName: "recovery_detection", expectedRowCount: 2, insertQuery: `insert into recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) values -(1, NOW() - INTERVAL 3 DAY,'a','a','a','a'), -(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), -(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, +(1, datetime('now', '-3 DAY'),'a','a','a','a'), +(2, datetime('now', '-5 DAY'),'a','a','a','a'), +(3, datetime('now', '-15 DAY'),'a','a','a','a')`, expireFunc: ExpireRecoveryDetectionHistory, }, { @@ -98,9 +98,9 @@ func TestExpireTableData(t *testing.T) { tableName: "topology_recovery", expectedRowCount: 1, insertQuery: `insert into topology_recovery (recovery_id, start_recovery, alias, analysis, keyspace, shard) values -(1, NOW() - INTERVAL 13 DAY,'a','a','a','a'), -(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), -(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, +(1, datetime('now', '-13 DAY'),'a','a','a','a'), +(2, datetime('now', '-5 DAY'),'a','a','a','a'), +(3, datetime('now', '-15 DAY'),'a','a','a','a')`, expireFunc: ExpireTopologyRecoveryHistory, }, { @@ -108,9 +108,9 @@ func TestExpireTableData(t *testing.T) { tableName: "topology_recovery_steps", expectedRowCount: 1, insertQuery: `insert into topology_recovery_steps (recovery_step_id, audit_at, recovery_id, message) values -(1, NOW() - INTERVAL 13 DAY, 1, 'a'), -(2, NOW() - INTERVAL 5 DAY, 2, 'a'), -(3, NOW() - INTERVAL 15 DAY, 3, 'a')`, +(1, datetime('now', '-13 DAY'), 1, 'a'), +(2, datetime('now', '-5 DAY'), 2, 'a'), +(3, datetime('now', '-15 DAY'), 3, 'a')`, expireFunc: ExpireTopologyRecoveryStepsHistory, }, } diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 86101d6c5c0..87a11733f66 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -40,7 +40,7 @@ func writeHealthToDatabase() bool { log.Error(err) return false } - sqlResult, err := db.ExecVTOrc(`insert into node_health (last_seen_active) values (now())`) + sqlResult, err := db.ExecVTOrc(`insert into node_health (last_seen_active) values (datetime('now'))`) if err != nil { log.Error(err) return false