From 9aa2181b1b7c5181fa732d96dc0d4feaaddd08d2 Mon Sep 17 00:00:00 2001 From: "jingshun.tq" <35712518+Teingi@users.noreply.github.com> Date: Thu, 21 Nov 2024 11:34:37 +0800 Subject: [PATCH] Fix some bugs for obdiag 2.6.0 dev (#562) * Fix some bugs in the analyze memory function. * fix * fix * fix --- handler/analyzer/analyze_log.py | 13 +- handler/analyzer/analyze_memory.py | 281 ++++++++++-------- .../tasks/observer/database_datasize.yaml | 43 +-- .../tasks/observer/table_datasize.yaml | 61 ++-- 4 files changed, 216 insertions(+), 182 deletions(-) diff --git a/handler/analyzer/analyze_log.py b/handler/analyzer/analyze_log.py index b15f7f6b..e88dc2a9 100644 --- a/handler/analyzer/analyze_log.py +++ b/handler/analyzer/analyze_log.py @@ -151,10 +151,15 @@ def handle_from_node(node): self.stdio.print("analyze nodes's log start. Please wait a moment...") self.stdio.start_loading('analyze start') for node in self.nodes: - if not self.is_ssh: - local_ip = NetUtils.get_inner_ip() - node = self.nodes[0] - node["ip"] = local_ip + if self.directly_analyze_files: + if nodes_threads: + break + node["ip"] = '127.0.0.1' + else: + if not self.is_ssh: + local_ip = NetUtils.get_inner_ip() + node = self.nodes[0] + node["ip"] = local_ip node_threads = threading.Thread(target=handle_from_node, args=(node,)) node_threads.start() nodes_threads.append(node_threads) diff --git a/handler/analyzer/analyze_memory.py b/handler/analyzer/analyze_memory.py index 336a03cf..8f091687 100644 --- a/handler/analyzer/analyze_memory.py +++ b/handler/analyzer/analyze_memory.py @@ -64,7 +64,8 @@ def init_config(self): self.file_number_limit = int(basic_config["file_number_limit"]) self.file_size_limit = int(FileUtil.size(basic_config["file_size_limit"])) self.config_path = basic_config['config_path'] - self.version = self.get_version() + if self.version is None: + self.version = self.get_version() return True def init_option(self): @@ -81,7 +82,11 @@ def init_option(self): self.is_ssh = False self.directly_analyze_files = True self.analyze_files_list = files_option - self.version = version + if version: + self.version = version + else: + self.stdio.error('the option --files requires the --version option to be specified') + return False if from_option is not None and to_option is not None: try: from_timestamp = TimeUtils.parse_time_str(from_option) @@ -149,16 +154,21 @@ def handle_from_node(node): with pool_sema: st = time.time() resp = self.__handle_from_node(node, local_store_parent_dir) - analyze_tuples.append((node.get("ip"), False, resp["error"], int(time.time() - st), resp["result_pack_path"])) + analyze_tuples.append((node.get("ip"), resp["skip"], resp["error"], int(time.time() - st), resp["result_pack_path"])) nodes_threads = [] self.stdio.print("analyze nodes's log start. Please wait a moment...") self.stdio.start_loading('analyze memory start') for node in self.nodes: - if not self.is_ssh: - local_ip = NetUtils.get_inner_ip() - node = self.nodes[0] - node["ip"] = local_ip + if self.directly_analyze_files: + if nodes_threads: + break + node["ip"] = '127.0.0.1' + else: + if not self.is_ssh: + local_ip = NetUtils.get_inner_ip() + node = self.nodes[0] + node["ip"] = local_ip node_threads = threading.Thread(target=handle_from_node, args=(node,)) node_threads.start() nodes_threads.append(node_threads) @@ -194,7 +204,8 @@ def __handle_from_node(self, node, local_store_parent_dir): gather_dir_full_path = "{0}/{1}".format(self.gather_ob_log_temporary_dir, gather_dir_name) mkdir(ssh_client, gather_dir_full_path) log_list, resp = self.__handle_log_list(ssh_client, node, resp) - resp["result_pack_path"] = local_store_dir + result_pack_path = "./{0}".format(os.path.relpath(local_store_dir, self.gather_pack_dir)) + resp["result_pack_path"] = result_pack_path if resp["skip"]: return resp self.stdio.print(FileUtil.show_file_list_tabulate(remote_ip, log_list, self.stdio)) @@ -229,7 +240,10 @@ def __handle_from_node(self, node, local_store_parent_dir): try: fig = go.Figure() colors = ['blue', 'orange', 'green', 'red', 'purple', 'cyan', 'magenta', 'yellow', 'black', 'brown', 'pink', 'gray', 'lime', 'teal', 'navy'] - if len(tenant_memory_info_dict) < 20: + if len(tenant_memory_info_dict) == 0: + resp["skip"] = True + resp["error"] = "failed to analyze memory data from the log" + elif len(tenant_memory_info_dict) < 20 and len(tenant_memory_info_dict) > 0: i = 0 x_lines = [] x_vals = [] @@ -312,61 +326,61 @@ def __handle_from_node(self, node, local_store_parent_dir): html_fig_ctx = pio.to_html(fig_ctx, full_html=False) html_fig_mod = pio.to_html(fig_mod, full_html=False) html_combined = ''' - - - tenant-{0}_hold_memory - - - -
- - - - - - - - - - - - - - - - -
-
- {1} -
- {2} -
- {3} - - -'''.format( + header>svg {{ + margin-left: -2em; + }} + + + +
+ + + + + + + + + + + + + + + + +
+
+ {1} +
+ {2} +
+ {3} + + + '''.format( tenant_id, html_fig_tenant, html_fig_ctx, html_fig_mod ) with open('{0}/tenant-{1}_hold_memory.html'.format(local_store_dir, tenant_id), 'w') as f: @@ -377,62 +391,61 @@ def __handle_from_node(self, node, local_store_parent_dir): fig.update_yaxes(tickformat='.0f') html_fig = pio.to_html(fig, full_html=False) html_top15_combined = ''' - - - TOP 15租户hold内存曲线图 - - - -
- - - - - - - - - - - - - - - - -
-
- {0} - - -'''.format( + header>svg {{ + margin-left: -2em; + }} + + + +
+ + + + + + + + + + + + + + + + +
+
+ {0} + + + '''.format( html_fig ) with open('{0}/TOP15_tenant_hold_memory.html'.format(local_store_dir), 'w') as f: f.write(html_top15_combined) - # plot(fig, filename='{0}/TOP15_tenant_hold_memory.html'.format(local_store_dir)) except Exception as e: self.stdio.exception('write html result failed, error: {0}'.format(e)) delete_file(ssh_client, gather_dir_full_path, self.stdio) @@ -452,9 +465,9 @@ def __handle_log_list(self, ssh_client, node, resp): resp["error"] = "Too many files {0} > {1}, " "Please adjust the number of incoming files".format(len(log_list), self.file_number_limit) return log_list, resp elif len(log_list) == 0: - self.stdio.warn("{0} The number of log files is {1}, No files found, " "Please adjust the query limit".format(node.get("ip"), len(log_list))) + self.stdio.warn("{0} The number of observer.log* files is {1}, No files found".format(node.get("ip"), len(log_list))) resp["skip"] = (True,) - resp["error"] = "No files found" + resp["error"] = "No observer.log* found" return log_list, resp return log_list, resp @@ -484,11 +497,13 @@ def __get_log_name_list_offline(self): for path in self.analyze_files_list: if os.path.exists(path): if os.path.isfile(path): - log_name_list.append(path) + if os.path.basename(path).startswith('observer.log'): + log_name_list.append(path) else: log_names = FileUtil.find_all_file(path) - if len(log_names) > 0: - log_name_list.extend(log_names) + if log_names: + filtered_logs = [name for name in log_names if os.path.basename(name).startswith('observer.log')] + log_name_list.extend(filtered_logs) self.stdio.verbose("get log list {}".format(log_name_list)) return log_name_list @@ -531,7 +546,9 @@ def __pharse_offline_log_file(self, ssh_client, log_name, local_store_dir): def __parse_memory_label(self, file_full_path): ssh_client = ssh_client_local_client.LocalClient(context=self.context, node={"ssh_type": "local"}) - if self.version > '4.0': + if self.version >= '4.3': + grep_cmd = 'grep -n "memory_dump.*statistics" ' + file_full_path + elif self.version >= '4.0' and self.version < '4.3': grep_cmd = 'grep -n "runTimerTask.*MemDumpTimer" ' + file_full_path else: grep_cmd = 'grep -n "Run print tenant memstore usage task" ' + file_full_path @@ -562,6 +579,7 @@ def __parse_log_lines(self, file_full_path, memory_dict): """ self.stdio.verbose("start parse log {0}".format(file_full_path)) memory_print_line_list = self.__parse_memory_label(file_full_path) + tenant_dict = dict() if memory_print_line_list: with open(file_full_path, 'r', encoding='utf8', errors='replace') as file: line_num = 0 @@ -578,7 +596,12 @@ def __parse_log_lines(self, file_full_path, memory_dict): if line_num < memory_print_begin_line: continue else: - if self.version > '4.0': + if self.version >= '4.3': + if 'MemoryDump' in line and 'statistics' in line: + time_str = self.__get_time_from_ob_log_line(line) + memory_print_time = time_str.split('.')[0] + memory_dict[memory_print_time] = dict() + elif self.version > '4.0' and self.version < '4.3': if 'runTimerTask' in line and 'MemDumpTimer' in line: time_str = self.__get_time_from_ob_log_line(line) memory_print_time = time_str.split('.')[0] @@ -588,7 +611,14 @@ def __parse_log_lines(self, file_full_path, memory_dict): time_str = self.__get_time_from_ob_log_line(line) memory_print_time = time_str.split('.')[0] memory_dict[memory_print_time] = dict() - if self.version > '4.0': + if self.version >= '4.3': + if 'print_tenant_usage' in line and 'ServerGTimer' in line and 'CHUNK_MGR' in line: + if memory_print_line_list: + memory_print_begin_line = memory_print_line_list[0] + memory_print_line_list.remove(memory_print_begin_line) + else: + break + elif self.version >= '4.0' and self.version < '4.3': if 'print_tenant_usage' in line and 'MemDumpTimer' in line and 'CHUNK_MGR' in line: if memory_print_line_list: memory_print_begin_line = memory_print_line_list[0] @@ -602,7 +632,7 @@ def __parse_log_lines(self, file_full_path, memory_dict): memory_print_line_list.remove(memory_print_begin_line) else: break - if '[MEMORY]' in line or 'MemDumpTimer' in line or 'ob_tenant_ctx_allocator' in line: + if '[MEMORY]' in line or 'MemDump' in line or 'ob_tenant_ctx_allocator' in line: if '[MEMORY] tenant:' in line: tenant_id = line.split('tenant:')[1].split(',')[0].strip() hold_bytes = line.split('hold:')[1].split('rpc_')[0].strip() @@ -610,7 +640,6 @@ def __parse_log_lines(self, file_full_path, memory_dict): cache_hold_bytes = line.split('cache_hold:')[1].split('cache_used')[0].strip() cache_used_bytes = line.split('cache_used:')[1].split('cache_item_count')[0].strip() cache_item_count = line.split('cache_item_count:')[1].strip() - tenant_dict = dict() tenant_dict['hold'] = self.__convert_string_bytes_2_int_bytes(hold_bytes) tenant_dict['rpc_hold'] = self.__convert_string_bytes_2_int_bytes(rpc_hold_bytes) tenant_dict['cache_hold'] = self.__convert_string_bytes_2_int_bytes(cache_hold_bytes) @@ -729,8 +758,8 @@ def __get_overall_summary(node_summary_tuple): field_names.append("ResultPath") for tup in node_summary_tuple: node = tup[0] - is_err = tup[1] + is_err = tup[2] consume_time = tup[3] - pack_path = tup[4] + pack_path = tup[4] if not is_err else None summary_tab.append((node, "Error:" + tup[2] if is_err else "Completed", "{0} s".format(consume_time), pack_path)) return "\nAnalyze Ob Log Summary:\n" + tabulate.tabulate(summary_tab, headers=field_names, tablefmt="grid", showindex=False) diff --git a/handler/display/tasks/observer/database_datasize.yaml b/handler/display/tasks/observer/database_datasize.yaml index 64121ec4..8dd8d221 100644 --- a/handler/display/tasks/observer/database_datasize.yaml +++ b/handler/display/tasks/observer/database_datasize.yaml @@ -5,29 +5,30 @@ task: - version: "[4.0.0.0, *]" steps: - type: sql - sql: "select tenant_id,tenant_name,DATABASE_NAME,sum(data_size_in_GB) as data_size_in_GB from - (SELECT /*+ query_timeout(30000000) */ a.TENANT_ID, c.tenant_name, a.DATABASE_NAME, a.TABLE_NAME, a.TABLE_ID, - SUM(CASE WHEN b.nested_offset = 0 THEN IFNULL(b.data_block_count + b.index_block_count + b.linked_block_count, 0) * 2 * 1024 * 1024 ELSE IFNULL(b.size, 0) END ) /1024.0 /1024/1024 AS data_size_in_GB - FROM oceanbase.CDB_OB_TABLE_LOCATIONS a INNER JOIN oceanbase.__all_virtual_table_mgr b ON a.svr_ip = b.svr_ip - AND a.svr_port = b.svr_port AND a.tenant_id = b.tenant_id AND a.LS_ID = b.LS_ID AND a.TABLET_ID = b.TABLET_ID - inner join oceanbase.__all_tenant c on a.tenant_id=c.tenant_id WHERE a.role = 'LEADER' - AND c.tenant_id = {tenant_id} AND b.table_type >= 10 AND b.size > 0 AND a.TABLE_NAME NOT REGEXP '^__' - and a.database_name='{database_name}' GROUP BY a.TABLE_ID) group by database_name;" + sql: + "select tenant_id,tenant_name,DATABASE_NAME,sum(data_size_in_B) as data_size_in_B from + (SELECT /*+ query_timeout(30000000) */ a.TENANT_ID, c.tenant_name, a.DATABASE_NAME, a.TABLE_NAME, a.TABLE_ID, + SUM(CASE WHEN b.nested_offset = 0 THEN IFNULL(b.data_block_count + b.index_block_count + b.linked_block_count, 0) * 2 * 1024 * 1024 ELSE IFNULL(b.size, 0) END ) AS data_size_in_B + FROM oceanbase.CDB_OB_TABLE_LOCATIONS a INNER JOIN oceanbase.__all_virtual_table_mgr b ON a.svr_ip = b.svr_ip + AND a.svr_port = b.svr_port AND a.tenant_id = b.tenant_id AND a.LS_ID = b.LS_ID AND a.TABLET_ID = b.TABLET_ID + inner join oceanbase.__all_tenant c on a.tenant_id=c.tenant_id WHERE a.role = 'LEADER' + AND c.tenant_id = {tenant_id} AND b.table_type >= 10 AND b.size > 0 AND a.TABLE_NAME NOT REGEXP '^__' + and a.database_name='{database_name}' GROUP BY a.TABLE_ID) group by database_name;" global: true - version: "[3.0.0.0, 4.0.0.0]" steps: - type: sql - sql: "select a.tenant_id, b.database_name,SUM(a.data_size) / 1024 / 1024 / 1024 AS data_size_G, - SUM(a.required_size) / 1024 / 1024 / 1024 AS required_size_G from - oceanbase.__all_virtual_meta_table a inner join - (select a.* from - oceanbase.gv$table a - inner join - oceanbase.gv$database b - on a.database_id=b.database_id - where b.tenant_id={tenant_id} and b.database_name='{database_name}')b - on a.table_id=b.table_id - where a.role = 1 - group by a.tenant_id, b.database_name;" + sql: + "select a.tenant_id, b.database_name,SUM(a.data_size) AS data_size_B, + SUM(a.required_size) AS required_size_B from + oceanbase.__all_virtual_meta_table a inner join + (select a.* from + oceanbase.gv$table a + inner join + oceanbase.gv$database b + on a.database_id=b.database_id + where b.tenant_id={tenant_id} and b.database_name='{database_name}')b + on a.table_id=b.table_id + where a.role = 1 + group by a.tenant_id, b.database_name;" global: true - diff --git a/handler/display/tasks/observer/table_datasize.yaml b/handler/display/tasks/observer/table_datasize.yaml index baaa5e66..c746c99a 100644 --- a/handler/display/tasks/observer/table_datasize.yaml +++ b/handler/display/tasks/observer/table_datasize.yaml @@ -5,39 +5,38 @@ task: - version: "[4.0.0.0, *]" steps: - type: sql - sql: "SELECT /*+ query_timeout(30000000) */ a.TENANT_ID, c.tenant_name, a.DATABASE_NAME, a.TABLE_NAME, a.TABLE_ID, - SUM(CASE WHEN b.nested_offset = 0 THEN IFNULL(b.data_block_count + b.index_block_count + b.linked_block_count, 0) * 2 * 1024 * 1024 ELSE IFNULL(b.size, 0) END ) / 1024.0 / 1024 / 1024 AS data_size_in_GB - FROM oceanbase.CDB_OB_TABLE_LOCATIONS a INNER JOIN oceanbase.__all_virtual_table_mgr b ON a.svr_ip = b.svr_ip AND a.svr_port = b.svr_port AND a.tenant_id = b.tenant_id AND a.LS_ID = b.LS_ID AND a.TABLET_ID = b.TABLET_ID - inner join oceanbase.__all_tenant c on a.tenant_id=c.tenant_id - WHERE a.role = 'LEADER' AND c.tenant_id = {tenant_id} - AND b.table_type >= 10 AND b.size > 0 - AND a.TABLE_NAME NOT REGEXP '^__' - and a.database_name='{database_name}' - and a.TABLE_NAME='{table_name}' GROUP BY a.TABLE_ID;" + sql: + "SELECT /*+ query_timeout(30000000) */ a.TENANT_ID, c.tenant_name, a.DATABASE_NAME, a.TABLE_NAME, a.TABLE_ID, + SUM(CASE WHEN b.nested_offset = 0 THEN IFNULL(b.data_block_count + b.index_block_count + b.linked_block_count, 0) * 2 * 1024 * 1024 ELSE IFNULL(b.size, 0) END ) AS data_size_in_B + FROM oceanbase.CDB_OB_TABLE_LOCATIONS a INNER JOIN oceanbase.__all_virtual_table_mgr b ON a.svr_ip = b.svr_ip AND a.svr_port = b.svr_port AND a.tenant_id = b.tenant_id AND a.LS_ID = b.LS_ID AND a.TABLET_ID = b.TABLET_ID + inner join oceanbase.__all_tenant c on a.tenant_id=c.tenant_id + WHERE a.role = 'LEADER' AND c.tenant_id = {tenant_id} + AND b.table_type >= 10 AND b.size > 0 + AND a.TABLE_NAME NOT REGEXP '^__' + and a.database_name='{database_name}' + and a.TABLE_NAME='{table_name}' GROUP BY a.TABLE_ID;" global: true - + - version: "[3.0.0.0, 4.0.0.0]" steps: - type: sql - sql: "SELECT - a.tenant_id, - b.table_name, - SUM(a.data_size) / 1024 / 1024 / 1024 AS data_size_G, - SUM(a.required_size) / 1024 / 1024 / 1024 AS required_size_G - FROM - oceanbase.__all_virtual_meta_table a - JOIN - (select a.* from - oceanbase.gv$table a - inner join - oceanbase.gv$database b - on a.database_id=b.database_id - where b.tenant_id={tenant_id} and b.database_name='{database_name}' and a.table_name='{table_name}') b - ON - a.table_id = b.table_id - WHERE - a.role = 1 - group by a.tenant_id, b.table_name;" + sql: "SELECT + a.tenant_id, + b.table_name, + SUM(a.data_size) AS data_size_B, + SUM(a.required_size) AS required_size_B + FROM + oceanbase.__all_virtual_meta_table a + JOIN + (select a.* from + oceanbase.gv$table a + inner join + oceanbase.gv$database b + on a.database_id=b.database_id + where b.tenant_id={tenant_id} and b.database_name='{database_name}' and a.table_name='{table_name}') b + ON + a.table_id = b.table_id + WHERE + a.role = 1 + group by a.tenant_id, b.table_name;" global: true - -