Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Dec 2, 2024
1 parent 0726ea6 commit 9e6c5e3
Showing 1 changed file with 102 additions and 45 deletions.
147 changes: 102 additions & 45 deletions go/vt/vtorc/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,111 @@
package db

import (
"context"
"database/sql"

"vitess.io/vitess/go/vt/external/golib/sqlutils"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/inst"
)

var (
Db DB = (*vtorcDB)(nil)
)
type AcknowledgeRecoveriesOpts struct {
Owner string
Comment string
MarkEndRecovery bool
}

type DB interface {
QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error
type AuditOperationOpts struct {
// TODO: define
}

type vtorcDB struct {
type ReadRecoveriesOpts struct {
// TODO: define
}

var _ DB = (*vtorcDB)(nil)
type ReadReplicaInstancesOpts struct {
PrimaryHost string
PrimaryPort int
IncludeBinlogServerSubReplicas bool
}

func (m *vtorcDB) QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error {
return QueryVTOrc(query, argsArray, onRow)
type DB interface {
// Discovery
DeleteDiscoveredTablets(ctx context.Context) error
GetShardPrimary(ctx context.Context, keyspace string, shard string) (*topodatapb.Tablet, error)
GetTabletAliasesByCell(ctx context.Context) ([]*topodatapb.TabletAlias, error)
GetTabletAliasesByKeyspaceShard(ctx context.Context) ([]*topodatapb.TabletAlias, error)

// Detection
AttemptFailureDetectionRegistration(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (bool, error)
ClearActiveFailureDetections(ctx context.Context) error

// Analysis
AuditInstanceAnalysisInChangelog(ctx context.Context, tabletAlias *topodatapb.TabletAlias, analysisCode AnalysisCode) error
ExpireAuditData(ctx context.Context) error
ExpireInstanceAnalysisChangelog(ctx context.Context) error
GetReplicationAnalysis(ctx context.Context, keyspace string, shard string, hints *inst.ReplicationAnalysisHints) ([]*inst.ReplicationAnalysis, error)

// Audit
AuditOperation(ctx context.Context, opts *AuditOperationOpts) error

// Instance
ExpireStaleInstanceBinlogCoordinates(ctx context.Context) error
ForgetInstance(ctx context.Context, tabletAlias *topodatapb.TabletAlias) error
ForgetLongUnseenInstances(ctx context.Context) error
GetKeyspaceShardName(ctx context.Context, tabletAlias *topodatapb.TabletAlias) (string, string, error)
LegacyReadInstanceClusterAttributes(ctx context.Context, primaryHost string, primaryPort int) (*inst.Instance, error)
ReadInstanceClusterAttributes(ctx context.Context, primaryAlias *topodatapb.TabletAlias) (*inst.Instance, error)
ReadInstance(ctx context.Context, tabletAlias *topodatapb.TabletAlias) (*inst.Instance, bool, error)
ReadReplicaInstances(ctx context.Context, opts *ReadReplicaInstancesOpts) ([]*inst.Instance, error)
ReadProblemInstances(ctx context.Context, keyspace string, shard string) ([]*inst.Instance, error)
ReadOutdatedInstanceKeys(ctx context.Context) ([]*topodatapb.TabletAlias, error)
ReadInstancesWithErrantGTIDs(ctx context.Context, keyspace string, shard string) ([]*inst.Instance, error)
RecordStaleInstanceBinlogCoordinates(ctx context.Context, tabletAlias *topodatapb.TabletAlias, binlogCoordinates *inst.BinlogCoordinates) error
SnapshotTopologies(ctx context.Context) error
UpdateInstanceLastAttemptedCheck(ctx context.Context, tabletAlias *topodatapb.TabletAlias)
UpdateInstanceLastChecked(ctx context.Context, tabletAlias *topodatapb.TabletAlias, partialSuccess bool) error
WriteInstances(ctx context.Context, instances []*inst.Instance, instanceWasActuallyFound, updateLastSeen bool) error

// Keyspace
ReadKeyspace(ctx context.Context, keyspace string) (*topo.KeyspaceInfo, error)
SaveKeyspace(ctx context.Context, keyspace *topo.KeyspaceInfo) error
GetDurabilityPolicy(ctx context.Context, keyspace string) (reparentutil.Durabler, error)

// Shard
ReadShardPrimaryInformation(ctx context.Context, keyspaceName, shardName string) (*topodatapb.TabletAlias, string, error)
SaveShard(ctx context.Context, shard *topo.ShardInfo) error

// Tablet
ReadTablet(ctx context.Context, tabletAlias *topodatapb.TabletAlias) (*topodatapb.Tablet, error)
SaveTablet(ctx context.Context, tablet *topodatapb.Tablet) error

// Recovery
AcknowledgeRecoveries(ctx context.Context, opts *AcknowledgeRecoveriesOpts) (int64, error)
ClearActiveRecoveries(ctx context.Context) error
DisableRecovery(ctx context.Context) error
EnableRecovery(ctx context.Context) error
ExpireBlockedRecoveries(ctx context.Context) error
ExpireRecoveries(ctx context.Context) error
ExpireRecoverySteps(ctx context.Context) error
IsRecoveryDisabled(ctx context.Context) (bool, error)
ReadRecoveries(ctx context.Context, opts *ReadRecoveriesOpts) ([]*logic.TopologyRecovery, error)
RegisterBlockedRecoveries(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, blockingRecoveries []*logic.TopologyRecovery) error
WriteResolveRecovery(ctx context.Context, topologyRecovery *logic.TopologyRecovery) error
WriteTopologyRecovery(ctx context.Context, topologyRecovery *logic.TopologyRecovery) (*logic.TopologyRecovery, error)
WriteTopologyRecoveryStep(ctx context.Context, topologyRecoveryStep *logic.TopologyRecoveryStep) error
}

type vtorcDB struct {
db *sql.DB
}

// OpenTopology returns the DB instance for the vtorc backed database
func OpenVTOrc() (db *sql.DB, err error) {
func OpenVTOrc() (*vtorcDB, error) {
var fromCache bool
db, fromCache, err = sqlutils.GetSQLiteDB(config.Config.SQLite3DataFile)
db, fromCache, err := sqlutils.GetSQLiteDB(config.Config.SQLite3DataFile)
if err == nil && !fromCache {
log.Infof("Connected to vtorc backend: sqlite on %v", config.Config.SQLite3DataFile)
if err := initVTOrcDB(db); err != nil {
Expand All @@ -55,28 +132,28 @@ func OpenVTOrc() (db *sql.DB, err error) {
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
}
return db, err
return &vtorcDB{db}, err
}

// registerVTOrcDeployment updates the vtorc_db_deployments table upon successful deployment
func registerVTOrcDeployment(db *sql.DB) error {
func (vdb *vtorcDB) registerVTOrcDeployment() error {
query := `REPLACE INTO vtorc_db_deployments (
deployed_version,
deployed_timestamp
) VALUES (
?,
DATETIME('now')
)`
if _, err := execInternal(db, query, ""); err != nil {
if _, err := ExecVTOrc(query, ""); err != nil {
log.Fatalf("Unable to write to vtorc_db_deployments: %+v", err)
}
return nil
}

// deployStatements will issue given sql queries that are not already known to be deployed.
// This iterates both lists (to-run and already-deployed) and also verifies no contradictions.
func deployStatements(db *sql.DB, queries []string) error {
tx, err := db.Begin()
func (vdb *vtorcDB) deployStatements(db *sql.DB, queries []string) error {
tx, err := vdb.db.Begin()
if err != nil {
return err
}
Expand All @@ -101,58 +178,38 @@ func ClearVTOrcDatabase() {

// initVTOrcDB attempts to create/upgrade the vtorc backend database. It is created once in the
// application's lifetime.
func initVTOrcDB(db *sql.DB) error {
func (vdb *vtorcDB) initVTOrcDB() error {
log.Info("Initializing vtorc")
log.Info("Migrating database schema")
if err := deployStatements(db, vtorcBackend); err != nil {
if err := vdb.deployStatements(); err != nil {
return err
}
if err := registerVTOrcDeployment(db); err != nil {
if err := vdb.registerVTOrcDeployment(); err != nil {
return err
}
if _, err := ExecVTOrc(`PRAGMA journal_mode = WAL`); err != nil {
if _, err := vdb.ExecVTOrc(`PRAGMA journal_mode = WAL`); err != nil {
return err
}
if _, err := ExecVTOrc(`PRAGMA synchronous = NORMAL`); err != nil {
if _, err := vdb.ExecVTOrc(`PRAGMA synchronous = NORMAL`); err != nil {
return err
}
return nil
}

// execInternal
func execInternal(db *sql.DB, query string, args ...any) (sql.Result, error) {
return sqlutils.ExecNoPrepare(db, query, args...)
}

// ExecVTOrc will execute given query on the vtorc backend database.
func ExecVTOrc(query string, args ...any) (sql.Result, error) {
db, err := OpenVTOrc()
if err != nil {
return nil, err
}
return execInternal(db, query, args...)
func (vdb *vtorcDB) ExecVTOrc(query string, args ...any) (sql.Result, error) {
return sqlutils.ExecNoPrepare(vdb.db, query, args...)
}

// QueryVTOrcRowsMap
func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error {
db, err := OpenVTOrc()
if err != nil {
return err
}

func (vdb *vtorcDB) QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error {
return sqlutils.QueryRowsMap(db, query, onRow)
}

// QueryVTOrc
func QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error {
db, err := OpenVTOrc()
if err != nil {
return err
}

func (vdb *vtorcDB) QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error {
if err = sqlutils.QueryRowsMap(db, query, onRow, argsArray...); err != nil {
log.Warning(err.Error())
}

return err
}

0 comments on commit 9e6c5e3

Please sign in to comment.