Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: rpc helper improvements + core-api update #13

Merged
merged 20 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
70fcefd
fix: collector cloning in build.sh
Seth-Schmidt Sep 27, 2024
c1b1c3c
chore: update last finalized core api endpoint to not loop
Seth-Schmidt Sep 27, 2024
72f5636
chore: pre-commit formatting
Seth-Schmidt Sep 27, 2024
080b2f6
chore: update all core-api endpoints for contract/data util changes
Seth-Schmidt Sep 27, 2024
c5c9de8
chore: initial implementation for chain rpc rate limiting
Seth-Schmidt Sep 30, 2024
781433f
chore: remove deprecated redis_conn argument from rpc functions
Seth-Schmidt Sep 30, 2024
4a41cec
chore: remove redis_conn argument from rpc_helper calls
Seth-Schmidt Sep 30, 2024
cef275a
chore: update poetry config for aiolimiter
Seth-Schmidt Sep 30, 2024
d79ef6c
chore: add unit tests for rpc helper module
Seth-Schmidt Oct 1, 2024
0d48560
fix: batch_eth_get_balance_on_block_range in rpc helper
Seth-Schmidt Oct 1, 2024
09a7d1a
chore: update gitignore
Seth-Schmidt Oct 1, 2024
b230eaa
chore: update dev dependencies for pytest
Seth-Schmidt Oct 1, 2024
f562141
chore: update setup.cfg for pytest
Seth-Schmidt Oct 1, 2024
de8226a
chore: update gitignore
Seth-Schmidt Oct 2, 2024
764ae18
chore: add dev chain snapshot and reversion for testing
Seth-Schmidt Oct 2, 2024
da738da
chore: add doc notes for test setup instructions
Seth-Schmidt Oct 2, 2024
c10e87b
chore: change rate limiter to a per node instance
Seth-Schmidt Oct 2, 2024
b3f4780
chore: add config option and autofill for rate limits
Seth-Schmidt Oct 2, 2024
2119efa
Merge remote-tracking branch 'origin/main' into feat/rpc_helper_impro…
Seth-Schmidt Oct 2, 2024
842054c
chore: using worker count to calculate rate limits, semaphore cleanup
xadahiya Oct 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ ipfs-data
ipfs-export
ipfs_data
ipfs_export

.coverage
.pytest_cache
.mypy_cache
htmlcov
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ ARG1=${1:-yes_collector}

if [ "$DEVMODE" = "true" ]; then
echo "Building local collector..."
rm -rf snapshotter-lite-local-collector
git clone https://github.com/powerloom/snapshotter-lite-local-collector.git --single-branch --branch main
(cd ./snapshotter-lite-local-collector/ && chmod +x build-docker.sh && ./build-docker.sh)

Expand Down
2 changes: 2 additions & 0 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ RELAYER_HOST=https://prost1h-relayer-public.powerloom.io
NAMESPACE=UNISWAPV2
POWERLOOM_REPORTING_URL=https://nms-testnet-reporting.powerloom.io
PROST_CHAIN_ID=11165
SOURCE_RPC_RATE_LIMIT=10
PROST_RPC_RATE_LIMIT=10
IPFS_URL=
IPFS_API_KEY=
IPFS_API_SECRET=
Expand Down
197 changes: 195 additions & 2 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ eip712-structs = "^1.1.0"
coincurve = "^19.0.1"
grpclib = {extras = ["protobuf"], version = "^0.4.7"}
grpcio-tools = "^1.62.1"
aiolimiter = "^1.1.0"


[tool.poetry.group.dev.dependencies]
pytest-cov = "^5.0.0"
pytest-asyncio = "^0.24.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
21 changes: 7 additions & 14 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,15 @@ known_third_party = fastapi,pydantic,starlette


[tool:pytest]
# Django configuration:
# https://pytest-django.readthedocs.io/en/latest/
DJANGO_SETTINGS_MODULE = server.settings

