Skip to content

Commit

Permalink
chore: change rate limiter to a per node instance
Browse files Browse the repository at this point in the history
  • Loading branch information
Seth-Schmidt committed Oct 2, 2024
1 parent da738da commit c10e87b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 23 deletions.
15 changes: 9 additions & 6 deletions snapshotter/tests/rpc_helper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@

RATE_LIMIT_OVERRIDE = RateLimitConfig(
requests_per_second=1,
requests_per_minute=60,
requests_per_day=86400,
)

TEST_RPC_CONFIG = RPCConfigFull(
full_nodes=[RPCNodeConfig(url='http://127.0.0.1:8545')],
archive_nodes=[RPCNodeConfig(url='http://127.0.0.1:8545')],
full_nodes=[
RPCNodeConfig(
url='http://127.0.0.1:8545',
rate_limit=RateLimitConfig(
requests_per_second=10,
),
),
],
connection_limits=settings.rpc.connection_limits,
rate_limit=settings.rpc.rate_limit,
semaphore_value=settings.rpc.semaphore_value,
retry=settings.rpc.retry,
force_archive_blocks=settings.rpc.force_archive_blocks,
Expand Down Expand Up @@ -89,7 +92,7 @@ async def rpc_helper(snapshot):
@async_fixture(scope='module')
async def rpc_helper_override(snapshot):
override_config = TEST_RPC_CONFIG
override_config.rate_limit = RATE_LIMIT_OVERRIDE
override_config.full_nodes[0].rate_limit = RATE_LIMIT_OVERRIDE
override_helper = RpcHelper(rpc_settings=override_config)
await override_helper.init()
yield override_helper
Expand Down
14 changes: 6 additions & 8 deletions snapshotter/utils/models/settings_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ class CoreAPI(BaseModel):
public_rate_limit: str


class RateLimitConfig(BaseModel):
"""RPC Rate limit configuration model."""
requests_per_second: int


class RPCNodeConfig(BaseModel):
"""RPC node configuration model."""
url: str
rate_limit: RateLimitConfig


class ConnectionLimits(BaseModel):
Expand All @@ -34,13 +40,6 @@ class ConnectionLimits(BaseModel):
keepalive_expiry: int = 300


class RateLimitConfig(BaseModel):
"""RPC Rate limit configuration model."""
requests_per_second: int
requests_per_minute: int
requests_per_day: int


class RPCConfigBase(BaseModel):
"""Base RPC configuration model."""
full_nodes: List[RPCNodeConfig]
Expand All @@ -56,7 +55,6 @@ class RPCConfigFull(RPCConfigBase):
skip_epoch_threshold_blocks: int
polling_interval: int
semaphore_value: int = 20
rate_limit: RateLimitConfig


class RLimit(BaseModel):
Expand Down
21 changes: 12 additions & 9 deletions snapshotter/utils/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ def __init__(self, rpc_settings: RPCConfigBase = settings.rpc, archive_mode=Fals
self._client = None
self._async_transport = None
self._semaphore = None
self._rate_limiter = None

async def _init_http_clients(self):
"""
Expand Down Expand Up @@ -201,10 +200,6 @@ async def init(self):
if not self._initialized:
self._semaphore = asyncio.BoundedSemaphore(value=settings.rpc.semaphore_value)

# Initialize rate limiter
requests_per_second = self._rpc_settings.rate_limit.requests_per_second
self._rate_limiter = AsyncLimiter(requests_per_second, 1)

if not self._sync_nodes_initialized:
self._logger.debug('Sync nodes not initialized, initializing...')
self.sync_init()
Expand Down Expand Up @@ -251,6 +246,10 @@ def sync_init(self):
'web3_client': Web3(Web3.HTTPProvider(node.url)),
'web3_client_async': None,
'rpc_url': node.url,
'rate_limiter': AsyncLimiter(
node.rate_limit.requests_per_second,
1,
),
},
)
except Exception as exc:
Expand Down Expand Up @@ -302,8 +301,8 @@ def _on_node_exception(self, retry_state: tenacity.RetryCallState):
next_node_idx, retry_state.outcome.exception(),
)

async def _rate_limited_call(self, coroutine):
async with self._rate_limiter:
async def _rate_limited_call(self, coroutine, node_idx):
async with self._nodes[node_idx]['rate_limiter']:
return await coroutine

@acquire_rpc_semaphore
Expand All @@ -327,7 +326,7 @@ async def f(node_idx):
web3_provider = node['web3_client_async']

try:
current_block = await self._rate_limited_call(web3_provider.eth.block_number)
current_block = await self._rate_limited_call(web3_provider.eth.block_number, node_idx)
except Exception as e:
exc = RPCException(
request='get_current_block_number',
Expand Down Expand Up @@ -369,6 +368,7 @@ async def f(node_idx):
try:
tx_receipt_details = await self._rate_limited_call(
node['web3_client_async'].eth.get_transaction_receipt(tx_hash),
node_idx,
)
except Exception as e:
exc = RPCException(
Expand Down Expand Up @@ -411,7 +411,7 @@ async def f(node_idx):
web3_provider = node['web3_client_async']

try:
current_block = await self._rate_limited_call(web3_provider.eth.block_number)
current_block = await self._rate_limited_call(web3_provider.eth.block_number, node_idx)
except Exception as e:
exc = RPCException(
request='get_current_block_number',
Expand Down Expand Up @@ -458,6 +458,7 @@ async def f(node_idx):
web3_tasks = [
self._rate_limited_call(
contract_obj.functions[task[0]](*task[1]).call(),
node_idx,
) for task in tasks
]
response = await asyncio.gather(*web3_tasks)
Expand Down Expand Up @@ -513,6 +514,7 @@ async def f(node_idx):
try:
response = await self._rate_limited_call(
self._client.post(url=rpc_url, json=rpc_query),
node_idx,
)
response_data = response.json()
except Exception as e:
Expand Down Expand Up @@ -833,6 +835,7 @@ async def f(node_idx):
try:
event_log = await self._rate_limited_call(
web3_provider.eth.get_logs(event_log_query),
node_idx,
)
codec: ABICodec = web3_provider.codec
all_events = []
Expand Down

0 comments on commit c10e87b

Please sign in to comment.