Skip to content

Commit

Permalink
feat: streamline db reads at startup (#169)
Browse files Browse the repository at this point in the history
* feat: push less work to threads, create less futs, bring more work to main thread

* feat: pop objects from db lru_cache to save ram

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
BobTheBuidler and github-actions[bot] authored Dec 21, 2024
1 parent 8d4b745 commit f1f02f8
Showing 1 changed file with 34 additions and 22 deletions.
56 changes: 34 additions & 22 deletions eth_portfolio/_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,19 @@ def ensure_token(token_address: ChecksumAddress) -> None:
get_token(token_address, sync=True)


async def get_transaction(sender: ChecksumAddress, nonce: int) -> Optional[Transaction]:
startup_txs = await transactions_known_at_startup(chain.id, sender)
data = startup_txs.pop(nonce, None) or await __get_transaction_bytes_from_db(sender, nonce)
if data:
return decode_transaction(data)


@a_sync(default="async", executor=_transaction_read_executor)
@robust_db_session
def get_transaction(sender: ChecksumAddress, nonce: int) -> Optional[Transaction]:
transactions = transactions_known_at_startup(chain.id, sender)
if nonce in transactions:
return decode_transaction(transactions.pop(nonce))
def __get_transaction_bytes_from_db(sender: ChecksumAddress, nonce: int) -> Optional[bytes]:
entity: entities.Transaction
if entity := entities.Transaction.get(from_address=(chain.id, sender), nonce=nonce):
return decode_transaction(entity.raw)
return entity.raw


def decode_transaction(data: bytes) -> Union[Transaction, TransactionRLP]:
Expand Down Expand Up @@ -431,44 +435,52 @@ def _insert_internal_transfer(transfer: InternalTransfer) -> None:
)


@a_sync(default="async", executor=_token_transfer_read_executor)
@robust_db_session
def get_token_transfer(transfer: evmspec.Log) -> Optional[TokenTransfer]:
async def get_token_transfer(transfer: evmspec.Log) -> Optional[TokenTransfer]:
pk = {
"block": (chain.id, transfer.blockNumber),
"transaction_index": transfer.transactionIndex,
"log_index": transfer.logIndex,
}
if obj := token_transfers_known_at_startup().get(tuple(pk.values())):
with reraise_excs_with_extra_context(obj):
return json.decode(obj, type=TokenTransfer, dec_hook=_decode_hook)
startup_xfers = await token_transfers_known_at_startup()
data = startup_xfers.pop(tuple(pk.values()), None) or await __get_token_transfer_bytes_from_db(
pk
)
if data:
with reraise_excs_with_extra_context(data):
return json.decode(data, type=TokenTransfer, dec_hook=_decode_hook)


@a_sync(default="async", executor=_token_transfer_read_executor)
@robust_db_session
def __get_token_transfer_bytes_from_db(pk: dict) -> Optional[bytes]:
entity: entities.TokenTransfer
if entity := entities.TokenTransfer.get(**pk):
with reraise_excs_with_extra_context(entity):
return json.decode(entity.raw, type=TokenTransfer, dec_hook=_decode_hook)
return entity.raw


_TPK = Tuple[Tuple[int, ChecksumAddress], int]

_transactions_startup_lock = threading.Lock()


@a_sync(default="async", executor=_transaction_read_executor, ram_cache_maxsize=None)
@lru_cache(maxsize=None)
def transactions_known_at_startup(chainid: int, from_address: ChecksumAddress) -> Dict[_TPK, bytes]:
with _transactions_startup_lock:
transfers = {}
obj: Tuple[int, ChecksumAddress, int, bytes]
for nonce, raw in select(
(t.nonce, t.raw)
for t in entities.Transaction # type: ignore [attr-defined]
if t.from_address.chain.id == chainid and t.from_address.address == from_address
):
transfers[nonce] = raw
return transfers
return dict(
select(
(t.nonce, t.raw)
for t in entities.Transaction # type: ignore [attr-defined]
if t.from_address.chain.id == chainid and t.from_address.address == from_address
)
)


_TokenTransferPK = Tuple[Tuple[int, int], int, int]
_token_transfers_startup_lock = threading.Lock()


@a_sync(default="async", executor=_transaction_read_executor, ram_cache_maxsize=None)
@lru_cache(maxsize=None)
def token_transfers_known_at_startup() -> Dict[_TokenTransferPK, bytes]:
chainid: int
Expand Down

0 comments on commit f1f02f8

Please sign in to comment.