Skip to content

Commit

Permalink
write logs with rotating log manager to avoid filling up the entire d…
Browse files Browse the repository at this point in the history
…isk with logs
  • Loading branch information
nielsrolf committed Feb 14, 2025
1 parent 9f5576d commit ecdefb5
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions openweights/cluster/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@
import subprocess
import sys
import time
import threading
from pathlib import Path
from typing import Dict

from dotenv import load_dotenv
from supabase import create_client, Client
from supabase.lib.client_options import ClientOptions
from logging.handlers import RotatingFileHandler

# Load environment variables
load_dotenv()

# Configure logging
# Configure logging for the supervisor itself
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
Expand Down Expand Up @@ -67,6 +69,36 @@ def validate_org_secrets(self, secrets: Dict[str, str]) -> bool:
]
return all(secret in secrets for secret in required_secrets)

def _get_rotating_logger(self, org_id: str, stream_name: str) -> logging.Logger:
"""
Set up a rotating logger for the organization's output.
Adjust maxBytes as needed. For example, assuming an average row is ~200 bytes,
maxBytes=2_000_000 approximates 10k rows.
"""
logger_name = f"org_manager_{org_id}_{stream_name}"
output_logger = logging.getLogger(logger_name)
if not output_logger.handlers:
log_dir = Path('logs')
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f'org_{org_id}_{stream_name}.log'
handler = RotatingFileHandler(log_file, maxBytes=2_000_000, backupCount=5)
formatter = logging.Formatter('%(asctime)s - %(message)s')
handler.setFormatter(formatter)
output_logger.addHandler(handler)
output_logger.setLevel(logging.INFO)
return output_logger

def _redirect_output(self, org_id: str, stream, stream_name: str):
"""
Read the output from a stream (e.g. stdout or stderr) and write it, line by line,
to a rotating log file.
"""
output_logger = self._get_rotating_logger(org_id, stream_name)
for line in iter(stream.readline, ''):
if line:
output_logger.info(line.rstrip())
stream.close()

def start_org_manager(self, org_id: str, secrets: Dict[str, str]) -> subprocess.Popen:
"""Start a new manager process for an organization."""
env = os.environ.copy()
Expand All @@ -87,23 +119,21 @@ def start_org_manager(self, org_id: str, secrets: Dict[str, str]) -> subprocess.
# Get the path to org_manager.py relative to this file
manager_path = Path(__file__).parent / 'org_manager.py'

# Create log directory if it doesn't exist
log_dir = Path('logs')
log_dir.mkdir(exist_ok=True)

# Open log files
stdout_path = log_dir / f'org_{org_id}_stdout.log'
stderr_path = log_dir / f'org_{org_id}_stderr.log'
stdout = open(stdout_path, 'a')
stderr = open(stderr_path, 'a')

# Instead of opening a file to write stdout/stderr directly,
# we use PIPEs and redirect the output in separate threads.
process = subprocess.Popen(
[sys.executable, str(manager_path)],
env=env,
stdout=stdout,
stderr=stderr
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1 # Line-buffered
)

# Spawn threads to capture and log stdout and stderr
threading.Thread(target=self._redirect_output, args=(org_id, process.stdout, 'stdout'), daemon=True).start()
threading.Thread(target=self._redirect_output, args=(org_id, process.stderr, 'stderr'), daemon=True).start()

logger.info(f"Started manager for organization {org_id} (PID: {process.pid})")
return process

Expand Down

0 comments on commit ecdefb5

Please sign in to comment.