Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update check tool #180

Merged
merged 15 commits into from
Apr 28, 2024
2 changes: 2 additions & 0 deletions cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ def do_command(self):
return False
cmd = '%s %s' % (self.prev_cmd, base)
ROOT_IO.track_limit += 1
if "main.py" in cmd:
telemetry.work_tag=False
telemetry.push_cmd_info("cmd: {0}. args:{1}".format(cmd,args))
return self.commands[base].init(cmd, args).do_command()

Expand Down
14 changes: 13 additions & 1 deletion common/ob_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class OBConnector(object):
def __init__(self, ip, port, username, password=None, database=None, stdio=None, timeout=10,):
def __init__(self, ip, port, username, password=None, database=None, stdio=None, timeout=30,):
self.ip = str(ip)
self.port = int(port)
self.username = str(username)
Expand Down Expand Up @@ -50,6 +50,18 @@ def _connect_db(self):
self.stdio.verbose("connect databse ...")
except mysql.Error as e:
self.stdio.error("connect OB: {0}:{1} with user {2} failed, error:{3}".format(self.ip, self.port, self.username, e))
return
try:
ob_trx_timeout=self.timeout*1000000
self.execute_sql("SET SESSION ob_trx_timeout={0};".format(ob_trx_timeout))
except Exception as e:
self.stdio.warn("set ob_trx_timeout failed, error:{0}".format(e))
try:
ob_query_timeout=self.timeout*1000000
self.execute_sql("SET SESSION ob_query_timeout={0};".format(ob_query_timeout))
except Exception as e:
self.stdio.warn("set ob_query_timeout failed, error:{0}".format(e))


def execute_sql(self, sql):
if self.conn is None:
Expand Down
12 changes: 9 additions & 3 deletions common/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"""
from common.ssh import SshHelper
from common.tool import StringUtils
from common.command import get_observer_version, get_obproxy_version
from common.command import get_observer_version, get_obproxy_version, get_observer_version_by_sql


def filter_by_version(scene, cluster, stdio=None):
try:
Expand Down Expand Up @@ -59,14 +60,19 @@ def filter_by_version(scene, cluster, stdio=None):
stdio.exception("filter_by_version Exception : {0}".format(e))
raise Exception("filter_by_version Exception : {0}".format(e))

def get_version(nodes, type, stdio=None):
def get_version(nodes, type,cluster, stdio=None):
try:
if len(nodes) < 1:
raise Exception("input nodes is empty, please check your config")
node = nodes[0]
ssh = SshHelper(True, node.get("ip"), node.get("ssh_username"), node.get("ssh_password"), node.get("ssh_port"), node.get("ssh_key_file"), node)
version = ""
if type == "observer":
version = get_observer_version(True, ssh, nodes[0]["home_path"], stdio)
try:
version = get_observer_version_by_sql(cluster,stdio)
except Exception as e:
stdio.warn("get observer version by sql fail, use node ssher to get. Exception:{0}".format(e))
version = get_observer_version(True, ssh, nodes[0]["home_path"], stdio)
elif type == "obproxy":
version = get_obproxy_version(True, ssh, nodes[0]["home_path"], stdio)
return version
Expand Down
1 change: 0 additions & 1 deletion conf/inner_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ check:
report:
report_path: "./check_report/"
export_type: table
package_file: "~/.obdiag/check/check_package.yaml"
tasks_base_path: "~/.obdiag/check/tasks/"
gather:
scenes_base_path: "~/.obdiag/gather/tasks"
Expand Down
79 changes: 76 additions & 3 deletions handler/checker/check_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
"""

import os
import queue
import time

import yaml

from common.ob_connector import OBConnector
from common.ssh import SshHelper
from handler.checker.check_exception import CheckException
from handler.checker.check_report import TaskReport, CheckReport, CheckrReportException
from handler.checker.check_task import TaskBase
Expand All @@ -27,7 +33,6 @@
from common.tool import YamlUtils
from common.tool import StringUtils


