Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reduce default transaction workers 10k -> 1k #167

Merged
merged 4 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions eth_portfolio/_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ def get_cache_file_path(args, kwargs):
makedirs(cache_path_for_fn, exist_ok=True)

if inspect.iscoroutinefunction(fn):
read_executor = PruningThreadPoolExecutor(8, f"{_THREAD_NAME_PREFIX}-{fn.__qualname__}-read")

read_executor = PruningThreadPoolExecutor(
8, f"{_THREAD_NAME_PREFIX}-{fn.__qualname__}-read"
)

queue = PriorityQueue()

@log_broken
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def token_transfers_known_at_startup() -> Dict[_TokenTransferPK, bytes]:
tx_index: int
log_index: int
raw: bytes

transfers = {}
for chainid, block, tx_index, log_index, raw in select(
(t.block.chain.id, t.block.number, t.transaction_index, t.log_index, t.raw)
Expand Down
7 changes: 5 additions & 2 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class AddressTransactionsLedger(AddressLedgerBase[TransactionsList, Transaction]
_list_type = TransactionsList
__slots__ = ("cached_thru_nonce", "_queue", "_ready", "_num_workers", "_workers")

def __init__(self, portfolio_address: "PortfolioAddress", num_workers: int = 10_000):
def __init__(self, portfolio_address: "PortfolioAddress", num_workers: int = 1000):
"""
Initializes the AddressTransactionsLedger instance.
Expand Down Expand Up @@ -592,6 +592,7 @@ async def get_transaction_status(txhash: str) -> Status:

_trace_semaphores = defaultdict(lambda: a_sync.Semaphore(16, __name__ + ".trace_semaphore"))


@cache_to_disk
@eth_retry.auto_retry
async def get_traces(filter_params: TraceFilterParams) -> List[FilterTrace]:
Expand All @@ -607,7 +608,9 @@ async def get_traces(filter_params: TraceFilterParams) -> List[FilterTrace]:
The list of traces.
"""
if chain.id == Network.Polygon:
logger.warning("polygon doesnt support trace_filter method, must develop alternate solution")
logger.warning(
"polygon doesnt support trace_filter method, must develop alternate solution"
)
return []
semaphore_key = tuple(
sorted(tuple(filter_params.get(x, ("",)) for x in ("toAddress", "fromAddress")))
Expand Down
5 changes: 2 additions & 3 deletions eth_portfolio/_loaders/token_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
logger = getLogger(__name__)

token_transfer_semaphore = BlockSemaphore(
20_000, # Some arbitrary number
20_000, # Some arbitrary number
name="eth_portfolio.token_transfers",
)
"""A semaphore that regulates the concurrent processing of token transfers by processing lower blocks first."""
Expand Down Expand Up @@ -104,7 +104,7 @@ async def load_token_transfer(
)
else:
coro_results["transaction_index"] = await tx_index_coro

except Exception as e:
logger.error(
f"%s %s for %s %s at block %s:",
Expand Down Expand Up @@ -208,7 +208,6 @@ async def get_transaction_index(hash: str) -> int:
new = TypeError(e, receipt_bytes, decode(receipt_bytes))
logger.exception(new)
raise new from e



class HasTxIndex(Struct):
Expand Down
14 changes: 8 additions & 6 deletions eth_portfolio/_loaders/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ async def get_block_for_nonce(address: Address, nonce: Nonce) -> int:
lowest_known_nonce_greater_than_query = None

# it is impossible for n to == nonce
for less_than, ns in itertools.groupby(filter(lambda n: n != nonce, nonces[address]), lambda n: n < nonce):
for less_than, ns in itertools.groupby(
filter(lambda n: n != nonce, nonces[address]), lambda n: n < nonce
):
if less_than:
max_value = max(ns)
if (
Expand Down Expand Up @@ -172,8 +174,8 @@ async def get_block_for_nonce(address: Address, nonce: Nonce) -> int:
lo += int((hi - lo) / 2) or 1
if debug_logs_enabled:
logger._log(
DEBUG,
"Nonce for %s at %s is %s, checking higher block %s",
DEBUG,
"Nonce for %s at %s is %s, checking higher block %s",
(address, old_lo, _nonce, lo),
)
continue
Expand All @@ -184,15 +186,15 @@ async def get_block_for_nonce(address: Address, nonce: Nonce) -> int:
lo = int(lo / 2)
if debug_logs_enabled:
logger._log(
DEBUG,
"Nonce for %s at %s is %s, checking lower block %s",
DEBUG,
"Nonce for %s at %s is %s, checking lower block %s",
(address, hi, _nonce, lo),
)
continue

if debug_logs_enabled:
logger._log(DEBUG, "Found nonce %s for %s at block %s", (nonce, address, lo))

return lo


Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(
address: Address,
start_block: Block,
load_prices: bool,
num_workers_transactions: int = 10_000,
num_workers_transactions: int = 1000,
asynchronous: bool = False,
) -> None: # type: ignore
"""
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/portfolio.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def __init__(
start_block: int = 0,
label: str = _DEFAULT_LABEL,
load_prices: bool = True,
num_workers_transactions: int = 10_000,
num_workers_transactions: int = 1000,
asynchronous: bool = False,
) -> None:
"""
Expand Down
Loading