From acda507f99b0736464b2d22f97ac8450b66f4b48 Mon Sep 17 00:00:00 2001 From: Paige Rubendall Date: Thu, 22 Sep 2022 11:54:15 -0400 Subject: [PATCH] adding max task --- cerberus/kubernetes/client.py | 12 ++++++++++-- start_cerberus.py | 6 ++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/cerberus/kubernetes/client.py b/cerberus/kubernetes/client.py index 4c2de80..e4e7609 100644 --- a/cerberus/kubernetes/client.py +++ b/cerberus/kubernetes/client.py @@ -54,7 +54,7 @@ def initialize_clients(kubeconfig_path, chunk_size, timeout): # List pods in a namespace in the cluster -def list_pods(namespace): +def list_pods(namespace, ignore_pattern): pods = [] try: ret = list_continue_helper(cli.list_namespaced_pod, namespace, pretty=True, limit=request_chunk_size) @@ -63,6 +63,14 @@ def list_pods(namespace): for ret_items in ret: for node in ret_items.items: + match = False + if ignore_pattern: + for pattern in ignore_pattern: + if re.match(pattern, node.metadata.name): + match = True + break + if match: + continue pods.append(node.metadata.name) return pods @@ -79,7 +87,7 @@ def list_continue_helper(func, *args, **keyword_args): ret = func(*args, **keyword_args, _continue=continue_string, timeout_seconds=cmd_timeout) ret_overall.append(ret) continue_string = ret.metadata._continue - + logging.info("continue") except ApiException as e: logging.error("Exception when calling CoreV1Api->%s: %s\n" % (str(func), e)) diff --git a/start_cerberus.py b/start_cerberus.py index c56b2fb..f0e4438 100644 --- a/start_cerberus.py +++ b/start_cerberus.py @@ -105,7 +105,7 @@ def main(cfg): cmd_timeout = config["tunings"].get("timeout", 60) request_chunk_size = config["tunings"].get("kube_api_request_chunk_size", 250) daemon_mode = config["tunings"].get("daemon_mode", False) - # cores_usage_percentage = config["tunings"].get("cores_usage_percentage", 0.5) + cores_usage_percentage = config["tunings"].get("cores_usage_percentage", 0.5) if "database" in config.keys(): database_path = config["database"].get("database_path", "/tmp/cerberus.db") reuse_database = config["database"].get("reuse_database", False) @@ -190,7 +190,9 @@ def main(cfg): # Variables used for multiprocessing multiprocessing.set_start_method("fork") - pool = multiprocessing.Pool(int(multiprocessing.cpu_count()), init_worker) + pool = multiprocessing.Pool( + int(cores_usage_percentage * multiprocessing.cpu_count()), init_worker, maxtasksperchild=1 + ) manager = multiprocessing.Manager() pods_tracker = manager.dict()