Skip to content

Commit

Permalink
fix: Impl requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
raizo07 committed Nov 7, 2024
1 parent 3658807 commit ff71cde
Showing 1 changed file with 75 additions and 97 deletions.
172 changes: 75 additions & 97 deletions scripts/data/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,49 +11,39 @@
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import random
import signal
import sys
from generate_data import generate_data
from format_args import format_args
import logging
from logging.handlers import TimedRotatingFileHandler
import signal
import sys

# Initialize logger
logger = logging.getLogger(__name__)

# Constants
MAX_WEIGHT_LIMIT = 8000 # Total weight limit for all jobs
THREAD_POOL_SIZE = os.cpu_count() # Number of threads for processing
QUEUE_MAX_SIZE = THREAD_POOL_SIZE * 2 # Maximum size of the job queue
BASE_DIR = Path(".client_cache")
BASE_DIR = Path(__file__).resolve().parent / ".client_cache" # Use absolute path

# Shared state variables
current_weight = 0
weight_lock = threading.Condition()
job_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)

shutdown_event = threading.Event()
executor = None # Global executor reference for shutdown


def handle_sigterm(signum, frame):
"""Handle SIGTERM by initiating immediate shutdown"""
"""Handle SIGTERM by initiating immediate shutdown."""
logger.info("Received SIGTERM signal. Initiating immediate shutdown...")
shutdown_event.set()

# Clear the job queue
while not job_queue.empty():
try:
job_queue.get_nowait()
job_queue.task_done()
except queue.Empty:
break

# If executor exists, shutdown immediately
# If executor exists, wait for current tasks to complete
if executor:
logger.info("Shutting down thread pool...")
executor.shutdown(wait=False)
executor.shutdown(wait=True)

# Exit the program
logger.info("Shutdown complete")
sys.exit(0)

Expand Down Expand Up @@ -98,17 +88,77 @@ def job_generator(start, blocks, step, mode, strategy):
logger.info("Shutdown detected in generator. Stopping job generation.")
break
try:
batch_file = BASE_DIR / f"{mode}_{height}_{step}.json"
batch_file = BASE_DIR / f"{mode}{height}_{step}.json"
batch_data = generate_data(
mode=mode, initial_height=height, num_blocks=step, fast=True
)
Path(batch_file).write_text(json.dumps(batch_data, indent=2))
batch_file.write_text(json.dumps(batch_data, indent=2))
batch_weight = calculate_batch_weight(batch_data, mode)
yield Job(height, step, mode, batch_weight, batch_file), batch_weight
except Exception as e:
logger.error(f"Error while generating data for: {height}:\n{e}")


