From 69b381fe185b28bf6ed68eba63821f169d64559c Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 26 Nov 2024 12:39:05 +0100 Subject: [PATCH] `vtorc`: make SQL formatting consistent (#17154) Signed-off-by: Tim Vaillancourt Signed-off-by: Manan Gupta Co-authored-by: Manan Gupta --- go/vt/vtorc/db/db.go | 18 +- go/vt/vtorc/inst/analysis_dao.go | 62 ++--- go/vt/vtorc/inst/audit_dao.go | 24 +- go/vt/vtorc/inst/instance_dao.go | 276 +++++++++++---------- go/vt/vtorc/inst/instance_dao_test.go | 46 ++-- go/vt/vtorc/inst/tablet_dao.go | 44 ++-- go/vt/vtorc/logic/disable_recovery.go | 33 ++- go/vt/vtorc/logic/tablet_discovery.go | 2 +- go/vt/vtorc/logic/topology_recovery_dao.go | 157 ++++++------ go/vt/vtorc/process/health.go | 9 +- 10 files changed, 358 insertions(+), 313 deletions(-) diff --git a/go/vt/vtorc/db/db.go b/go/vt/vtorc/db/db.go index 64143477645..4bb22b65b5a 100644 --- a/go/vt/vtorc/db/db.go +++ b/go/vt/vtorc/db/db.go @@ -58,13 +58,13 @@ func OpenVTOrc() (db *sql.DB, err error) { // registerVTOrcDeployment updates the vtorc_db_deployments table upon successful deployment func registerVTOrcDeployment(db *sql.DB) error { - query := ` - replace into vtorc_db_deployments ( - deployed_version, deployed_timestamp - ) values ( - ?, datetime('now') - ) - ` + query := `REPLACE INTO vtorc_db_deployments ( + deployed_version, + deployed_timestamp + ) VALUES ( + ?, + DATETIME('now') + )` if _, err := execInternal(db, query, ""); err != nil { log.Fatalf("Unable to write to vtorc_db_deployments: %+v", err) } @@ -116,9 +116,7 @@ func initVTOrcDB(db *sql.DB) error { // execInternal func execInternal(db *sql.DB, query string, args ...any) (sql.Result, error) { - var err error - res, err := sqlutils.ExecNoPrepare(db, query, args...) - return res, err + return sqlutils.ExecNoPrepare(db, query, args...) } // ExecVTOrc will execute given query on the vtorc backend database. diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 25d93a6864b..e44538e694c 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -69,8 +69,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna // TODO(sougou); deprecate ReduceReplicationAnalysisCount args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, keyspace, shard) - query := ` - SELECT + query := `SELECT vitess_tablet.info AS tablet_info, vitess_tablet.tablet_type, vitess_tablet.primary_timestamp, @@ -91,13 +90,13 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna IFNULL( primary_instance.binary_log_file = database_instance_stale_binlog_coordinates.binary_log_file AND primary_instance.binary_log_pos = database_instance_stale_binlog_coordinates.binary_log_pos - AND database_instance_stale_binlog_coordinates.first_seen < datetime('now', printf('-%d second', ?)), + AND database_instance_stale_binlog_coordinates.first_seen < DATETIME('now', PRINTF('-%d SECOND', ?)), 0 ) ) AS is_stale_binlog_coordinates, MIN( primary_instance.last_checked <= primary_instance.last_seen - and primary_instance.last_attempted_check <= datetime(primary_instance.last_seen, printf('+%d second', ?)) + and primary_instance.last_attempted_check <= DATETIME(primary_instance.last_seen, PRINTF('+%d SECOND', ?)) ) = 1 AS is_last_check_valid, /* To be considered a primary, traditional async replication must not be present/valid AND the host should either */ /* not be a replication group member OR be the primary of the replication group */ @@ -655,13 +654,13 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC // Find if the lastAnalysisHasChanged or not while updating the row if it has. lastAnalysisChanged := false { - sqlResult, err := db.ExecVTOrc(` - update database_instance_last_analysis set + sqlResult, err := db.ExecVTOrc(`UPDATE database_instance_last_analysis + SET analysis = ?, - analysis_timestamp = datetime('now') - where + analysis_timestamp = DATETIME('now') + WHERE alias = ? - and analysis != ? + AND analysis != ? `, string(analysisCode), tabletAlias, string(analysisCode), ) @@ -682,13 +681,16 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC firstInsertion := false if !lastAnalysisChanged { // The insert only returns more than 1 row changed if this is the first insertion. - sqlResult, err := db.ExecVTOrc(` - insert or ignore into database_instance_last_analysis ( - alias, analysis_timestamp, analysis - ) values ( - ?, datetime('now'), ? - ) - `, + sqlResult, err := db.ExecVTOrc(`INSERT OR IGNORE + INTO database_instance_last_analysis ( + alias, + analysis_timestamp, + analysis + ) VALUES ( + ?, + DATETIME('now'), + ? + )`, tabletAlias, string(analysisCode), ) if err != nil { @@ -708,13 +710,16 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC return nil } - _, err := db.ExecVTOrc(` - insert into database_instance_analysis_changelog ( - alias, analysis_timestamp, analysis - ) values ( - ?, datetime('now'), ? - ) - `, + _, err := db.ExecVTOrc(`INSERT + INTO database_instance_analysis_changelog ( + alias, + analysis_timestamp, + analysis + ) VALUES ( + ?, + DATETIME('now'), + ? + )`, tabletAlias, string(analysisCode), ) if err == nil { @@ -727,12 +732,11 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC // ExpireInstanceAnalysisChangelog removes old-enough analysis entries from the changelog func ExpireInstanceAnalysisChangelog() error { - _, err := db.ExecVTOrc(` - delete - from database_instance_analysis_changelog - where - analysis_timestamp < datetime('now', printf('-%d hour', ?)) - `, + _, err := db.ExecVTOrc(`DELETE + FROM database_instance_analysis_changelog + WHERE + analysis_timestamp < DATETIME('now', PRINTF('-%d HOUR', ?)) + `, config.UnseenInstanceForgetHours, ) if err != nil { diff --git a/go/vt/vtorc/inst/audit_dao.go b/go/vt/vtorc/inst/audit_dao.go index 642fb187509..cbfd771e81c 100644 --- a/go/vt/vtorc/inst/audit_dao.go +++ b/go/vt/vtorc/inst/audit_dao.go @@ -55,14 +55,22 @@ func AuditOperation(auditType string, tabletAlias string, message string) error }() } if config.Config.AuditToBackendDB { - _, err := db.ExecVTOrc(` - insert - into audit ( - audit_timestamp, audit_type, alias, keyspace, shard, message - ) VALUES ( - datetime('now'), ?, ?, ?, ?, ? - ) - `, + _, err := db.ExecVTOrc(`INSERT + INTO audit ( + audit_timestamp, + audit_type, + alias, + keyspace, + shard, + message + ) VALUES ( + DATETIME('now'), + ?, + ?, + ?, + ?, + ? + )`, auditType, tabletAlias, keyspace, diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index bd437d1ddac..c8ff218710f 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -114,10 +114,15 @@ func ExecDBWriteFunc(f func() error) error { func ExpireTableData(tableName string, timestampColumn string) error { writeFunc := func() error { - _, err := db.ExecVTOrc( - fmt.Sprintf("delete from %s where %s < datetime('now', printf('-%%d DAY', ?))", tableName, timestampColumn), - config.Config.AuditPurgeDays, + query := fmt.Sprintf(`DELETE + FROM %s + WHERE + %s < DATETIME('now', PRINTF('-%%d DAY', ?)) + `, + tableName, + timestampColumn, ) + _, err := db.ExecVTOrc(query, config.Config.AuditPurgeDays) return err } return ExecDBWriteFunc(writeFunc) @@ -463,16 +468,16 @@ func ReadInstanceClusterAttributes(instance *Instance) (err error) { var primaryExecutedGtidSet string primaryDataFound := false - query := ` - select - replication_depth, - source_host, - source_port, - ancestry_uuid, - executed_gtid_set - from database_instance - where hostname=? and port=? - ` + query := `SELECT + replication_depth, + source_host, + source_port, + ancestry_uuid, + executed_gtid_set + FROM database_instance + WHERE + hostname = ? + AND port = ?` primaryHostname := instance.SourceHost primaryPort := instance.SourcePort args := sqlutils.Args(primaryHostname, primaryPort) @@ -604,20 +609,22 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In if sort == "" { sort = `alias` } - query := fmt.Sprintf(` - select - *, - strftime('%%s', 'now') - strftime('%%s', last_checked) as seconds_since_last_checked, - ifnull(last_checked <= last_seen, 0) as is_last_check_valid, - strftime('%%s', 'now') - strftime('%%s', last_seen) as seconds_since_last_seen - from - vitess_tablet - left join database_instance using (alias, hostname, port) - where - %s - order by - %s - `, condition, sort) + query := fmt.Sprintf(`SELECT + *, + STRFTIME('%%s', 'now') - STRFTIME('%%s', last_checked) AS seconds_since_last_checked, + IFNULL(last_checked <= last_seen, 0) AS is_last_check_valid, + STRFTIME('%%s', 'now') - STRFTIME('%%s', last_seen) AS seconds_since_last_seen + FROM + vitess_tablet + LEFT JOIN database_instance USING (alias, hostname, port) + WHERE + %s + ORDER BY + %s + `, + condition, + sort, + ) err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { instance := readInstanceRow(m) @@ -638,9 +645,7 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In // ReadInstance reads an instance from the vtorc backend database func ReadInstance(tabletAlias string) (*Instance, bool, error) { - condition := ` - alias = ? - ` + condition := `alias = ?` instances, err := readInstancesByCondition(condition, sqlutils.Args(tabletAlias), "") // We know there will be at most one (alias is the PK). // And we expect to find one. @@ -657,18 +662,17 @@ func ReadInstance(tabletAlias string) (*Instance, bool, error) { // ReadProblemInstances reads all instances with problems func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error) { condition := ` - keyspace LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) - and shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) - and ( - (last_seen < last_checked) - or (strftime('%%s', 'now') - strftime('%%s', last_checked) > ?) - or (replication_sql_thread_state not in (-1 ,1)) - or (replication_io_thread_state not in (-1 ,1)) - or (abs(cast(replication_lag_seconds as integer) - cast(sql_delay as integer)) > ?) - or (abs(cast(replica_lag_seconds as integer) - cast(sql_delay as integer)) > ?) - or (gtid_errant != '') - ) - ` + keyspace LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) + AND shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) + AND ( + (last_seen < last_checked) + OR (STRFTIME('%%s', 'now') - STRFTIME('%%s', last_checked) > ?) + OR (replication_sql_thread_state NOT IN (-1 ,1)) + OR (replication_io_thread_state NOT IN (-1 ,1)) + OR (ABS(CAST(replication_lag_seconds AS integer) - CAST(sql_delay AS integer)) > ?) + OR (ABS(CAST(replica_lag_seconds AS integer) - CAST(sql_delay AS integer)) > ?) + OR (gtid_errant != '') + )` args := sqlutils.Args(keyspace, keyspace, shard, shard, config.Config.InstancePollSeconds*5, config.Config.ReasonableReplicationLagSeconds, config.Config.ReasonableReplicationLagSeconds) return readInstancesByCondition(condition, args, "") @@ -677,10 +681,9 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error) // ReadInstancesWithErrantGTIds reads all instances with errant GTIDs func ReadInstancesWithErrantGTIds(keyspace string, shard string) ([]*Instance, error) { condition := ` - keyspace LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) - and shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) - and gtid_errant != '' - ` + keyspace LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) + AND shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) + AND gtid_errant != ''` args := sqlutils.Args(keyspace, keyspace, shard, shard) return readInstancesByCondition(condition, args, "") @@ -688,15 +691,14 @@ func ReadInstancesWithErrantGTIds(keyspace string, shard string) ([]*Instance, e // GetKeyspaceShardName gets the keyspace shard name for the given instance key func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, err error) { - query := ` - select - keyspace, - shard - from - vitess_tablet - where - alias = ? - ` + query := `SELECT + keyspace, + shard + FROM + vitess_tablet + WHERE + alias = ? + ` err = db.QueryVTOrc(query, sqlutils.Args(tabletAlias), func(m sqlutils.RowMap) error { keyspace = m.GetString("keyspace") shard = m.GetString("shard") @@ -719,27 +721,26 @@ func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, er // the instance. func ReadOutdatedInstanceKeys() ([]string, error) { var res []string - query := ` - SELECT - alias - FROM - database_instance - WHERE - CASE - WHEN last_attempted_check <= last_checked - THEN last_checked < datetime('now', printf('-%d second', ?)) - ELSE last_checked < datetime('now', printf('-%d second', ?)) - END - UNION - SELECT - vitess_tablet.alias - FROM - vitess_tablet LEFT JOIN database_instance ON ( - vitess_tablet.alias = database_instance.alias - ) - WHERE - database_instance.alias IS NULL - ` + query := `SELECT + alias + FROM + database_instance + WHERE + CASE + WHEN last_attempted_check <= last_checked + THEN last_checked < DATETIME('now', PRINTF('-%d SECOND', ?)) + ELSE last_checked < DATETIME('now', PRINTF('-%d SECOND', ?)) + END + UNION + SELECT + vitess_tablet.alias + FROM + vitess_tablet LEFT JOIN database_instance ON ( + vitess_tablet.alias = database_instance.alias + ) + WHERE + database_instance.alias IS NULL + ` args := sqlutils.Args(config.Config.InstancePollSeconds, 2*config.Config.InstancePollSeconds) err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { @@ -782,12 +783,17 @@ func mkInsert(table string, columns []string, values []string, nrRows int, inser } col := strings.Join(columns, ", ") - q.WriteString(fmt.Sprintf(`%s %s - (%s) - VALUES - %s - `, - insertStr, table, col, val.String())) + query := fmt.Sprintf(`%s %s + (%s) + VALUES + %s + `, + insertStr, + table, + col, + val.String(), + ) + q.WriteString(query) return q.String(), nil } @@ -873,13 +879,13 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, for i := range columns { values[i] = "?" } - values[3] = "datetime('now')" // last_checked - values[4] = "datetime('now')" // last_attempted_check + values[3] = "DATETIME('now')" // last_checked + values[4] = "DATETIME('now')" // last_attempted_check values[5] = "1" // last_check_partial_success if updateLastSeen { columns = append(columns, "last_seen") - values = append(values, "datetime('now')") + values = append(values, "DATETIME('now')") } var args []any @@ -995,14 +1001,13 @@ func WriteInstance(instance *Instance, instanceWasActuallyFound bool, lastError // for a given instance func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error { writeFunc := func() error { - _, err := db.ExecVTOrc(` - update - database_instance - set - last_checked = datetime('now'), - last_check_partial_success = ? - where - alias = ?`, + _, err := db.ExecVTOrc(`UPDATE database_instance + SET + last_checked = DATETIME('now'), + last_check_partial_success = ? + WHERE + alias = ? + `, partialSuccess, tabletAlias, ) @@ -1024,13 +1029,12 @@ func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error { // we have a "hanging" issue. func UpdateInstanceLastAttemptedCheck(tabletAlias string) error { writeFunc := func() error { - _, err := db.ExecVTOrc(` - update - database_instance - set - last_attempted_check = datetime('now') - where - alias = ?`, + _, err := db.ExecVTOrc(`UPDATE database_instance + SET + last_attempted_check = DATETIME('now') + WHERE + alias = ? + `, tabletAlias, ) if err != nil { @@ -1061,11 +1065,11 @@ func ForgetInstance(tabletAlias string) error { currentErrantGTIDCount.Reset(tabletAlias) // Delete from the 'vitess_tablet' table. - _, err := db.ExecVTOrc(` - delete - from vitess_tablet - where - alias = ?`, + _, err := db.ExecVTOrc(`DELETE + FROM vitess_tablet + WHERE + alias = ? + `, tabletAlias, ) if err != nil { @@ -1074,11 +1078,11 @@ func ForgetInstance(tabletAlias string) error { } // Also delete from the 'database_instance' table. - sqlResult, err := db.ExecVTOrc(` - delete - from database_instance - where - alias = ?`, + sqlResult, err := db.ExecVTOrc(`DELETE + FROM database_instance + WHERE + alias = ? + `, tabletAlias, ) if err != nil { @@ -1102,11 +1106,11 @@ func ForgetInstance(tabletAlias string) error { // ForgetLongUnseenInstances will remove entries of all instances that have long since been last seen. func ForgetLongUnseenInstances() error { - sqlResult, err := db.ExecVTOrc(` - delete - from database_instance - where - last_seen < datetime('now', printf('-%d hour', ?))`, + sqlResult, err := db.ExecVTOrc(`DELETE + FROM database_instance + WHERE + last_seen < DATETIME('now', PRINTF('-%d HOUR', ?)) + `, config.UnseenInstanceForgetHours, ) if err != nil { @@ -1127,18 +1131,26 @@ func ForgetLongUnseenInstances() error { // SnapshotTopologies records topology graph for all existing topologies func SnapshotTopologies() error { writeFunc := func() error { - _, err := db.ExecVTOrc(` - insert or ignore into - database_instance_topology_history (snapshot_unix_timestamp, - alias, hostname, port, source_host, source_port, keyspace, shard, version) - select - strftime('%s', 'now'), - vitess_tablet.alias, vitess_tablet.hostname, vitess_tablet.port, - database_instance.source_host, database_instance.source_port, + _, err := db.ExecVTOrc(`INSERT OR IGNORE + INTO database_instance_topology_history ( + snapshot_unix_timestamp, + alias, + hostname, + port, + source_host, + source_port, + keyspace, + shard, + version + ) + SELECT + STRFTIME('%s', 'now'), + vitess_tablet.alias, vitess_tablet.hostname, vitess_tablet.port, + database_instance.source_host, database_instance.source_port, vitess_tablet.keyspace, vitess_tablet.shard, database_instance.version - from - vitess_tablet left join database_instance using (alias, hostname, port) - `, + FROM + vitess_tablet LEFT JOIN database_instance USING (alias, hostname, port) + `, ) if err != nil { log.Error(err) @@ -1156,10 +1168,12 @@ func ExpireStaleInstanceBinlogCoordinates() error { expireSeconds = config.StaleInstanceCoordinatesExpireSeconds } writeFunc := func() error { - _, err := db.ExecVTOrc(` - delete from database_instance_stale_binlog_coordinates - where first_seen < datetime('now', printf('-%d second', ?)) - `, expireSeconds, + _, err := db.ExecVTOrc(`DELETE + FROM database_instance_stale_binlog_coordinates + WHERE + first_seen < DATETIME('now', PRINTF('-%d SECOND', ?)) + `, + expireSeconds, ) if err != nil { log.Error(err) @@ -1181,7 +1195,7 @@ func GetDatabaseState() (string, error) { ts := tableState{ TableName: tableName, } - err := db.QueryVTOrc("select * from "+tableName, nil, func(rowMap sqlutils.RowMap) error { + err := db.QueryVTOrc("SELECT * FROM "+tableName, nil, func(rowMap sqlutils.RowMap) error { ts.Rows = append(ts.Rows, rowMap) return nil }) diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index cb2064d10c0..f248ded5e2b 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -66,7 +66,7 @@ func TestMkInsertSingle(t *testing.T) { replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) VALUES - (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) ` a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, @@ -89,9 +89,9 @@ func TestMkInsertThree(t *testing.T) { replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) VALUES - (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), - (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), - (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) ` a3 := ` zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, @@ -429,27 +429,27 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { }{ { name: "No problems", - sql: []string{"update database_instance set last_checked = datetime('now')"}, + sql: []string{"update database_instance set last_checked = DATETIME('now')"}, instancesRequired: nil, }, { name: "One instance is outdated", sql: []string{ - "update database_instance set last_checked = datetime('now')", - "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = DATETIME('now')", + "update database_instance set last_checked = DATETIME('now', '-1 hour') where alias = 'zone1-0000000100'", }, instancesRequired: []string{"zone1-0000000100"}, }, { name: "One instance doesn't have myql data", sql: []string{ - "update database_instance set last_checked = datetime('now')", + "update database_instance set last_checked = DATETIME('now')", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103"}, }, { name: "One instance doesn't have myql data and one is outdated", sql: []string{ - "update database_instance set last_checked = datetime('now')", - "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = DATETIME('now')", + "update database_instance set last_checked = DATETIME('now', '-1 hour') where alias = 'zone1-0000000100'", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103", "zone1-0000000100"}, @@ -486,10 +486,10 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { errInDataCollection := db.QueryVTOrcRowsMap(`select alias, last_checked, last_attempted_check, -ROUND((JULIANDAY(datetime('now')) - JULIANDAY(last_checked)) * 86400) AS difference, +ROUND((JULIANDAY(DATETIME('now')) - JULIANDAY(last_checked)) * 86400) AS difference, last_attempted_check <= last_checked as use1, -last_checked < datetime('now', '-1500 second') as is_outdated1, -last_checked < datetime('now', '-3000 second') as is_outdated2 +last_checked < DATETIME('now', '-1500 second') as is_outdated1, +last_checked < DATETIME('now', '-3000 second') as is_outdated2 from database_instance`, func(rowMap sqlutils.RowMap) error { log.Errorf("Row in database_instance - %+v", rowMap) return nil @@ -513,12 +513,12 @@ func TestUpdateInstanceLastChecked(t *testing.T) { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", partialSuccess: false, - conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = false", + conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false", }, { name: "Verify partial success", tabletAlias: "zone1-0000000100", partialSuccess: true, - conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = true", + conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = true", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", @@ -564,7 +564,7 @@ func TestUpdateInstanceLastAttemptedCheck(t *testing.T) { { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", - conditionToCheck: "last_attempted_check >= datetime('now', '-30 second')", + conditionToCheck: "last_attempted_check >= DATETIME('now', '-30 second')", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", @@ -737,19 +737,19 @@ func TestExpireTableData(t *testing.T) { tableName: "audit", timestampColumn: "audit_timestamp", expectedRowCount: 1, - insertQuery: `insert into audit (audit_id, audit_timestamp, audit_type, alias, message, keyspace, shard) values -(1, datetime('now', '-50 DAY'), 'a','a','a','a','a'), -(2, datetime('now', '-5 DAY'), 'a','a','a','a','a')`, + insertQuery: `INSERT INTO audit (audit_id, audit_timestamp, audit_type, alias, message, keyspace, shard) VALUES +(1, DATETIME('now', '-50 DAY'), 'a','a','a','a','a'), +(2, DATETIME('now', '-5 DAY'), 'a','a','a','a','a')`, }, { name: "ExpireRecoveryDetectionHistory", tableName: "recovery_detection", timestampColumn: "detection_timestamp", expectedRowCount: 2, - insertQuery: `insert into recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) values -(1, datetime('now', '-3 DAY'),'a','a','a','a'), -(2, datetime('now', '-5 DAY'),'a','a','a','a'), -(3, datetime('now', '-15 DAY'),'a','a','a','a')`, + insertQuery: `INSERT INTO recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) VALUES +(1, DATETIME('now', '-3 DAY'),'a','a','a','a'), +(2, DATETIME('now', '-5 DAY'),'a','a','a','a'), +(3, DATETIME('now', '-15 DAY'),'a','a','a','a')`, }, } for _, tt := range tests { diff --git a/go/vt/vtorc/inst/tablet_dao.go b/go/vt/vtorc/inst/tablet_dao.go index af304292a70..f48f2b97370 100644 --- a/go/vt/vtorc/inst/tablet_dao.go +++ b/go/vt/vtorc/inst/tablet_dao.go @@ -56,13 +56,13 @@ func fullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) { // ReadTablet reads the vitess tablet record. func ReadTablet(tabletAlias string) (*topodatapb.Tablet, error) { - query := ` - select - info - from - vitess_tablet - where alias = ? - ` + query := `SELECT + info + FROM + vitess_tablet + WHERE + alias = ? + ` args := sqlutils.Args(tabletAlias) tablet := &topodatapb.Tablet{} opts := prototext.UnmarshalOptions{DiscardUnknown: true} @@ -84,14 +84,28 @@ func SaveTablet(tablet *topodatapb.Tablet) error { if err != nil { return err } - _, err = db.ExecVTOrc(` - replace - into vitess_tablet ( - alias, hostname, port, cell, keyspace, shard, tablet_type, primary_timestamp, info - ) values ( - ?, ?, ?, ?, ?, ?, ?, ?, ? - ) - `, + _, err = db.ExecVTOrc(`REPLACE + INTO vitess_tablet ( + alias, + hostname, + port, + cell, + keyspace, + shard, + tablet_type, + primary_timestamp, + info + ) VALUES ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ? + )`, topoproto.TabletAliasString(tablet.Alias), tablet.MysqlHostname, int(tablet.MysqlPort), diff --git a/go/vt/vtorc/logic/disable_recovery.go b/go/vt/vtorc/logic/disable_recovery.go index 60650798876..c5446eeb9ff 100644 --- a/go/vt/vtorc/logic/disable_recovery.go +++ b/go/vt/vtorc/logic/disable_recovery.go @@ -40,14 +40,13 @@ import ( // IsRecoveryDisabled returns true if Recoveries are disabled globally func IsRecoveryDisabled() (disabled bool, err error) { - query := ` - SELECT - COUNT(*) as mycount - FROM - global_recovery_disable - WHERE - disable_recovery=? - ` + query := `SELECT + COUNT(*) AS mycount + FROM + global_recovery_disable + WHERE + disable_recovery = ? + ` err = db.QueryVTOrc(query, sqlutils.Args(1), func(m sqlutils.RowMap) error { mycount := m.GetInt("mycount") disabled = (mycount > 0) @@ -63,21 +62,19 @@ func IsRecoveryDisabled() (disabled bool, err error) { // DisableRecovery ensures recoveries are disabled globally func DisableRecovery() error { - _, err := db.ExecVTOrc(` - INSERT OR IGNORE INTO global_recovery_disable - (disable_recovery) - VALUES (1) - `, - ) + _, err := db.ExecVTOrc(`INSERT OR IGNORE + INTO global_recovery_disable ( + disable_recovery + ) VALUES (1)`) return err } // EnableRecovery ensures recoveries are enabled globally func EnableRecovery() error { // The "WHERE" clause is just to avoid full-scan reports by monitoring tools - _, err := db.ExecVTOrc(` - DELETE FROM global_recovery_disable WHERE disable_recovery >= 0 - `, - ) + _, err := db.ExecVTOrc(`DELETE + FROM global_recovery_disable + WHERE + disable_recovery >= 0`) return err } diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index e9bbcee35cb..1ee9e2cfefa 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -66,7 +66,7 @@ func OpenTabletDiscovery() <-chan time.Time { ts = topo.Open() tmc = inst.InitializeTMC() // Clear existing cache and perform a new refresh. - if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil { + if _, err := db.ExecVTOrc("DELETE FROM vitess_tablet"); err != nil { log.Error(err) } // We refresh all information from the topo once before we start the ticks to do it on a timer. diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 730e6b2a158..137251c4fc8 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -30,21 +30,20 @@ import ( // InsertRecoveryDetection inserts the recovery analysis that has been detected. func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error { - sqlResult, err := db.ExecVTOrc(` - insert or ignore - into recovery_detection ( - alias, - analysis, - keyspace, - shard, - detection_timestamp - ) values ( - ?, - ?, - ?, - ?, - datetime('now') - )`, + sqlResult, err := db.ExecVTOrc(`INSERT OR IGNORE + INTO recovery_detection ( + alias, + analysis, + keyspace, + shard, + detection_timestamp + ) VALUES ( + ?, + ?, + ?, + ?, + DATETIME('now') + )`, analysisEntry.AnalyzedInstanceAlias, string(analysisEntry.Analysis), analysisEntry.ClusterDetails.Keyspace, @@ -65,26 +64,24 @@ func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error { func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecovery, error) { analysisEntry := topologyRecovery.AnalysisEntry - sqlResult, err := db.ExecVTOrc(` - insert or ignore - into topology_recovery ( - recovery_id, - alias, - start_recovery, - analysis, - keyspace, - shard, - detection_id - ) values ( - ?, - ?, - datetime('now'), - ?, - ?, - ?, - ? - ) - `, + sqlResult, err := db.ExecVTOrc(`INSERT OR IGNORE + INTO topology_recovery ( + recovery_id, + alias, + start_recovery, + analysis, + keyspace, + shard, + detection_id + ) VALUES ( + ?, + ?, + DATETIME('now'), + ?, + ?, + ?, + ? + )`, sqlutils.NilIfZero(topologyRecovery.ID), analysisEntry.AnalyzedInstanceAlias, string(analysisEntry.Analysis), @@ -138,15 +135,16 @@ func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis) (*Topo // ResolveRecovery is called on completion of a recovery process and updates the recovery status. // It does not clear the "active period" as this still takes place in order to avoid flapping. func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { - _, err := db.ExecVTOrc(` - update topology_recovery set - is_successful = ?, - successor_alias = ?, - all_errors = ?, - end_recovery = datetime('now') - where - recovery_id = ? - `, topologyRecovery.IsSuccessful, + _, err := db.ExecVTOrc(`UPDATE topology_recovery + SET + is_successful = ?, + successor_alias = ?, + all_errors = ?, + end_recovery = DATETIME('now') + WHERE + recovery_id = ? + `, + topologyRecovery.IsSuccessful, topologyRecovery.SuccessorAlias, strings.Join(topologyRecovery.AllErrors, "\n"), topologyRecovery.ID, @@ -160,26 +158,27 @@ func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { // readRecoveries reads recovery entry/audit entries from topology_recovery func readRecoveries(whereCondition string, limit string, args []any) ([]*TopologyRecovery, error) { res := []*TopologyRecovery{} - query := fmt.Sprintf(` - select - recovery_id, - alias, - start_recovery, - IFNULL(end_recovery, '') AS end_recovery, - is_successful, - ifnull(successor_alias, '') as successor_alias, - analysis, - keyspace, - shard, - all_errors, - detection_id - from + query := fmt.Sprintf(`SELECT + recovery_id, + alias, + start_recovery, + IFNULL(end_recovery, '') AS end_recovery, + is_successful, + IFNULL(successor_alias, '') AS successor_alias, + analysis, + keyspace, + shard, + all_errors, + detection_id + FROM topology_recovery %s - order by - recovery_id desc + ORDER BY recovery_id DESC %s - `, whereCondition, limit) + `, + whereCondition, + limit, + ) err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { topologyRecovery := *NewTopologyRecovery(inst.ReplicationAnalysis{}) topologyRecovery.ID = m.GetInt64("recovery_id") @@ -211,11 +210,10 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog // ReadActiveClusterRecoveries reads recoveries that are ongoing for the given cluster. func ReadActiveClusterRecoveries(keyspace string, shard string) ([]*TopologyRecovery, error) { - whereClause := ` - where - end_recovery IS NULL - and keyspace=? - and shard=?` + whereClause := `WHERE + end_recovery IS NULL + AND keyspace = ? + AND shard = ?` return readRecoveries(whereClause, ``, sqlutils.Args(keyspace, shard)) } @@ -225,23 +223,30 @@ func ReadRecentRecoveries(page int) ([]*TopologyRecovery, error) { whereClause := "" var args []any if len(whereConditions) > 0 { - whereClause = fmt.Sprintf("where %s", strings.Join(whereConditions, " and ")) + whereClause = fmt.Sprintf("WHERE %s", strings.Join(whereConditions, " AND ")) } - limit := ` - limit ? - offset ?` + limit := `LIMIT ? OFFSET ?` args = append(args, config.AuditPageSize, page*config.AuditPageSize) return readRecoveries(whereClause, limit, args) } // writeTopologyRecoveryStep writes down a single step in a recovery process func writeTopologyRecoveryStep(topologyRecoveryStep *TopologyRecoveryStep) error { - sqlResult, err := db.ExecVTOrc(` - insert or ignore - into topology_recovery_steps ( - recovery_step_id, recovery_id, audit_at, message - ) values (?, ?, datetime('now'), ?) - `, sqlutils.NilIfZero(topologyRecoveryStep.ID), topologyRecoveryStep.RecoveryID, topologyRecoveryStep.Message, + sqlResult, err := db.ExecVTOrc(`INSERT OR IGNORE + INTO topology_recovery_steps ( + recovery_step_id, + recovery_id, + audit_at, + message + ) VALUES ( + ?, + ?, + DATETIME('now'), + ? + )`, + sqlutils.NilIfZero(topologyRecoveryStep.ID), + topologyRecoveryStep.RecoveryID, + topologyRecoveryStep.Message, ) if err != nil { log.Error(err) diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 87a11733f66..f72d7b05210 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -35,12 +35,17 @@ var ThisNodeHealth = &NodeHealth{} // writeHealthToDatabase writes to the database and returns if it was successful. func writeHealthToDatabase() bool { - _, err := db.ExecVTOrc("delete from node_health") + _, err := db.ExecVTOrc("DELETE FROM node_health") if err != nil { log.Error(err) return false } - sqlResult, err := db.ExecVTOrc(`insert into node_health (last_seen_active) values (datetime('now'))`) + sqlResult, err := db.ExecVTOrc(`INSERT + INTO node_health ( + last_seen_active + ) VALUES ( + DATETIME('now') + )`) if err != nil { log.Error(err) return false