diff --git a/scripts/data/client.py b/scripts/data/client.py index 468a68dd..c3c8d543 100755 --- a/scripts/data/client.py +++ b/scripts/data/client.py @@ -32,6 +32,7 @@ job_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) shutdown_requested = False + class ShutdownRequested(Exception): """Raised when shutdown is requested during process execution""" @@ -102,9 +103,7 @@ def __str__(self): # Generator function to create jobs -def job_generator( - start, blocks, step, mode, strategy, execute_scripts -): +def job_generator(start, blocks, step, mode, strategy, execute_scripts): BASE_DIR.mkdir(exist_ok=True) end = start + blocks @@ -152,12 +151,7 @@ def process_batch(job): ] ) - if ( - returncode != 0 - or "FAIL" in stdout - or "error" in stdout - or "panicked" in stdout - ): + if returncode != 0 or "FAIL" in stdout or "error" in stdout or "panicked" in stdout: error = stdout or stderr if returncode == -9: match = re.search(r"gas_spent=(\d+)", stdout) @@ -186,18 +180,17 @@ def process_batch(job): logger.debug(f"Full error while processing: {job}:\n{error}") else: match = re.search(r"gas_spent=(\d+)", stdout) - gas_info = ( - f"gas spent: {int(match.group(1))}" if match else "no gas info found" - ) + gas_info = f"gas spent: {int(match.group(1))}" if match else "no gas info found" logger.info(f"{job} done, {gas_info}") if not match: logger.warning(f"{job}: no gas info found") + # Producer function: Generates data and adds jobs to the queue def job_producer(job_gen): global current_weight global shutdown_requested - + try: for job, weight in job_gen: # if shutdown_requested: @@ -236,7 +229,9 @@ def job_producer(job_gen): # Signal end of jobs to consumers if not shutdown_requested: for _ in range(THREAD_POOL_SIZE): - logger.warning(f"Producer is putting None into the queue..., full: {job_queue.full()}") + logger.warning( + f"Producer is putting None into the queue..., full: {job_queue.full()}" + ) job_queue.put(None, block=False) with weight_lock: @@ -283,7 +278,7 @@ def job_consumer(process_job): logger.debug(f"Shutdown requested while processing {job}") return except subprocess.TimeoutExpired: - logger.warning(f"Timeout while terminating subprocess for {job}") + logger.warning(f"Timeout while terminating subprocess for {job}") except Exception as e: logger.error(f"Error while processing job: {job}:\n{e}") @@ -305,6 +300,7 @@ def job_consumer(process_job): logger.debug("Job consumer done.") + def main(start, blocks, step, mode, strategy, execute_scripts): global shutdown_requested @@ -335,14 +331,10 @@ def signal_handler(signum, frame): ) # Create the job generator - job_gen = job_generator( - start, blocks, step, mode, strategy, execute_scripts - ) + job_gen = job_generator(start, blocks, step, mode, strategy, execute_scripts) # Start the job producer thread - producer_thread = threading.Thread( - target=job_producer, args=(job_gen, ) - ) + producer_thread = threading.Thread(target=job_producer, args=(job_gen,)) producer_thread.start() # Start the consumer threads using ThreadPoolExecutor @@ -356,10 +348,7 @@ def signal_handler(signum, frame): producer_thread.join() # Wait for all items in the queue to be processed or shutdown - while ( - not shutdown_requested - and not job_queue.empty() - ): + while not shutdown_requested and not job_queue.empty(): try: job_queue.join() break