From 70fcefd9cd1d85476570ee133fa4276d6b5852c2 Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 27 Sep 2024 16:59:06 -0400 Subject: [PATCH 01/19] fix: collector cloning in build.sh --- build.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/build.sh b/build.sh index bcea918..8199bdc 100755 --- a/build.sh +++ b/build.sh @@ -166,6 +166,7 @@ ARG1=${1:-yes_collector} if [ "$DEVMODE" = "true" ]; then echo "Building local collector..." + rm -rf snapshotter-lite-local-collector git clone https://github.com/powerloom/snapshotter-lite-local-collector.git --single-branch --branch main (cd ./snapshotter-lite-local-collector/ && chmod +x build-docker.sh && ./build-docker.sh) From c1b1c3cc7853b62e4e9c71273e511578a4a52c5e Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 27 Sep 2024 18:24:51 -0400 Subject: [PATCH 02/19] chore: update last finalized core api endpoint to not loop --- snapshotter/core_api.py | 34 +++++++++++----------------------- snapshotter/utils/rpc.py | 9 ++++++++- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 54f32f4..21c42c9 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -200,33 +200,21 @@ async def get_project_last_finalized_epoch_info( try: # Find the last finalized epoch from the contract - epoch_finalized = False - [cur_epoch] = await request.app.state.anchor_rpc_helper.web3_call( - [request.app.state.protocol_state_contract.functions.currentEpoch(Web3.to_checksum_address(settings.data_market))], + [project_last_finalized_epoch] = await request.app.state.anchor_rpc_helper.web3_call( + tasks=[ + ('lastFinalizedSnapshot', [Web3.to_checksum_address(settings.data_market), project_id]), + ], + contract_addr=protocol_state_contract_address, + abi=protocol_state_contract_abi, ) - epoch_id = int(cur_epoch[2]) - - # Iterate backwards through epochs until a finalized one is found - while not epoch_finalized and epoch_id >= 0: - # Get finalization status - [epoch_finalized_contract] = await request.app.state.anchor_rpc_helper.web3_call( - [request.app.state.protocol_state_contract.functions.snapshotStatus(settings.data_market, project_id, epoch_id)], - ) - if epoch_finalized_contract[0]: - epoch_finalized = True - project_last_finalized_epoch = epoch_id - else: - epoch_id -= 1 - if epoch_id < 0: - response.status_code = 404 - return { - 'status': 'error', - 'message': f'Unable to find last finalized epoch for project {project_id}', - } # Get epoch info for the last finalized epoch [epoch_info_data] = await request.app.state.anchor_rpc_helper.web3_call( - [request.app.state.protocol_state_contract.functions.epochInfo(Web3.to_checksum_address(settings.data_market), project_last_finalized_epoch)], + tasks=[ + ('epochInfo', [Web3.to_checksum_address(settings.data_market), project_last_finalized_epoch]), + ], + contract_addr=protocol_state_contract_address, + abi=protocol_state_contract_abi, ) epoch_info = { 'epochId': project_last_finalized_epoch, diff --git a/snapshotter/utils/rpc.py b/snapshotter/utils/rpc.py index 937044b..a8189b4 100644 --- a/snapshotter/utils/rpc.py +++ b/snapshotter/utils/rpc.py @@ -460,8 +460,15 @@ async def f(node_idx): response = await asyncio.gather(*web3_tasks) return response except Exception as e: + # Create a serializable version of the tasks + serializable_tasks = [ + { + "function_name": task[0], + "args": task[1] + } for task in tasks + ] exc = RPCException( - request=tasks, + request=serializable_tasks, response=None, underlying_exception=e, extra_info={'msg': str(e)}, From 72f56362f98c56b57ca29c018b56c5b6e7e85f81 Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 27 Sep 2024 18:25:22 -0400 Subject: [PATCH 03/19] chore: pre-commit formatting --- snapshotter/core_api.py | 23 +++++++++++++++++------ snapshotter/utils/rpc.py | 4 ++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 21c42c9..5a0f572 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -4,7 +4,6 @@ It includes functionality for health checks, epoch information retrieval, project data fetching, and task status checking. """ - from fastapi import FastAPI from fastapi import Request from fastapi import Response @@ -117,7 +116,11 @@ async def get_current_epoch( """ try: [current_epoch_data] = await request.app.state.anchor_rpc_helper.web3_call( - [request.app.state.protocol_state_contract.functions.currentEpoch(Web3.to_checksum_address(settings.data_market))], + [ + request.app.state.protocol_state_contract.functions.currentEpoch( + Web3.to_checksum_address(settings.data_market), + ), + ], ) current_epoch = { 'begin': current_epoch_data[0], @@ -158,7 +161,11 @@ async def get_epoch_info( """ try: [epoch_info_data] = await request.app.state.anchor_rpc_helper.web3_call( - [request.app.state.protocol_state_contract.functions.epochInfo(Web3.to_checksum_address(settings.data_market), epoch_id)], + [ + request.app.state.protocol_state_contract.functions.epochInfo( + Web3.to_checksum_address(settings.data_market), epoch_id, + ), + ], ) epoch_info = { 'timestamp': epoch_info_data[0], @@ -207,7 +214,7 @@ async def get_project_last_finalized_epoch_info( contract_addr=protocol_state_contract_address, abi=protocol_state_contract_abi, ) - + # Get epoch info for the last finalized epoch [epoch_info_data] = await request.app.state.anchor_rpc_helper.web3_call( tasks=[ @@ -372,11 +379,15 @@ async def get_task_status_post( # Construct project ID project_id = f'{task_status_request.task_type}:{task_status_request.wallet_address.lower()}:{settings.namespace}' - + try: # Get the last finalized epoch for the project [last_finalized_epoch] = await request.app.state.anchor_rpc_helper.web3_call( - [request.app.state.protocol_state_contract.functions.lastFinalizedSnapshot(Web3.to_checksum_address(settings.data_market), project_id)], + [ + request.app.state.protocol_state_contract.functions.lastFinalizedSnapshot( + Web3.to_checksum_address(settings.data_market), project_id, + ), + ], ) except Exception as e: diff --git a/snapshotter/utils/rpc.py b/snapshotter/utils/rpc.py index a8189b4..960c61c 100644 --- a/snapshotter/utils/rpc.py +++ b/snapshotter/utils/rpc.py @@ -463,8 +463,8 @@ async def f(node_idx): # Create a serializable version of the tasks serializable_tasks = [ { - "function_name": task[0], - "args": task[1] + 'function_name': task[0], + 'args': task[1], } for task in tasks ] exc = RPCException( From 080b2f6f0f8bb523b9b23aaa414461379aff9824 Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 27 Sep 2024 19:58:23 -0400 Subject: [PATCH 04/19] chore: update all core-api endpoints for contract/data util changes --- snapshotter/core_api.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 5a0f572..be5936d 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -20,6 +20,7 @@ from snapshotter.utils.default_logger import logger from snapshotter.utils.file_utils import read_json_file from snapshotter.utils.models.data_models import TaskStatusRequest +from snapshotter.utils.redis.redis_conn import RedisPoolCache from snapshotter.utils.rpc import RpcHelper @@ -79,6 +80,9 @@ async def startup_boilerplate(): await app.state.ipfs_singleton.init_sessions() app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client app.state.epoch_size = 0 + app.state._aioredis_pool = RedisPoolCache() + await app.state._aioredis_pool.populate() + app.state.redis_conn = app.state._aioredis_pool._aioredis_pool @app.get('/health') @@ -116,11 +120,11 @@ async def get_current_epoch( """ try: [current_epoch_data] = await request.app.state.anchor_rpc_helper.web3_call( - [ - request.app.state.protocol_state_contract.functions.currentEpoch( - Web3.to_checksum_address(settings.data_market), - ), + tasks=[ + ('currentEpoch', [Web3.to_checksum_address(settings.data_market)]), ], + contract_addr=protocol_state_contract_address, + abi=protocol_state_contract_abi, ) current_epoch = { 'begin': current_epoch_data[0], @@ -161,11 +165,11 @@ async def get_epoch_info( """ try: [epoch_info_data] = await request.app.state.anchor_rpc_helper.web3_call( - [ - request.app.state.protocol_state_contract.functions.epochInfo( - Web3.to_checksum_address(settings.data_market), epoch_id, - ), + tasks=[ + ('epochInfo', [Web3.to_checksum_address(settings.data_market), epoch_id]), ], + contract_addr=protocol_state_contract_address, + abi=protocol_state_contract_abi, ) epoch_info = { 'timestamp': epoch_info_data[0], @@ -271,6 +275,7 @@ async def get_data_for_project_id_epoch_id( } try: data = await get_project_epoch_snapshot( + request.app.state.redis_conn, request.app.state.protocol_state_contract, request.app.state.anchor_rpc_helper, request.app.state.ipfs_reader_client, @@ -321,8 +326,8 @@ async def get_finalized_cid_for_project_id_epoch_id( try: data = await get_project_finalized_cid( + request.app.state.redis_conn, request.app.state.protocol_state_contract, - settings.data_market, request.app.state.anchor_rpc_helper, epoch_id, project_id, @@ -383,11 +388,11 @@ async def get_task_status_post( try: # Get the last finalized epoch for the project [last_finalized_epoch] = await request.app.state.anchor_rpc_helper.web3_call( - [ - request.app.state.protocol_state_contract.functions.lastFinalizedSnapshot( - Web3.to_checksum_address(settings.data_market), project_id, - ), + tasks=[ + ('lastFinalizedSnapshot', [Web3.to_checksum_address(settings.data_market), project_id]), ], + contract_addr=protocol_state_contract_address, + abi=protocol_state_contract_abi, ) except Exception as e: From c5c9de8d2099a9bf87c5480756700113b8b5467d Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 30 Sep 2024 13:19:33 -0400 Subject: [PATCH 05/19] chore: initial implementation for chain rpc rate limiting --- snapshotter/utils/models/settings_model.py | 8 ++++++ snapshotter/utils/rpc.py | 31 ++++++++++++++++------ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/snapshotter/utils/models/settings_model.py b/snapshotter/utils/models/settings_model.py index b80d70d..9201704 100644 --- a/snapshotter/utils/models/settings_model.py +++ b/snapshotter/utils/models/settings_model.py @@ -34,6 +34,13 @@ class ConnectionLimits(BaseModel): keepalive_expiry: int = 300 +class RateLimitConfig(BaseModel): + """RPC Rate limit configuration model.""" + requests_per_second: int + requests_per_minute: int + requests_per_day: int + + class RPCConfigBase(BaseModel): """Base RPC configuration model.""" full_nodes: List[RPCNodeConfig] @@ -49,6 +56,7 @@ class RPCConfigFull(RPCConfigBase): skip_epoch_threshold_blocks: int polling_interval: int semaphore_value: int = 20 + rate_limit: RateLimitConfig class RLimit(BaseModel): diff --git a/snapshotter/utils/rpc.py b/snapshotter/utils/rpc.py index 960c61c..ecf94b3 100644 --- a/snapshotter/utils/rpc.py +++ b/snapshotter/utils/rpc.py @@ -5,6 +5,7 @@ import eth_abi import tenacity +from aiolimiter import AsyncLimiter from eth_abi.codec import ABICodec from eth_utils import keccak from hexbytes import HexBytes @@ -151,6 +152,7 @@ def __init__(self, rpc_settings: RPCConfigBase = settings.rpc, archive_mode=Fals self._client = None self._async_transport = None self._semaphore = None + self._rate_limiter = None async def _init_http_clients(self): """ @@ -201,6 +203,11 @@ async def init(self, redis_conn=None): """ if not self._initialized: self._semaphore = asyncio.BoundedSemaphore(value=settings.rpc.semaphore_value) + + # Initialize rate limiter + requests_per_second = self._rpc_settings.rate_limit.requests_per_second + self._rate_limiter = AsyncLimiter(requests_per_second) + if not self._sync_nodes_initialized: self._logger.debug('Sync nodes not initialized, initializing...') self.sync_init() @@ -298,6 +305,10 @@ def _on_node_exception(self, retry_state: tenacity.RetryCallState): next_node_idx, retry_state.outcome.exception(), ) + async def _rate_limited_call(self, coroutine): + async with self._rate_limiter: + return await coroutine + @acquire_rpc_semaphore async def get_current_block_number(self, redis_conn=None): """ @@ -324,7 +335,7 @@ async def f(node_idx): web3_provider = node['web3_client_async'] try: - current_block = await web3_provider.eth.block_number + current_block = await self._rate_limited_call(web3_provider.eth.block_number) except Exception as e: exc = RPCException( request='get_current_block_number', @@ -365,8 +376,8 @@ async def f(node_idx): node = self._nodes[node_idx] try: - tx_receipt_details = await node['web3_client_async'].eth.get_transaction_receipt( - tx_hash, + tx_receipt_details = await self._rate_limited_call( + node['web3_client_async'].eth.get_transaction_receipt(tx_hash), ) except Exception as e: exc = RPCException( @@ -410,7 +421,7 @@ async def f(node_idx): web3_provider = node['web3_client_async'] try: - current_block = await web3_provider.eth.block_number + current_block = await self._rate_limited_call(web3_provider.eth.block_number) except Exception as e: exc = RPCException( request='get_current_block_number', @@ -455,7 +466,9 @@ async def f(node_idx): abi=abi, ) web3_tasks = [ - contract_obj.functions[task[0]](*task[1]).call() for task in tasks + self._rate_limited_call( + contract_obj.functions[task[0]](*task[1]).call(), + ) for task in tasks ] response = await asyncio.gather(*web3_tasks) return response @@ -509,7 +522,9 @@ async def f(node_idx): node = self._nodes[node_idx] rpc_url = node.get('rpc_url') try: - response = await self._client.post(url=rpc_url, json=rpc_query) + response = await self._rate_limited_call( + self._client.post(url=rpc_url, json=rpc_query), + ) response_data = response.json() except Exception as e: exc = RPCException( @@ -827,8 +842,8 @@ async def f(node_idx): 'topics': topics, } try: - event_log = await web3_provider.eth.get_logs( - event_log_query, + event_log = await self._rate_limited_call( + web3_provider.eth.get_logs(event_log_query), ) codec: ABICodec = web3_provider.codec all_events = [] From 781433fbd87ff344d0f268e7a703ff1c22b44a6b Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 30 Sep 2024 16:12:47 -0400 Subject: [PATCH 06/19] chore: remove deprecated redis_conn argument from rpc functions --- snapshotter/utils/rpc.py | 46 ++++++++++++---------------------------- 1 file changed, 13 insertions(+), 33 deletions(-) diff --git a/snapshotter/utils/rpc.py b/snapshotter/utils/rpc.py index ecf94b3..6a021c7 100644 --- a/snapshotter/utils/rpc.py +++ b/snapshotter/utils/rpc.py @@ -189,15 +189,12 @@ async def _load_async_web3_providers(self): self._logger.info('Loaded async web3 provider for node {}: {}', node['rpc_url'], node['web3_client_async']) self._logger.info('Post async web3 provider loading: {}', self._nodes) - async def init(self, redis_conn=None): + async def init(self): """ Initializes the RPC client by loading web3 providers and rate limits, loading rate limit SHAs, initializing HTTP clients, and loading async web3 providers. - Args: - redis_conn: Redis connection object. - Returns: None """ @@ -310,13 +307,8 @@ async def _rate_limited_call(self, coroutine): return await coroutine @acquire_rpc_semaphore - async def get_current_block_number(self, redis_conn=None): + async def get_current_block_number(self): """ - Returns the current block number of the Ethereum blockchain. - - Args: - redis_conn: Redis connection object. - Returns: int: The current block number of the Ethereum blockchain. @@ -350,13 +342,12 @@ async def f(node_idx): return await f(node_idx=0) @acquire_rpc_semaphore - async def get_transaction_receipt(self, tx_hash, redis_conn=None): + async def get_transaction_receipt(self, tx_hash): """ Retrieves the transaction receipt for a given transaction hash. Args: tx_hash (str): The transaction hash for which to retrieve the receipt. - redis_conn: Redis connection object. Returns: dict: The transaction receipt details as a dictionary. @@ -395,12 +386,11 @@ async def f(node_idx): return await f(node_idx=0) @acquire_rpc_semaphore - async def get_current_block(self, redis_conn=None, node_idx=0): + async def get_current_block(self, node_idx=0): """ Returns the current block number of the Ethereum blockchain. Args: - redis_conn: Redis connection object. node_idx (int): Index of the node to use for the RPC call. Returns: @@ -496,13 +486,12 @@ async def f(node_idx): return await f(node_idx=0) @acquire_rpc_semaphore - async def _make_rpc_jsonrpc_call(self, rpc_query, redis_conn=None): + async def _make_rpc_jsonrpc_call(self, rpc_query): """ Makes an RPC JSON-RPC call to a node in the pool. Args: rpc_query (dict): The JSON-RPC query to be sent. - redis_conn (Redis): The Redis connection object. Returns: dict: The JSON-RPC response data. @@ -584,7 +573,6 @@ async def f(node_idx): async def batch_eth_get_balance_on_block_range( self, address, - redis_conn, from_block, to_block, ): @@ -593,7 +581,6 @@ async def batch_eth_get_balance_on_block_range( Args: address (str): The Ethereum address to retrieve the balance for. - redis_conn (redis.Redis): The Redis connection object. from_block (int): The starting block number. to_block (int): The ending block number. @@ -615,7 +602,7 @@ async def batch_eth_get_balance_on_block_range( request_id += 1 try: - response_data = await self._make_rpc_jsonrpc_call(rpc_query, redis_conn) + response_data = await self._make_rpc_jsonrpc_call(rpc_query) rpc_response = [] if not isinstance(response_data, list) and response_data is not None and isinstance(response_data, dict): @@ -635,7 +622,6 @@ async def batch_eth_call_on_block_range( abi_dict, function_name, contract_address, - redis_conn, from_block, to_block, params: Union[List, None] = None, @@ -648,7 +634,6 @@ async def batch_eth_call_on_block_range( abi_dict (dict): The ABI dictionary of the contract. function_name (str): The name of the function to call. contract_address (str): The address of the contract. - redis_conn (redis.Redis): The Redis connection object. from_block (int): The starting block number. to_block (int): The ending block number. params (list, optional): The list of parameters to pass to the function. Defaults to None. @@ -683,7 +668,7 @@ async def batch_eth_call_on_block_range( ) request_id += 1 - response_data = await self._make_rpc_jsonrpc_call(rpc_query, redis_conn=redis_conn) + response_data = await self._make_rpc_jsonrpc_call(rpc_query) rpc_response = [] if isinstance(response_data, list): response = response_data @@ -707,7 +692,6 @@ async def batch_eth_call_on_block_range_hex_data( abi_dict, function_name, contract_address, - redis_conn, from_block, to_block, params: Union[List, None] = None, @@ -720,7 +704,6 @@ async def batch_eth_call_on_block_range_hex_data( abi_dict (dict): The ABI dictionary of the contract. function_name (str): The name of the function to call. contract_address (str): The address of the contract. - redis_conn (redis.Redis): The Redis connection object. from_block (int): The starting block number. to_block (int): The ending block number. params (list, optional): The list of parameters to pass to the function. Defaults to None. @@ -755,7 +738,7 @@ async def batch_eth_call_on_block_range_hex_data( ) request_id += 1 - response_data = await self._make_rpc_jsonrpc_call(rpc_query, redis_conn=redis_conn) + response_data = await self._make_rpc_jsonrpc_call(rpc_query) rpc_response = [] # Return the hexbytes data to be decoded outside the function @@ -769,14 +752,13 @@ async def batch_eth_call_on_block_range_hex_data( return rpc_response - async def batch_eth_get_block(self, from_block, to_block, redis_conn): + async def batch_eth_get_block(self, from_block, to_block): """ Batch retrieves Ethereum blocks using eth_getBlockByNumber JSON-RPC method. Args: from_block (int): The block number to start retrieving from. to_block (int): The block number to stop retrieving at. - redis_conn (redis.Redis): Redis connection object. Returns: dict: A dictionary containing the response data from the JSON-RPC call. @@ -798,12 +780,12 @@ async def batch_eth_get_block(self, from_block, to_block, redis_conn): ) request_id += 1 - response_data = await self._make_rpc_jsonrpc_call(rpc_query, redis_conn=redis_conn) + response_data = await self._make_rpc_jsonrpc_call(rpc_query) return response_data @acquire_rpc_semaphore async def get_events_logs( - self, contract_address, to_block, from_block, topics, event_abi, redis_conn=None, + self, contract_address, to_block, from_block, topics, event_abi, ): """ Returns all events logs for a given contract address, within a specified block range and with specified topics. @@ -814,7 +796,6 @@ async def get_events_logs( from_block (int): The lowest block number to retrieve events logs from. topics (List[str]): A list of topics to filter the events logs by. event_abi (Dict): The ABI of the event to decode the logs with. - redis_conn (Redis): The Redis connection object to use for rate limiting. Returns: List[Dict]: A list of dictionaries representing the decoded events logs. @@ -870,17 +851,16 @@ async def f(node_idx): return await f(node_idx=0) - async def eth_get_block(self, redis_conn=None, block_number=None): + async def eth_get_block(self, block_number=None): """ Batch retrieves Ethereum blocks using eth_getBlockByNumber JSON-RPC method. Args: block_number (int): The block number to retrieve. - redis_conn (redis.Redis): Redis connection object. Returns: JSON-RPC response: A response containing the block data from the JSON-RPC call to fetch the respective block. """ if not self._initialized: - await self.init(redis_conn) + await self.init() rpc_query = [] block = hex(block_number) if block_number is not None else 'latest' From 4a41cec1c3d806ec237c2fb3bd94c6a19011fa2d Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 30 Sep 2024 16:19:52 -0400 Subject: [PATCH 07/19] chore: remove redis_conn argument from rpc_helper calls --- snapshotter/processor_distributor.py | 4 ++-- snapshotter/snapshotter_id_ping.py | 2 +- snapshotter/system_event_detector.py | 6 +++--- snapshotter/tests/test_web3_async_provider.py | 4 ++-- snapshotter/utils/generic_worker.py | 4 ++-- .../preloaders/tx_receipts/delegated_worker/tx_receipts.py | 3 +-- snapshotter/utils/snapshot_utils.py | 2 +- 7 files changed, 12 insertions(+), 13 deletions(-) diff --git a/snapshotter/processor_distributor.py b/snapshotter/processor_distributor.py index 13a6a3d..877a92d 100644 --- a/snapshotter/processor_distributor.py +++ b/snapshotter/processor_distributor.py @@ -184,9 +184,9 @@ async def _init_rpc_helper(self): Initializes the RpcHelper instance if it is not already initialized. """ self._rpc_helper = RpcHelper() - await self._rpc_helper.init(redis_conn=self._redis_conn) + await self._rpc_helper.init() self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc) - await self._anchor_rpc_helper.init(redis_conn=self._redis_conn) + await self._anchor_rpc_helper.init() async def _init_rabbitmq_connection(self): """ diff --git a/snapshotter/snapshotter_id_ping.py b/snapshotter/snapshotter_id_ping.py index cb37b9f..a597eee 100644 --- a/snapshotter/snapshotter_id_ping.py +++ b/snapshotter/snapshotter_id_ping.py @@ -23,7 +23,7 @@ async def main(): # Initialize RPC helper for anchor chain anchor_rpc = RpcHelper(settings.anchor_chain_rpc) - await anchor_rpc.init(redis_conn=redis_conn) + await anchor_rpc.init() # Load protocol state ABI protocol_abi = read_json_file(settings.protocol_state.abi) diff --git a/snapshotter/system_event_detector.py b/snapshotter/system_event_detector.py index 7867295..47596b7 100644 --- a/snapshotter/system_event_detector.py +++ b/snapshotter/system_event_detector.py @@ -304,7 +304,7 @@ async def _detect_events(self): """ while True: try: - current_block = await self._anchor_rpc_helper.get_current_block(redis_conn=self._redis_conn) + current_block = await self._anchor_rpc_helper.get_current_block() self._logger.info('Current block: {}', current_block) except Exception as e: @@ -408,8 +408,8 @@ async def _init_rpc(self): """ Initializes the RpcHelper instances for both anchor and source chains. """ - await self._anchor_rpc_helper.init(redis_conn=self._redis_conn) - await self._source_rpc_helper.init(redis_conn=self._redis_conn) + await self._anchor_rpc_helper.init() + await self._source_rpc_helper.init() @rabbitmq_and_redis_cleanup def run(self): diff --git a/snapshotter/tests/test_web3_async_provider.py b/snapshotter/tests/test_web3_async_provider.py index f5c7088..b13068a 100644 --- a/snapshotter/tests/test_web3_async_provider.py +++ b/snapshotter/tests/test_web3_async_provider.py @@ -35,7 +35,7 @@ async def test_web3_async_call(): # Set up the RPC helper with the anchor chain RPC rpc_helper = RpcHelper(settings.anchor_chain_rpc) - await rpc_helper.init(writer_redis_pool) + await rpc_helper.init() # Create a synchronous Web3 client sync_w3_client = Web3(HTTPProvider(settings.anchor_chain_rpc.full_nodes[0].url)) @@ -52,7 +52,7 @@ async def test_web3_async_call(): ] # Execute the Web3 call asynchronously - result = await rpc_helper.web3_call(tasks, redis_conn=writer_redis_pool) + result = await rpc_helper.web3_call(tasks) logger.debug('Retrieve: {}', result) diff --git a/snapshotter/utils/generic_worker.py b/snapshotter/utils/generic_worker.py index ce219a8..337e93e 100644 --- a/snapshotter/utils/generic_worker.py +++ b/snapshotter/utils/generic_worker.py @@ -493,9 +493,9 @@ async def _init_rpc_helper(self): Initializes the RpcHelper objects for the worker and anchor chain, and sets up the protocol state contract. """ self._rpc_helper = RpcHelper(rpc_settings=settings.rpc) - await self._rpc_helper.init(redis_conn=self._redis_conn) + await self._rpc_helper.init() self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc) - await self._anchor_rpc_helper.init(redis_conn=self._redis_conn) + await self._anchor_rpc_helper.init() await self._anchor_rpc_helper._load_async_web3_providers() self._protocol_state_contract = self._anchor_rpc_helper.get_current_node()['web3_client'].eth.contract( address=Web3.to_checksum_address( diff --git a/snapshotter/utils/preloaders/tx_receipts/delegated_worker/tx_receipts.py b/snapshotter/utils/preloaders/tx_receipts/delegated_worker/tx_receipts.py index a23e0d5..410375f 100644 --- a/snapshotter/utils/preloaders/tx_receipts/delegated_worker/tx_receipts.py +++ b/snapshotter/utils/preloaders/tx_receipts/delegated_worker/tx_receipts.py @@ -13,7 +13,7 @@ class TxReceiptProcessor(GenericDelegateProcessor): """ A processor class for handling transaction receipts. - + This class extends GenericDelegateProcessor and provides functionality to process and fetch transaction receipts. """ @@ -63,7 +63,6 @@ async def compute( # Fetch the transaction receipt tx_receipt_obj: web3.datastructures.AttributeDict = await rpc_helper.get_transaction_receipt( tx_hash, - redis_conn, ) # Convert AttributeDict to regular dictionary diff --git a/snapshotter/utils/snapshot_utils.py b/snapshotter/utils/snapshot_utils.py index 3f776b7..cb46f9d 100644 --- a/snapshotter/utils/snapshot_utils.py +++ b/snapshotter/utils/snapshot_utils.py @@ -52,7 +52,7 @@ async def get_block_details_in_block_range( return cached_details # Fetch block details from RPC if not cached - rpc_batch_block_details = await rpc_helper.batch_eth_get_block(from_block, to_block, redis_conn) + rpc_batch_block_details = await rpc_helper.batch_eth_get_block(from_block, to_block) rpc_batch_block_details = rpc_batch_block_details if rpc_batch_block_details else [] From cef275adc18aeeea25fcc788c83864a4bb81150f Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 30 Sep 2024 16:20:40 -0400 Subject: [PATCH 08/19] chore: update poetry config for aiolimiter --- poetry.lock | 15 +++++++++++++-- pyproject.toml | 1 + 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/poetry.lock b/poetry.lock index e6ca471..a2837e7 100755 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aio-pika" @@ -138,6 +138,17 @@ yarl = ">=1.12.0,<2.0" [package.extras] speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"] +[[package]] +name = "aiolimiter" +version = "1.1.0" +description = "asyncio rate limiter, a leaky bucket implementation" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "aiolimiter-1.1.0-py3-none-any.whl", hash = "sha256:0b4997961fc58b8df40279e739f9cf0d3e255e63e9a44f64df567a8c17241e24"}, + {file = "aiolimiter-1.1.0.tar.gz", hash = "sha256:461cf02f82a29347340d031626c92853645c099cb5ff85577b831a7bd21132b5"}, +] + [[package]] name = "aiormq" version = "6.8.1" @@ -3117,4 +3128,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "e3576170e80f60997bbdac4a989546128f00f17410750f1157a45cd23282c78f" +content-hash = "0ee8cfeddf22b2b71b555f067e8d0052c0eb777cdf5b138760204bf3f9481267" diff --git a/pyproject.toml b/pyproject.toml index a69d499..462ab88 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ eip712-structs = "^1.1.0" coincurve = "^19.0.1" grpclib = {extras = ["protobuf"], version = "^0.4.7"} grpcio-tools = "^1.62.1" +aiolimiter = "^1.1.0" [build-system] From d79ef6c4153c0598ce2c069454044f26d4c4f2ad Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 30 Sep 2024 22:07:10 -0400 Subject: [PATCH 09/19] chore: add unit tests for rpc helper module --- snapshotter/tests/rpc_helper_test.py | 215 ++++++++++++++++++ .../tests/static/abi/UUPSUpgradeable.json | 67 ++++++ .../tests/static/bytecode/protocol_state.json | 3 + .../static/bytecode/uups_upgradeable.json | 3 + 4 files changed, 288 insertions(+) create mode 100644 snapshotter/tests/rpc_helper_test.py create mode 100644 snapshotter/tests/static/abi/UUPSUpgradeable.json create mode 100644 snapshotter/tests/static/bytecode/protocol_state.json create mode 100644 snapshotter/tests/static/bytecode/uups_upgradeable.json diff --git a/snapshotter/tests/rpc_helper_test.py b/snapshotter/tests/rpc_helper_test.py new file mode 100644 index 0000000..cfbfcaa --- /dev/null +++ b/snapshotter/tests/rpc_helper_test.py @@ -0,0 +1,215 @@ +import asyncio +import json + +import pytest +from pytest_asyncio import fixture as async_fixture +from web3 import AsyncHTTPProvider +from web3 import AsyncWeb3 +from web3.contract import AsyncContract + +from snapshotter.settings.config import settings +from snapshotter.utils.models.settings_model import RateLimitConfig +from snapshotter.utils.models.settings_model import RPCConfigFull +from snapshotter.utils.models.settings_model import RPCNodeConfig +from snapshotter.utils.rpc import get_contract_abi_dict +from snapshotter.utils.rpc import get_event_sig_and_abi +from snapshotter.utils.rpc import RpcHelper + +# Custom RPC config for testing with Hardhat + +RATE_LIMIT_OVERRIDE = RateLimitConfig( + requests_per_second=1, + requests_per_minute=60, + requests_per_day=86400, +) + +TEST_RPC_CONFIG = RPCConfigFull( + full_nodes=[RPCNodeConfig(url='http://127.0.0.1:8545')], + archive_nodes=[RPCNodeConfig(url='http://127.0.0.1:8545')], + connection_limits=settings.rpc.connection_limits, + rate_limit=settings.rpc.rate_limit, + semaphore_value=settings.rpc.semaphore_value, + retry=settings.rpc.retry, + force_archive_blocks=settings.rpc.force_archive_blocks, + request_time_out=settings.rpc.request_time_out, + skip_epoch_threshold_blocks=settings.rpc.skip_epoch_threshold_blocks, + polling_interval=settings.rpc.polling_interval, +) + +@async_fixture(scope='module') +async def rpc_helper(): + helper = RpcHelper(rpc_settings=TEST_RPC_CONFIG) + await helper.init() + yield helper + +@async_fixture(scope='module') +async def rpc_helper_override(): + override_config = TEST_RPC_CONFIG + override_config.rate_limit = RATE_LIMIT_OVERRIDE + override_helper = RpcHelper(rpc_settings=override_config) + await override_helper.init() + yield override_helper + +@async_fixture(scope='module') +async def web3(): + yield AsyncWeb3(AsyncHTTPProvider('http://127.0.0.1:8545')) + +@async_fixture(scope='module') +async def protocol_contract(web3: AsyncWeb3): + # Load Implementation ABI and Bytecode + with open('snapshotter/static/abis/ProtocolContract.json', 'r') as abi_file: + implementation_abi = json.load(abi_file) + + with open('snapshotter/tests/static/bytecode/protocol_state.json', 'r') as bytecode_file: + implementation_bytecode_json = json.load(bytecode_file) + implementation_bytecode = implementation_bytecode_json['bytecode'] + + # Deploy the Implementation Contract + implementation_contract = web3.eth.contract(abi=implementation_abi, bytecode=implementation_bytecode) + tx_hash_impl = await implementation_contract.constructor().transact() + tx_receipt_impl = await web3.eth.wait_for_transaction_receipt(tx_hash_impl) + implementation_address = tx_receipt_impl['contractAddress'] + + accounts = await web3.eth.accounts + + # Load UUPSUpgradeable ABI and Bytecode + with open('snapshotter/tests/static/abi/UUPSUpgradeable.json', 'r') as proxy_abi_file: + proxy_abi = json.load(proxy_abi_file) + + with open('snapshotter/tests/static/bytecode/uups_upgradeable.json', 'r') as proxy_bytecode_file: + proxy_bytecode_json = json.load(proxy_bytecode_file) + proxy_bytecode = proxy_bytecode_json['bytecode'] + + # Initialize Proxy Constructor Parameters + # The initializer must be encoded using the implementation's initializer function + initializer = implementation_contract.encodeABI(fn_name='initialize', args=[web3.to_checksum_address(accounts[0])]) + + # Deploy the Proxy Contract + proxy_contract = web3.eth.contract(abi=proxy_abi, bytecode=proxy_bytecode) + tx_hash_proxy = await proxy_contract.constructor( + implementation_address, + initializer, + ).transact({'from': accounts[0]}) + tx_receipt_proxy = await web3.eth.wait_for_transaction_receipt(tx_hash_proxy) + proxy_address = tx_receipt_proxy['contractAddress'] + + proxy_instance = web3.eth.contract(address=proxy_address, abi=implementation_abi) + + # verify initialization + owner = await proxy_instance.functions.owner().call() + assert owner == web3.to_checksum_address(accounts[0]), 'Initialization failed.' + + yield proxy_instance + +@pytest.mark.asyncio(loop_scope='module') +async def test_get_current_block_number(rpc_helper, web3): + result = await rpc_helper.get_current_block_number() + assert isinstance(result, int) + assert result == await web3.eth.block_number + +@pytest.mark.asyncio(loop_scope='module') +async def test_get_transaction_receipt(rpc_helper, web3: AsyncWeb3, protocol_contract: AsyncContract): + accounts = await web3.eth.accounts + tx_hash = await protocol_contract.functions.updateSnapshotterState(accounts[0]).transact({'from': accounts[0]}) + await web3.eth.wait_for_transaction_receipt(tx_hash) + + result: dict = await rpc_helper.get_transaction_receipt(tx_hash.hex()) + assert result['transactionHash'] == tx_hash + assert 'blockNumber' in result + assert 'gasUsed' in result + +@pytest.mark.asyncio(loop_scope='module') +async def test_web3_call(rpc_helper, protocol_contract): + result: dict = await rpc_helper.web3_call([('owner', [])], protocol_contract.address, protocol_contract.abi) + assert result[0] == await protocol_contract.functions.owner().call() + +@pytest.mark.asyncio(loop_scope='module') +async def test_batch_eth_get_balance_on_block_range(rpc_helper: RpcHelper, web3: AsyncWeb3): + accounts = await web3.eth.accounts + account = accounts[0] + start_block = await web3.eth.block_number + + for _ in range(3): + tx_hash = await web3.eth.send_transaction({ + 'from': account, + 'to': accounts[1], + 'value': web3.to_wei(1, 'ether'), + }) + await web3.eth.wait_for_transaction_receipt(tx_hash) + + end_block = await web3.eth.block_number + balances = await rpc_helper.batch_eth_get_balance_on_block_range(account, start_block, end_block) + + assert len(balances) == end_block - start_block + 1 + assert all(isinstance(balance, int) for balance in balances) + assert balances[0] > balances[-1] + +@pytest.mark.asyncio(loop_scope='module') +async def test_batch_eth_call_on_block_range(rpc_helper: RpcHelper, web3: AsyncWeb3, protocol_contract: AsyncContract): + accounts = await web3.eth.accounts + start_block = await web3.eth.block_number + + for i in range(1, 4): + tx_hash = await protocol_contract.functions.updateSnapshotterState( + accounts[i], + ).transact({'from': accounts[0]}) + await web3.eth.wait_for_transaction_receipt(tx_hash) + + end_block = await web3.eth.block_number + + abi_dict = get_contract_abi_dict(protocol_contract.abi) + + results: list[tuple[int]] = await rpc_helper.batch_eth_call_on_block_range(abi_dict, 'snapshotterState', protocol_contract.address, start_block, end_block) + + assert len(results) == end_block - start_block + 1 + assert web3.to_checksum_address(results[-1][0]) == web3.to_checksum_address(accounts[3]) + +@pytest.mark.asyncio(loop_scope='module') +async def test_get_events_logs(rpc_helper: RpcHelper, web3: AsyncWeb3, protocol_contract: AsyncContract): + accounts = await web3.eth.accounts + start_block = await web3.eth.block_number + + for i in range(1, 4): + tx_hash = await protocol_contract.functions.transferOwnership( + accounts[i], + ).transact({'from': accounts[i-1]}) + await web3.eth.wait_for_transaction_receipt(tx_hash) + + end_block = await web3.eth.block_number + + EVENT_SIGS = { + 'OwnershipTransferred': 'OwnershipTransferred(address,address)', + } + + EVENT_ABI = { + 'OwnershipTransferred': protocol_contract.events.OwnershipTransferred._get_event_abi(), + } + + event_sig, event_abi = get_event_sig_and_abi(EVENT_SIGS, EVENT_ABI) + + events: list[dict[str, dict]] = await rpc_helper.get_events_logs( + protocol_contract.address, + end_block, + start_block, + event_sig, + event_abi, + ) + + assert len(events) == 3 + for i, event in enumerate(events): + assert event['event'] == 'OwnershipTransferred' + assert event['args']['previousOwner'] == accounts[i] + assert event['args']['newOwner'] == accounts[i+1] + +@pytest.mark.asyncio(loop_scope='module') +async def test_rate_limiting(rpc_helper_override: RpcHelper): + samples = 10 + start_time = asyncio.get_event_loop().time() + tasks = [rpc_helper_override.get_current_block_number() for _ in range(samples)] + await asyncio.gather(*tasks) + end_time = asyncio.get_event_loop().time() + + elapsed_time = end_time - start_time + expected_time = (samples - 1) / RATE_LIMIT_OVERRIDE.requests_per_second + + assert elapsed_time >= expected_time, f'Rate limiting not working as expected. Elapsed time: {elapsed_time}, Expected time: {expected_time}' diff --git a/snapshotter/tests/static/abi/UUPSUpgradeable.json b/snapshotter/tests/static/abi/UUPSUpgradeable.json new file mode 100644 index 0000000..67107cb --- /dev/null +++ b/snapshotter/tests/static/abi/UUPSUpgradeable.json @@ -0,0 +1,67 @@ +[ + { + "inputs": [ + { + "internalType": "address", + "name": "implementation", + "type": "address" + }, + { + "internalType": "bytes", + "name": "_data", + "type": "bytes" + } + ], + "stateMutability": "payable", + "type": "constructor" + }, + { + "inputs": [ + { + "internalType": "address", + "name": "target", + "type": "address" + } + ], + "name": "AddressEmptyCode", + "type": "error" + }, + { + "inputs": [ + { + "internalType": "address", + "name": "implementation", + "type": "address" + } + ], + "name": "ERC1967InvalidImplementation", + "type": "error" + }, + { + "inputs": [], + "name": "ERC1967NonPayable", + "type": "error" + }, + { + "inputs": [], + "name": "FailedInnerCall", + "type": "error" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "address", + "name": "implementation", + "type": "address" + } + ], + "name": "Upgraded", + "type": "event" + }, + { + "stateMutability": "payable", + "type": "fallback" + } +] diff --git a/snapshotter/tests/static/bytecode/protocol_state.json b/snapshotter/tests/static/bytecode/protocol_state.json new file mode 100644 index 0000000..3137192 --- /dev/null +++ b/snapshotter/tests/static/bytecode/protocol_state.json @@ -0,0 +1,3 @@ +{ +"bytecode": "" +} diff --git a/snapshotter/tests/static/bytecode/uups_upgradeable.json b/snapshotter/tests/static/bytecode/uups_upgradeable.json new file mode 100644 index 0000000..15d9be5 --- /dev/null +++ b/snapshotter/tests/static/bytecode/uups_upgradeable.json @@ -0,0 +1,3 @@ +{ + "bytecode": "60806040526040516103f03803806103f08339810160408190526100229161025e565b61002c8282610033565b5050610341565b61003c82610091565b6040516001600160a01b038316907fbc7cd75a20ee27fd9adebab32041f755214dbc6bffa90cc0225b39da2e5c2d3b905f90a280511561008557610080828261010c565b505050565b61008d61017f565b5050565b806001600160a01b03163b5f036100cb57604051634c9c8ce360e01b81526001600160a01b03821660048201526024015b60405180910390fd5b7f360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc80546001600160a01b0319166001600160a01b0392909216919091179055565b60605f80846001600160a01b0316846040516101289190610326565b5f60405180830381855af49150503d805f8114610160576040519150601f19603f3d011682016040523d82523d5f602084013e610165565b606091505b5090925090506101768583836101a0565b95945050505050565b341561019e5760405163b398979f60e01b815260040160405180910390fd5b565b6060826101b5576101b0826101ff565b6101f8565b81511580156101cc57506001600160a01b0384163b155b156101f557604051639996b31560e01b81526001600160a01b03851660048201526024016100c2565b50805b9392505050565b80511561020f5780518082602001fd5b604051630a12f52160e11b815260040160405180910390fd5b634e487b7160e01b5f52604160045260245ffd5b5f5b8381101561025657818101518382015260200161023e565b50505f910152565b5f806040838503121561026f575f80fd5b82516001600160a01b0381168114610285575f80fd5b60208401519092506001600160401b03808211156102a1575f80fd5b818501915085601f8301126102b4575f80fd5b8151818111156102c6576102c6610228565b604051601f8201601f19908116603f011681019083821181831017156102ee576102ee610228565b81604052828152886020848701011115610306575f80fd5b61031783602083016020880161023c565b80955050505050509250929050565b5f825161033781846020870161023c565b9190910192915050565b60a38061034d5f395ff3fe6080604052600a600c565b005b60186014601a565b6050565b565b5f604b7f360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc546001600160a01b031690565b905090565b365f80375f80365f845af43d5f803e8080156069573d5ff35b3d5ffdfea26469706673582212203b4bb5e0fd57d8893d3f24c78de2c21e5b5967bf9075bf3cdc52bc2171b2f4a164736f6c63430008180033" +} From 0d485608a807930609b5dc26e722ce207b5f8861 Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 30 Sep 2024 22:08:10 -0400 Subject: [PATCH 10/19] fix: batch_eth_get_balance_on_block_range in rpc helper --- snapshotter/utils/rpc.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/snapshotter/utils/rpc.py b/snapshotter/utils/rpc.py index 6a021c7..1a8c001 100644 --- a/snapshotter/utils/rpc.py +++ b/snapshotter/utils/rpc.py @@ -203,7 +203,7 @@ async def init(self): # Initialize rate limiter requests_per_second = self._rpc_settings.rate_limit.requests_per_second - self._rate_limiter = AsyncLimiter(requests_per_second) + self._rate_limiter = AsyncLimiter(requests_per_second, 1) if not self._sync_nodes_initialized: self._logger.debug('Sync nodes not initialized, initializing...') @@ -605,17 +605,25 @@ async def batch_eth_get_balance_on_block_range( response_data = await self._make_rpc_jsonrpc_call(rpc_query) rpc_response = [] - if not isinstance(response_data, list) and response_data is not None and isinstance(response_data, dict): - response_data = [response_data] - for response in response_data: - if 'result' in response: - eth_balance = response['result'] - rpc_response.append(eth_balance) - else: - rpc_response.append(None) + if isinstance(response_data, list): + response = response_data + elif response_data is not None and isinstance(response_data, dict): + response = [response_data] + for result in response: + if 'result' in result: + eth_balance = result['result'] + eth_balance = int(eth_balance, 16) + rpc_response.append(eth_balance) return rpc_response except Exception as e: - raise e + exc = RPCException( + request=rpc_query, + response=None, + underlying_exception=e, + extra_info=f'RPC_BATCH_ETH_GET_BALANCE_ON_BLOCK_RANGE_ERROR: {str(e)}', + ) + self._logger.trace('Error in batch_eth_get_balance_on_block_range, error {}', str(exc)) + raise exc async def batch_eth_call_on_block_range( self, From 09a7d1a8a19cc8423b94b01b7477f9fb2e018e1b Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 30 Sep 2024 22:08:37 -0400 Subject: [PATCH 11/19] chore: update gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 5cbd367..8dfceb2 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ config/*settings.json snapshotter-lite-local-collector config computes -logs \ No newline at end of file +logs +.coverage From b230eaafed56bb137a93730d890acf3934a788b2 Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 30 Sep 2024 22:09:54 -0400 Subject: [PATCH 12/19] chore: update dev dependencies for pytest --- poetry.lock | 184 ++++++++++++++++++++++++++++++++++++++++++++++++- pyproject.toml | 4 ++ 2 files changed, 187 insertions(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index a2837e7..0a8bbab 100755 --- a/poetry.lock +++ b/poetry.lock @@ -790,6 +790,93 @@ files = [ {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "coverage" +version = "7.6.1" +description = "Code coverage measurement for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "coverage-7.6.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b06079abebbc0e89e6163b8e8f0e16270124c154dc6e4a47b413dd538859af16"}, + {file = "coverage-7.6.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:cf4b19715bccd7ee27b6b120e7e9dd56037b9c0681dcc1adc9ba9db3d417fa36"}, + {file = "coverage-7.6.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e61c0abb4c85b095a784ef23fdd4aede7a2628478e7baba7c5e3deba61070a02"}, + {file = "coverage-7.6.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fd21f6ae3f08b41004dfb433fa895d858f3f5979e7762d052b12aef444e29afc"}, + {file = "coverage-7.6.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8f59d57baca39b32db42b83b2a7ba6f47ad9c394ec2076b084c3f029b7afca23"}, + {file = "coverage-7.6.1-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:a1ac0ae2b8bd743b88ed0502544847c3053d7171a3cff9228af618a068ed9c34"}, + {file = "coverage-7.6.1-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:e6a08c0be454c3b3beb105c0596ebdc2371fab6bb90c0c0297f4e58fd7e1012c"}, + {file = "coverage-7.6.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:f5796e664fe802da4f57a168c85359a8fbf3eab5e55cd4e4569fbacecc903959"}, + {file = "coverage-7.6.1-cp310-cp310-win32.whl", hash = "sha256:7bb65125fcbef8d989fa1dd0e8a060999497629ca5b0efbca209588a73356232"}, + {file = "coverage-7.6.1-cp310-cp310-win_amd64.whl", hash = "sha256:3115a95daa9bdba70aea750db7b96b37259a81a709223c8448fa97727d546fe0"}, + {file = "coverage-7.6.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:7dea0889685db8550f839fa202744652e87c60015029ce3f60e006f8c4462c93"}, + {file = "coverage-7.6.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ed37bd3c3b063412f7620464a9ac1314d33100329f39799255fb8d3027da50d3"}, + {file = "coverage-7.6.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d85f5e9a5f8b73e2350097c3756ef7e785f55bd71205defa0bfdaf96c31616ff"}, + {file = "coverage-7.6.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9bc572be474cafb617672c43fe989d6e48d3c83af02ce8de73fff1c6bb3c198d"}, + {file = "coverage-7.6.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0c0420b573964c760df9e9e86d1a9a622d0d27f417e1a949a8a66dd7bcee7bc6"}, + {file = "coverage-7.6.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:1f4aa8219db826ce6be7099d559f8ec311549bfc4046f7f9fe9b5cea5c581c56"}, + {file = "coverage-7.6.1-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:fc5a77d0c516700ebad189b587de289a20a78324bc54baee03dd486f0855d234"}, + {file = "coverage-7.6.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:b48f312cca9621272ae49008c7f613337c53fadca647d6384cc129d2996d1133"}, + {file = "coverage-7.6.1-cp311-cp311-win32.whl", hash = "sha256:1125ca0e5fd475cbbba3bb67ae20bd2c23a98fac4e32412883f9bcbaa81c314c"}, + {file = "coverage-7.6.1-cp311-cp311-win_amd64.whl", hash = "sha256:8ae539519c4c040c5ffd0632784e21b2f03fc1340752af711f33e5be83a9d6c6"}, + {file = "coverage-7.6.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:95cae0efeb032af8458fc27d191f85d1717b1d4e49f7cb226cf526ff28179778"}, + {file = "coverage-7.6.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:5621a9175cf9d0b0c84c2ef2b12e9f5f5071357c4d2ea6ca1cf01814f45d2391"}, + {file = "coverage-7.6.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:260933720fdcd75340e7dbe9060655aff3af1f0c5d20f46b57f262ab6c86a5e8"}, + {file = "coverage-7.6.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:07e2ca0ad381b91350c0ed49d52699b625aab2b44b65e1b4e02fa9df0e92ad2d"}, + {file = "coverage-7.6.1-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c44fee9975f04b33331cb8eb272827111efc8930cfd582e0320613263ca849ca"}, + {file = "coverage-7.6.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:877abb17e6339d96bf08e7a622d05095e72b71f8afd8a9fefc82cf30ed944163"}, + {file = "coverage-7.6.1-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:3e0cadcf6733c09154b461f1ca72d5416635e5e4ec4e536192180d34ec160f8a"}, + {file = "coverage-7.6.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:c3c02d12f837d9683e5ab2f3d9844dc57655b92c74e286c262e0fc54213c216d"}, + {file = "coverage-7.6.1-cp312-cp312-win32.whl", hash = "sha256:e05882b70b87a18d937ca6768ff33cc3f72847cbc4de4491c8e73880766718e5"}, + {file = "coverage-7.6.1-cp312-cp312-win_amd64.whl", hash = "sha256:b5d7b556859dd85f3a541db6a4e0167b86e7273e1cdc973e5b175166bb634fdb"}, + {file = "coverage-7.6.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:a4acd025ecc06185ba2b801f2de85546e0b8ac787cf9d3b06e7e2a69f925b106"}, + {file = "coverage-7.6.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:a6d3adcf24b624a7b778533480e32434a39ad8fa30c315208f6d3e5542aeb6e9"}, + {file = "coverage-7.6.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d0c212c49b6c10e6951362f7c6df3329f04c2b1c28499563d4035d964ab8e08c"}, + {file = "coverage-7.6.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6e81d7a3e58882450ec4186ca59a3f20a5d4440f25b1cff6f0902ad890e6748a"}, + {file = "coverage-7.6.1-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:78b260de9790fd81e69401c2dc8b17da47c8038176a79092a89cb2b7d945d060"}, + {file = "coverage-7.6.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a78d169acd38300060b28d600344a803628c3fd585c912cacc9ea8790fe96862"}, + {file = "coverage-7.6.1-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:2c09f4ce52cb99dd7505cd0fc8e0e37c77b87f46bc9c1eb03fe3bc9991085388"}, + {file = "coverage-7.6.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6878ef48d4227aace338d88c48738a4258213cd7b74fd9a3d4d7582bb1d8a155"}, + {file = "coverage-7.6.1-cp313-cp313-win32.whl", hash = "sha256:44df346d5215a8c0e360307d46ffaabe0f5d3502c8a1cefd700b34baf31d411a"}, + {file = "coverage-7.6.1-cp313-cp313-win_amd64.whl", hash = "sha256:8284cf8c0dd272a247bc154eb6c95548722dce90d098c17a883ed36e67cdb129"}, + {file = "coverage-7.6.1-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:d3296782ca4eab572a1a4eca686d8bfb00226300dcefdf43faa25b5242ab8a3e"}, + {file = "coverage-7.6.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:502753043567491d3ff6d08629270127e0c31d4184c4c8d98f92c26f65019962"}, + {file = "coverage-7.6.1-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6a89ecca80709d4076b95f89f308544ec8f7b4727e8a547913a35f16717856cb"}, + {file = "coverage-7.6.1-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a318d68e92e80af8b00fa99609796fdbcdfef3629c77c6283566c6f02c6d6704"}, + {file = "coverage-7.6.1-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13b0a73a0896988f053e4fbb7de6d93388e6dd292b0d87ee51d106f2c11b465b"}, + {file = "coverage-7.6.1-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:4421712dbfc5562150f7554f13dde997a2e932a6b5f352edcce948a815efee6f"}, + {file = "coverage-7.6.1-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:166811d20dfea725e2e4baa71fffd6c968a958577848d2131f39b60043400223"}, + {file = "coverage-7.6.1-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:225667980479a17db1048cb2bf8bfb39b8e5be8f164b8f6628b64f78a72cf9d3"}, + {file = "coverage-7.6.1-cp313-cp313t-win32.whl", hash = "sha256:170d444ab405852903b7d04ea9ae9b98f98ab6d7e63e1115e82620807519797f"}, + {file = "coverage-7.6.1-cp313-cp313t-win_amd64.whl", hash = "sha256:b9f222de8cded79c49bf184bdbc06630d4c58eec9459b939b4a690c82ed05657"}, + {file = "coverage-7.6.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:6db04803b6c7291985a761004e9060b2bca08da6d04f26a7f2294b8623a0c1a0"}, + {file = "coverage-7.6.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:f1adfc8ac319e1a348af294106bc6a8458a0f1633cc62a1446aebc30c5fa186a"}, + {file = "coverage-7.6.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a95324a9de9650a729239daea117df21f4b9868ce32e63f8b650ebe6cef5595b"}, + {file = "coverage-7.6.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b43c03669dc4618ec25270b06ecd3ee4fa94c7f9b3c14bae6571ca00ef98b0d3"}, + {file = "coverage-7.6.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8929543a7192c13d177b770008bc4e8119f2e1f881d563fc6b6305d2d0ebe9de"}, + {file = "coverage-7.6.1-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:a09ece4a69cf399510c8ab25e0950d9cf2b42f7b3cb0374f95d2e2ff594478a6"}, + {file = "coverage-7.6.1-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:9054a0754de38d9dbd01a46621636689124d666bad1936d76c0341f7d71bf569"}, + {file = "coverage-7.6.1-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:0dbde0f4aa9a16fa4d754356a8f2e36296ff4d83994b2c9d8398aa32f222f989"}, + {file = "coverage-7.6.1-cp38-cp38-win32.whl", hash = "sha256:da511e6ad4f7323ee5702e6633085fb76c2f893aaf8ce4c51a0ba4fc07580ea7"}, + {file = "coverage-7.6.1-cp38-cp38-win_amd64.whl", hash = "sha256:3f1156e3e8f2872197af3840d8ad307a9dd18e615dc64d9ee41696f287c57ad8"}, + {file = "coverage-7.6.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:abd5fd0db5f4dc9289408aaf34908072f805ff7792632250dcb36dc591d24255"}, + {file = "coverage-7.6.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:547f45fa1a93154bd82050a7f3cddbc1a7a4dd2a9bf5cb7d06f4ae29fe94eaf8"}, + {file = "coverage-7.6.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:645786266c8f18a931b65bfcefdbf6952dd0dea98feee39bd188607a9d307ed2"}, + {file = "coverage-7.6.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9e0b2df163b8ed01d515807af24f63de04bebcecbd6c3bfeff88385789fdf75a"}, + {file = "coverage-7.6.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:609b06f178fe8e9f89ef676532760ec0b4deea15e9969bf754b37f7c40326dbc"}, + {file = "coverage-7.6.1-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:702855feff378050ae4f741045e19a32d57d19f3e0676d589df0575008ea5004"}, + {file = "coverage-7.6.1-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:2bdb062ea438f22d99cba0d7829c2ef0af1d768d1e4a4f528087224c90b132cb"}, + {file = "coverage-7.6.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:9c56863d44bd1c4fe2abb8a4d6f5371d197f1ac0ebdee542f07f35895fc07f36"}, + {file = "coverage-7.6.1-cp39-cp39-win32.whl", hash = "sha256:6e2cd258d7d927d09493c8df1ce9174ad01b381d4729a9d8d4e38670ca24774c"}, + {file = "coverage-7.6.1-cp39-cp39-win_amd64.whl", hash = "sha256:06a737c882bd26d0d6ee7269b20b12f14a8704807a01056c80bb881a4b2ce6ca"}, + {file = "coverage-7.6.1-pp38.pp39.pp310-none-any.whl", hash = "sha256:e9a6e0eb86070e8ccaedfbd9d38fec54864f3125ab95419970575b42af7541df"}, + {file = "coverage-7.6.1.tar.gz", hash = "sha256:953510dfb7b12ab69d20135a0662397f077c59b1e6379a768e97c59d852ee51d"}, +] + +[package.dependencies] +tomli = {version = "*", optional = true, markers = "python_full_version <= \"3.11.0a6\" and extra == \"toml\""} + +[package.extras] +toml = ["tomli"] + [[package]] name = "cytoolz" version = "0.12.3" @@ -1656,6 +1743,17 @@ enabler = ["pytest-enabler (>=2.2)"] test = ["jaraco.test (>=5.4)", "pytest (>=6,!=8.1.*)", "zipp (>=3.17)"] type = ["pytest-mypy"] +[[package]] +name = "iniconfig" +version = "2.0.0" +description = "brain-dead simple config-ini parsing" +optional = false +python-versions = ">=3.7" +files = [ + {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, + {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, +] + [[package]] name = "jsonschema" version = "4.23.0" @@ -2035,6 +2133,21 @@ files = [ {file = "pkgutil_resolve_name-1.3.10.tar.gz", hash = "sha256:357d6c9e6a755653cfd78893817c0853af365dd51ec97f3d358a819373bbd174"}, ] +[[package]] +name = "pluggy" +version = "1.5.0" +description = "plugin and hook calling mechanisms for python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, +] + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + [[package]] name = "protobuf" version = "5.28.2" @@ -2224,6 +2337,64 @@ files = [ {file = "pysha3-1.0.2.tar.gz", hash = "sha256:fe988e73f2ce6d947220624f04d467faf05f1bbdbc64b0a201296bb3af92739e"}, ] +[[package]] +name = "pytest" +version = "8.3.3" +description = "pytest: simple powerful testing with Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-8.3.3-py3-none-any.whl", hash = "sha256:a6853c7375b2663155079443d2e45de913a911a11d669df02a50814944db57b2"}, + {file = "pytest-8.3.3.tar.gz", hash = "sha256:70b98107bd648308a7952b06e6ca9a50bc660be218d53c257cc1fc94fda10181"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "sys_platform == \"win32\""} +exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=1.5,<2" +tomli = {version = ">=1", markers = "python_version < \"3.11\""} + +[package.extras] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] + +[[package]] +name = "pytest-asyncio" +version = "0.24.0" +description = "Pytest support for asyncio" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest_asyncio-0.24.0-py3-none-any.whl", hash = "sha256:a811296ed596b69bf0b6f3dc40f83bcaf341b155a269052d82efa2b25ac7037b"}, + {file = "pytest_asyncio-0.24.0.tar.gz", hash = "sha256:d081d828e576d85f875399194281e92bf8a68d60d72d1a2faf2feddb6c46b276"}, +] + +[package.dependencies] +pytest = ">=8.2,<9" + +[package.extras] +docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"] +testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] + +[[package]] +name = "pytest-cov" +version = "5.0.0" +description = "Pytest plugin for measuring coverage." +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-cov-5.0.0.tar.gz", hash = "sha256:5837b58e9f6ebd335b0f8060eecce69b662415b16dc503883a02f45dfeb14857"}, + {file = "pytest_cov-5.0.0-py3-none-any.whl", hash = "sha256:4f0764a1219df53214206bf1feea4633c3b558a2925c8b59f144f682861ce652"}, +] + +[package.dependencies] +coverage = {version = ">=5.2.1", extras = ["toml"]} +pytest = ">=4.6" + +[package.extras] +testing = ["fields", "hunter", "process-tests", "pytest-xdist", "virtualenv"] + [[package]] name = "pyunormalize" version = "16.0.0" @@ -2632,6 +2803,17 @@ files = [ {file = "timeago-1.0.16-py3-none-any.whl", hash = "sha256:9b8cb2e3102b329f35a04aa4531982d867b093b19481cfbb1dac7845fa2f79b0"}, ] +[[package]] +name = "tomli" +version = "2.0.1" +description = "A lil' TOML parser" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, + {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, +] + [[package]] name = "toolz" version = "0.12.1" @@ -3128,4 +3310,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "0ee8cfeddf22b2b71b555f067e8d0052c0eb777cdf5b138760204bf3f9481267" +content-hash = "f9ecb99ff2cd7bf75b9f2fc78c451c39d29ec0e29572a3b91a0a47503a937949" diff --git a/pyproject.toml b/pyproject.toml index 462ab88..bdf7130 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,10 @@ grpcio-tools = "^1.62.1" aiolimiter = "^1.1.0" +[tool.poetry.group.dev.dependencies] +pytest-cov = "^5.0.0" +pytest-asyncio = "^0.24.0" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" From f562141271b5469194491cbeb164eeaa72abc494 Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 30 Sep 2024 22:16:17 -0400 Subject: [PATCH 13/19] chore: update setup.cfg for pytest --- setup.cfg | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/setup.cfg b/setup.cfg index e415a3b..ee6e2f2 100755 --- a/setup.cfg +++ b/setup.cfg @@ -51,16 +51,15 @@ known_third_party = fastapi,pydantic,starlette [tool:pytest] -# Django configuration: -# https://pytest-django.readthedocs.io/en/latest/ -DJANGO_SETTINGS_MODULE = server.settings # Timeout for tests, so they can not take longer # than this amount of seconds. # You should adjust this value to be as low as possible. # Configuration: # https://pypi.org/project/pytest-timeout/ -timeout = 5 + +# NOTE: Removing timeout for now for rate limiting tests and pytest-timeout is not a current dependency +# timeout = 5 # Directories that are not visited by pytest collector: norecursedirs = *.egg .eggs dist build docs .tox .git __pycache__ @@ -72,26 +71,20 @@ addopts = --strict-markers --strict-config --doctest-modules - --fail-on-template-vars - --dup-fixtures # Output: --tb=short # Parallelism: # -n auto # --boxed # Coverage: - --cov=server - --cov=tests + --cov=snapshotter --cov-branch --cov-report=term-missing:skip-covered --cov-report=html - --cov-fail-under=100 - - -[coverage:run] -plugins = - django_coverage_plugin + # --cov-fail-under=100 +markers = + asyncio: mark test as an asyncio coroutine [mypy] # Mypy configuration: From de8226a652b4e683b9a8dd54dc82a2e8be523ce8 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 1 Oct 2024 20:13:38 -0400 Subject: [PATCH 14/19] chore: update gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 8dfceb2..988db5f 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ config computes logs .coverage +.pytest_cache +.mypy_cache +htmlcov From 764ae18817fbc5f1c384c45b402350092cd552c2 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 1 Oct 2024 21:11:55 -0400 Subject: [PATCH 15/19] chore: add dev chain snapshot and reversion for testing --- snapshotter/tests/rpc_helper_test.py | 30 +++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/snapshotter/tests/rpc_helper_test.py b/snapshotter/tests/rpc_helper_test.py index cfbfcaa..f07365d 100644 --- a/snapshotter/tests/rpc_helper_test.py +++ b/snapshotter/tests/rpc_helper_test.py @@ -37,13 +37,33 @@ ) @async_fixture(scope='module') -async def rpc_helper(): +async def web3(): + w3 = AsyncWeb3(AsyncHTTPProvider('http://127.0.0.1:8545')) + yield w3 + +@async_fixture(scope='module') +async def snapshot(web3: AsyncWeb3): + # Take a snapshot of the current state + snapshot_id = await web3.provider.make_request('evm_snapshot', []) + print(f'Snapshot created with ID: {snapshot_id}') + + yield snapshot_id['result'] + + # Revert to the snapshot after all tests are done + revert_result = await web3.provider.make_request('evm_revert', [snapshot_id['result']]) + print(f'Snapshot revert result: {revert_result}') + + if not revert_result['result']: + raise Exception('Snapshot revert failed') + +@async_fixture(scope='module') +async def rpc_helper(snapshot): helper = RpcHelper(rpc_settings=TEST_RPC_CONFIG) await helper.init() yield helper @async_fixture(scope='module') -async def rpc_helper_override(): +async def rpc_helper_override(snapshot): override_config = TEST_RPC_CONFIG override_config.rate_limit = RATE_LIMIT_OVERRIDE override_helper = RpcHelper(rpc_settings=override_config) @@ -51,11 +71,7 @@ async def rpc_helper_override(): yield override_helper @async_fixture(scope='module') -async def web3(): - yield AsyncWeb3(AsyncHTTPProvider('http://127.0.0.1:8545')) - -@async_fixture(scope='module') -async def protocol_contract(web3: AsyncWeb3): +async def protocol_contract(web3: AsyncWeb3, snapshot): # Load Implementation ABI and Bytecode with open('snapshotter/static/abis/ProtocolContract.json', 'r') as abi_file: implementation_abi = json.load(abi_file) From da738da672cb7594a6ae4bd4cfb2c9c0711e153b Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 1 Oct 2024 21:12:39 -0400 Subject: [PATCH 16/19] chore: add doc notes for test setup instructions --- snapshotter/tests/rpc_helper_test.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/snapshotter/tests/rpc_helper_test.py b/snapshotter/tests/rpc_helper_test.py index f07365d..0bb7663 100644 --- a/snapshotter/tests/rpc_helper_test.py +++ b/snapshotter/tests/rpc_helper_test.py @@ -15,6 +15,30 @@ from snapshotter.utils.rpc import get_event_sig_and_abi from snapshotter.utils.rpc import RpcHelper +""" +RPC Helper Test Suite + +This test suite is designed to test the RpcHelper class, which provides various methods for interacting with Ethereum nodes via RPC calls. + +Requirements: +- A local development Hardhat node is required to run these tests. + +To run the tests: +1. Ensure you have a local Hardhat node running (typically on http://127.0.0.1:8545). +2. Run the tests using Poetry with the following command: + poetry run python -m pytest snapshotter/tests/rpc_helper_test.py + +What these tests cover: +- Initialization of RpcHelper +- Basic RPC calls (e.g., getting current block number, transaction receipts) +- Contract interactions (function calls) +- Batch RPC calls (e.g., getting balances or calling functions across a range of blocks) +- Event log retrieval +- Rate limiting behavior + +Note: These tests use a custom RPC configuration (TEST_RPC_CONFIG) that points to the local Hardhat node. Ensure your local node matches this configuration. +""" + # Custom RPC config for testing with Hardhat RATE_LIMIT_OVERRIDE = RateLimitConfig( From c10e87b6dc349890ab676552b373529316f992b3 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 1 Oct 2024 22:25:49 -0400 Subject: [PATCH 17/19] chore: change rate limiter to a per node instance --- snapshotter/tests/rpc_helper_test.py | 15 +++++++++------ snapshotter/utils/models/settings_model.py | 14 ++++++-------- snapshotter/utils/rpc.py | 21 ++++++++++++--------- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/snapshotter/tests/rpc_helper_test.py b/snapshotter/tests/rpc_helper_test.py index 0bb7663..b823563 100644 --- a/snapshotter/tests/rpc_helper_test.py +++ b/snapshotter/tests/rpc_helper_test.py @@ -43,15 +43,18 @@ RATE_LIMIT_OVERRIDE = RateLimitConfig( requests_per_second=1, - requests_per_minute=60, - requests_per_day=86400, ) TEST_RPC_CONFIG = RPCConfigFull( - full_nodes=[RPCNodeConfig(url='http://127.0.0.1:8545')], - archive_nodes=[RPCNodeConfig(url='http://127.0.0.1:8545')], + full_nodes=[ + RPCNodeConfig( + url='http://127.0.0.1:8545', + rate_limit=RateLimitConfig( + requests_per_second=10, + ), + ), + ], connection_limits=settings.rpc.connection_limits, - rate_limit=settings.rpc.rate_limit, semaphore_value=settings.rpc.semaphore_value, retry=settings.rpc.retry, force_archive_blocks=settings.rpc.force_archive_blocks, @@ -89,7 +92,7 @@ async def rpc_helper(snapshot): @async_fixture(scope='module') async def rpc_helper_override(snapshot): override_config = TEST_RPC_CONFIG - override_config.rate_limit = RATE_LIMIT_OVERRIDE + override_config.full_nodes[0].rate_limit = RATE_LIMIT_OVERRIDE override_helper = RpcHelper(rpc_settings=override_config) await override_helper.init() yield override_helper diff --git a/snapshotter/utils/models/settings_model.py b/snapshotter/utils/models/settings_model.py index 9201704..0838e31 100644 --- a/snapshotter/utils/models/settings_model.py +++ b/snapshotter/utils/models/settings_model.py @@ -22,9 +22,15 @@ class CoreAPI(BaseModel): public_rate_limit: str +class RateLimitConfig(BaseModel): + """RPC Rate limit configuration model.""" + requests_per_second: int + + class RPCNodeConfig(BaseModel): """RPC node configuration model.""" url: str + rate_limit: RateLimitConfig class ConnectionLimits(BaseModel): @@ -34,13 +40,6 @@ class ConnectionLimits(BaseModel): keepalive_expiry: int = 300 -class RateLimitConfig(BaseModel): - """RPC Rate limit configuration model.""" - requests_per_second: int - requests_per_minute: int - requests_per_day: int - - class RPCConfigBase(BaseModel): """Base RPC configuration model.""" full_nodes: List[RPCNodeConfig] @@ -56,7 +55,6 @@ class RPCConfigFull(RPCConfigBase): skip_epoch_threshold_blocks: int polling_interval: int semaphore_value: int = 20 - rate_limit: RateLimitConfig class RLimit(BaseModel): diff --git a/snapshotter/utils/rpc.py b/snapshotter/utils/rpc.py index 1a8c001..fdb7855 100644 --- a/snapshotter/utils/rpc.py +++ b/snapshotter/utils/rpc.py @@ -152,7 +152,6 @@ def __init__(self, rpc_settings: RPCConfigBase = settings.rpc, archive_mode=Fals self._client = None self._async_transport = None self._semaphore = None - self._rate_limiter = None async def _init_http_clients(self): """ @@ -201,10 +200,6 @@ async def init(self): if not self._initialized: self._semaphore = asyncio.BoundedSemaphore(value=settings.rpc.semaphore_value) - # Initialize rate limiter - requests_per_second = self._rpc_settings.rate_limit.requests_per_second - self._rate_limiter = AsyncLimiter(requests_per_second, 1) - if not self._sync_nodes_initialized: self._logger.debug('Sync nodes not initialized, initializing...') self.sync_init() @@ -251,6 +246,10 @@ def sync_init(self): 'web3_client': Web3(Web3.HTTPProvider(node.url)), 'web3_client_async': None, 'rpc_url': node.url, + 'rate_limiter': AsyncLimiter( + node.rate_limit.requests_per_second, + 1, + ), }, ) except Exception as exc: @@ -302,8 +301,8 @@ def _on_node_exception(self, retry_state: tenacity.RetryCallState): next_node_idx, retry_state.outcome.exception(), ) - async def _rate_limited_call(self, coroutine): - async with self._rate_limiter: + async def _rate_limited_call(self, coroutine, node_idx): + async with self._nodes[node_idx]['rate_limiter']: return await coroutine @acquire_rpc_semaphore @@ -327,7 +326,7 @@ async def f(node_idx): web3_provider = node['web3_client_async'] try: - current_block = await self._rate_limited_call(web3_provider.eth.block_number) + current_block = await self._rate_limited_call(web3_provider.eth.block_number, node_idx) except Exception as e: exc = RPCException( request='get_current_block_number', @@ -369,6 +368,7 @@ async def f(node_idx): try: tx_receipt_details = await self._rate_limited_call( node['web3_client_async'].eth.get_transaction_receipt(tx_hash), + node_idx, ) except Exception as e: exc = RPCException( @@ -411,7 +411,7 @@ async def f(node_idx): web3_provider = node['web3_client_async'] try: - current_block = await self._rate_limited_call(web3_provider.eth.block_number) + current_block = await self._rate_limited_call(web3_provider.eth.block_number, node_idx) except Exception as e: exc = RPCException( request='get_current_block_number', @@ -458,6 +458,7 @@ async def f(node_idx): web3_tasks = [ self._rate_limited_call( contract_obj.functions[task[0]](*task[1]).call(), + node_idx, ) for task in tasks ] response = await asyncio.gather(*web3_tasks) @@ -513,6 +514,7 @@ async def f(node_idx): try: response = await self._rate_limited_call( self._client.post(url=rpc_url, json=rpc_query), + node_idx, ) response_data = response.json() except Exception as e: @@ -833,6 +835,7 @@ async def f(node_idx): try: event_log = await self._rate_limited_call( web3_provider.eth.get_logs(event_log_query), + node_idx, ) codec: ABICodec = web3_provider.codec all_events = [] From b3f4780afd6dda2f8b41e7134db3ccfc9253b571 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 1 Oct 2024 23:21:22 -0400 Subject: [PATCH 18/19] chore: add config option and autofill for rate limits --- env.example | 2 ++ snapshotter_autofill.sh | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/env.example b/env.example index fee8c2a..6ede807 100755 --- a/env.example +++ b/env.example @@ -15,6 +15,8 @@ RELAYER_HOST=https://prost1h-relayer-public.powerloom.io NAMESPACE=UNISWAPV2 POWERLOOM_REPORTING_URL=https://nms-testnet-reporting.powerloom.io PROST_CHAIN_ID=11165 +SOURCE_RPC_RATE_LIMIT=10 +PROST_RPC_RATE_LIMIT=10 IPFS_URL= IPFS_API_KEY= IPFS_API_SECRET= diff --git a/snapshotter_autofill.sh b/snapshotter_autofill.sh index 9c4e9cb..741278b 100755 --- a/snapshotter_autofill.sh +++ b/snapshotter_autofill.sh @@ -170,4 +170,15 @@ else fi +# Set default rate limits +SOURCE_RPC_RATE_LIMIT="${SOURCE_RPC_RATE_LIMIT:-10}" +PROST_RPC_RATE_LIMIT="${PROST_RPC_RATE_LIMIT:-10}" + +echo "Using SOURCE RPC Rate Limit: ${SOURCE_RPC_RATE_LIMIT}" +echo "Using PROST RPC Rate Limit: ${PROST_RPC_RATE_LIMIT}" + +sed -i'.backup' "s/\"source-rpc-rate-limit\"/$SOURCE_RPC_RATE_LIMIT/" config/settings.json + +sed -i'.backup' "s/\"prost-rpc-rate-limit\"/$PROST_RPC_RATE_LIMIT/" config/settings.json + echo 'settings has been populated!' From 842054c617fa94213d3cd0017301fdd9e2a4d180 Mon Sep 17 00:00:00 2001 From: Akshay Dahiya Date: Thu, 3 Oct 2024 14:08:28 +0530 Subject: [PATCH 19/19] chore: using worker count to calculate rate limits, semaphore cleanup --- snapshotter/core_api.py | 2 +- snapshotter/processor_distributor.py | 9 +-- snapshotter/snapshotter_id_ping.py | 2 +- snapshotter/system_event_detector.py | 2 +- snapshotter/tests/rpc_helper_test.py | 1 - snapshotter/tests/test_web3_async_provider.py | 2 +- snapshotter/utils/generic_worker.py | 2 +- snapshotter/utils/helper_functions.py | 28 --------- snapshotter/utils/models/settings_model.py | 1 - snapshotter/utils/rpc.py | 49 ++++------------ snapshotter/utils/utility_functions.py | 57 ------------------- 11 files changed, 18 insertions(+), 137 deletions(-) delete mode 100644 snapshotter/utils/utility_functions.py diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 4833731..3f42493 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -62,7 +62,7 @@ async def startup_boilerplate(): """ app.state.core_settings = settings app.state.local_user_cache = dict() - app.state.anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc) + app.state.anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc, source_node=False) await app.state.anchor_rpc_helper.init() app.state.protocol_state_contract = app.state.anchor_rpc_helper.get_current_node()['web3_client'].eth.contract( address=Web3.to_checksum_address( diff --git a/snapshotter/processor_distributor.py b/snapshotter/processor_distributor.py index 000e459..48305c3 100644 --- a/snapshotter/processor_distributor.py +++ b/snapshotter/processor_distributor.py @@ -195,7 +195,7 @@ async def _init_rpc_helper(self): """ self._rpc_helper = RpcHelper() await self._rpc_helper.init() - self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc) + self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc, source_node=False) await self._anchor_rpc_helper.init() async def _init_rabbitmq_connection(self): @@ -1080,15 +1080,8 @@ def run(self) -> None: for signame in [SIGINT, SIGTERM, SIGQUIT]: signal(signame, self._signal_handler) asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - self._anchor_rpc_helper = RpcHelper( - rpc_settings=settings.anchor_chain_rpc, - ) - - self._slots_per_day = 12 - self._logger.debug('Set slots per day to {}', self._slots_per_day) ev_loop = asyncio.get_event_loop() - ev_loop.run_until_complete(self._anchor_rpc_helper.init()) ev_loop.run_until_complete(self.init_worker()) self._logger.debug('Starting RabbitMQ consumer on queue {} for Processor Distributor', self._consume_queue_name) diff --git a/snapshotter/snapshotter_id_ping.py b/snapshotter/snapshotter_id_ping.py index a597eee..a5e9042 100644 --- a/snapshotter/snapshotter_id_ping.py +++ b/snapshotter/snapshotter_id_ping.py @@ -22,7 +22,7 @@ async def main(): redis_conn = aioredis_pool._aioredis_pool # Initialize RPC helper for anchor chain - anchor_rpc = RpcHelper(settings.anchor_chain_rpc) + anchor_rpc = RpcHelper(settings.anchor_chain_rpc, source_node=False) await anchor_rpc.init() # Load protocol state ABI diff --git a/snapshotter/system_event_detector.py b/snapshotter/system_event_detector.py index 139934a..5075ae6 100644 --- a/snapshotter/system_event_detector.py +++ b/snapshotter/system_event_detector.py @@ -117,7 +117,7 @@ def __init__(self, name, **kwargs): self._last_processed_block = None self._source_rpc_helper = RpcHelper(rpc_settings=settings.rpc) - self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc) + self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc, source_node=False) self.contract_abi = read_json_file( settings.protocol_state.abi, self._logger, diff --git a/snapshotter/tests/rpc_helper_test.py b/snapshotter/tests/rpc_helper_test.py index b823563..c062bda 100644 --- a/snapshotter/tests/rpc_helper_test.py +++ b/snapshotter/tests/rpc_helper_test.py @@ -55,7 +55,6 @@ ), ], connection_limits=settings.rpc.connection_limits, - semaphore_value=settings.rpc.semaphore_value, retry=settings.rpc.retry, force_archive_blocks=settings.rpc.force_archive_blocks, request_time_out=settings.rpc.request_time_out, diff --git a/snapshotter/tests/test_web3_async_provider.py b/snapshotter/tests/test_web3_async_provider.py index 069a1aa..b09cb67 100644 --- a/snapshotter/tests/test_web3_async_provider.py +++ b/snapshotter/tests/test_web3_async_provider.py @@ -31,7 +31,7 @@ async def test_web3_async_call(): writer_redis_pool = aioredis_pool._aioredis_pool # Set up the RPC helper with the anchor chain RPC - rpc_helper = RpcHelper(settings.anchor_chain_rpc) + rpc_helper = RpcHelper(settings.anchor_chain_rpc, source_node=False) await rpc_helper.init() # Create a synchronous Web3 client diff --git a/snapshotter/utils/generic_worker.py b/snapshotter/utils/generic_worker.py index 8a88374..2d30fe7 100644 --- a/snapshotter/utils/generic_worker.py +++ b/snapshotter/utils/generic_worker.py @@ -499,7 +499,7 @@ async def _init_rpc_helper(self): """ self._rpc_helper = RpcHelper(rpc_settings=settings.rpc) await self._rpc_helper.init() - self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc) + self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc, source_node=False) await self._anchor_rpc_helper.init() await self._anchor_rpc_helper._load_async_web3_providers() self._protocol_state_contract = self._anchor_rpc_helper.get_current_node()['web3_client'].eth.contract( diff --git a/snapshotter/utils/helper_functions.py b/snapshotter/utils/helper_functions.py index 5b64984..588fa3e 100644 --- a/snapshotter/utils/helper_functions.py +++ b/snapshotter/utils/helper_functions.py @@ -43,34 +43,6 @@ def wrapper(self, *args, **kwargs): return wrapper -def acquire_threading_semaphore(fn): - """ - A decorator function that acquires a threading semaphore before executing the decorated function and releases it after execution. - - Args: - fn (function): The function to be decorated. - - Returns: - function: The decorated function. - """ - @wraps(fn) - def semaphore_wrapper(*args, **kwargs): - semaphore = kwargs['semaphore'] - - logger.debug('Acquiring threading semaphore') - semaphore.acquire() - try: - resp = fn(*args, **kwargs) - except Exception: - raise - finally: - semaphore.release() - - return resp - - return semaphore_wrapper - - def preloading_entry_exit_logger(fn): """ Decorator function to log entry and exit of preloading worker functions. diff --git a/snapshotter/utils/models/settings_model.py b/snapshotter/utils/models/settings_model.py index cea9d84..453f857 100644 --- a/snapshotter/utils/models/settings_model.py +++ b/snapshotter/utils/models/settings_model.py @@ -54,7 +54,6 @@ class RPCConfigFull(RPCConfigBase): """Full RPC configuration model.""" skip_epoch_threshold_blocks: int polling_interval: int - semaphore_value: int = 20 class RLimit(BaseModel): diff --git a/snapshotter/utils/rpc.py b/snapshotter/utils/rpc.py index 2e9e1ab..03ed0c2 100644 --- a/snapshotter/utils/rpc.py +++ b/snapshotter/utils/rpc.py @@ -107,36 +107,9 @@ def get_event_sig_and_abi(event_signatures, event_abis): return event_sig, event_abi -def acquire_rpc_semaphore(fn): - """ - A decorator function that acquires a bounded semaphore before executing the decorated function and releases it - after the function is executed. This decorator is intended to be used with async functions. - - Args: - fn: The async function to be decorated. - - Returns: - The decorated async function. - """ - @wraps(fn) - async def wrapped(self, *args, **kwargs): - sem: asyncio.BoundedSemaphore = self._semaphore - await sem.acquire() - result = None - try: - result = await fn(self, *args, **kwargs) - return result - except Exception as e: - logger.opt(exception=True).error('Error in asyncio semaphore acquisition decorator: {}', e) - raise e - finally: - sem.release() - return wrapped - - class RpcHelper(object): - def __init__(self, rpc_settings: RPCConfigBase = settings.rpc, archive_mode=False): + def __init__(self, rpc_settings: RPCConfigBase = settings.rpc, archive_mode=False, source_node: bool = False): """ Initializes an instance of the RpcHelper class. @@ -154,7 +127,7 @@ def __init__(self, rpc_settings: RPCConfigBase = settings.rpc, archive_mode=Fals self._logger = logger self._client = None self._async_transport = None - self._semaphore = None + self._source_node = source_node async def _init_http_clients(self): """ @@ -201,7 +174,6 @@ async def init(self): None """ if not self._initialized: - self._semaphore = asyncio.BoundedSemaphore(value=settings.rpc.semaphore_value) if not self._sync_nodes_initialized: self._logger.debug('Sync nodes not initialized, initializing...') @@ -244,13 +216,22 @@ def sync_init(self): raise Exception('No full nor archive nodes found in config') for node in nodes: try: + _total_workers = settings.callback_worker_config.num_aggregation_workers + \ + settings.callback_worker_config.num_snapshot_workers + settings.callback_worker_config.num_delegate_workers + 1 + if self._source_node: + # Adding 1 to account for processor distributor RPC usage + _total_workers += 1 + else: + # Adding 3 to account for processor distributor, event detector and + # core API usage (assuming there isn't crazy load and it is serving reasonable amount of requests) + _total_workers += 3 self._nodes.append( { 'web3_client': Web3(Web3.HTTPProvider(node.url)), 'web3_client_async': None, 'rpc_url': node.url, 'rate_limiter': AsyncLimiter( - node.rate_limit.requests_per_second, + max(1, (node.rate_limit.requests_per_second) // (_total_workers)), 1, ), }, @@ -308,7 +289,6 @@ async def _rate_limited_call(self, coroutine, node_idx): async with self._nodes[node_idx]['rate_limiter']: return await coroutine - @acquire_rpc_semaphore async def get_current_block_number(self): """ Returns: @@ -343,7 +323,6 @@ async def f(node_idx): return current_block return await f(node_idx=0) - @acquire_rpc_semaphore async def get_transaction_receipt(self, tx_hash): """ Retrieves the transaction receipt for a given transaction hash. @@ -388,7 +367,6 @@ async def f(node_idx): return tx_receipt_details return await f(node_idx=0) - @acquire_rpc_semaphore async def get_current_block(self, node_idx=0): """ Returns the current block number of the Ethereum blockchain. @@ -428,7 +406,6 @@ async def f(node_idx): return current_block return await f(node_idx=0) - @acquire_rpc_semaphore async def web3_call(self, tasks, contract_addr, abi): """ Calls the given tasks asynchronously using web3 and returns the response. @@ -489,7 +466,6 @@ async def f(node_idx): raise exc return await f(node_idx=0) - @acquire_rpc_semaphore async def _make_rpc_jsonrpc_call(self, rpc_query): """ Makes an RPC JSON-RPC call to a node in the pool. @@ -796,7 +772,6 @@ async def batch_eth_get_block(self, from_block, to_block): response_data = await self._make_rpc_jsonrpc_call(rpc_query) return response_data - @acquire_rpc_semaphore async def get_events_logs( self, contract_address, to_block, from_block, topics, event_abi, ): diff --git a/snapshotter/utils/utility_functions.py b/snapshotter/utils/utility_functions.py deleted file mode 100644 index 3b7f219..0000000 --- a/snapshotter/utils/utility_functions.py +++ /dev/null @@ -1,57 +0,0 @@ -import asyncio -from functools import wraps - -from snapshotter.utils.default_logger import default_logger - - -def acquire_bounded_semaphore(fn): - """ - A decorator function that acquires a bounded semaphore before executing the decorated function and releases it - after the function is executed. This decorator is intended to be used with async functions. - - Args: - fn (callable): The async function to be decorated. - - Returns: - callable: The decorated async function. - - Raises: - Any exception that may occur during the execution of the decorated function. - """ - @wraps(fn) - async def wrapped(self, *args, **kwargs): - """ - Wrapper function that handles semaphore acquisition and release. - - Args: - self: The instance of the class containing the decorated method. - *args: Variable length argument list for the decorated function. - **kwargs: Arbitrary keyword arguments for the decorated function. - - Returns: - Any: The result of the decorated function. - - Raises: - Exception: Any exception that occurs during the execution of the decorated function. - """ - # Extract the semaphore from the keyword arguments - sem: asyncio.BoundedSemaphore = kwargs['semaphore'] - - # Acquire the semaphore - await sem.acquire() - - result = None - try: - # Execute the decorated function - result = await fn(self, *args, **kwargs) - except Exception as e: - # Log any exceptions that occur during execution - default_logger.opt(exception=True).error('Error in asyncio semaphore acquisition decorator: {}', e) - raise # Re-raise the exception after logging - finally: - # Ensure the semaphore is released, even if an exception occurred - sem.release() - - return result - - return wrapped