Skip to content

Commit

Permalink
new: controller to list/stop/restart scripts indiviually
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Aug 7, 2024
1 parent 2fe4ca6 commit 92b16cd
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 192 deletions.
13 changes: 7 additions & 6 deletions bin/capture_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,29 @@ async def clear_dead_captures(self) -> None:
self.logger.warning(f'{expected_uuid} is canceled now.')

async def _to_run_forever_async(self) -> None:

def clear_list_callback(task: Task) -> None: # type: ignore[type-arg]
self.captures.discard(task)
self.unset_running()

await self.clear_dead_captures()
if self.force_stop:
return
max_new_captures = get_config('generic', 'concurrent_captures') - len(self.captures)
self.logger.debug(f'{len(self.captures)} ongoing captures.')
if max_new_captures <= 0:
if len(self.lacus.monitoring.get_enqueued_captures()) > 0:
self.logger.debug(f'Max amount of captures in parallel reached ({len(self.captures)})')
return
for capture_task in self.lacus.core.consume_queue(max_new_captures):
self.captures.add(capture_task)
capture_task.add_done_callback(self.captures.discard)
# NOTE: +1 because running this method also counts for one and will
# be decremented when it finishes
self.set_running(len(self.captures) + 1)
self.set_running()
capture_task.add_done_callback(clear_list_callback)

async def _wait_to_finish_async(self) -> None:
while self.captures:
self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...')
self.logger.info(f'Ongoing captures: {", ".join(capture.get_name() for capture in self.captures)}')
await asyncio.sleep(5)
self.set_running(len(self.captures))
self.logger.info('No more captures')


Expand Down
63 changes: 63 additions & 0 deletions bin/scripts_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python3

from __future__ import annotations

import argparse

import time

from subprocess import Popen

from psutil import Process
from redis import Redis

from lacus.default import get_homedir, get_socket_path, AbstractManager


def _get_cmdline(pid: str) -> list[str]:
process = Process(int(pid))
return process.cmdline()


def main() -> None:
parser = argparse.ArgumentParser(description='Manage the scripts.')
parser.add_argument('action', choices=['list', 'stop', 'restart'], help='The action to perform.', default='list')
parser.add_argument('script', help='The script to manage.', nargs='?')
args = parser.parse_args()
# Just fail if the env isn't set.
get_homedir()
if args.action == 'list':
try:
print(AbstractManager.is_running())
except FileNotFoundError:
print('Redis is down.')
else:
if args.action == 'restart':
# we need to keep the cmdline for the restart
try:
running_services = AbstractManager.is_running()
except KeyError:
print(f'{args.script} is not running.')
return
for name, numbers, pids in running_services:
if name == args.script:
to_restart = _get_cmdline(pids.pop())
break

print(f'Request {args.script} to {args.action}...')
r = Redis(unix_socket_path=get_socket_path('cache'), db=1)
r.sadd('shutdown_manual', args.script)
while r.zscore('running', args.script) is not None:
print(f'Wait for {args.script} to stop...')
time.sleep(1)
print('done.')
r.srem('shutdown_manual', args.script)

if args.action == 'restart':
print(f'Start {args.script}...')
Popen(to_restart)
print('done.')


if __name__ == '__main__':
main()
25 changes: 9 additions & 16 deletions lacus/default/abstractmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

import asyncio
import logging
import logging.config
import os
import signal
import time
from abc import ABC
from datetime import datetime, timedelta
from subprocess import Popen
from typing import List, Optional, Tuple

from redis import Redis
from redis.exceptions import ConnectionError as RedisConnectionError
Expand All @@ -33,9 +33,10 @@ def __init__(self, loglevel: int | None=None):
self.force_stop = False

@staticmethod
def is_running() -> list[tuple[str, float]]:
def is_running() -> list[tuple[str, float, set[str]]]:
try:
r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
running_scripts: dict[str, set[str]] = {}
for script_name, score in r.zrangebyscore('running', '-inf', '+inf', withscores=True):
for pid in r.smembers(f'service|{script_name}'):
try:
Expand All @@ -48,7 +49,8 @@ def is_running() -> list[tuple[str, float]]:
r.zadd('running', {script_name: other_same_services})
else:
r.zrem('running', script_name)
return r.zrangebyscore('running', '-inf', '+inf', withscores=True)
running_scripts[script_name] = r.smembers(f'service|{script_name}')
return [(name, rank, running_scripts[name] if name in running_scripts else set()) for name, rank in r.zrangebyscore('running', '-inf', '+inf', withscores=True)]
except RedisConnectionError:
print('Unable to connect to redis, the system is down.')
return []
Expand Down Expand Up @@ -104,7 +106,8 @@ async def long_sleep_async(self, sleep_in_sec: int, shutdown_check: int=10) -> b

def shutdown_requested(self) -> bool:
try:
return bool(self.__redis.exists('shutdown'))
return (bool(self.__redis.exists('shutdown'))
or bool(self.__redis.sismember('shutdown_manual', self.script_name)))
except ConnectionRefusedError:
return True
except RedisConnectionError:
Expand Down Expand Up @@ -133,6 +136,7 @@ def _kill_process(self) -> None:
def run(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}')
try:
self.set_running()
while not self.force_stop:
if self.shutdown_requested():
break
Expand All @@ -142,15 +146,9 @@ def run(self, sleep_in_sec: int) -> None:
self.logger.critical(f'Unable to start {self.script_name}.')
break
else:
self.set_running()
self._to_run_forever()
except Exception: # nosec B110
self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.')
finally:
if not self.process:
# self.process means we run an external script, all the time,
# do not unset between sleep.
self.unset_running()
if not self.long_sleep(sleep_in_sec):
break
except KeyboardInterrupt:
Expand Down Expand Up @@ -187,6 +185,7 @@ async def stop_async(self) -> None:
async def run_async(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}')
try:
self.set_running()
while not self.force_stop:
if self.shutdown_requested():
break
Expand All @@ -196,15 +195,9 @@ async def run_async(self, sleep_in_sec: int) -> None:
self.logger.critical(f'Unable to start {self.script_name}.')
break
else:
self.set_running()
await self._to_run_forever_async()
except Exception: # nosec B110
self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.')
finally:
if not self.process:
# self.process means we run an external script, all the time,
# do not unset between sleep.
self.unset_running()
if not await self.long_sleep_async(sleep_in_sec):
break
except KeyboardInterrupt:
Expand Down
Loading

0 comments on commit 92b16cd

Please sign in to comment.