def run_process(arguments_file, job):
"""Run the process and handle termination or errors."""
process = subprocess.Popen(
[
"scarb",
"cairo-run",
"--no-build",
"--package",
"client",
"--function",
"main",
"--arguments-file",
str(arguments_file),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

while process.poll() is None:
if shutdown_event.is_set():
logger.info(f"Shutdown detected, terminating process for job: {job}")
process.terminate()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
process.kill()
return

stdout, stderr = process.communicate()

if (
process.returncode != 0
or "FAIL" in stdout
or "error" in stdout
or "panicked" in stdout
):
error = stdout or stderr
if process.returncode == -9:
match = re.search(r"gas_spent=(\d+)", stdout)
gas_info = (
f", gas spent: {int(match.group(1))}"
if match
else ", no gas info found"
)
error = f"Return code -9, killed by OOM?{gas_info}"
message = error
else:
error_match = re.search(r"error='([^']*)'", error)
message = error_match.group(1) if error_match else ""
logger.error(f"{job} error: {message}")
logger.debug(f"Full error while processing: {job}:\n{error}")
else:
match = re.search(r"gas_spent=(\d+)", stdout)
gas_info = f"gas spent: {int(match.group(1))}" if match else "no gas info found"
logger.info(f"{job} done, {gas_info}")
if not match:
logger.warning(f"{job}: no gas info found")


def process_batch(job):
if shutdown_event.is_set():
logger.info(f"Shutdown detected, skipping job: {job}")
Expand All @@ -118,66 +168,7 @@ def process_batch(job):
try:
with open(arguments_file, "w") as af:
af.write(str(format_args(job.batch_file, False, False)))

process = subprocess.Popen(
[
"scarb",
"cairo-run",
"--no-build",
"--package",
"client",
"--function",
"main",
"--arguments-file",
str(arguments_file),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

while process.poll() is None:
if shutdown_event.is_set():
logger.info(f"Shutdown detected, terminating process for job: {job}")
process.terminate()
try:
process.wait(timeout=2) # Give it 2 seconds to terminate gracefully
except subprocess.TimeoutExpired:
process.kill() # Force kill if it doesn't terminate
return

result = process.communicate()
stdout, stderr = result

if (
process.returncode != 0
or "FAIL" in stdout
or "error" in stdout
or "panicked" in stdout
):
error = stdout or stderr
if process.returncode == -9:
match = re.search(r"gas_spent=(\d+)", stdout)
gas_info = (
f", gas spent: {int(match.group(1))}"
if match
else ", no gas info found"
)
error = f"Return code -9, killed by OOM?{gas_info}"
message = error
else:
error_match = re.search(r"error='([^']*)'", error)
message = error_match.group(1) if error_match else ""
logger.error(f"{job} error: {message}")
logger.debug(f"Full error while processing: {job}:\n{error}")
else:
match = re.search(r"gas_spent=(\d+)", stdout)
gas_info = (
f"gas spent: {int(match.group(1))}" if match else "no gas info found"
)
logger.info(f"{job} done, {gas_info}")
if not match:
logger.warning(f"{job}: no gas info found")
run_process(arguments_file, job)
except Exception as e:
logger.error(f"Error processing batch {job}: {e}")

Expand All @@ -200,12 +191,12 @@ def job_producer(job_gen):
or job_queue.full()
):
logger.debug("Producer is waiting for weight to be released.")
weight_lock.wait(timeout=1) # Add timeout to check shutdown_event
weight_lock.wait(timeout=1)

if shutdown_event.is_set():
break

if (current_weight + weight > MAX_WEIGHT_LIMIT) and current_weight == 0:
if current_weight + weight > MAX_WEIGHT_LIMIT and current_weight == 0:
logger.warning(f"{job} over the weight limit: {MAX_WEIGHT_LIMIT}")

job_queue.put((job, weight))
Expand All @@ -230,24 +221,19 @@ def job_consumer(process_job):
f"Consumer is waiting for a job. Queue length: {job_queue.qsize()}"
)
try:
work_to_do = job_queue.get(
timeout=1
) # Add timeout to check shutdown_event
work_to_do = job_queue.get(timeout=1)
except queue.Empty:
continue

if work_to_do is None:
job_queue.task_done()
break

(job, weight) = work_to_do

job, weight = work_to_do
try:
if not shutdown_event.is_set():
logger.debug(f"Executing job: {job}...")
process_job(job)
except Exception as e:
logger.error(f"Error while processing job: {job}:\n{e}")
finally:
with weight_lock:
current_weight -= weight
Expand All @@ -260,7 +246,7 @@ def job_consumer(process_job):


def main(start, blocks, step, mode, strategy):
global executor # Use global executor for shutdown access
global executor

logger.info(
"Starting client, initial height: %d, blocks: %d, step: %d, mode: %s, strategy: %s",
Expand All @@ -270,12 +256,6 @@ def main(start, blocks, step, mode, strategy):
mode,
strategy,
)
logger.info(
"Max weight limit: %d, Thread pool size: %d, Queue max size: %d",
MAX_WEIGHT_LIMIT,
THREAD_POOL_SIZE,
QUEUE_MAX_SIZE,
)

job_gen = job_generator(start, blocks, step, mode, strategy)
producer_thread = threading.Thread(target=job_producer, args=(job_gen,))
Expand All @@ -287,10 +267,8 @@ def main(start, blocks, step, mode, strategy):
executor.submit(job_consumer, process_batch)
for _ in range(THREAD_POOL_SIZE)
]

producer_thread.join()
job_queue.join()

except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt, initiating shutdown...")
handle_sigterm(signal.SIGTERM, None)
Expand Down

0 comments on commit ff71cde

Please sign in to comment.