diff --git a/eth_portfolio/_db/utils.py b/eth_portfolio/_db/utils.py index 1e018f8e..78c45532 100644 --- a/eth_portfolio/_db/utils.py +++ b/eth_portfolio/_db/utils.py @@ -9,6 +9,7 @@ from a_sync import a_sync from brownie import chain from evmspec.data import _decode_hook +from logging import getLogger from msgspec import ValidationError, json from multicall.utils import get_event_loop from pony.orm import BindingError, OperationalError, commit, db_session, flush, select @@ -21,7 +22,7 @@ from eth_portfolio.structs import InternalTransfer, TokenTransfer, Transaction, TransactionRLP from eth_portfolio.typing import _P, _T, Fn -logger = logging.getLogger(__name__) +logger = getLogger(__name__) def __bind(): diff --git a/eth_portfolio/_ledgers/address.py b/eth_portfolio/_ledgers/address.py index 66c210d1..09226c7b 100644 --- a/eth_portfolio/_ledgers/address.py +++ b/eth_portfolio/_ledgers/address.py @@ -487,8 +487,10 @@ def _ensure_workers(self, num_workers: int) -> None: put_ready = self._ready.put_nowait self._workers.extend( - create_task(worker_fn(address, load_prices, queue_get, put_ready)) - for _ in range(num_workers - len_workers) + create_task( + coro=worker_fn(address, load_prices, queue_get, put_ready), + name=f"AddressTransactionsLedger worker {i} for {address}") + for i in range(num_workers - len_workers) ) @staticmethod @@ -664,38 +666,59 @@ async def _load_new_objects( for direction, (start, end) in product(["toAddress", "fromAddress"], block_ranges) ] - # NOTE: We only want tqdm progress bar when there is work to do if len(block_ranges) == 1: generator_function = a_sync.as_completed else: generator_function = partial( # type: ignore [assignment] a_sync.as_completed, tqdm=True, desc=f"Trace Filters {self.address}" - ) - - if tasks := [ - create_task( - coro=InternalTransfer.from_trace(trace, self.load_prices), - name="InternalTransfer.from_trace", ) - for traces in generator_function(trace_filter_coros) - for trace in await traces - ]: + + traces = [] + async for chunk in generator_function(trace_filter_coros, aiter=True): + traces.extend(chunk) + + if traces: internal_transfers = [] append_transfer = internal_transfers.append + load = InternalTransfer.from_trace + tqdm_desc = f"Internal Transfers {self.address}" done = 0 - async for internal_transfer in a_sync.as_completed( - tasks, aiter=True, tqdm=True, desc=f"Internal Transfers {self.address}" - ): - if internal_transfer: - append_transfer(internal_transfer) - yield internal_transfer - - done += 1 - if done % 100 == 0: + if self.load_prices: + tasks = [] + while traces: + tasks.extend( + create_task(load(trace, load_prices=True)) for trace in traces[:1000] + ) + traces = traces[1000:] + # let the tasks start sending calls to your node now + # without waiting for all tasks to be created await sleep(0) + async for internal_transfer in a_sync.as_completed( + tasks, aiter=True, tqdm=True, desc=tqdm_desc + ): + if internal_transfer is not None: + append_transfer(internal_transfer) + yield internal_transfer + + done += 1 + if done % 100 == 0: + await sleep(0) + + else: + pop_next_trace = traces.pop + for _ in tqdm(tuple(range(len(traces))), desc=tqdm_desc): + internal_transfer = await load(pop_next_trace(), load_prices=False) + if internal_transfer is not None: + append_transfer(internal_transfer) + yield internal_transfer + + done += 1 + if done % 100 == 0: + await sleep(0) + if internal_transfers: self.objects.extend(internal_transfers) diff --git a/eth_portfolio/_ydb/token_transfers.py b/eth_portfolio/_ydb/token_transfers.py index 553bf662..d80716ca 100644 --- a/eth_portfolio/_ydb/token_transfers.py +++ b/eth_portfolio/_ydb/token_transfers.py @@ -77,8 +77,7 @@ async def _extend(self, objs: List[evmspec.Log]) -> None: # save i/o array_encodable_log = y._db.log.Log(**log) task = create_task( - coro=load_token_transfer(array_encodable_log, self._load_prices), - name="load_token_transfer", + load_token_transfer(array_encodable_log, self._load_prices) ) task.block = log.block # type: ignore [attr-defined] append_loader_task(task) @@ -134,6 +133,6 @@ def yield_thru_block(self, block: int) -> ASyncIterator["Task[TokenTransfer]"]: return ASyncIterator( as_yielded( self.transfers_in.yield_thru_block(block), - self.transfers_out.yield_thru_block(block) + self.transfers_out.yield_thru_block(block), ) ) diff --git a/eth_portfolio/address.py b/eth_portfolio/address.py index 9367fdc9..59785e19 100644 --- a/eth_portfolio/address.py +++ b/eth_portfolio/address.py @@ -225,9 +225,7 @@ async def external_balances(self, block: Optional[Block] = None) -> RemoteTokenB Examples: >>> external_balances = await address.external_balances(12345678) """ - balances = await gather( - self.staking(block, sync=False), self.collateral(block, sync=False) - ) + balances = await gather(self.staking(block, sync=False), self.collateral(block, sync=False)) return sum(balances) # type: ignore [arg-type, return-value] # Assets