Skip to content

Commit

Permalink
feat: optimize loading internal transfers at startup (#146)
Browse files Browse the repository at this point in the history
* feat: optimize loading internal transfers at startup

* chore: `black .`

* fix: unblock event loop @ internal transfer task creation

* chore: refactor

* fix: missing import

* chore: `black .`

* chore: refactor

* chore: refactor

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
BobTheBuidler and github-actions[bot] authored Dec 17, 2024
1 parent 099faa9 commit b950f51
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 28 deletions.
3 changes: 2 additions & 1 deletion eth_portfolio/_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from a_sync import a_sync
from brownie import chain
from evmspec.data import _decode_hook
from logging import getLogger
from msgspec import ValidationError, json
from multicall.utils import get_event_loop
from pony.orm import BindingError, OperationalError, commit, db_session, flush, select
Expand All @@ -21,7 +22,7 @@
from eth_portfolio.structs import InternalTransfer, TokenTransfer, Transaction, TransactionRLP
from eth_portfolio.typing import _P, _T, Fn

logger = logging.getLogger(__name__)
logger = getLogger(__name__)


def __bind():
Expand Down
65 changes: 44 additions & 21 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,10 @@ def _ensure_workers(self, num_workers: int) -> None:
put_ready = self._ready.put_nowait

self._workers.extend(
create_task(worker_fn(address, load_prices, queue_get, put_ready))
for _ in range(num_workers - len_workers)
create_task(
coro=worker_fn(address, load_prices, queue_get, put_ready),
name=f"AddressTransactionsLedger worker {i} for {address}")
for i in range(num_workers - len_workers)
)

@staticmethod
Expand Down Expand Up @@ -664,38 +666,59 @@ async def _load_new_objects(
for direction, (start, end) in product(["toAddress", "fromAddress"], block_ranges)
]


# NOTE: We only want tqdm progress bar when there is work to do
if len(block_ranges) == 1:
generator_function = a_sync.as_completed
else:
generator_function = partial( # type: ignore [assignment]
a_sync.as_completed, tqdm=True, desc=f"Trace Filters {self.address}"
)

if tasks := [
create_task(
coro=InternalTransfer.from_trace(trace, self.load_prices),
name="InternalTransfer.from_trace",
)
for traces in generator_function(trace_filter_coros)
for trace in await traces
]:

traces = []
async for chunk in generator_function(trace_filter_coros, aiter=True):
traces.extend(chunk)

if traces:
internal_transfers = []
append_transfer = internal_transfers.append
load = InternalTransfer.from_trace
tqdm_desc = f"Internal Transfers {self.address}"
done = 0

async for internal_transfer in a_sync.as_completed(
tasks, aiter=True, tqdm=True, desc=f"Internal Transfers {self.address}"
):
if internal_transfer:
append_transfer(internal_transfer)
yield internal_transfer

done += 1
if done % 100 == 0:
if self.load_prices:
tasks = []
while traces:
tasks.extend(
create_task(load(trace, load_prices=True)) for trace in traces[:1000]
)
traces = traces[1000:]
# let the tasks start sending calls to your node now
# without waiting for all tasks to be created
await sleep(0)

async for internal_transfer in a_sync.as_completed(
tasks, aiter=True, tqdm=True, desc=tqdm_desc
):
if internal_transfer is not None:
append_transfer(internal_transfer)
yield internal_transfer

done += 1
if done % 100 == 0:
await sleep(0)

else:
pop_next_trace = traces.pop
for _ in tqdm(tuple(range(len(traces))), desc=tqdm_desc):
internal_transfer = await load(pop_next_trace(), load_prices=False)
if internal_transfer is not None:
append_transfer(internal_transfer)
yield internal_transfer

done += 1
if done % 100 == 0:
await sleep(0)

if internal_transfers:
self.objects.extend(internal_transfers)

Expand Down
5 changes: 2 additions & 3 deletions eth_portfolio/_ydb/token_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ async def _extend(self, objs: List[evmspec.Log]) -> None:
# save i/o
array_encodable_log = y._db.log.Log(**log)
task = create_task(
coro=load_token_transfer(array_encodable_log, self._load_prices),
name="load_token_transfer",
load_token_transfer(array_encodable_log, self._load_prices)
)
task.block = log.block # type: ignore [attr-defined]
append_loader_task(task)
Expand Down Expand Up @@ -134,6 +133,6 @@ def yield_thru_block(self, block: int) -> ASyncIterator["Task[TokenTransfer]"]:
return ASyncIterator(
as_yielded(
self.transfers_in.yield_thru_block(block),
self.transfers_out.yield_thru_block(block)
self.transfers_out.yield_thru_block(block),
)
)
4 changes: 1 addition & 3 deletions eth_portfolio/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ async def external_balances(self, block: Optional[Block] = None) -> RemoteTokenB
Examples:
>>> external_balances = await address.external_balances(12345678)
"""
balances = await gather(
self.staking(block, sync=False), self.collateral(block, sync=False)
)
balances = await gather(self.staking(block, sync=False), self.collateral(block, sync=False))
return sum(balances) # type: ignore [arg-type, return-value]

# Assets
Expand Down

0 comments on commit b950f51

Please sign in to comment.