class CheckHandler:

def __init__(self, context, check_target_type="observer"):
Expand Down Expand Up @@ -87,6 +92,35 @@ def __init__(self, context, check_target_type="observer"):
# input_param
self.options=self.context.options

# add ssher
new_node=[]
for node in self.nodes:
# add ssher
ssher = None
try:
ssher = SshHelper(True, node.get("ip"),
node.get("ssh_username"),
node.get("ssh_password"),
node.get("ssh_port"),
node.get("ssh_key_file"),
node)
except Exception as e:
self.stdio.warn("StepBase get SshHelper fail on{0} ,Exception: {1}".format(node.get("ip"), e))
node["ssher"] = ssher
new_node.append(node)
self.nodes=new_node
self.version=get_version(self.nodes, self.check_target_type,self.cluster, self.stdio)

# add OBConnectorPool
try:
obConnectorPool=checkOBConnectorPool(context,3,self.cluster)

except Exception as e:
self.stdio.warn("obConnector init error. Error info is {0}".format(e))
finally:
self.context.set_variable('check_obConnector_pool', obConnectorPool)


def handle(self):
try:
package_name = None
Expand Down Expand Up @@ -173,7 +207,7 @@ def execute_one(self, task_name):
# Verify if the version is within a reasonable range
report = TaskReport(self.context,task_name)
if not self.ignore_version:
version = get_version(self.nodes, self.check_target_type, self.stdio)
version = self.version
if version:
self.cluster["version"] = version
self.stdio.verbose("cluster.version is {0}".format(self.cluster["version"]))
Expand All @@ -191,6 +225,7 @@ def execute_one(self, task_name):
raise CheckException("execute_one Exception : {0}".format(e))

