diff --git a/pkg/lorry/engines/oceanbase/manager.go b/pkg/lorry/engines/oceanbase/manager.go index 884564407df..7c3df4a26f1 100644 --- a/pkg/lorry/engines/oceanbase/manager.go +++ b/pkg/lorry/engines/oceanbase/manager.go @@ -23,6 +23,8 @@ import ( "context" "database/sql" "fmt" + "os" + "strconv" "strings" "time" @@ -33,6 +35,7 @@ import ( "github.com/apecloud/kubeblocks/pkg/lorry/dcs" "github.com/apecloud/kubeblocks/pkg/lorry/engines" "github.com/apecloud/kubeblocks/pkg/lorry/engines/mysql" + "github.com/apecloud/kubeblocks/pkg/lorry/util" ) const ( @@ -233,13 +236,41 @@ COMMIT;`, engines.CheckStatusType) } func (mgr *Manager) HealthyCheckForOracleMode(ctx context.Context, cluster *dcs.Cluster, member *dcs.Member) error { - // there is no golang driver for oceanbase oracle mode, so we just check the root connection for healthy. - isLeader, err := mgr.IsLeader(ctx, cluster) + // there is no golang driver for oceanbase oracle mode, so use mysql client to check + isLeader, err := mgr.IsLeaderMember(ctx, cluster, member) if err != nil { return err } if isLeader { - return nil + cmd := []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", "SELECT t.table_name tablename FROM user_tables t WHERE table_name = 'KB_HEALTH_CHECK'"} + output, err := util.ExecCommand(ctx, cmd, os.Environ()) + if err != nil { + return errors.Wrap(err, "check table failed") + } + if !strings.Contains(output, "KB_HEALTH_CHECK") { + sql := "create table kb_health_check (type int primary key, check_ts NUMBER);" + sql += fmt.Sprintf("INSERT INTO kb_health_check (type, check_ts) VALUES (1, %d);", time.Now().Unix()) + sql += "commit;" + cmd = []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", sql} + _, err = util.ExecCommand(ctx, cmd, os.Environ()) + if err != nil { + return errors.Wrap(err, "create table failed") + } + } + sql := fmt.Sprintf("UPDATE kb_health_check SET check_ts = %d WHERE type=1;", time.Now().Unix()) + sql += "commit;" + cmd = []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", sql} + _, err = util.ExecCommand(ctx, cmd, os.Environ()) + if err != nil { + return errors.Wrap(err, "create table failed") + } + } + + sql := "SELECT check_ts from kb_health_check WHERE type=1;" + cmd := []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", sql} + _, err = util.ExecCommand(ctx, cmd, os.Environ()) + if err != nil { + return errors.Wrap(err, "create table failed") } return nil } @@ -279,10 +310,6 @@ func (mgr *Manager) IsMemberLagging(ctx context.Context, cluster *dcs.Cluster, m } func (mgr *Manager) GetDBState(ctx context.Context, cluster *dcs.Cluster) *dcs.DBState { - if mgr.CompatibilityMode == ORACLE { - return nil - } - mgr.DBState = nil member := cluster.GetMemberWithName(mgr.CurrentMemberName) opTimestamp, err := mgr.GetMemberOpTimestamp(ctx, cluster, member) @@ -297,9 +324,22 @@ func (mgr *Manager) GetDBState(ctx context.Context, cluster *dcs.Cluster) *dcs.D } func (mgr *Manager) GetMemberOpTimestamp(ctx context.Context, cluster *dcs.Cluster, member *dcs.Member) (int64, error) { - if mgr.CompatibilityMode == ORACLE { - // oracle mode does not support op timestamp yet! - return 0, nil + compatibilityMode, err := mgr.GetCompatibilityMode(ctx) + if err != nil { + return 0, errors.Wrap(err, "compatibility mode unknown") + } + if compatibilityMode == ORACLE { + sql := "SELECT check_ts from kb_health_check WHERE type=1;" + cmd := []string{"mysql", "-h", member.PodIP, "-P", member.DBPort, "-u", "SYS@" + mgr.ReplicaTenant, "-e", sql} + output, err := util.ExecCommand(ctx, cmd, os.Environ()) + if err != nil { + return 0, errors.Wrap(err, "get timestamp failed") + } + stimeStamp := strings.Split(output, "\n") + if len(stimeStamp) < 2 { + return 0, nil + } + return strconv.ParseInt(stimeStamp[1], 10, 64) } addr := cluster.GetMemberAddrWithPort(*member) db, err := mgr.GetMySQLDBConnWithAddr(addr) @@ -323,7 +363,10 @@ func (mgr *Manager) Promote(ctx context.Context, cluster *dcs.Cluster) error { primaryTenant := "ALTER SYSTEM ACTIVATE STANDBY TENANT = " + mgr.ReplicaTenant if cluster.Switchover != nil { // it's a manual switchover + mgr.Logger.Info("manual switchover") primaryTenant = "ALTER SYSTEM SWITCHOVER TO PRIMARY TENANT = " + mgr.ReplicaTenant + } else { + mgr.Logger.Info("unexpected switchover, promote to primary directly") } _, err = db.Exec(primaryTenant)