From 0ac46d39549249cec0cb697b62788579e8c91c03 Mon Sep 17 00:00:00 2001 From: SaintShit Date: Sat, 17 Feb 2024 22:39:58 +0330 Subject: [PATCH] fix: generate xray config in thread on core_health_check --- app/jobs/0_xray_core.py | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/app/jobs/0_xray_core.py b/app/jobs/0_xray_core.py index 0a5414819..25138b4eb 100644 --- a/app/jobs/0_xray_core.py +++ b/app/jobs/0_xray_core.py @@ -7,15 +7,37 @@ from app.utils.concurrency import threaded_function from xray_api import exc as xray_exc +global _node_op_in_progress +_node_op_in_progress = {} + + +# ops = {node_id: (func, kwargs)} +@threaded_function +def _op(ops: dict): + for node_id in list(ops.keys()): + if _node_op_in_progress.get(node_id): + del ops[node_id] + else: + _node_op_in_progress[node_id] = True + + if not ops: + return + + config = xray.config.include_db_users() + for node_id, (func, kwargs) in ops.items(): + func(config=config, **kwargs) + try: + del _node_op_in_progress[node_id] + except KeyError: + pass + def core_health_check(): - config = None + ops = {} # main core if not xray.core.started: - if not config: - config = xray.config.include_db_users() - xray.core.restart(config) + ops[0] = (xray.core.restart, {}) # nodes' core for node_id, node in list(xray.nodes.items()): @@ -23,14 +45,13 @@ def core_health_check(): try: node.api.get_sys_stats() except (ConnectionError, xray_exc.ConnectionError, xray_exc.UnknownError): - if not config: - config = xray.config.include_db_users() - xray.operations.restart_node(node_id, config) + ops[node_id] = (xray.operations.restart_node, {"node_id": node_id}) if not node.connected: - if not config: - config = xray.config.include_db_users() - xray.operations.connect_node(node_id, config) + ops[node_id] = (xray.operations.connect_node, {"node_id": node_id}) + + if ops: + _op(ops) @app.on_event("startup")