# Timeout for tests, so they can not take longer
# than this amount of seconds.
# You should adjust this value to be as low as possible.
# Configuration:
# https://pypi.org/project/pytest-timeout/
timeout = 5

# NOTE: Removing timeout for now for rate limiting tests and pytest-timeout is not a current dependency
# timeout = 5

# Directories that are not visited by pytest collector:
norecursedirs = *.egg .eggs dist build docs .tox .git __pycache__
Expand All @@ -72,26 +71,20 @@ addopts =
--strict-markers
--strict-config
--doctest-modules
--fail-on-template-vars
--dup-fixtures
# Output:
--tb=short
# Parallelism:
# -n auto
# --boxed
# Coverage:
--cov=server
--cov=tests
--cov=snapshotter
--cov-branch
--cov-report=term-missing:skip-covered
--cov-report=html
--cov-fail-under=100


[coverage:run]
plugins =
django_coverage_plugin
# --cov-fail-under=100

markers =
asyncio: mark test as an asyncio coroutine

[mypy]
# Mypy configuration:
Expand Down
73 changes: 27 additions & 46 deletions snapshotter/core_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from snapshotter.utils.default_logger import default_logger
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.models.data_models import TaskStatusRequest
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.rpc import RpcHelper


Expand Down Expand Up @@ -78,6 +79,9 @@ async def startup_boilerplate():
await app.state.ipfs_singleton.init_sessions()
app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client
app.state.epoch_size = 0
app.state._aioredis_pool = RedisPoolCache()
await app.state._aioredis_pool.populate()
app.state.redis_conn = app.state._aioredis_pool._aioredis_pool