def execute(self):
start_time = time.time()
try:
self.stdio.verbose(
"execute_all_tasks. the number of tasks is {0} ,tasks is {1}".format(len(self.tasks.keys()),
Expand All @@ -206,4 +241,42 @@ def execute(self):
except CheckrReportException as e:
self.stdio.error("Report error :{0}".format(e))
except Exception as e:
self.stdio.error("Internal error :{0}".format(e))
self.stdio.error("Internal error :{0}".format(e))
end_time = time.time()
Teingi marked this conversation as resolved.
Show resolved Hide resolved
print("Total cost time is {0} s".format((end_time - start_time)))
class checkOBConnectorPool:
def __init__(self,context, max_size, cluster):
self.max_size = max_size
self.cluster=cluster
self.connections = queue.Queue(maxsize=max_size)
self.stdio=context.stdio
self.stdio.verbose("obConnectorPool init success!")
try:
for i in range(max_size):
conn = OBConnector(
ip=self.cluster.get("db_host"),
port=self.cluster.get("db_port"),
username=self.cluster.get("tenant_sys").get("user"),
password=self.cluster.get("tenant_sys").get("password"),
stdio=self.stdio,
timeout=10000
)
self.connections.put(conn)
self.stdio.verbose("obConnectorPool init success!")
except Exception as e:
self.stdio.error("obConnectorPool init fail! err:".format(e))


def get_connection(self):
try:
return self.connections.get()
except Exception as e:
self.stdio.error("get connection fail! err:".format(e))
return None

def release_connection(self, conn):

if conn is not None:
self.connections.put(conn)
return

20 changes: 18 additions & 2 deletions handler/checker/check_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
@file: check_task.py
@desc:
"""
import threading

from common.ob_connector import OBConnector
from handler.checker.check_exception import StepResultFailException, \
StepExecuteFailException, StepResultFalseException, TaskException
from handler.checker.step.stepbase import StepBase
Expand Down Expand Up @@ -46,7 +48,18 @@ def execute(self):
self.stdio.verbose("filter_by_version is return {0}".format(steps_nu))
if len(self.nodes) == 0:
raise Exception("node is not exist")
# TODO: 这里的逻辑需要优化,如果一个节点执行失败了,那么后续的步骤就不会被执行了。
work_threads = []
for node in self.nodes:
t = threading.Thread(target=self.execute_one_node, args=(steps_nu,node))
work_threads.append(t)
t.start()
for t in work_threads:
t.join()

self.stdio.verbose("task execute end")
def execute_one_node(self,steps_nu,node):
try:
self.stdio.verbose("run task in node: {0}".format(StringUtils.node_cut_passwd_for_log(node)))
steps = self.task[steps_nu]
nu = 1
Expand All @@ -58,7 +71,6 @@ def execute(self):
step_run = StepBase(self.context, step, node, self.cluster, self.task_variable_dict)
self.stdio.verbose("step nu: {0} initted, to execute".format(nu))
step_run.execute(self.report)
self.task_variable_dict = step_run.update_task_variable_dict()
if "report_type" in step["result"] and step["result"]["report_type"] == "execution":
self.stdio.verbose("report_type stop this step")
return
Expand All @@ -77,4 +89,8 @@ def execute(self):

self.stdio.verbose("step nu: {0} execute end ".format(nu))
nu = nu + 1
self.stdio.verbose("task execute end")
except Exception as e:
self.stdio.error("TaskBase execute Exception: {0}".format(e))
raise e


14 changes: 5 additions & 9 deletions handler/checker/step/data_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,14 @@ def __init__(self,context, step, node, task_variable_dict):
self.task_variable_dict = task_variable_dict

try:
is_ssh = True
self.ssh_helper = SshHelper(is_ssh, node.get("ip"),
node.get("ssh_username"),
node.get("ssh_password"),
node.get("ssh_port"),
node.get("ssh_key_file"),
node)
self.ssh_helper=self.node["ssher"]
if self.ssh_helper is None:
raise Exception("self.ssh_helper is None.")
except Exception as e:
self.stdio.error(
"GetSystemParameterHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))
"DataSizeHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))
raise Exception(
"GetSystemParameterHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))
"DataSizeHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))

# step report
self.parameter = []
Expand Down
10 changes: 3 additions & 7 deletions handler/checker/step/get_system_parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,9 @@ def __init__(self,context, step, node, task_variable_dict):
self.task_variable_dict = task_variable_dict

try:
is_ssh = True
self.ssh_helper = SshHelper(is_ssh, node.get("ip"),
node.get("ssh_username"),
node.get("ssh_password"),
node.get("ssh_port"),
node.get("ssh_key_file"),
node)
self.ssh_helper=self.node["ssher"]
if self.ssh_helper is None:
raise Exception("self.ssh_helper is None.")
except Exception as e:
self.stdio.error(
"GetSystemParameterHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))
Expand Down
30 changes: 16 additions & 14 deletions handler/checker/step/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,23 @@


class StepSQLHandler:
def __init__(self,context, step, ob_cluster, task_variable_dict):
def __init__(self,context, step, task_variable_dict):
try:
self.context = context
self.stdio = context.stdio
self.ob_cluster = ob_cluster
self.ob_cluster_name = ob_cluster.get("cluster_name")
self.ob_cluster = self.context.cluster_config
self.ob_cluster_name = self.ob_cluster.get("cluster_name")
self.tenant_mode = None
self.sys_database = None
self.database = None
self.ob_connector = OBConnector(ip=ob_cluster.get("db_host"),
port=ob_cluster.get("db_port"),
username=ob_cluster.get("tenant_sys").get("user"),
password=ob_cluster.get("tenant_sys").get("password"),
stdio=self.stdio,
timeout=10000)
self.ob_connector_pool=self.context.get_variable('check_obConnector_pool',None)
if self.ob_connector_pool is not None:
self.ob_connector=self.ob_connector_pool.get_connection()
if self.ob_connector is None:
raise Exception("self.ob_connector is None.")
except Exception as e:
self.stdio.error("StepSQLHandler init fail. Please check the OBCLUSTER conf. OBCLUSTER: {0} Exception : {1} .".format(ob_cluster,e))
raise Exception("StepSQLHandler init fail. Please check the OBCLUSTER conf. OBCLUSTER: {0} Exception : {1} .".format(ob_cluster,e))
self.stdio.error("StepSQLHandler init fail. Please check the OBCLUSTER conf. Exception : {0} .".format(e))
raise Exception("StepSQLHandler init fail. Please check the OBCLUSTER conf. Exception : {0} .".format(e))
self.task_variable_dict = task_variable_dict
self.enable_dump_db = False
self.trace_id = None
Expand All @@ -62,8 +61,9 @@ def execute(self):
if data is None:
self.stdio.warn("sql result is None: {0}".format(self.step["sql"]))
self.stdio.verbose("execute_sql result:{0}".format(data))
if len(data) == 0:
if len(data) == 0 or data is None:
self.stdio.warn("sql result is None: {0}".format(self.step["sql"]))
data=""
else:
data = data[0][0]
if data is None:
Expand All @@ -73,8 +73,10 @@ def execute(self):
self.stdio.verbose("sql execute update task_variable_dict: {0} = {1}".format(self.step["result"]["set_value"], Util.convert_to_number(data)))
self.task_variable_dict[self.step["result"]["set_value"]] = Util.convert_to_number(data)
except Exception as e:
self.stdio.error("StepSQLHandler execute Exception: {0}".format(e).strip())
raise StepExecuteFailException("StepSQLHandler execute Exception: {0}".format(e).strip())
self.stdio.error("StepSQLHandler execute Exception: {0}".format(e))
raise StepExecuteFailException("StepSQLHandler execute Exception: {0}".format(e))
finally:
self.ob_connector_pool.release_connection(self.ob_connector)

def update_step_variable_dict(self):
return self.task_variable_dict
11 changes: 3 additions & 8 deletions handler/checker/step/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from handler.checker.check_exception import StepExecuteFailException
from handler.checker.check_report import TaskReport
from common.ssh import SshHelper
from common.tool import StringUtils
from common.tool import Util

Expand All @@ -32,13 +31,9 @@ def __init__(self,context, step, node, task_variable_dict):
self.step = step
self.node = node
try:
is_ssh = True
self.ssh_helper = SshHelper(is_ssh, node.get("ip"),
node.get("ssh_username"),
node.get("ssh_password"),
node.get("ssh_port"),
node.get("ssh_key_file"),
node)
self.ssh_helper=self.node["ssher"]
if self.ssh_helper is None:
raise Exception("self.ssh_helper is None.")
except Exception as e:
self.stdio.error(
"SshHandler init fail. Please check the NODES conf. node: {0}. Exception : {1} .".format(node, e))
Expand Down
7 changes: 3 additions & 4 deletions handler/checker/step/stepbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,16 @@ def execute(self, report):
self.task_variable_dict["remote_ip"] = \
docker.from_env().containers.get(self.node["container_name"]).attrs['NetworkSettings']['Networks'][
'bridge']["IPAddress"]
for key in self.node:
self.task_variable_dict["remote_{0}".format(key)] = self.node[key]

for node in self.node:
self.task_variable_dict["remote_{0}".format(node)] = self.node[node]
if "type" not in self.step:
raise StepExecuteFailException("Missing field :type")
if self.step["type"] == "get_system_parameter":
handler = GetSystemParameterHandler(self.context, self.step, self.node, self.task_variable_dict)
elif self.step["type"] == "ssh":
handler = SshHandler(self.context, self.step, self.node, self.task_variable_dict)
elif self.step["type"] == "sql":
handler = StepSQLHandler(self.context, self.step, self.cluster, self.task_variable_dict)
handler = StepSQLHandler(self.context, self.step, task_variable_dict=self.task_variable_dict)
elif self.step["type"] == "data_size":
handler = DataSizeHandler(self.context, self.step, self.cluster, self.task_variable_dict)
else:
Expand Down
Loading
Loading