Skip to content

Commit

Permalink
feat: support oceanbase failover in oracle mode (#7144)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuriwuyun authored Apr 24, 2024
1 parent 8194d22 commit e2b10cb
Showing 1 changed file with 53 additions and 10 deletions.
63 changes: 53 additions & 10 deletions pkg/lorry/engines/oceanbase/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"context"
"database/sql"
"fmt"
"os"
"strconv"
"strings"
"time"

Expand All @@ -33,6 +35,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/lorry/dcs"
"github.com/apecloud/kubeblocks/pkg/lorry/engines"
"github.com/apecloud/kubeblocks/pkg/lorry/engines/mysql"
"github.com/apecloud/kubeblocks/pkg/lorry/util"
)

const (
Expand Down Expand Up @@ -233,13 +236,41 @@ COMMIT;`, engines.CheckStatusType)
}

func (mgr *Manager) HealthyCheckForOracleMode(ctx context.Context, cluster *dcs.Cluster, member *dcs.Member) error {
// there is no golang driver for oceanbase oracle mode, so we just check the root connection for healthy.
isLeader, err := mgr.IsLeader(ctx, cluster)
// there is no golang driver for oceanbase oracle mode, so use mysql client to check
isLeader, err := mgr.IsLeaderMember(ctx, cluster, member)
if err != nil {
return err
}
if isLeader {
return nil
cmd := []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", "SELECT t.table_name tablename FROM user_tables t WHERE table_name = 'KB_HEALTH_CHECK'"}
output, err := util.ExecCommand(ctx, cmd, os.Environ())
if err != nil {
return errors.Wrap(err, "check table failed")
}
if !strings.Contains(output, "KB_HEALTH_CHECK") {
sql := "create table kb_health_check (type int primary key, check_ts NUMBER);"
sql += fmt.Sprintf("INSERT INTO kb_health_check (type, check_ts) VALUES (1, %d);", time.Now().Unix())
sql += "commit;"
cmd = []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", sql}
_, err = util.ExecCommand(ctx, cmd, os.Environ())
if err != nil {
return errors.Wrap(err, "create table failed")
}
}
sql := fmt.Sprintf("UPDATE kb_health_check SET check_ts = %d WHERE type=1;", time.Now().Unix())
sql += "commit;"
cmd = []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", sql}
_, err = util.ExecCommand(ctx, cmd, os.Environ())
if err != nil {
return errors.Wrap(err, "create table failed")
}
}

sql := "SELECT check_ts from kb_health_check WHERE type=1;"
cmd := []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", sql}
_, err = util.ExecCommand(ctx, cmd, os.Environ())
if err != nil {
return errors.Wrap(err, "create table failed")
}
return nil
}
Expand Down Expand Up @@ -279,10 +310,6 @@ func (mgr *Manager) IsMemberLagging(ctx context.Context, cluster *dcs.Cluster, m
}

func (mgr *Manager) GetDBState(ctx context.Context, cluster *dcs.Cluster) *dcs.DBState {
if mgr.CompatibilityMode == ORACLE {
return nil
}

mgr.DBState = nil
member := cluster.GetMemberWithName(mgr.CurrentMemberName)
opTimestamp, err := mgr.GetMemberOpTimestamp(ctx, cluster, member)
Expand All @@ -297,9 +324,22 @@ func (mgr *Manager) GetDBState(ctx context.Context, cluster *dcs.Cluster) *dcs.D
}

func (mgr *Manager) GetMemberOpTimestamp(ctx context.Context, cluster *dcs.Cluster, member *dcs.Member) (int64, error) {
if mgr.CompatibilityMode == ORACLE {
// oracle mode does not support op timestamp yet!
return 0, nil
compatibilityMode, err := mgr.GetCompatibilityMode(ctx)
if err != nil {
return 0, errors.Wrap(err, "compatibility mode unknown")
}
if compatibilityMode == ORACLE {
sql := "SELECT check_ts from kb_health_check WHERE type=1;"
cmd := []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", sql}
output, err := util.ExecCommand(ctx, cmd, os.Environ())
if err != nil {
return 0, errors.Wrap(err, "get timestamp failed")
}
stimeStamp := strings.Split(output, "\n")
if len(stimeStamp) < 2 {
return 0, nil
}
return strconv.ParseInt(stimeStamp[1], 10, 64)
}
addr := cluster.GetMemberAddrWithPort(*member)
db, err := mgr.GetMySQLDBConnWithAddr(addr)
Expand All @@ -323,7 +363,10 @@ func (mgr *Manager) Promote(ctx context.Context, cluster *dcs.Cluster) error {
primaryTenant := "ALTER SYSTEM ACTIVATE STANDBY TENANT = " + mgr.ReplicaTenant
if cluster.Switchover != nil {
// it's a manual switchover
mgr.Logger.Info("manual switchover")
primaryTenant = "ALTER SYSTEM SWITCHOVER TO PRIMARY TENANT = " + mgr.ReplicaTenant
} else {
mgr.Logger.Info("unexpected switchover, promote to primary directly")
}

_, err = db.Exec(primaryTenant)
Expand Down

0 comments on commit e2b10cb

Please sign in to comment.