Skip to content

Commit

Permalink
fixes 23
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejka committed Oct 17, 2024
1 parent 1ea0424 commit 7c104db
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 17 deletions.
1 change: 1 addition & 0 deletions packages/client/src/test.cairo
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct UtreexoArgs {
/// Panics in case of a validation error or chain state mismatch.
/// Prints result to the stdout.
pub(crate) fn main(mut arguments: Span<felt252>, execute_script: bool) {
println!("Running integration test... ");
let mut gas_before = get_available_gas();

let Args { mut chain_state, blocks, expected_chain_state, utreexo_args } = Serde::deserialize(
Expand Down
48 changes: 32 additions & 16 deletions scripts/data/client2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from dataclasses import dataclass
import json
import re
import os
import threading
import queue
import argparse
Expand All @@ -13,9 +16,9 @@
logger = logging.getLogger(__name__)

# Constants
MAX_WEIGHT_LIMIT = 100 # Total weight limit for all jobs
THREAD_POOL_SIZE = 4 # Number of threads for processing
QUEUE_MAX_SIZE = THREAD_POOL_SIZE * 3 # Maximum size of the job queue
MAX_WEIGHT_LIMIT = 1000 # Total weight limit for all jobs
THREAD_POOL_SIZE = os.cpu_count() # Number of threads for processing
QUEUE_MAX_SIZE = THREAD_POOL_SIZE * 2 # Maximum size of the job queue

BASE_DIR = Path(".client_cache")

Expand All @@ -36,6 +39,15 @@ def calculate_batch_weight(block_data, mode):
for tx in block["data"]["transactions"]
)

@dataclass
class Job:
height: int
step: int
mode: str
batch_file: Path

def __str__(self):
return f"Job(height='{self.height}', step={self.step}, mode='{self.mode}')"

# Generator function to create jobs
def job_generator(start, blocks, step, mode, strategy):
Expand All @@ -58,17 +70,17 @@ def job_generator(start, blocks, step, mode, strategy):
Path(batch_file).write_text(json.dumps(batch_data, indent=2))

batch_weight = calculate_batch_weight(batch_data, mode)
yield batch_file, batch_weight
yield Job(height, step, mode, batch_file), batch_weight


# Function to process a batch
def process_batch(batch_file):
logger.debug(f"Running client on: {batch_file} {type(batch_file)}")
def process_batch(job):
logger.debug(f"Running client on: {job}")

arguments_file = batch_file.as_posix().replace(".json", "-arguments.json")
arguments_file = job.batch_file.as_posix().replace(".json", "-arguments.json")

with open(arguments_file, "w") as af:
af.write(str(format_args(batch_file, False, False)))
af.write(str(format_args(job.batch_file, False, False)))

result = subprocess.run(
[
Expand All @@ -89,16 +101,18 @@ def process_batch(batch_file):
if (
result.returncode != 0
or "FAIL" in result.stdout
or "error" in result.stderr
or "error" in result.stdout
or "panicked" in result.stderr
or "panicked" in result.stdout
):
logger.error(
f"Error while processing: {batch_file}:\n{result.stderr}\n{result.stderr}"
f"Error while processing: {job}:\n{result.stdout or result.stderr}"
)
else:
logger.info(f"{batch_file} processed: {result.stdout}")
match = re.search(r'gas_spent=(\d+)', result.stdout)
if not match:
logger.warning(f"While processing: {job}: not gas info found")
else:
logger.info(f"{job} processed, gas spent: {int(match.group(1))}")


# Producer function: Generates data and adds jobs to the queue
Expand All @@ -110,15 +124,15 @@ def job_producer(job_gen):
with weight_lock:
while (
current_weight + weight > MAX_WEIGHT_LIMIT or job_queue.full()
) or not (job_queue.empty() and weight > MAX_WEIGHT_LIMIT):
): # or not (job_queue.empty() and weight > MAX_WEIGHT_LIMIT):
logger.debug("Producer is waiting for weight to be released.")
weight_lock.wait() # Wait for the condition to be met

# Add the job to the queue and update the weight
job_queue.put((job, weight))
current_weight += weight
logger.debug(
f"Produced job with weight {weight}, current total weight: {current_weight}"
f"Produced job: {job} with weight {weight}, current total weight: {current_weight}"
)

# Notify consumers that a new job is available
Expand Down Expand Up @@ -191,10 +205,12 @@ def main(start, blocks, step, mode, strategy):
if __name__ == "__main__":
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
console_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)-10.10s - %(levelname)s - %(message)s"))
root_logger = logging.getLogger()
root_logger.addHandler(console_handler)
root_logger.setLevel(logging.DEBUG)
root_logger.setLevel(logging.INFO)

logging.getLogger("urllib3").setLevel(logging.WARNING)

parser = argparse.ArgumentParser(description="Run client script")
parser.add_argument("--start", type=int, required=True, help="Start block height")
Expand Down
2 changes: 1 addition & 1 deletion scripts/data/generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def generate_data(

for i in range(num_blocks):
logger.debug(
f"\rFetching block {initial_height + i + 1}/{initial_height + num_blocks}"
f"Fetching block {initial_height + i + 1}/{initial_height + num_blocks}"
)

# Interblock cache
Expand Down

0 comments on commit 7c104db

Please sign in to comment.