From 6d222e7953e72478bbfdbd60880a73bffb273808 Mon Sep 17 00:00:00 2001 From: Teingi Date: Wed, 13 Nov 2024 20:17:19 +0800 Subject: [PATCH 1/4] update top sql display --- diag_cmd.py | 5 +- .../sql/schedule/schedule_slow_sql.py | 182 ++++++++++++++++++ handler/display/tasks/observer/topsql.yaml | 146 +++++++------- 3 files changed, 266 insertions(+), 67 deletions(-) create mode 100644 handler/analyzer/sql/schedule/schedule_slow_sql.py diff --git a/diag_cmd.py b/diag_cmd.py index 544ae539..72751c42 100644 --- a/diag_cmd.py +++ b/diag_cmd.py @@ -955,6 +955,7 @@ def __init__(self): self.parser.add_option('--limit', type='string', help="The limit on the number of data rows returned by sql_audit for the tenant.", default=2000) self.parser.add_option('--store_dir', type='string', help='the dir to store result, current dir by default.', default='./obdiag_analyze/') self.parser.add_option('--elapsed_time', type='string', help='The minimum threshold for filtering execution time, measured in microseconds.', default=100000) + self.parser.add_option('--run_mode', type='string', help='run mode, choices=[single, continuous', default='single') self.parser.add_option('-c', type='string', help='obdiag custom config', default=os.path.expanduser('~/.obdiag/config.yml')) self.parser.add_option('--config', action="append", type="string", help='config options Format: --config key=value') @@ -1194,8 +1195,8 @@ def __init__(self): self.register_command(ObdiagAnalyzeVariableCommand()) self.register_command(ObdiagAnalyzeQueueCommand()) self.register_command(ObdiagAnalyzeIndexSpaceCommand()) - # self.register_command(ObdiagAnalyzeSQLCommand()) - # self.register_command(ObdiagAnalyzeSQLReviewCommand()) + self.register_command(ObdiagAnalyzeSQLCommand()) + self.register_command(ObdiagAnalyzeSQLReviewCommand()) class ObdiagRCACommand(MajorCommand): diff --git a/handler/analyzer/sql/schedule/schedule_slow_sql.py b/handler/analyzer/sql/schedule/schedule_slow_sql.py new file mode 100644 index 00000000..cab9c4e4 --- /dev/null +++ b/handler/analyzer/sql/schedule/schedule_slow_sql.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -* +# Copyright (c) 2022 OceanBase +# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +@time: 2024/11/04 +@file: schedule_sql_audit.py +@desc: +""" +import sqlite3 +import time +import threading + +slowSqlSelect = '''select + sql_id, + trace_id, + event, + client_ip, + table_scan as is_table_scan, + wait_time_micro as wait_time, + total_wait_time_micro as total_wait_time, + execute_time - total_wait_time_micro + get_plan_time cpu_time, + tenant_id, + user_id, + db_id, + request_id, + request_time, + plan_id, + request_id, + request_time, + client_port, + affected_rows, + return_rows, + partition_cnt, + ret_code, + total_waits, + rpc_count, + plan_type, + is_inner_sql, + is_executor_rpc, + is_hit_plan, + elapsed_time, + net_time, + net_wait_time, + queue_time, + decode_time, + get_plan_time, + execute_time, + application_wait_time, + concurrency_wait_time, + user_io_wait_time, + schedule_time, + row_cache_hit, + bloom_filter_cache_hit, + block_cache_hit, + block_index_cache_hit, + disk_reads, + retry_cnt, + consistency_level, + memstore_read_row_count, + ssstore_read_row_count + from `v$sql_audit` + where tenant_id = ? + and request_id >= ? + and request_id <= ? + and (elapsed_time > ? or (plan_type != 1 and elapsed_time > ? and length(query_sql) =0))''' + + +class SlowSQLAuditSchedule(object): + def __init__(self, context): + super(SlowSQLAuditSchedule, self).__init__() + self.context = context + self.stdio = context.stdio + + def get_last_request_time(tenant_id): + conn = sqlite3.connect(SQLITE_DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT last_request_time FROM tenant_last_request_time WHERE tenant_id = ?", (tenant_id,)) + result = cursor.fetchone() + conn.close() + return result[0] if result else None + + def update_last_request_time(tenant_id, last_request_time): + conn = sqlite3.connect(SQLITE_DB_PATH) + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO tenant_last_request_time (tenant_id, last_request_time) VALUES (?, ?) + ON CONFLICT(tenant_id) DO UPDATE SET last_request_time=excluded.last_request_time; + """, (tenant_id, last_request_time)) + conn.commit() + conn.close() + + def fetch_and_store_data(tenant_id): + last_request_time = self.get_last_request_time(tenant_id) + + # 连接OceanBase + connection_ob = pymysql.connect(**OCEANBASE_CONFIG) + cursor_ob = connection_ob.cursor() + + # 执行查询 + cursor_ob.execute(QUERY_GV_SQL_AUDIT, (tenant_id, last_request_time or '1970-01-01 00:00:00')) + results = cursor_ob.fetchall() + + # 关闭OceanBase连接 + cursor_ob.close() + connection_ob.close() + + # 如果没有新的数据,则直接返回 + if not results: + return + + # 获取最新的request_time + new_last_request_time = max(row['request_time'] for row in results) + + # 连接SQLite + connection_sqlite = sqlite3.connect(SQLITE_DB_PATH) + cursor_sqlite = connection_sqlite.cursor() + + # 创建表(如果不存在) + cursor_sqlite.execute(''' + CREATE TABLE IF NOT EXISTS ob_sql_audit ( + -- 根据gv$ob_sql_audit的实际列定义表结构 + id INTEGER PRIMARY KEY, + svr_ip TEXT, + svr_port INTEGER, + request_id INTEGER, + tenant_id INTEGER, + request_time DATETIME, + ... + ) + ''') + + # 创建用于跟踪最后请求时间的表(如果不存在) + cursor_sqlite.execute(''' + CREATE TABLE IF NOT EXISTS tenant_last_request_time ( + tenant_id INTEGER PRIMARY KEY, + last_request_time DATETIME + ) + ''') + + # 插入数据 + for row in results: + cursor_sqlite.execute(''' + INSERT INTO ob_sql_audit (id, svr_ip, svr_port, request_id, tenant_id, request_time, ...) + VALUES (?, ?, ?, ?, ?, ?, ...); + ''', (row['id'], row['svr_ip'], row['svr_port'], row['request_id'], row['tenant_id'], row['request_time'], ...)) + + # 提交事务并关闭连接 + connection_sqlite.commit() + cursor_sqlite.close() + connection_sqlite.close() + + # 更新最后请求时间 + update_last_request_time(tenant_id, new_last_request_time) + + def scheduled_task(): + while True: + print("Executing scheduled task...") + # 执行你的定时任务 + time.sleep(60) # 每60秒执行一次 + + def job(): + # 假设我们有一个租户ID列表 + tenant_ids = [1001, 1002, 1003] # 示例租户ID + for tenant_id in tenant_ids: + fetch_and_store_data(tenant_id) + + # 每小时运行一次job + schedule.every().hour.do(job) + + # 主循环 + while True: + schedule.run_pending() + time.sleep(1) \ No newline at end of file diff --git a/handler/display/tasks/observer/topsql.yaml b/handler/display/tasks/observer/topsql.yaml index 9fd6e181..34a3e859 100644 --- a/handler/display/tasks/observer/topsql.yaml +++ b/handler/display/tasks/observer/topsql.yaml @@ -7,98 +7,114 @@ task: steps: - type: sql tittle: Top SQL time consumption in the last {mtime} minutes - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,SQL_ID,QUERY_SQL,count(1),avg(ELAPSED_TIME),avg(EXECUTE_TIME),avg(QUEUE_TIME),avg(AFFECTED_ROWS),avg(GET_PLAN_TIME) - from oceanbase.gv$ob_sql_audit - where time_to_usec(now(6))-request_time <{mtime}*60*1000000 - and tenant_name='{tenant_name}' - group by SQL_ID order by avg(ELAPSED_TIME)*count(1) desc limit 20;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,SQL_ID,substr(query_sql, 1, 20) as query_sql,count(1),avg(ELAPSED_TIME),avg(EXECUTE_TIME),avg(QUEUE_TIME),avg(AFFECTED_ROWS),avg(GET_PLAN_TIME) + from oceanbase.gv$ob_sql_audit + where time_to_usec(now(6))-request_time <{mtime}*60*1000000 + and tenant_name='{tenant_name}' + group by SQL_ID order by avg(ELAPSED_TIME)*count(1) desc limit 20;" global: true - type: sql tittle: Top-N SQL queries ranked by request count in the last {mtime} minutes - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,SQL_ID, QUERY_SQL,count(*) as QPS, avg(t1.elapsed_time) RT - from oceanbase.gv$ob_sql_audit t1 - where tenant_name='{tenant_name}' and IS_EXECUTOR_RPC = 0 - and request_time > (time_to_usec(now()) - {mtime}*60*1000000) - and request_time < time_to_usec(now()) - group by t1.sql_id order by QPS desc limit 20;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,SQL_ID, substr(query_sql, 1, 20) as query_sql,count(*) as QPS, avg(t1.elapsed_time) RT + from oceanbase.gv$ob_sql_audit t1 + where tenant_name='{tenant_name}' and IS_EXECUTOR_RPC = 0 + and request_time > (time_to_usec(now()) - {mtime}*60*1000000) + and request_time < time_to_usec(now()) + group by t1.sql_id order by QPS desc limit 20;" global: true - type: sql tittle: The SQL that consumes the most CPU among all SQLs in the last {mtime} minutes - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,sql_id, substr(query_sql, 1, 20) as query_sql, - sum(elapsed_time - queue_time) sum_t, count(*) cnt, - avg(get_plan_time), avg(execute_time) - from oceanbase.gv$ob_sql_audit - where tenant_name ='{tenant_name}' - and request_time > (time_to_usec(now()) - {mtime}*60*1000000) - and request_time < time_to_usec(now()) - group by sql_id order by sum_t desc limit 20;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,sql_id, substr(query_sql, 1, 20) as query_sql, + sum(elapsed_time - queue_time) as cpu_time, count(*) cnt, + avg(get_plan_time), avg(execute_time) + from oceanbase.gv$ob_sql_audit + where tenant_name ='{tenant_name}' + and request_time > (time_to_usec(now()) - {mtime}*60*1000000) + and request_time < time_to_usec(now()) + group by sql_id order by cpu_time desc limit 20;" global: true - type: sql tittle: Check whether there have been a large number of unreasonable remote execution requests for SQL executions in the past {mtime} minutes - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ count(*), plan_type - from oceanbase.gv$ob_sql_audit - where tenant_name ='{tenant_name}' - and IS_EXECUTOR_RPC = 0 - and request_time > (time_to_usec(now()) - {mtime}*60*1000000) - and request_time < time_to_usec(now()) - group by plan_type limit 20;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ count(*), plan_type + from oceanbase.gv$ob_sql_audit + where tenant_name ='{tenant_name}' + and IS_EXECUTOR_RPC = 0 + and is_inner_sql = 0 + and request_time > (time_to_usec(now()) - {mtime}*60*1000000) + and request_time < time_to_usec(now()) + group by plan_type limit 20;" global: true - type: sql tittle: SQL for querying a full table scan - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ query_sql - from oceanbase.gv$ob_sql_audit - where table_scan = 1 and tenant_name = '{tenant_name}' - and request_time > (time_to_usec(now()) - {mtime}*60*1000000) - and request_time < time_to_usec(now()) - group by sql_id limit 20;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name, SQL_ID, substr(query_sql, 1, 20) as query_sql, elapsed_time + from oceanbase.gv$ob_sql_audit + where table_scan = 1 and tenant_name = '{tenant_name}' + and request_time > (time_to_usec(now()) - {mtime}*60*1000000) + and request_time < time_to_usec(now()) + and is_inner_sql = 0 + order by elapsed_time desc limit 20;" global: true - version: "[3.0.0.0, 4.0.0.0]" steps: - type: sql tittle: Top SQL time consumption in the last {mtime} minutes - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,SQL_ID,QUERY_SQL,count(1),avg(ELAPSED_TIME),avg(EXECUTE_TIME),avg(QUEUE_TIME),avg(AFFECTED_ROWS),avg(GET_PLAN_TIME) - from oceanbase.gv$sql_audit - where time_to_usec(now(6))-request_time <{mtime}*60*1000000 - and tenant_name='{tenant_name}' - group by SQL_ID order by avg(ELAPSED_TIME)*count(1) desc limit 20 ;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,SQL_ID,substr(query_sql, 1, 20) as query_sql,count(1),avg(ELAPSED_TIME),avg(EXECUTE_TIME),avg(QUEUE_TIME),avg(AFFECTED_ROWS),avg(GET_PLAN_TIME) + from oceanbase.gv$sql_audit + where time_to_usec(now(6))-request_time <{mtime}*60*1000000 + and tenant_name='{tenant_name}' + and is_inner_sql = 0 + group by SQL_ID order by avg(ELAPSED_TIME)*count(1) desc limit 20 ;" global: true - type: sql tittle: Top-N SQL queries ranked by request count in the last {mtime} minutes - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name, SQL_ID,QUERY_SQL, count(*) as QPS, avg(t1.elapsed_time) RT - from oceanbase.gv$sql_audit t1 - where tenant_name='{tenant_name}' - and IS_EXECUTOR_RPC = 0 - and request_time > (time_to_usec(now()) - {mtime}*60*1000000) - and request_time < time_to_usec(now()) - group by t1.sql_id order by QPS desc limit 20;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name, SQL_ID,substr(query_sql, 1, 20) as query_sql, count(*) as QPS, avg(t1.elapsed_time) RT + from oceanbase.gv$sql_audit t1 + where tenant_name='{tenant_name}' + and IS_EXECUTOR_RPC = 0 + and is_inner_sql = 0 + and request_time > (time_to_usec(now()) - {mtime}*60*1000000) + and request_time < time_to_usec(now()) + group by t1.sql_id order by QPS desc limit 20;" global: true - type: sql tittle: The SQL that consumes the most CPU among all SQLs in the last {mtime} minutes - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,sql_id, substr(query_sql, 1, 20) as query_sql, - sum(elapsed_time - queue_time) sum_t, count(*) cnt, - avg(get_plan_time), avg(execute_time) - from oceanbase.gv$sql_audit - where tenant_name='{tenant_name}' - and request_time > (time_to_usec(now()) - {mtime}*60*1000000) - and request_time < time_to_usec(now()) - group by sql_id order by sum_t desc limit 20;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name,sql_id, substr(query_sql, 1, 20) as query_sql, + sum(elapsed_time - queue_time) as cpu_time, count(*) cnt, + avg(get_plan_time), avg(execute_time) + from oceanbase.gv$sql_audit + where tenant_name='{tenant_name}' + and request_time > (time_to_usec(now()) - {mtime}*60*1000000) + and request_time < time_to_usec(now()) + and is_inner_sql = 0 + group by sql_id order by cpu_time desc limit 20;" global: true - type: sql tittle: Check whether there have been a large number of unreasonable remote execution requests for SQL executions in the past {mtime} minutes - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ count(*), plan_type - from oceanbase.gv$sql_audit - where tenant_name='{tenant_name}' - and IS_EXECUTOR_RPC = 0 - and request_time > (time_to_usec(now()) - {mtime}*60*1000000) - and request_time < time_to_usec(now()) - group by plan_type limit 20;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ count(*), plan_type + from oceanbase.gv$sql_audit + where tenant_name='{tenant_name}' + and IS_EXECUTOR_RPC = 0 + and request_time > (time_to_usec(now()) - {mtime}*60*1000000) + and request_time < time_to_usec(now()) + group by plan_type limit 20;" global: true - type: sql tittle: SQL for querying a full table scan - sql: "select /*+read_consistency(weak),query_timeout(100000000)*/ query_sql - from oceanbase.gv$sql_audit - where table_scan = 1 and tenant_name = '{tenant_name}' - and request_time > (time_to_usec(now()) - {mtime}*60*10000) - and request_time < time_to_usec(now()) - group by sql_id limit 20;" + sql: + "select /*+read_consistency(weak),query_timeout(100000000)*/ tenant_name, SQL_ID, substr(query_sql, 1, 20) as query_sql + from oceanbase.gv$sql_audit + where table_scan = 1 and tenant_name = '{tenant_name}' + and request_time > (time_to_usec(now()) - {mtime}*60*10000) + and request_time < time_to_usec(now()) + and is_inner_sql = 0 + order by elapsed_time desc limit 20;" global: true From 3bc964a0cd6b002956df4a799f36b5d82747483b Mon Sep 17 00:00:00 2001 From: Teingi Date: Wed, 13 Nov 2024 20:18:09 +0800 Subject: [PATCH 2/4] fix --- .../sql/schedule/schedule_slow_sql.py | 182 ------------------ 1 file changed, 182 deletions(-) delete mode 100644 handler/analyzer/sql/schedule/schedule_slow_sql.py diff --git a/handler/analyzer/sql/schedule/schedule_slow_sql.py b/handler/analyzer/sql/schedule/schedule_slow_sql.py deleted file mode 100644 index cab9c4e4..00000000 --- a/handler/analyzer/sql/schedule/schedule_slow_sql.py +++ /dev/null @@ -1,182 +0,0 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -* -# Copyright (c) 2022 OceanBase -# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -# See the Mulan PSL v2 for more details. - -""" -@time: 2024/11/04 -@file: schedule_sql_audit.py -@desc: -""" -import sqlite3 -import time -import threading - -slowSqlSelect = '''select - sql_id, - trace_id, - event, - client_ip, - table_scan as is_table_scan, - wait_time_micro as wait_time, - total_wait_time_micro as total_wait_time, - execute_time - total_wait_time_micro + get_plan_time cpu_time, - tenant_id, - user_id, - db_id, - request_id, - request_time, - plan_id, - request_id, - request_time, - client_port, - affected_rows, - return_rows, - partition_cnt, - ret_code, - total_waits, - rpc_count, - plan_type, - is_inner_sql, - is_executor_rpc, - is_hit_plan, - elapsed_time, - net_time, - net_wait_time, - queue_time, - decode_time, - get_plan_time, - execute_time, - application_wait_time, - concurrency_wait_time, - user_io_wait_time, - schedule_time, - row_cache_hit, - bloom_filter_cache_hit, - block_cache_hit, - block_index_cache_hit, - disk_reads, - retry_cnt, - consistency_level, - memstore_read_row_count, - ssstore_read_row_count - from `v$sql_audit` - where tenant_id = ? - and request_id >= ? - and request_id <= ? - and (elapsed_time > ? or (plan_type != 1 and elapsed_time > ? and length(query_sql) =0))''' - - -class SlowSQLAuditSchedule(object): - def __init__(self, context): - super(SlowSQLAuditSchedule, self).__init__() - self.context = context - self.stdio = context.stdio - - def get_last_request_time(tenant_id): - conn = sqlite3.connect(SQLITE_DB_PATH) - cursor = conn.cursor() - cursor.execute("SELECT last_request_time FROM tenant_last_request_time WHERE tenant_id = ?", (tenant_id,)) - result = cursor.fetchone() - conn.close() - return result[0] if result else None - - def update_last_request_time(tenant_id, last_request_time): - conn = sqlite3.connect(SQLITE_DB_PATH) - cursor = conn.cursor() - cursor.execute(""" - INSERT INTO tenant_last_request_time (tenant_id, last_request_time) VALUES (?, ?) - ON CONFLICT(tenant_id) DO UPDATE SET last_request_time=excluded.last_request_time; - """, (tenant_id, last_request_time)) - conn.commit() - conn.close() - - def fetch_and_store_data(tenant_id): - last_request_time = self.get_last_request_time(tenant_id) - - # 连接OceanBase - connection_ob = pymysql.connect(**OCEANBASE_CONFIG) - cursor_ob = connection_ob.cursor() - - # 执行查询 - cursor_ob.execute(QUERY_GV_SQL_AUDIT, (tenant_id, last_request_time or '1970-01-01 00:00:00')) - results = cursor_ob.fetchall() - - # 关闭OceanBase连接 - cursor_ob.close() - connection_ob.close() - - # 如果没有新的数据,则直接返回 - if not results: - return - - # 获取最新的request_time - new_last_request_time = max(row['request_time'] for row in results) - - # 连接SQLite - connection_sqlite = sqlite3.connect(SQLITE_DB_PATH) - cursor_sqlite = connection_sqlite.cursor() - - # 创建表(如果不存在) - cursor_sqlite.execute(''' - CREATE TABLE IF NOT EXISTS ob_sql_audit ( - -- 根据gv$ob_sql_audit的实际列定义表结构 - id INTEGER PRIMARY KEY, - svr_ip TEXT, - svr_port INTEGER, - request_id INTEGER, - tenant_id INTEGER, - request_time DATETIME, - ... - ) - ''') - - # 创建用于跟踪最后请求时间的表(如果不存在) - cursor_sqlite.execute(''' - CREATE TABLE IF NOT EXISTS tenant_last_request_time ( - tenant_id INTEGER PRIMARY KEY, - last_request_time DATETIME - ) - ''') - - # 插入数据 - for row in results: - cursor_sqlite.execute(''' - INSERT INTO ob_sql_audit (id, svr_ip, svr_port, request_id, tenant_id, request_time, ...) - VALUES (?, ?, ?, ?, ?, ?, ...); - ''', (row['id'], row['svr_ip'], row['svr_port'], row['request_id'], row['tenant_id'], row['request_time'], ...)) - - # 提交事务并关闭连接 - connection_sqlite.commit() - cursor_sqlite.close() - connection_sqlite.close() - - # 更新最后请求时间 - update_last_request_time(tenant_id, new_last_request_time) - - def scheduled_task(): - while True: - print("Executing scheduled task...") - # 执行你的定时任务 - time.sleep(60) # 每60秒执行一次 - - def job(): - # 假设我们有一个租户ID列表 - tenant_ids = [1001, 1002, 1003] # 示例租户ID - for tenant_id in tenant_ids: - fetch_and_store_data(tenant_id) - - # 每小时运行一次job - schedule.every().hour.do(job) - - # 主循环 - while True: - schedule.run_pending() - time.sleep(1) \ No newline at end of file From 04d0fe11b9168fae8e6eb9f987bd222ad234100b Mon Sep 17 00:00:00 2001 From: Teingi Date: Wed, 13 Nov 2024 20:22:47 +0800 Subject: [PATCH 3/4] fix --- diag_cmd.py | 5 +- .../sql/schedule/schedule_slow_sql.py | 182 ++++++++++++++++++ 2 files changed, 184 insertions(+), 3 deletions(-) create mode 100644 handler/analyzer/sql/schedule/schedule_slow_sql.py diff --git a/diag_cmd.py b/diag_cmd.py index 72751c42..544ae539 100644 --- a/diag_cmd.py +++ b/diag_cmd.py @@ -955,7 +955,6 @@ def __init__(self): self.parser.add_option('--limit', type='string', help="The limit on the number of data rows returned by sql_audit for the tenant.", default=2000) self.parser.add_option('--store_dir', type='string', help='the dir to store result, current dir by default.', default='./obdiag_analyze/') self.parser.add_option('--elapsed_time', type='string', help='The minimum threshold for filtering execution time, measured in microseconds.', default=100000) - self.parser.add_option('--run_mode', type='string', help='run mode, choices=[single, continuous', default='single') self.parser.add_option('-c', type='string', help='obdiag custom config', default=os.path.expanduser('~/.obdiag/config.yml')) self.parser.add_option('--config', action="append", type="string", help='config options Format: --config key=value') @@ -1195,8 +1194,8 @@ def __init__(self): self.register_command(ObdiagAnalyzeVariableCommand()) self.register_command(ObdiagAnalyzeQueueCommand()) self.register_command(ObdiagAnalyzeIndexSpaceCommand()) - self.register_command(ObdiagAnalyzeSQLCommand()) - self.register_command(ObdiagAnalyzeSQLReviewCommand()) + # self.register_command(ObdiagAnalyzeSQLCommand()) + # self.register_command(ObdiagAnalyzeSQLReviewCommand()) class ObdiagRCACommand(MajorCommand): diff --git a/handler/analyzer/sql/schedule/schedule_slow_sql.py b/handler/analyzer/sql/schedule/schedule_slow_sql.py new file mode 100644 index 00000000..cab9c4e4 --- /dev/null +++ b/handler/analyzer/sql/schedule/schedule_slow_sql.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -* +# Copyright (c) 2022 OceanBase +# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +@time: 2024/11/04 +@file: schedule_sql_audit.py +@desc: +""" +import sqlite3 +import time +import threading + +slowSqlSelect = '''select + sql_id, + trace_id, + event, + client_ip, + table_scan as is_table_scan, + wait_time_micro as wait_time, + total_wait_time_micro as total_wait_time, + execute_time - total_wait_time_micro + get_plan_time cpu_time, + tenant_id, + user_id, + db_id, + request_id, + request_time, + plan_id, + request_id, + request_time, + client_port, + affected_rows, + return_rows, + partition_cnt, + ret_code, + total_waits, + rpc_count, + plan_type, + is_inner_sql, + is_executor_rpc, + is_hit_plan, + elapsed_time, + net_time, + net_wait_time, + queue_time, + decode_time, + get_plan_time, + execute_time, + application_wait_time, + concurrency_wait_time, + user_io_wait_time, + schedule_time, + row_cache_hit, + bloom_filter_cache_hit, + block_cache_hit, + block_index_cache_hit, + disk_reads, + retry_cnt, + consistency_level, + memstore_read_row_count, + ssstore_read_row_count + from `v$sql_audit` + where tenant_id = ? + and request_id >= ? + and request_id <= ? + and (elapsed_time > ? or (plan_type != 1 and elapsed_time > ? and length(query_sql) =0))''' + + +class SlowSQLAuditSchedule(object): + def __init__(self, context): + super(SlowSQLAuditSchedule, self).__init__() + self.context = context + self.stdio = context.stdio + + def get_last_request_time(tenant_id): + conn = sqlite3.connect(SQLITE_DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT last_request_time FROM tenant_last_request_time WHERE tenant_id = ?", (tenant_id,)) + result = cursor.fetchone() + conn.close() + return result[0] if result else None + + def update_last_request_time(tenant_id, last_request_time): + conn = sqlite3.connect(SQLITE_DB_PATH) + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO tenant_last_request_time (tenant_id, last_request_time) VALUES (?, ?) + ON CONFLICT(tenant_id) DO UPDATE SET last_request_time=excluded.last_request_time; + """, (tenant_id, last_request_time)) + conn.commit() + conn.close() + + def fetch_and_store_data(tenant_id): + last_request_time = self.get_last_request_time(tenant_id) + + # 连接OceanBase + connection_ob = pymysql.connect(**OCEANBASE_CONFIG) + cursor_ob = connection_ob.cursor() + + # 执行查询 + cursor_ob.execute(QUERY_GV_SQL_AUDIT, (tenant_id, last_request_time or '1970-01-01 00:00:00')) + results = cursor_ob.fetchall() + + # 关闭OceanBase连接 + cursor_ob.close() + connection_ob.close() + + # 如果没有新的数据,则直接返回 + if not results: + return + + # 获取最新的request_time + new_last_request_time = max(row['request_time'] for row in results) + + # 连接SQLite + connection_sqlite = sqlite3.connect(SQLITE_DB_PATH) + cursor_sqlite = connection_sqlite.cursor() + + # 创建表(如果不存在) + cursor_sqlite.execute(''' + CREATE TABLE IF NOT EXISTS ob_sql_audit ( + -- 根据gv$ob_sql_audit的实际列定义表结构 + id INTEGER PRIMARY KEY, + svr_ip TEXT, + svr_port INTEGER, + request_id INTEGER, + tenant_id INTEGER, + request_time DATETIME, + ... + ) + ''') + + # 创建用于跟踪最后请求时间的表(如果不存在) + cursor_sqlite.execute(''' + CREATE TABLE IF NOT EXISTS tenant_last_request_time ( + tenant_id INTEGER PRIMARY KEY, + last_request_time DATETIME + ) + ''') + + # 插入数据 + for row in results: + cursor_sqlite.execute(''' + INSERT INTO ob_sql_audit (id, svr_ip, svr_port, request_id, tenant_id, request_time, ...) + VALUES (?, ?, ?, ?, ?, ?, ...); + ''', (row['id'], row['svr_ip'], row['svr_port'], row['request_id'], row['tenant_id'], row['request_time'], ...)) + + # 提交事务并关闭连接 + connection_sqlite.commit() + cursor_sqlite.close() + connection_sqlite.close() + + # 更新最后请求时间 + update_last_request_time(tenant_id, new_last_request_time) + + def scheduled_task(): + while True: + print("Executing scheduled task...") + # 执行你的定时任务 + time.sleep(60) # 每60秒执行一次 + + def job(): + # 假设我们有一个租户ID列表 + tenant_ids = [1001, 1002, 1003] # 示例租户ID + for tenant_id in tenant_ids: + fetch_and_store_data(tenant_id) + + # 每小时运行一次job + schedule.every().hour.do(job) + + # 主循环 + while True: + schedule.run_pending() + time.sleep(1) \ No newline at end of file From 0e24928f2f59adbf08a9237276ac3c7bea8c67f4 Mon Sep 17 00:00:00 2001 From: Teingi Date: Wed, 13 Nov 2024 20:23:12 +0800 Subject: [PATCH 4/4] fix --- .../sql/schedule/schedule_slow_sql.py | 182 ------------------ 1 file changed, 182 deletions(-) delete mode 100644 handler/analyzer/sql/schedule/schedule_slow_sql.py diff --git a/handler/analyzer/sql/schedule/schedule_slow_sql.py b/handler/analyzer/sql/schedule/schedule_slow_sql.py deleted file mode 100644 index cab9c4e4..00000000 --- a/handler/analyzer/sql/schedule/schedule_slow_sql.py +++ /dev/null @@ -1,182 +0,0 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -* -# Copyright (c) 2022 OceanBase -# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -# See the Mulan PSL v2 for more details. - -""" -@time: 2024/11/04 -@file: schedule_sql_audit.py -@desc: -""" -import sqlite3 -import time -import threading - -slowSqlSelect = '''select - sql_id, - trace_id, - event, - client_ip, - table_scan as is_table_scan, - wait_time_micro as wait_time, - total_wait_time_micro as total_wait_time, - execute_time - total_wait_time_micro + get_plan_time cpu_time, - tenant_id, - user_id, - db_id, - request_id, - request_time, - plan_id, - request_id, - request_time, - client_port, - affected_rows, - return_rows, - partition_cnt, - ret_code, - total_waits, - rpc_count, - plan_type, - is_inner_sql, - is_executor_rpc, - is_hit_plan, - elapsed_time, - net_time, - net_wait_time, - queue_time, - decode_time, - get_plan_time, - execute_time, - application_wait_time, - concurrency_wait_time, - user_io_wait_time, - schedule_time, - row_cache_hit, - bloom_filter_cache_hit, - block_cache_hit, - block_index_cache_hit, - disk_reads, - retry_cnt, - consistency_level, - memstore_read_row_count, - ssstore_read_row_count - from `v$sql_audit` - where tenant_id = ? - and request_id >= ? - and request_id <= ? - and (elapsed_time > ? or (plan_type != 1 and elapsed_time > ? and length(query_sql) =0))''' - - -class SlowSQLAuditSchedule(object): - def __init__(self, context): - super(SlowSQLAuditSchedule, self).__init__() - self.context = context - self.stdio = context.stdio - - def get_last_request_time(tenant_id): - conn = sqlite3.connect(SQLITE_DB_PATH) - cursor = conn.cursor() - cursor.execute("SELECT last_request_time FROM tenant_last_request_time WHERE tenant_id = ?", (tenant_id,)) - result = cursor.fetchone() - conn.close() - return result[0] if result else None - - def update_last_request_time(tenant_id, last_request_time): - conn = sqlite3.connect(SQLITE_DB_PATH) - cursor = conn.cursor() - cursor.execute(""" - INSERT INTO tenant_last_request_time (tenant_id, last_request_time) VALUES (?, ?) - ON CONFLICT(tenant_id) DO UPDATE SET last_request_time=excluded.last_request_time; - """, (tenant_id, last_request_time)) - conn.commit() - conn.close() - - def fetch_and_store_data(tenant_id): - last_request_time = self.get_last_request_time(tenant_id) - - # 连接OceanBase - connection_ob = pymysql.connect(**OCEANBASE_CONFIG) - cursor_ob = connection_ob.cursor() - - # 执行查询 - cursor_ob.execute(QUERY_GV_SQL_AUDIT, (tenant_id, last_request_time or '1970-01-01 00:00:00')) - results = cursor_ob.fetchall() - - # 关闭OceanBase连接 - cursor_ob.close() - connection_ob.close() - - # 如果没有新的数据,则直接返回 - if not results: - return - - # 获取最新的request_time - new_last_request_time = max(row['request_time'] for row in results) - - # 连接SQLite - connection_sqlite = sqlite3.connect(SQLITE_DB_PATH) - cursor_sqlite = connection_sqlite.cursor() - - # 创建表(如果不存在) - cursor_sqlite.execute(''' - CREATE TABLE IF NOT EXISTS ob_sql_audit ( - -- 根据gv$ob_sql_audit的实际列定义表结构 - id INTEGER PRIMARY KEY, - svr_ip TEXT, - svr_port INTEGER, - request_id INTEGER, - tenant_id INTEGER, - request_time DATETIME, - ... - ) - ''') - - # 创建用于跟踪最后请求时间的表(如果不存在) - cursor_sqlite.execute(''' - CREATE TABLE IF NOT EXISTS tenant_last_request_time ( - tenant_id INTEGER PRIMARY KEY, - last_request_time DATETIME - ) - ''') - - # 插入数据 - for row in results: - cursor_sqlite.execute(''' - INSERT INTO ob_sql_audit (id, svr_ip, svr_port, request_id, tenant_id, request_time, ...) - VALUES (?, ?, ?, ?, ?, ?, ...); - ''', (row['id'], row['svr_ip'], row['svr_port'], row['request_id'], row['tenant_id'], row['request_time'], ...)) - - # 提交事务并关闭连接 - connection_sqlite.commit() - cursor_sqlite.close() - connection_sqlite.close() - - # 更新最后请求时间 - update_last_request_time(tenant_id, new_last_request_time) - - def scheduled_task(): - while True: - print("Executing scheduled task...") - # 执行你的定时任务 - time.sleep(60) # 每60秒执行一次 - - def job(): - # 假设我们有一个租户ID列表 - tenant_ids = [1001, 1002, 1003] # 示例租户ID - for tenant_id in tenant_ids: - fetch_and_store_data(tenant_id) - - # 每小时运行一次job - schedule.every().hour.do(job) - - # 主循环 - while True: - schedule.run_pending() - time.sleep(1) \ No newline at end of file