@app.get('/health')
Expand Down Expand Up @@ -115,11 +119,11 @@ async def get_current_epoch(
"""
try:
[current_epoch_data] = await request.app.state.anchor_rpc_helper.web3_call(
[
request.app.state.protocol_state_contract.functions.currentEpoch(
Web3.to_checksum_address(settings.data_market),
),
tasks=[
('currentEpoch', [Web3.to_checksum_address(settings.data_market)]),
],
contract_addr=protocol_state_contract_address,
abi=protocol_state_contract_abi,
)
current_epoch = {
'begin': current_epoch_data[0],
Expand Down Expand Up @@ -160,11 +164,11 @@ async def get_epoch_info(
"""
try:
[epoch_info_data] = await request.app.state.anchor_rpc_helper.web3_call(
[
request.app.state.protocol_state_contract.functions.epochInfo(
Web3.to_checksum_address(settings.data_market), epoch_id,
),
tasks=[
('epochInfo', [Web3.to_checksum_address(settings.data_market), epoch_id]),
],
contract_addr=protocol_state_contract_address,
abi=protocol_state_contract_abi,
)
epoch_info = {
'timestamp': epoch_info_data[0],
Expand Down Expand Up @@ -206,45 +210,21 @@ async def get_project_last_finalized_epoch_info(

try:
# Find the last finalized epoch from the contract
epoch_finalized = False
[cur_epoch] = await request.app.state.anchor_rpc_helper.web3_call(
[
request.app.state.protocol_state_contract.functions.currentEpoch(
Web3.to_checksum_address(settings.data_market),
),
[project_last_finalized_epoch] = await request.app.state.anchor_rpc_helper.web3_call(
tasks=[
('lastFinalizedSnapshot', [Web3.to_checksum_address(settings.data_market), project_id]),
],
contract_addr=protocol_state_contract_address,
abi=protocol_state_contract_abi,
)
epoch_id = int(cur_epoch[2])

# Iterate backwards through epochs until a finalized one is found
while not epoch_finalized and epoch_id >= 0:
# Get finalization status
[epoch_finalized_contract] = await request.app.state.anchor_rpc_helper.web3_call(
[
request.app.state.protocol_state_contract.functions.snapshotStatus(
settings.data_market, project_id, epoch_id,
),
],
)
if epoch_finalized_contract[0]:
epoch_finalized = True
project_last_finalized_epoch = epoch_id
else:
epoch_id -= 1
if epoch_id < 0:
response.status_code = 404
return {
'status': 'error',
'message': f'Unable to find last finalized epoch for project {project_id}',
}

# Get epoch info for the last finalized epoch
[epoch_info_data] = await request.app.state.anchor_rpc_helper.web3_call(
[
request.app.state.protocol_state_contract.functions.epochInfo(
Web3.to_checksum_address(settings.data_market), project_last_finalized_epoch,
),
tasks=[
('epochInfo', [Web3.to_checksum_address(settings.data_market), project_last_finalized_epoch]),
],
contract_addr=protocol_state_contract_address,
abi=protocol_state_contract_abi,
)
epoch_info = {
'epochId': project_last_finalized_epoch,
Expand Down Expand Up @@ -294,6 +274,7 @@ async def get_data_for_project_id_epoch_id(
}
try:
data = await get_project_epoch_snapshot(
request.app.state.redis_conn,
request.app.state.protocol_state_contract,
request.app.state.anchor_rpc_helper,
request.app.state.ipfs_reader_client,
Expand Down Expand Up @@ -344,8 +325,8 @@ async def get_finalized_cid_for_project_id_epoch_id(

try:
data = await get_project_finalized_cid(
request.app.state.redis_conn,
request.app.state.protocol_state_contract,
settings.data_market,
request.app.state.anchor_rpc_helper,
epoch_id,
project_id,
Expand Down Expand Up @@ -406,11 +387,11 @@ async def get_task_status_post(
try:
# Get the last finalized epoch for the project
[last_finalized_epoch] = await request.app.state.anchor_rpc_helper.web3_call(
[
request.app.state.protocol_state_contract.functions.lastFinalizedSnapshot(
Web3.to_checksum_address(settings.data_market), project_id,
),
tasks=[
('lastFinalizedSnapshot', [Web3.to_checksum_address(settings.data_market), project_id]),
],
contract_addr=protocol_state_contract_address,
abi=protocol_state_contract_abi,
)

except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions snapshotter/processor_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ async def _init_rpc_helper(self):
Initializes the RpcHelper instance if it is not already initialized.
"""
self._rpc_helper = RpcHelper()
await self._rpc_helper.init(redis_conn=self._redis_conn)
await self._rpc_helper.init()
self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc)
await self._anchor_rpc_helper.init(redis_conn=self._redis_conn)
await self._anchor_rpc_helper.init()

async def _init_rabbitmq_connection(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion snapshotter/snapshotter_id_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def main():

# Initialize RPC helper for anchor chain
anchor_rpc = RpcHelper(settings.anchor_chain_rpc)
await anchor_rpc.init(redis_conn=redis_conn)
await anchor_rpc.init()

# Load protocol state ABI
protocol_abi = read_json_file(settings.protocol_state.abi)
Expand Down
6 changes: 3 additions & 3 deletions snapshotter/system_event_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ async def _detect_events(self):
"""
while True:
try:
current_block = await self._anchor_rpc_helper.get_current_block(redis_conn=self._redis_conn)
current_block = await self._anchor_rpc_helper.get_current_block()
self._logger.info('Current block: {}', current_block)

except Exception as e:
Expand Down Expand Up @@ -408,8 +408,8 @@ async def _init_rpc(self):
"""
Initializes the RpcHelper instances for both anchor and source chains.
"""
await self._anchor_rpc_helper.init(redis_conn=self._redis_conn)
await self._source_rpc_helper.init(redis_conn=self._redis_conn)
await self._anchor_rpc_helper.init()
await self._source_rpc_helper.init()

@rabbitmq_and_redis_cleanup
def run(self):
Expand Down
Loading
Loading