Skip to content

Commit

Permalink
feat: reduce default transaction workers 10k -> 1k (#167)
Browse files Browse the repository at this point in the history
* feat: reduce default  transaction workers 10k -> 1k

* chore: `black .`

* Update portfolio.py

* Update address.py (#168)

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
BobTheBuidler and github-actions[bot] authored Dec 21, 2024
1 parent 8829e9f commit a595884
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 16 deletions.
6 changes: 4 additions & 2 deletions eth_portfolio/_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ def get_cache_file_path(args, kwargs):
makedirs(cache_path_for_fn, exist_ok=True)

if inspect.iscoroutinefunction(fn):
read_executor = PruningThreadPoolExecutor(8, f"{_THREAD_NAME_PREFIX}-{fn.__qualname__}-read")

read_executor = PruningThreadPoolExecutor(
8, f"{_THREAD_NAME_PREFIX}-{fn.__qualname__}-read"
)

queue = PriorityQueue()

@log_broken
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def token_transfers_known_at_startup() -> Dict[_TokenTransferPK, bytes]:
tx_index: int
log_index: int
raw: bytes

transfers = {}
for chainid, block, tx_index, log_index, raw in select(
(t.block.chain.id, t.block.number, t.transaction_index, t.log_index, t.raw)
Expand Down
7 changes: 5 additions & 2 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class AddressTransactionsLedger(AddressLedgerBase[TransactionsList, Transaction]
_list_type = TransactionsList
__slots__ = ("cached_thru_nonce", "_queue", "_ready", "_num_workers", "_workers")

def __init__(self, portfolio_address: "PortfolioAddress", num_workers: int = 10_000):
def __init__(self, portfolio_address: "PortfolioAddress", num_workers: int = 1000):
"""
Initializes the AddressTransactionsLedger instance.
Expand Down Expand Up @@ -592,6 +592,7 @@ async def get_transaction_status(txhash: str) -> Status:

_trace_semaphores = defaultdict(lambda: a_sync.Semaphore(16, __name__ + ".trace_semaphore"))


@cache_to_disk
@eth_retry.auto_retry
async def get_traces(filter_params: TraceFilterParams) -> List[FilterTrace]:
Expand All @@ -607,7 +608,9 @@ async def get_traces(filter_params: TraceFilterParams) -> List[FilterTrace]:
The list of traces.
"""
if chain.id == Network.Polygon:
logger.warning("polygon doesnt support trace_filter method, must develop alternate solution")
logger.warning(
"polygon doesnt support trace_filter method, must develop alternate solution"
)
return []
semaphore_key = tuple(
sorted(tuple(filter_params.get(x, ("",)) for x in ("toAddress", "fromAddress")))
Expand Down
5 changes: 2 additions & 3 deletions eth_portfolio/_loaders/token_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
logger = getLogger(__name__)

token_transfer_semaphore = BlockSemaphore(
20_000, # Some arbitrary number
20_000, # Some arbitrary number
name="eth_portfolio.token_transfers",
)
"""A semaphore that regulates the concurrent processing of token transfers by processing lower blocks first."""
Expand Down Expand Up @@ -104,7 +104,7 @@ async def load_token_transfer(
)
else:
coro_results["transaction_index"] = await tx_index_coro

except Exception as e:
logger.error(
f"%s %s for %s %s at block %s:",
Expand Down Expand Up @@ -208,7 +208,6 @@ async def get_transaction_index(hash: str) -> int:
new = TypeError(e, receipt_bytes, decode(receipt_bytes))
logger.exception(new)
raise new from e



class HasTxIndex(Struct):
Expand Down
14 changes: 8 additions & 6 deletions eth_portfolio/_loaders/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ async def get_block_for_nonce(address: Address, nonce: Nonce) -> int:
lowest_known_nonce_greater_than_query = None

# it is impossible for n to == nonce
for less_than, ns in itertools.groupby(filter(lambda n: n != nonce, nonces[address]), lambda n: n < nonce):
for less_than, ns in itertools.groupby(
filter(lambda n: n != nonce, nonces[address]), lambda n: n < nonce
):
if less_than:
max_value = max(ns)
if (
Expand Down Expand Up @@ -172,8 +174,8 @@ async def get_block_for_nonce(address: Address, nonce: Nonce) -> int:
lo += int((hi - lo) / 2) or 1
if debug_logs_enabled:
logger._log(
DEBUG,
"Nonce for %s at %s is %s, checking higher block %s",
DEBUG,
"Nonce for %s at %s is %s, checking higher block %s",
(address, old_lo, _nonce, lo),
)
continue
Expand All @@ -184,15 +186,15 @@ async def get_block_for_nonce(address: Address, nonce: Nonce) -> int:
lo = int(lo / 2)
if debug_logs_enabled:
logger._log(
DEBUG,
"Nonce for %s at %s is %s, checking lower block %s",
DEBUG,
"Nonce for %s at %s is %s, checking lower block %s",
(address, hi, _nonce, lo),
)
continue

if debug_logs_enabled:
logger._log(DEBUG, "Found nonce %s for %s at block %s", (nonce, address, lo))

return lo


Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(
address: Address,
start_block: Block,
load_prices: bool,
num_workers_transactions: int = 10_000,
num_workers_transactions: int = 1000,
asynchronous: bool = False,
) -> None: # type: ignore
"""
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/portfolio.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def __init__(
start_block: int = 0,
label: str = _DEFAULT_LABEL,
load_prices: bool = True,
num_workers_transactions: int = 10_000,
num_workers_transactions: int = 1000,
asynchronous: bool = False,
) -> None:
"""
Expand Down

0 comments on commit a595884

Please sign in to comment.