Skip to content

Commit

Permalink
gather_component_log use threading (#657)
Browse files Browse the repository at this point in the history
* gather_component_log delete semaphore

* gather_component_log use threading
  • Loading branch information
wayyoungboy authored Dec 31, 2024
1 parent 277b5f9 commit 09e88da
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions src/handler/gather/gather_component_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
@file: gather_component_log.py
@desc:
"""
import copy
import datetime
import os
import tarfile
import threading
import traceback
import uuid
import multiprocessing as mp
import shutil

from prettytable import PrettyTable
Expand Down Expand Up @@ -246,17 +245,25 @@ def handle(self):
else:
tasks.append(GatherLogOnNode(new_context, node, self.gather_log_conf_dict))
file_queue = []
result_list = mp.Queue()
pool_sema = threading.BoundedSemaphore(value=self.thread_nums)

def handle_from_node(task):
with pool_sema:
task.handle()

for task in tasks:
file_thread = mp.Process(target=task.handle, args=(result_list,))
file_thread = threading.Thread(
target=handle_from_node,
args=(task,),
)
file_thread.start()
file_queue.append(file_thread)
self.stdio.verbose("file_queue len: {0}".format(len(file_queue)))
for task_thread in file_queue:
task_thread.join()
self.stdio.verbose("start get gather_tuples")
for _ in range(result_list.qsize()):
self.gather_tuples.append(result_list.get())
for task in tasks:
self.gather_tuples.append(task.get_result())
self.stdio.verbose("gather_tuples: {0}".format(self.gather_tuples))
summary_tuples = self.__get_overall_summary(self.gather_tuples)
self.stdio.print(summary_tuples)
Expand Down Expand Up @@ -385,13 +392,11 @@ def __init__(self, context, node, config):
self.file_number_limit = self.config.get("file_number_limit")
self.file_size_limit = self.config.get("file_size_limit")
self.gather_tuple = {"node": "", "success": "Fail", "info": "", "file_size": 0, "file_path": ""}
self.result_list = None

def get_result(self):
return self.gather_tuple

def handle(self, result_list=None):
self.result_list = result_list
def handle(self):
self.ssh_client = SshClient(self.context, self.node)
self.gather_tuple["node"] = self.ssh_client.get_name()
self.tmp_dir = os.path.join(self.tmp_dir, "obdiag_gather_{0}".format(str(uuid.uuid4())[:6]))
Expand Down Expand Up @@ -467,8 +472,6 @@ def handle(self, result_list=None):
self.ssh_client.exec_cmd("rm -rf {0}".format(tmp_log_dir))
self.stdio.verbose("gather_log_on_node {0} finished".format(self.ssh_client.get_ip()))
self.stdio.verbose("gather_log_on_node {0} gather_tuple: {1}".format(self.ssh_client.get_ip(), self.gather_tuple))
if self.result_list:
self.result_list.put(self.gather_tuple)
self.stdio.verbose("gather_log_on_node {0} done".format(self.ssh_client.get_ip()))

def __grep_log_to_tmp(self, logs_name, tmp_log_dir):
Expand Down

0 comments on commit 09e88da

Please sign in to comment.