Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejka committed Nov 27, 2024
1 parent cb20442 commit b5973a9
Showing 1 changed file with 14 additions and 25 deletions.
39 changes: 14 additions & 25 deletions scripts/data/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
job_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
shutdown_requested = False


class ShutdownRequested(Exception):
"""Raised when shutdown is requested during process execution"""

Expand Down Expand Up @@ -102,9 +103,7 @@ def __str__(self):


# Generator function to create jobs
def job_generator(
start, blocks, step, mode, strategy, execute_scripts
):
def job_generator(start, blocks, step, mode, strategy, execute_scripts):
BASE_DIR.mkdir(exist_ok=True)
end = start + blocks

Expand Down Expand Up @@ -152,12 +151,7 @@ def process_batch(job):
]
)

if (
returncode != 0
or "FAIL" in stdout
or "error" in stdout
or "panicked" in stdout
):
if returncode != 0 or "FAIL" in stdout or "error" in stdout or "panicked" in stdout:
error = stdout or stderr
if returncode == -9:
match = re.search(r"gas_spent=(\d+)", stdout)
Expand Down Expand Up @@ -186,18 +180,17 @@ def process_batch(job):
logger.debug(f"Full error while processing: {job}:\n{error}")
else:
match = re.search(r"gas_spent=(\d+)", stdout)
gas_info = (
f"gas spent: {int(match.group(1))}" if match else "no gas info found"
)
gas_info = f"gas spent: {int(match.group(1))}" if match else "no gas info found"
logger.info(f"{job} done, {gas_info}")
if not match:
logger.warning(f"{job}: no gas info found")


# Producer function: Generates data and adds jobs to the queue
def job_producer(job_gen):
global current_weight
global shutdown_requested

try:
for job, weight in job_gen:
# if shutdown_requested:
Expand Down Expand Up @@ -236,7 +229,9 @@ def job_producer(job_gen):
# Signal end of jobs to consumers
if not shutdown_requested:
for _ in range(THREAD_POOL_SIZE):
logger.warning(f"Producer is putting None into the queue..., full: {job_queue.full()}")
logger.warning(
f"Producer is putting None into the queue..., full: {job_queue.full()}"
)
job_queue.put(None, block=False)

with weight_lock:
Expand Down Expand Up @@ -283,7 +278,7 @@ def job_consumer(process_job):
logger.debug(f"Shutdown requested while processing {job}")
return
except subprocess.TimeoutExpired:
logger.warning(f"Timeout while terminating subprocess for {job}")
logger.warning(f"Timeout while terminating subprocess for {job}")
except Exception as e:
logger.error(f"Error while processing job: {job}:\n{e}")

Expand All @@ -305,6 +300,7 @@ def job_consumer(process_job):

logger.debug("Job consumer done.")


def main(start, blocks, step, mode, strategy, execute_scripts):
global shutdown_requested

Expand Down Expand Up @@ -335,14 +331,10 @@ def signal_handler(signum, frame):
)

# Create the job generator
job_gen = job_generator(
start, blocks, step, mode, strategy, execute_scripts
)
job_gen = job_generator(start, blocks, step, mode, strategy, execute_scripts)

# Start the job producer thread
producer_thread = threading.Thread(
target=job_producer, args=(job_gen, )
)
producer_thread = threading.Thread(target=job_producer, args=(job_gen,))
producer_thread.start()

# Start the consumer threads using ThreadPoolExecutor
Expand All @@ -356,10 +348,7 @@ def signal_handler(signum, frame):
producer_thread.join()

# Wait for all items in the queue to be processed or shutdown
while (
not shutdown_requested
and not job_queue.empty()
):
while not shutdown_requested and not job_queue.empty():
try:
job_queue.join()
break
Expand Down

0 comments on commit b5973a9

Please sign in to comment.