From 9e6c5e3a3d4f2831b363ceea5266e6c74ab5a7b6 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 2 Dec 2024 21:02:13 +0100 Subject: [PATCH] WIP Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/db/db.go | 147 ++++++++++++++++++++++++++++++------------- 1 file changed, 102 insertions(+), 45 deletions(-) diff --git a/go/vt/vtorc/db/db.go b/go/vt/vtorc/db/db.go index 470e5364680..ee0c7efe14f 100644 --- a/go/vt/vtorc/db/db.go +++ b/go/vt/vtorc/db/db.go @@ -17,34 +17,111 @@ package db import ( + "context" "database/sql" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vtorc/config" + "vitess.io/vitess/go/vt/vtorc/inst" ) -var ( - Db DB = (*vtorcDB)(nil) -) +type AcknowledgeRecoveriesOpts struct { + Owner string + Comment string + MarkEndRecovery bool +} -type DB interface { - QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error +type AuditOperationOpts struct { + // TODO: define } -type vtorcDB struct { +type ReadRecoveriesOpts struct { + // TODO: define } -var _ DB = (*vtorcDB)(nil) +type ReadReplicaInstancesOpts struct { + PrimaryHost string + PrimaryPort int + IncludeBinlogServerSubReplicas bool +} -func (m *vtorcDB) QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error { - return QueryVTOrc(query, argsArray, onRow) +type DB interface { + // Discovery + DeleteDiscoveredTablets(ctx context.Context) error + GetShardPrimary(ctx context.Context, keyspace string, shard string) (*topodatapb.Tablet, error) + GetTabletAliasesByCell(ctx context.Context) ([]*topodatapb.TabletAlias, error) + GetTabletAliasesByKeyspaceShard(ctx context.Context) ([]*topodatapb.TabletAlias, error) + + // Detection + AttemptFailureDetectionRegistration(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (bool, error) + ClearActiveFailureDetections(ctx context.Context) error + + // Analysis + AuditInstanceAnalysisInChangelog(ctx context.Context, tabletAlias *topodatapb.TabletAlias, analysisCode AnalysisCode) error + ExpireAuditData(ctx context.Context) error + ExpireInstanceAnalysisChangelog(ctx context.Context) error + GetReplicationAnalysis(ctx context.Context, keyspace string, shard string, hints *inst.ReplicationAnalysisHints) ([]*inst.ReplicationAnalysis, error) + + // Audit + AuditOperation(ctx context.Context, opts *AuditOperationOpts) error + + // Instance + ExpireStaleInstanceBinlogCoordinates(ctx context.Context) error + ForgetInstance(ctx context.Context, tabletAlias *topodatapb.TabletAlias) error + ForgetLongUnseenInstances(ctx context.Context) error + GetKeyspaceShardName(ctx context.Context, tabletAlias *topodatapb.TabletAlias) (string, string, error) + LegacyReadInstanceClusterAttributes(ctx context.Context, primaryHost string, primaryPort int) (*inst.Instance, error) + ReadInstanceClusterAttributes(ctx context.Context, primaryAlias *topodatapb.TabletAlias) (*inst.Instance, error) + ReadInstance(ctx context.Context, tabletAlias *topodatapb.TabletAlias) (*inst.Instance, bool, error) + ReadReplicaInstances(ctx context.Context, opts *ReadReplicaInstancesOpts) ([]*inst.Instance, error) + ReadProblemInstances(ctx context.Context, keyspace string, shard string) ([]*inst.Instance, error) + ReadOutdatedInstanceKeys(ctx context.Context) ([]*topodatapb.TabletAlias, error) + ReadInstancesWithErrantGTIDs(ctx context.Context, keyspace string, shard string) ([]*inst.Instance, error) + RecordStaleInstanceBinlogCoordinates(ctx context.Context, tabletAlias *topodatapb.TabletAlias, binlogCoordinates *inst.BinlogCoordinates) error + SnapshotTopologies(ctx context.Context) error + UpdateInstanceLastAttemptedCheck(ctx context.Context, tabletAlias *topodatapb.TabletAlias) + UpdateInstanceLastChecked(ctx context.Context, tabletAlias *topodatapb.TabletAlias, partialSuccess bool) error + WriteInstances(ctx context.Context, instances []*inst.Instance, instanceWasActuallyFound, updateLastSeen bool) error + + // Keyspace + ReadKeyspace(ctx context.Context, keyspace string) (*topo.KeyspaceInfo, error) + SaveKeyspace(ctx context.Context, keyspace *topo.KeyspaceInfo) error + GetDurabilityPolicy(ctx context.Context, keyspace string) (reparentutil.Durabler, error) + + // Shard + ReadShardPrimaryInformation(ctx context.Context, keyspaceName, shardName string) (*topodatapb.TabletAlias, string, error) + SaveShard(ctx context.Context, shard *topo.ShardInfo) error + + // Tablet + ReadTablet(ctx context.Context, tabletAlias *topodatapb.TabletAlias) (*topodatapb.Tablet, error) + SaveTablet(ctx context.Context, tablet *topodatapb.Tablet) error + + // Recovery + AcknowledgeRecoveries(ctx context.Context, opts *AcknowledgeRecoveriesOpts) (int64, error) + ClearActiveRecoveries(ctx context.Context) error + DisableRecovery(ctx context.Context) error + EnableRecovery(ctx context.Context) error + ExpireBlockedRecoveries(ctx context.Context) error + ExpireRecoveries(ctx context.Context) error + ExpireRecoverySteps(ctx context.Context) error + IsRecoveryDisabled(ctx context.Context) (bool, error) + ReadRecoveries(ctx context.Context, opts *ReadRecoveriesOpts) ([]*logic.TopologyRecovery, error) + RegisterBlockedRecoveries(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, blockingRecoveries []*logic.TopologyRecovery) error + WriteResolveRecovery(ctx context.Context, topologyRecovery *logic.TopologyRecovery) error + WriteTopologyRecovery(ctx context.Context, topologyRecovery *logic.TopologyRecovery) (*logic.TopologyRecovery, error) + WriteTopologyRecoveryStep(ctx context.Context, topologyRecoveryStep *logic.TopologyRecoveryStep) error +} + +type vtorcDB struct { + db *sql.DB } // OpenTopology returns the DB instance for the vtorc backed database -func OpenVTOrc() (db *sql.DB, err error) { +func OpenVTOrc() (*vtorcDB, error) { var fromCache bool - db, fromCache, err = sqlutils.GetSQLiteDB(config.Config.SQLite3DataFile) + db, fromCache, err := sqlutils.GetSQLiteDB(config.Config.SQLite3DataFile) if err == nil && !fromCache { log.Infof("Connected to vtorc backend: sqlite on %v", config.Config.SQLite3DataFile) if err := initVTOrcDB(db); err != nil { @@ -55,11 +132,11 @@ func OpenVTOrc() (db *sql.DB, err error) { db.SetMaxOpenConns(1) db.SetMaxIdleConns(1) } - return db, err + return &vtorcDB{db}, err } // registerVTOrcDeployment updates the vtorc_db_deployments table upon successful deployment -func registerVTOrcDeployment(db *sql.DB) error { +func (vdb *vtorcDB) registerVTOrcDeployment() error { query := `REPLACE INTO vtorc_db_deployments ( deployed_version, deployed_timestamp @@ -67,7 +144,7 @@ func registerVTOrcDeployment(db *sql.DB) error { ?, DATETIME('now') )` - if _, err := execInternal(db, query, ""); err != nil { + if _, err := ExecVTOrc(query, ""); err != nil { log.Fatalf("Unable to write to vtorc_db_deployments: %+v", err) } return nil @@ -75,8 +152,8 @@ func registerVTOrcDeployment(db *sql.DB) error { // deployStatements will issue given sql queries that are not already known to be deployed. // This iterates both lists (to-run and already-deployed) and also verifies no contradictions. -func deployStatements(db *sql.DB, queries []string) error { - tx, err := db.Begin() +func (vdb *vtorcDB) deployStatements(db *sql.DB, queries []string) error { + tx, err := vdb.db.Begin() if err != nil { return err } @@ -101,58 +178,38 @@ func ClearVTOrcDatabase() { // initVTOrcDB attempts to create/upgrade the vtorc backend database. It is created once in the // application's lifetime. -func initVTOrcDB(db *sql.DB) error { +func (vdb *vtorcDB) initVTOrcDB() error { log.Info("Initializing vtorc") log.Info("Migrating database schema") - if err := deployStatements(db, vtorcBackend); err != nil { + if err := vdb.deployStatements(); err != nil { return err } - if err := registerVTOrcDeployment(db); err != nil { + if err := vdb.registerVTOrcDeployment(); err != nil { return err } - if _, err := ExecVTOrc(`PRAGMA journal_mode = WAL`); err != nil { + if _, err := vdb.ExecVTOrc(`PRAGMA journal_mode = WAL`); err != nil { return err } - if _, err := ExecVTOrc(`PRAGMA synchronous = NORMAL`); err != nil { + if _, err := vdb.ExecVTOrc(`PRAGMA synchronous = NORMAL`); err != nil { return err } return nil } -// execInternal -func execInternal(db *sql.DB, query string, args ...any) (sql.Result, error) { - return sqlutils.ExecNoPrepare(db, query, args...) -} - // ExecVTOrc will execute given query on the vtorc backend database. -func ExecVTOrc(query string, args ...any) (sql.Result, error) { - db, err := OpenVTOrc() - if err != nil { - return nil, err - } - return execInternal(db, query, args...) +func (vdb *vtorcDB) ExecVTOrc(query string, args ...any) (sql.Result, error) { + return sqlutils.ExecNoPrepare(vdb.db, query, args...) } // QueryVTOrcRowsMap -func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { - db, err := OpenVTOrc() - if err != nil { - return err - } - +func (vdb *vtorcDB) QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { return sqlutils.QueryRowsMap(db, query, onRow) } // QueryVTOrc -func QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error { - db, err := OpenVTOrc() - if err != nil { - return err - } - +func (vdb *vtorcDB) QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error { if err = sqlutils.QueryRowsMap(db, query, onRow, argsArray...); err != nil { log.Warning(err.Error()) } - return err }