Skip to content

Commit

Permalink
chore: refactor (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Dec 18, 2024
1 parent b950f51 commit 6d60d28
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,15 +442,23 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T
if self.cached_thru and end_block < self.cached_thru:
return
end_block_nonce: int = await get_nonce_at_block(self.address, end_block)
if nonces := list(range(self.cached_thru_nonce + 1, end_block_nonce + 1)):
for nonce in nonces:
if nonces := tuple(range(self.cached_thru_nonce + 1, end_block_nonce + 1)):
for i, nonce in enumerate(nonces):
self._queue.put_nowait(nonce)

self._ensure_workers(min(len(nonces), self._num_workers))
# Keep the event loop relatively unblocked
# and let the rpc start doing work asap
if i % 1000:
await sleep(0)

len_nonces = len(nonces)
del nonces

self._ensure_workers(min(len_nonces, self._num_workers))

transactions = []
transaction: Optional[Transaction]
for _ in tqdm(nonces, desc=f"Transactions {self.address}"):
for _ in tqdm(range(len_nonces), desc=f"Transactions {self.address}"):
nonce, transaction = await self._ready.get()
if transaction:
if isinstance(transaction, Exception):
Expand Down Expand Up @@ -683,7 +691,6 @@ async def _load_new_objects(
append_transfer = internal_transfers.append
load = InternalTransfer.from_trace
tqdm_desc = f"Internal Transfers {self.address}"
done = 0

if self.load_prices:
tasks = []
Expand All @@ -696,6 +703,7 @@ async def _load_new_objects(
# without waiting for all tasks to be created
await sleep(0)

done = 0
async for internal_transfer in a_sync.as_completed(
tasks, aiter=True, tqdm=True, desc=tqdm_desc
):
Expand All @@ -709,14 +717,13 @@ async def _load_new_objects(

else:
pop_next_trace = traces.pop
for _ in tqdm(tuple(range(len(traces))), desc=tqdm_desc):
for i in tqdm(tuple(range(len(traces))), desc=tqdm_desc):
internal_transfer = await load(pop_next_trace(), load_prices=False)
if internal_transfer is not None:
append_transfer(internal_transfer)
yield internal_transfer

done += 1
if done % 100 == 0:
if i % 100 == 0:
await sleep(0)

if internal_transfers:
Expand Down

0 comments on commit 6d60d28

Please sign in to comment.