diff --git a/README.md b/README.md index 5d00147..63954b1 100644 --- a/README.md +++ b/README.md @@ -58,9 +58,10 @@ print( ```python from tonsdk_ng.contract.token.ft import JettonWallet from tonsdk_ng.contract.token.nft import NFTItem -from tonsdk_ng.utils import Address, to_nano +from tonsdk_ng.types import Address +from tonsdk_ng.utils import to_nano -body = NFTItem().create_transfer_body(Address("New Owner Address")) +body = NFTItem().create_transfer_body(Address.from_string("New Owner Address")) query = wallet.create_transfer_message( "NFT Item Address", to_nano(0.05, "ton"), @@ -70,7 +71,7 @@ query = wallet.create_transfer_message( nft_boc = bytes_to_b64str(query["message"].to_boc(False)) body = JettonWallet().create_transfer_body( - Address("Destination address"), to_nano(40000, "ton") # jettons amount + Address.from_string("Destination address"), to_nano(40000, "ton") # jettons amount ) query = wallet.create_transfer_message( "Jetton Wallet Address", @@ -100,7 +101,7 @@ from abc import ABC, abstractmethod import aiohttp from tvm_valuetypes import serialize_tvm_stack -from tonsdk_ng.boc import Cell +from tonsdk_ng.types import Cell from tonsdk_ng.provider import ( SyncTonlibClient, ToncenterClient, diff --git a/examples/tokens/jetton/transfer.py b/examples/tokens/jetton/transfer.py index 39d0140..0c223b7 100644 --- a/examples/tokens/jetton/transfer.py +++ b/examples/tokens/jetton/transfer.py @@ -1,6 +1,7 @@ from tonsdk_ng.contract.token.ft import JettonWallet from tonsdk_ng.contract.wallet import Wallets, WalletVersionEnum -from tonsdk_ng.utils import Address, bytes_to_b64str, to_nano +from tonsdk_ng.types import Address +from tonsdk_ng.utils import bytes_to_b64str, to_nano """your wallet mnemonics""" mnemonics = [ @@ -29,14 +30,14 @@ "piano", "language", ] -mnemonics, pub_k, priv_k, wallet = Wallets.from_mnemonics( +wallet = Wallets.from_mnemonics( mnemonics=mnemonics, version=WalletVersionEnum.v3r2, workchain=0 ) """transfer""" body = JettonWallet().create_transfer_body( - to_address=Address("address"), + to_address=Address.from_string("address"), jetton_amount=to_nano(float("jettons amount"), "ton"), ) diff --git a/examples/tokens/nft/transfer.py b/examples/tokens/nft/transfer.py index 0747a4b..55e9eb3 100644 --- a/examples/tokens/nft/transfer.py +++ b/examples/tokens/nft/transfer.py @@ -1,6 +1,7 @@ from tonsdk_ng.contract.token.nft import NFTItem from tonsdk_ng.contract.wallet import Wallets, WalletVersionEnum -from tonsdk_ng.utils import Address, bytes_to_b64str, to_nano +from tonsdk_ng.types import Address +from tonsdk_ng.utils import bytes_to_b64str, to_nano """your wallet mnemonics""" mnemonics = [ @@ -29,14 +30,14 @@ "piano", "language", ] -mnemonics, pub_k, priv_k, wallet = Wallets.from_mnemonics( +wallet = Wallets.from_mnemonics( mnemonics=mnemonics, version=WalletVersionEnum.v3r2, workchain=0 ) """transfer""" body = NFTItem().create_transfer_body( - new_owner_address=Address("new owner address") + new_owner_address=Address.from_string("new owner address") ) query = wallet.create_transfer_message( to_addr="nft addr", diff --git a/examples/types/cell.py b/examples/types/cell.py index e640819..9e9649c 100644 --- a/examples/types/cell.py +++ b/examples/types/cell.py @@ -1,9 +1,11 @@ from tonsdk_ng.boc import begin_cell -from tonsdk_ng.utils import Address +from tonsdk_ng.types import Address cell = ( begin_cell() .store_uint(4, 32) - .store_address(Address("EQBvW8Z5huBkMJYdnfAEM5JqTNkuWX3diqYENkWsIL0XggGG")) + .store_address( + Address.from_string("EQBvW8Z5huBkMJYdnfAEM5JqTNkuWX3diqYENkWsIL0XggGG") + ) .end_cell() ) diff --git a/examples/types/slice.py b/examples/types/slice.py index 85f8f55..9e1a8ae 100644 --- a/examples/types/slice.py +++ b/examples/types/slice.py @@ -1,10 +1,11 @@ -from tonsdk_ng.boc import begin_cell -from tonsdk_ng.utils import Address +from tonsdk_ng.types import Address, begin_cell cell = ( begin_cell() .store_uint(4, 32) - .store_address(Address("EQBvW8Z5huBkMJYdnfAEM5JqTNkuWX3diqYENkWsIL0XggGG")) + .store_address( + Address.from_string("EQBvW8Z5huBkMJYdnfAEM5JqTNkuWX3diqYENkWsIL0XggGG") + ) .end_cell() ) diff --git a/tonsdk_ng/contract/__init__.py b/tonsdk_ng/contract/__init__.py index e3de496..9da1f02 100644 --- a/tonsdk_ng/contract/__init__.py +++ b/tonsdk_ng/contract/__init__.py @@ -1,32 +1,59 @@ import abc +from typing import Any, TypedDict, cast -from ..boc import Cell -from ..utils import Address +from ..types import Address, Cell + + +class StateInit(TypedDict): + code: Cell + data: Cell + address: Address + state_init: Cell | None + + +class Options(TypedDict): + wc: int + code: Cell + address: Address + public_key: bytes + private_key: bytes + + +class ExternalMessage(TypedDict): + address: Address + message: Cell + state_init: Cell | None + code: Cell | None + data: Cell | None class Contract(abc.ABC): - def __init__(self, **kwargs): - self.options = kwargs + def __init__(self, **kwargs: Any): + self.options = cast(Options, kwargs) self._address = ( - Address(kwargs["address"]) if "address" in kwargs else None + Address.from_any(kwargs["address"]) if "address" in kwargs else None ) if "wc" not in kwargs: - kwargs["wc"] = self._address.wc if self._address is not None else 0 + self.options["wc"] = ( + self._address.wc if self._address is not None else 0 + ) @property - def address(self): + def address(self) -> Address: if self._address is None: self._address = self.create_state_init()["address"] return self._address - def create_state_init(self): + def create_state_init(self) -> StateInit: code_cell = self.create_code_cell() data_cell = self.create_data_cell() state_init = self.__create_state_init(code_cell, data_cell) state_init_hash = state_init.bytes_hash() - address = Address(str(self.options["wc"]) + ":" + state_init_hash.hex()) + address = Address.from_string( + str(self.options["wc"]) + ":" + state_init_hash.hex() + ) return { "code": code_cell, @@ -35,15 +62,15 @@ def create_state_init(self): "state_init": state_init, } - def create_code_cell(self): + def create_code_cell(self) -> Cell: if "code" not in self.options or self.options["code"] is None: raise Exception("Contract: options.code is not defined") return self.options["code"] - def create_data_cell(self): + def create_data_cell(self) -> Cell: return Cell() - def create_init_external_message(self): + def create_init_external_message(self) -> ExternalMessage: create_state_init = self.create_state_init() state_init = create_state_init["state_init"] address = create_state_init["address"] @@ -60,29 +87,34 @@ def create_init_external_message(self): } @classmethod - def create_external_message_header(cls, dest, src=None, import_fee=0): + def create_external_message_header( + cls, + dest: str | Address, + src: str | Address | None = None, + import_fee: int = 0, + ) -> Cell: message = Cell() message.bits.write_uint(2, 2) - message.bits.write_address(Address(src) if src else None) - message.bits.write_address(Address(dest)) + message.bits.write_address(Address.from_any(src) if src else None) + message.bits.write_address(Address.from_any(dest)) message.bits.write_grams(import_fee) return message @classmethod def create_internal_message_header( cls, - dest, - grams=0, - ihr_disabled=True, - bounce=None, - bounced=False, - src=None, - currency_collection=None, - ihr_fees=0, - fwd_fees=0, - created_lt=0, - created_at=0, - ): + dest: str | Address, + grams: int = 0, + ihr_disabled: bool = True, + bounce: bool | None = None, + bounced: bool = False, + src: str | Address | None = None, + currency_collection: Any | None = None, + ihr_fees: int = 0, + fwd_fees: int = 0, + created_lt: int = 0, + created_at: int = 0, + ) -> Cell: message = Cell() message.bits.write_bit(0) message.bits.write_bit(ihr_disabled) @@ -90,12 +122,13 @@ def create_internal_message_header( if bounce is not None: message.bits.write_bit(bounce) else: - message.bits.write_bit(Address(dest).is_bounceable) + message.bits.write_bit(Address.from_any(dest).is_bounceable) message.bits.write_bit(bounced) - message.bits.write_address(Address(src) if src else None) - message.bits.write_address(Address(dest)) + message.bits.write_address(Address.from_any(src) if src else None) + message.bits.write_address(Address.from_any(dest)) message.bits.write_grams(grams) if currency_collection: + # TODO: implement currency collections raise Exception("Currency collections are not implemented yet") message.bits.write_bit(bool(currency_collection)) @@ -111,8 +144,8 @@ def create_out_msg( address: str, amount: int, payload: str | bytes | Cell | None = None, - state_init=None, - ): + state_init: Cell | None = None, + ) -> Cell: payload_cell = Cell() if payload: if isinstance(payload, Cell): @@ -131,7 +164,12 @@ def create_out_msg( return order @classmethod - def create_common_msg_info(cls, header, state_init=None, body=None): + def create_common_msg_info( + cls, + header: Cell, + state_init: Cell | None = None, + body: Cell | None = None, + ) -> Cell: common_msg_info = Cell() common_msg_info.write_cell(header) if state_init: @@ -164,9 +202,15 @@ def create_common_msg_info(cls, header, state_init=None, body=None): return common_msg_info def __create_state_init( - self, code, data, library=None, split_depth=None, ticktock=None - ): + self, + code: Cell, + data: Cell, + library: Cell | None = None, + split_depth: Cell | None = None, + ticktock: Cell | None = None, + ) -> Cell: if library or split_depth or ticktock: + # TODO: implement library/split_depth/ticktock raise Exception( "Library/SplitDepth/Ticktock in state init is not implemented" ) diff --git a/tonsdk_ng/contract/wallet/__init__.py b/tonsdk_ng/contract/wallet/__init__.py index 1c42edf..f73d06a 100644 --- a/tonsdk_ng/contract/wallet/__init__.py +++ b/tonsdk_ng/contract/wallet/__init__.py @@ -1,4 +1,5 @@ from enum import Enum +from typing import Any from ...crypto import ( mnemonic_from_password, @@ -35,7 +36,7 @@ class WalletVersionEnum(str, Enum): class Wallets: default_version = WalletVersionEnum.v4r2 - ALL = { + ALL: dict[WalletVersionEnum, type[WalletContract]] = { WalletVersionEnum.v2r1: WalletV2ContractR1, WalletVersionEnum.v2r2: WalletV2ContractR2, WalletVersionEnum.v3r1: WalletV3ContractR1, @@ -52,7 +53,7 @@ def create( version: WalletVersionEnum, workchain: int, password: str | None = None, - **kwargs, + **kwargs: Any, ) -> tuple[list[str], bytes, bytes, WalletContract]: mnemonics = ( mnemonic_from_password(password) if password else mnemonic_new() @@ -70,7 +71,7 @@ def from_mnemonics( mnemonics: list[str], version: WalletVersionEnum = default_version, workchain: int = 0, - **kwargs, + **kwargs: Any, ) -> WalletContract: if not mnemonic_is_valid(mnemonics): raise InvalidMnemonicsError() @@ -86,7 +87,7 @@ def from_private_key( private_key: bytes, version: WalletVersionEnum = default_version, workchain: int = 0, - **kwargs, + **kwargs: Any, ) -> WalletContract: public_key = private_key_to_public_key(private_key) return cls.ALL[version]( @@ -102,7 +103,7 @@ def to_addr_pk( mnemonics: list[str], version: WalletVersionEnum = default_version, workchain: int = 0, - **kwargs, + **kwargs: Any, ) -> tuple[bytes, bytes]: wallet = cls.from_mnemonics(mnemonics, version, workchain, **kwargs) pub_k, priv_k = mnemonic_to_wallet_key(mnemonics) diff --git a/tonsdk_ng/contract/wallet/_highload_query_id.py b/tonsdk_ng/contract/wallet/_highload_query_id.py index 76bda6d..d3bb343 100644 --- a/tonsdk_ng/contract/wallet/_highload_query_id.py +++ b/tonsdk_ng/contract/wallet/_highload_query_id.py @@ -5,7 +5,7 @@ class HighloadQueryId: - def __init__(self): + def __init__(self) -> None: """ Initializes a HighloadQueryId instance with default values @@ -16,7 +16,9 @@ def __init__(self): self._bit_number = 0 @staticmethod - def from_shift_and_bit_number(shift, bit_number): + def from_shift_and_bit_number( + shift: int, bit_number: int + ) -> "HighloadQueryId": """ Creates a new HighloadQueryId object with specified shift and bit number @@ -35,7 +37,7 @@ def from_shift_and_bit_number(shift, bit_number): q._bit_number = bit_number return q - def get_next(self): + def get_next(self) -> "HighloadQueryId": """ Calculates the next HighloadQueryId based on the current state @@ -59,7 +61,7 @@ def get_next(self): new_shift, new_bit_number ) - def has_next(self): + def has_next(self) -> bool: """ Checks if there is a next HighloadQueryId available @@ -73,7 +75,7 @@ def has_next(self): return not is_end @property - def shift(self): + def shift(self) -> int: """ Gets the current shift value @@ -82,7 +84,7 @@ def shift(self): return self._shift @property - def bit_number(self): + def bit_number(self) -> int: """ Gets the current bit number value @@ -91,7 +93,7 @@ def bit_number(self): return self._bit_number @property - def query_id(self): + def query_id(self) -> int: """ Computes the query ID based on the current shift and bit number @@ -100,7 +102,7 @@ def query_id(self): return (self._shift << BIT_NUMBER_SIZE) + self._bit_number @staticmethod - def from_query_id(query_id: int): + def from_query_id(query_id: int) -> "HighloadQueryId": """ Creates a new HighloadQueryId object from a given query ID @@ -112,7 +114,7 @@ def from_query_id(query_id: int): return HighloadQueryId.from_shift_and_bit_number(shift, bit_number) @staticmethod - def from_seqno(i: int): + def from_seqno(i: int) -> "HighloadQueryId": """ Creates a HighloadQueryId from a sequence number diff --git a/tonsdk_ng/contract/wallet/_highload_wallet_contract_v2.py b/tonsdk_ng/contract/wallet/_highload_wallet_contract_v2.py index d78b09f..47d82ab 100644 --- a/tonsdk_ng/contract/wallet/_highload_wallet_contract_v2.py +++ b/tonsdk_ng/contract/wallet/_highload_wallet_contract_v2.py @@ -1,5 +1,5 @@ -import decimal import time +from decimal import Decimal from tonsdk_ng.types import Address, Cell, begin_cell, begin_dict from tonsdk_ng.utils import sign_message @@ -58,8 +58,8 @@ def create_transfer_message( payload_cell.bits.write_bytes(recipient["payload"]) order_header = Contract.create_internal_message_header( - Address(recipient["address"]), - decimal.Decimal(recipient["amount"]), + Address.from_any(recipient["address"]), + Decimal(recipient["amount"]), ) order = Contract.create_common_msg_info( order_header, recipient.get("state_init"), payload_cell diff --git a/tonsdk_ng/contract/wallet/_multisig_wallet_contract.py b/tonsdk_ng/contract/wallet/_multisig_wallet_contract.py index e9fc107..01f3545 100644 --- a/tonsdk_ng/contract/wallet/_multisig_wallet_contract.py +++ b/tonsdk_ng/contract/wallet/_multisig_wallet_contract.py @@ -1,9 +1,9 @@ import decimal import time -from ...boc import Cell, begin_cell, begin_dict from ...crypto import private_key_to_public_key, verify_sign -from ...utils import Address, sign_message +from ...types import Address, Cell, begin_cell, begin_dict +from ...utils import sign_message from .. import Contract from ._wallet_contract import WalletContract @@ -109,7 +109,7 @@ def add_message( payload_cell.bits.write_bytes(payload) order_header = Contract.create_internal_message_header( - Address(to_addr), decimal.Decimal(amount) + Address.from_any(to_addr), decimal.Decimal(amount) ) order = Contract.create_common_msg_info( order_header, state_init, payload_cell diff --git a/tonsdk_ng/contract/wallet/_wallet_contract.py b/tonsdk_ng/contract/wallet/_wallet_contract.py index 75136dd..ab9bc34 100644 --- a/tonsdk_ng/contract/wallet/_wallet_contract.py +++ b/tonsdk_ng/contract/wallet/_wallet_contract.py @@ -1,11 +1,32 @@ -import decimal from enum import Enum +from typing import Any, TypedDict -from ...boc import Cell -from ...utils import Address, sign_message +from ...types import Address, Cell +from ...utils import sign_message from .. import Contract +class InitExternalMessage(TypedDict): + address: Address + signing_message: Cell + message: Cell + body: Cell + state_init: Cell | None + code: Cell | None + data: Cell | None + + +class ExternalMessage(TypedDict): + address: Address + signature: bytes + signing_message: Cell + message: Cell + body: Cell + state_init: Cell | None + code: Cell | None + data: Cell | None + + class SendModeEnum(int, Enum): carry_all_remaining_balance = 128 carry_all_remaining_incoming_value = 64 @@ -18,7 +39,7 @@ def __str__(self) -> str: class WalletContract(Contract): - def __init__(self, **kwargs): + def __init__(self, **kwargs: Any) -> None: if ( ("public_key" not in kwargs or "private_key" not in kwargs) and "address" not in kwargs @@ -28,13 +49,13 @@ def __init__(self, **kwargs): ) super().__init__(**kwargs) - def create_data_cell(self): + def create_data_cell(self) -> Cell: cell = Cell() cell.bits.write_uint(0, 32) cell.bits.write_bytes(self.options["public_key"]) return cell - def create_signing_message(self, seqno=None): + def create_signing_message(self, seqno: int | None = None) -> Cell: seqno = seqno or 0 cell = Cell() cell.bits.write_uint(seqno, 32) @@ -46,10 +67,11 @@ def create_transfer_message( amount: int, seqno: int, payload: Cell | str | bytes | None = None, - send_mode=SendModeEnum.ignore_errors | SendModeEnum.pay_gas_separately, - dummy_signature=False, - state_init=None, - ): + send_mode: int = SendModeEnum.ignore_errors + | SendModeEnum.pay_gas_separately, + dummy_signature: bool = False, + state_init: Cell | None = None, + ) -> ExternalMessage: payload_cell = Cell() if payload: if isinstance(payload, str): @@ -61,7 +83,7 @@ def create_transfer_message( payload_cell.bits.write_bytes(payload) order_header = Contract.create_internal_message_header( - Address(to_addr), decimal.Decimal(amount) + Address.from_any(to_addr), amount ) order = Contract.create_common_msg_info( order_header, state_init, payload_cell @@ -75,8 +97,8 @@ def create_transfer_message( ) def create_external_message( - self, signing_message, seqno, dummy_signature=False - ): + self, signing_message: Cell, seqno: int, dummy_signature: bool = False + ) -> ExternalMessage: signature = ( bytes(64) if dummy_signature @@ -114,7 +136,7 @@ def create_external_message( "data": data, } - def create_init_external_message(self): + def create_init_external_message(self) -> InitExternalMessage: create_state_init = self.create_state_init() state_init = create_state_init["state_init"] address = create_state_init["address"] diff --git a/tonsdk_ng/crypto/_keystore.py b/tonsdk_ng/crypto/_keystore.py index 4db4e1e..abb258c 100644 --- a/tonsdk_ng/crypto/_keystore.py +++ b/tonsdk_ng/crypto/_keystore.py @@ -1,9 +1,16 @@ import os from hashlib import pbkdf2_hmac +from typing import TypedDict from nacl.bindings import crypto_box_seed_keypair +class Keystore(TypedDict): + version: int + salt: str + public_key: str + + def generate_keystore_key(password: str, salt: bytes) -> tuple[bytes, bytes]: """ :rtype: (bytes(public_key), bytes(secret_key)) @@ -12,7 +19,7 @@ def generate_keystore_key(password: str, salt: bytes) -> tuple[bytes, bytes]: return crypto_box_seed_keypair(secret) -def generate_new_keystore(password: str, version: int = 1): +def generate_new_keystore(password: str, version: int = 1) -> Keystore: salt = os.urandom(32) pub_k, _ = generate_keystore_key(password, salt) diff --git a/tonsdk_ng/crypto/_mnemonic.py b/tonsdk_ng/crypto/_mnemonic.py index 464bd2d..36cf4ea 100644 --- a/tonsdk_ng/crypto/_mnemonic.py +++ b/tonsdk_ng/crypto/_mnemonic.py @@ -15,30 +15,23 @@ def mnemonic_is_valid(mnemo_words: list[str]) -> bool: ) -def mnemonic_to_entropy(mnemo_words: list[str]): - sign = hmac.new( +def mnemonic_to_entropy(mnemo_words: list[str]) -> bytes: + return hmac.new( (" ".join(mnemo_words)).encode("utf-8"), bytes(0), hashlib.sha512 ).digest() - return sign -def mnemonic_to_seed(mnemo_words: list[str], seed: str): +def mnemonic_to_seed(mnemo_words: list[str], seed: bytes) -> bytes: entropy = mnemonic_to_entropy(mnemo_words) return hashlib.pbkdf2_hmac("sha512", entropy, seed, PBKDF_ITERATIONS) def mnemonic_to_private_key(mnemo_words: list[str]) -> tuple[bytes, bytes]: - """ - :rtype: (bytes(public_key), bytes(secret_key)) - """ seed = mnemonic_to_seed(mnemo_words, b"TON default seed") return crypto_sign_seed_keypair(seed[:32]) def mnemonic_to_wallet_key(mnemo_words: list[str]) -> tuple[bytes, bytes]: - """ - :rtype: (bytes(public_key), bytes(secret_key)) - """ _, priv_k = mnemonic_to_private_key(mnemo_words) return crypto_sign_seed_keypair(priv_k[:32]) diff --git a/tonsdk_ng/crypto/_utils.py b/tonsdk_ng/crypto/_utils.py index 00ee438..fb06d4c 100644 --- a/tonsdk_ng/crypto/_utils.py +++ b/tonsdk_ng/crypto/_utils.py @@ -3,13 +3,14 @@ import os from hashlib import pbkdf2_hmac +from nacl import exceptions as exc from nacl.bindings import crypto_sign_ed25519_sk_to_pk -from nacl.signing import VerifyKey, exc +from nacl.signing import VerifyKey from ._settings import PBKDF_ITERATIONS -def get_secure_random_number(min_v, max_v): +def get_secure_random_number(min_v: int, max_v: int) -> int: range_betw = max_v - min_v bits_needed = math.ceil(math.log2(range_betw)) if bits_needed > 53: @@ -23,7 +24,7 @@ def get_secure_random_number(min_v, max_v): power = (bytes_needed - 1) * 8 number_val = 0 for i in range(bytes_needed): - number_val += res[i] * math.pow(2, power) + number_val += res[i] * 2**power power -= 8 number_val = int(number_val) & int(mask) if number_val >= range_betw: @@ -32,7 +33,7 @@ def get_secure_random_number(min_v, max_v): return min_v + number_val -def is_basic_seed(entropy): +def is_basic_seed(entropy: bytes) -> bool: seed = pbkdf2_hmac( "sha512", entropy, @@ -42,11 +43,13 @@ def is_basic_seed(entropy): return seed[0] == 0 -def private_key_to_public_key(priv_k: bytes): +def private_key_to_public_key(priv_k: bytes) -> bytes: return crypto_sign_ed25519_sk_to_pk(priv_k) -def verify_sign(public_key: bytes, signed_message: bytes, signature: bytes): +def verify_sign( + public_key: bytes, signed_message: bytes, signature: bytes +) -> bool: key = VerifyKey(public_key) try: key.verify(signed_message, signature) diff --git a/tonsdk_ng/crypto/exceptions.py b/tonsdk_ng/crypto/exceptions.py index b564c14..4683a89 100644 --- a/tonsdk_ng/crypto/exceptions.py +++ b/tonsdk_ng/crypto/exceptions.py @@ -1,4 +1,4 @@ -from .._exceptions import TonSdkException +from ..exceptions import TonSdkException class InvalidMnemonicsError(TonSdkException): diff --git a/tonsdk_ng/types/__init__.py b/tonsdk_ng/types/__init__.py index a2dc042..119a748 100644 --- a/tonsdk_ng/types/__init__.py +++ b/tonsdk_ng/types/__init__.py @@ -1,6 +1,6 @@ from ._address import Address from ._builder import Builder, begin_cell -from ._cell import Cell, deserialize_cell_data, parse_boc_header +from ._cell import Cell from ._dict_builder import DictBuilder, begin_dict from ._slice import Slice diff --git a/tonsdk_ng/types/_address.py b/tonsdk_ng/types/_address.py index 3370e31..c81d1c4 100644 --- a/tonsdk_ng/types/_address.py +++ b/tonsdk_ng/types/_address.py @@ -1,7 +1,5 @@ import base64 -from typing import NamedTuple, cast - -from typing_extensions import Self +from typing import NamedTuple from tonsdk_ng.exceptions import InvalidAddressError from tonsdk_ng.utils import bytes_to_b64str, crc16 @@ -32,14 +30,14 @@ def __init__( self.is_test_only = is_test_only @classmethod - def from_any(cls, val: Self | Cell | str) -> Self: - address: Self | None = None + def from_any(cls, val: "Address" | Cell | str) -> "Address": + address: "Address" | None = None match val: case str(): address = cls.from_string(val) case Address(): - address = cls.from_address(cast(Self, val)) + address = cls.from_address(val) case Cell(): address = cls.from_cell(val) @@ -49,7 +47,7 @@ def from_any(cls, val: Self | Cell | str) -> Self: return address @classmethod - def from_address(cls, addr: Self) -> Self: + def from_address(cls, addr: "Address") -> "Address": return cls( wc=addr.wc, hash_part=addr.hash_part, @@ -60,7 +58,7 @@ def from_address(cls, addr: Self) -> Self: ) @classmethod - def from_string(cls, addr: str) -> Self: + def from_string(cls, addr: str) -> "Address": if addr.find("-") > 0 or addr.find("_") > 0: addr = addr.replace("-", "+").replace("_", "/") is_url_safe = True @@ -108,7 +106,7 @@ def from_string(cls, addr: str) -> Self: ) @classmethod - def from_cell(cls, cell: Cell) -> Self | None: + def from_cell(cls, cell: Cell) -> "Address" | None: data = "".join([str(cell.bits.get(x)) for x in range(cell.bits.length)]) if len(data) < 267: return None diff --git a/tonsdk_ng/types/_bit_string.py b/tonsdk_ng/types/_bit_string.py index ff8607f..3be23d7 100644 --- a/tonsdk_ng/types/_bit_string.py +++ b/tonsdk_ng/types/_bit_string.py @@ -121,7 +121,7 @@ def write_uint(self, number: int, bit_length: int) -> None: if number == 0: return - raise Exception( + raise ValueError( "bitLength is too small for number, got" f" number={number},bitLength={bit_length}" ) diff --git a/tonsdk_ng/types/_builder.py b/tonsdk_ng/types/_builder.py index 86d964c..fec55fa 100644 --- a/tonsdk_ng/types/_builder.py +++ b/tonsdk_ng/types/_builder.py @@ -1,5 +1,3 @@ -from typing_extensions import Self - from ._address import Address from ._bit_string import BitString from ._cell import Cell @@ -14,16 +12,16 @@ def __init__(self) -> None: def __repr__(self) -> str: return "" % (len(self.refs), repr(self.bits)) - def store_cell(self, src: Cell) -> Self: + def store_cell(self, src: Cell) -> "Builder": self.bits.write_bit_string(src.bits) self.refs += src.refs return self - def store_ref(self, src: Cell) -> Self: + def store_ref(self, src: Cell) -> "Builder": self.refs.append(src) return self - def store_maybe_ref(self, src: Cell | None) -> Self: + def store_maybe_ref(self, src: Cell | None) -> "Builder": if src: self.bits.write_bit(1) self.store_ref(src) @@ -32,47 +30,47 @@ def store_maybe_ref(self, src: Cell | None) -> Self: return self - def store_bit(self, value: int) -> Self: + def store_bit(self, value: int) -> "Builder": self.bits.write_bit(value) return self - def store_bit_array(self, value: bytes | bytearray) -> Self: + def store_bit_array(self, value: bytes | bytearray) -> "Builder": self.bits.write_bit_array(value) return self - def store_uint(self, value: int, bit_length: int) -> Self: + def store_uint(self, value: int, bit_length: int) -> "Builder": self.bits.write_uint(value, bit_length) return self - def store_uint8(self, value: int) -> Self: + def store_uint8(self, value: int) -> "Builder": self.bits.write_uint8(value) return self - def store_int(self, value: int, bit_length: int) -> Self: + def store_int(self, value: int, bit_length: int) -> "Builder": self.bits.write_int(value, bit_length) return self - def store_string(self, value: str) -> Self: + def store_string(self, value: str) -> "Builder": self.bits.write_string(value) return self - def store_bytes(self, value: bytes) -> Self: + def store_bytes(self, value: bytes) -> "Builder": self.bits.write_bytes(value) return self - def store_bit_string(self, value: BitString) -> Self: + def store_bit_string(self, value: BitString) -> "Builder": self.bits.write_bit_string(value) return self - def store_address(self, value: Address) -> Self: + def store_address(self, value: Address) -> "Builder": self.bits.write_address(value) return self - def store_grams(self, value: int) -> Self: + def store_grams(self, value: int) -> "Builder": self.bits.write_grams(value) return self - def store_coins(self, value: int) -> Self: + def store_coins(self, value: int) -> "Builder": self.bits.write_coins(value) return self diff --git a/tonsdk_ng/types/_cell.py b/tonsdk_ng/types/_cell.py index 660880b..e8d38a8 100644 --- a/tonsdk_ng/types/_cell.py +++ b/tonsdk_ng/types/_cell.py @@ -1,17 +1,11 @@ import copy +import io import math import typing from hashlib import sha256 from typing import NamedTuple -from typing_extensions import Self - -from tonsdk_ng.utils import ( - bytes_to_b64str, - crc32c, - read_n_bytes_uint_from_array, - tree_walk, -) +from tonsdk_ng.utils import bytes_to_b64str, crc32c, tree_walk from ._bit_string import BitString @@ -20,7 +14,7 @@ class Cell: - REACH_BOC_MAGIC_PREFIX = bytes.fromhex("B5EE9C72") + REACH_BOC_MAGIC_PREFIX = bytes.fromhex("b5ee9c72") LEAN_BOC_MAGIC_PREFIX = bytes.fromhex("68ff65f3") LEAN_BOC_MAGIC_PREFIX_CRC = bytes.fromhex("acc3a728") @@ -47,7 +41,7 @@ def bytes_repr(self) -> bytes: repr_array.append(ref.bytes_hash()) return b"".join(repr_array) - def write_cell(self, another_cell: Self) -> None: + def write_cell(self, another_cell: "Cell") -> None: self.bits.write_bit_string(another_cell.bits) self.refs += another_cell.refs @@ -216,186 +210,186 @@ def begin_parse(self) -> Slice: @staticmethod def one_from_boc(serialized_boc: str | bytes) -> "Cell": - cells = deserialize_boc(serialized_boc) + cells = from_boc_multi_root(serialized_boc) if len(cells) != 1: - raise Exception("Expected 1 root cell") + raise ValueError("Expected 1 root cell") return cells[0] -def deserialize_cell_data( - cell_data: bytes, reference_index_size: int -) -> dict[str, Cell | bytes]: - if len(cell_data) < 2: - raise Exception("Not enough bytes to encode cell descriptors") - - d1, d2 = cell_data[0], cell_data[1] - cell_data = cell_data[2:] - math.floor(d1 / 32) - is_exotic = d1 & 8 - ref_num = d1 % 8 - data_bytes_size = math.ceil(d2 / 2) - fullfilled_bytes = not (d2 % 2) - - cell = Cell() - cell.is_exotic = bool(is_exotic) - - if len(cell_data) < data_bytes_size + reference_index_size * ref_num: - raise Exception("Not enough bytes to encode cell data") +class Flags(NamedTuple): + has_index: bool + has_crc32c: bool + has_cache_bits: bool - cell.bits.set_top_upped_array( - bytearray(cell_data[:data_bytes_size]), fullfilled_bytes - ) - cell_data = cell_data[data_bytes_size:] - for r in range(ref_num): - cell.refs.append( - read_n_bytes_uint_from_array(reference_index_size, cell_data) + @staticmethod + def parse(byte: int) -> "Flags": + # has_idx:(## 1) has_crc32c:(## 1) has_cache_bits:(## 1) flags:... + return Flags( + has_index=bool(byte & (1 << 7)), + has_crc32c=bool(byte & (1 << 6)), + has_cache_bits=bool(byte & (1 << 5)), ) - cell_data = cell_data[reference_index_size:] - return {"cell": cell, "residue": cell_data} +def from_boc_multi_root(data: bytes | bytearray | str) -> list[Cell]: + if isinstance(data, str): + data = bytes.fromhex(data) -class ParsedBocHeader(NamedTuple): - has_idx: bool - hash_crc32: bool - has_cache_bits: bool - flags: int - size_bytes: int - off_bytes: int - cells_num: int - roots_num: int - absent_num: int - tot_cells_size: int - root_list: list[int] - index_: list[int] - cells_data: bytes - - -def parse_boc_header(serialized_boc: bytes) -> ParsedBocHeader: - if len(serialized_boc) < 4 + 1: - raise Exception("Not enough bytes for magic prefix") - - input_data = serialized_boc - prefix = serialized_boc[:4] - serialized_boc = serialized_boc[4:] - - match prefix: + if len(data) < 10: + raise ValueError("invalid boc") + + r = io.BytesIO(data) + match r.read(4): case Cell.REACH_BOC_MAGIC_PREFIX: - flags_byte = serialized_boc[0] - has_idx = bool(flags_byte & 128) - hash_crc32 = bool(flags_byte & 64) - has_cache_bits = bool(flags_byte & 32) - flags = (flags_byte & 16) * 2 + (flags_byte & 8) - size_bytes = flags_byte % 8 + byte = big_int(r.read(1)) + cell_num_size_bytes = byte & 0b00000111 + flags = Flags.parse(byte) case Cell.LEAN_BOC_MAGIC_PREFIX: - has_idx = True - hash_crc32 = False - has_cache_bits = False - flags = 0 - size_bytes = serialized_boc[0] + flags = Flags( + has_index=True, has_crc32c=False, has_cache_bits=False + ) + cell_num_size_bytes = big_int(r.read(1)) case Cell.LEAN_BOC_MAGIC_PREFIX_CRC: - has_idx = True - hash_crc32 = True - has_cache_bits = False - flags = 0 - size_bytes = serialized_boc[0] - - serialized_boc = serialized_boc[1:] - - if len(serialized_boc) < 1 + 5 * size_bytes: - raise Exception("Not enough bytes for encoding cells counters") - - offset_bytes = serialized_boc[0] - serialized_boc = serialized_boc[1:] - cells_num = read_n_bytes_uint_from_array(size_bytes, serialized_boc) - serialized_boc = serialized_boc[size_bytes:] - roots_num = read_n_bytes_uint_from_array(size_bytes, serialized_boc) - serialized_boc = serialized_boc[size_bytes:] - absent_num = read_n_bytes_uint_from_array(size_bytes, serialized_boc) - serialized_boc = serialized_boc[size_bytes:] - tot_cells_size = read_n_bytes_uint_from_array(offset_bytes, serialized_boc) - serialized_boc = serialized_boc[offset_bytes:] - - if len(serialized_boc) < roots_num * size_bytes: - raise Exception("Not enough bytes for encoding root cells hashes") - - root_list = [] - for c in range(roots_num): - root_list.append( - read_n_bytes_uint_from_array(size_bytes, serialized_boc) + flags = Flags(has_index=True, has_crc32c=True, has_cache_bits=False) + cell_num_size_bytes = big_int(r.read(1)) + case _: + raise ValueError("Invalid BOC magic header") + # off_bytes:(## 8) { off_bytes <= 8 } + data_size_bytes = big_int(r.read(1)) + # cells:(##(size * 8)) + cells_num = big_int(r.read(cell_num_size_bytes)) + # roots:(##(size * 8)) { roots >= 1 } + roots_num = big_int(r.read(cell_num_size_bytes)) + + # complete BOCs - ??? (absent:(##(size * 8)) { roots + absent <= cells }) + r.read(cell_num_size_bytes) + + # tot_cells_size:(##(off_bytes * 8)) + data_len = big_int(r.read(data_size_bytes)) + + if flags.has_crc32c and crc32c(data[:-4]) != data[-4:]: + raise ValueError("Checksum does not match") + + roots_index = [ + big_int(r.read(cell_num_size_bytes)) for _ in range(roots_num) + ] + + if flags.has_cache_bits and not flags.has_index: + raise ValueError("Cache flag cannot be set without index flag") + + index: list[int] = [] + if flags.has_index: + idx_data = r.read(cells_num * data_size_bytes) + + for i in range(cells_num): + off = i * data_size_bytes + val = big_int(idx_data[off : off + data_size_bytes]) + if flags.has_cache_bits: + # we don't need a cache, cause our loader uses memory + val //= 2 + index.append(val) + + if cells_num > data_len // 2: + raise ValueError( + f"Cells num looks malicious: data len {data_len}, cells {cells_num}" ) - serialized_boc = serialized_boc[size_bytes:] - - index = [] - if has_idx: - if len(serialized_boc) < offset_bytes * cells_num: - raise Exception("Not enough bytes for index encoding") - for c in range(cells_num): - index.append( - read_n_bytes_uint_from_array(offset_bytes, serialized_boc) - ) - serialized_boc = serialized_boc[offset_bytes:] - - if len(serialized_boc) < tot_cells_size: - raise Exception("Not enough bytes for cells data") - cells_data = serialized_boc[:tot_cells_size] - serialized_boc = serialized_boc[tot_cells_size:] - - if hash_crc32: - if len(serialized_boc) < 4: - raise Exception("Not enough bytes for crc32c hashsum") - - length = len(input_data) - if crc32c(input_data[: length - 4]) != serialized_boc[:4]: - raise Exception("Crc32c hashsum mismatch") - - serialized_boc = serialized_boc[4:] - - if len(serialized_boc): - raise Exception("Too much bytes in BoC serialization") - - return ParsedBocHeader( - has_idx=has_idx, - hash_crc32=hash_crc32, - has_cache_bits=has_cache_bits, - flags=flags, - size_bytes=size_bytes, - off_bytes=offset_bytes, - cells_num=cells_num, - roots_num=roots_num, - absent_num=absent_num, - tot_cells_size=tot_cells_size, - root_list=root_list, - index_=index, - cells_data=cells_data, - ) - - -def deserialize_boc(serialized_boc: str | bytes) -> list[Cell]: - if isinstance(serialized_boc, str): - serialized_boc = bytes.fromhex(serialized_boc) - header = parse_boc_header(serialized_boc) - cells_data = header.cells_data - cells_array: list[Cell] = [] - - for ci in range(header.cells_num): - dd = deserialize_cell_data(cells_data, header.size_bytes) - cells_data = dd["residue"] - cells_array.append(dd["cell"]) - - for ci in reversed(range(header.cells_num)): - c = cells_array[ci] - for ri in range(len(c.refs)): - r = c.refs[ri] - if r < ci: - raise Exception("Topological order is broken") - c.refs[ri] = cells_array[r] - - root_cells = [] - for ri in header.root_list: - root_cells.append(cells_array[ri]) + payload = r.read(data_len) + cells = parse_cells( + roots_index, cells_num, cell_num_size_bytes, payload, index + ) - return root_cells + return cells + + +def parse_cells( + roots_index: list[int], + cells_num: int, + ref_sz_bytes: int, + data: bytes, + index: list[int], +) -> list[Cell]: + cells = [Cell() for _ in range(cells_num)] + hash_size = 32 + depth_size = 2 + offset = 0 + + for i in range(cells_num): + if len(data) - offset < 2: + raise ValueError("Failed to parse cell header, corrupted data") + + if index: + # if we have index, then set offset from it, + # it stores end of each cell + offset = index[i - 1] if i > 0 else 0 + + flags = data[offset] + refs_num = flags & 0b111 + is_exotic = bool(flags & 0b1000) + with_hashes = bool(flags & 0b10000) + level_mask = flags >> 5 + + if refs_num > 4: + raise ValueError("Too many refs in cell") + + ln = data[offset + 1] + one_more = ln % 2 + sz = ln // 2 + one_more + + offset += 2 + if len(data) - offset < sz: + raise ValueError("Failed to parse cell payload, corrupted data") + + if with_hashes: + mask_bits = int(math.ceil(math.log2(level_mask + 1))) + hashes_num = mask_bits + 1 + offset += hashes_num * hash_size + hashes_num * depth_size + + payload = data[offset : offset + sz] + + offset += sz + if len(data) - offset < refs_num * ref_sz_bytes: + raise ValueError("Failed to parse cell refs, corrupted data") + + refs_index = [ + big_int(data[offset : offset + ref_sz_bytes]) + for _ in range(refs_num) + ] + offset += refs_num * ref_sz_bytes + + refs = [] + for y, id in enumerate(refs_index): + if i == id: + raise ValueError("Recursive reference of cells") + if id < i and not index: + raise ValueError( + "Reference to index which is behind parent cell" + ) + if id >= len(cells): + raise ValueError("Invalid index, out of scope") + + refs.append(cells[id]) + + bits_sz = ln * 4 + + # if not full byte + if ln % 2 != 0: + # find last bit of byte which indicates the end and cut it and next + for y in range(8): + if (payload[-1] >> y) & 1 == 1: + bits_sz += 3 - y + break + + cells[i].is_exotic = is_exotic + cells[i].bits.write_bytes(payload) + cells[i].bits.length = bits_sz + cells[i].refs = refs + + roots = [cells[idx] for idx in roots_index] + return roots + + +def big_int(b: bytes) -> int: + return int.from_bytes(b, byteorder="big") diff --git a/tonsdk_ng/types/dict/__init__.py b/tonsdk_ng/types/_dict/__init__.py similarity index 59% rename from tonsdk_ng/types/dict/__init__.py rename to tonsdk_ng/types/_dict/__init__.py index fdfdbe0..8e9b386 100644 --- a/tonsdk_ng/types/dict/__init__.py +++ b/tonsdk_ng/types/_dict/__init__.py @@ -1,2 +1,7 @@ from .find_common_prefix import find_common_prefix from .serialize_dict import serialize_dict + +__all__ = [ + "serialize_dict", + "find_common_prefix", +] diff --git a/tonsdk_ng/types/dict/find_common_prefix.py b/tonsdk_ng/types/_dict/find_common_prefix.py similarity index 100% rename from tonsdk_ng/types/dict/find_common_prefix.py rename to tonsdk_ng/types/_dict/find_common_prefix.py diff --git a/tonsdk_ng/types/dict/serialize_dict.py b/tonsdk_ng/types/_dict/serialize_dict.py similarity index 90% rename from tonsdk_ng/types/dict/serialize_dict.py rename to tonsdk_ng/types/_dict/serialize_dict.py index 3145342..d117a0c 100644 --- a/tonsdk_ng/types/dict/serialize_dict.py +++ b/tonsdk_ng/types/_dict/serialize_dict.py @@ -1,12 +1,14 @@ +from collections.abc import Callable from math import ceil, log2 +from typing import Literal -from typing_extensions import Callable, Literal, NamedTuple +from typing_extensions import NamedTuple from .._bit_string import BitString from .._cell import Cell from .find_common_prefix import find_common_prefix -Serializer = Callable[["Node" | "Edge" | Cell, Cell], None] +Serializer = Callable[[Cell, Cell], None] Source = dict[str, Cell] SourceInt = dict[int, Cell] @@ -120,7 +122,9 @@ def label_long_length(src: str, key_length: int) -> int: return 1 + 1 + ceil(log2(key_length + 1)) + len(src) -def write_label_same(value: bool, length: int, key_length: int, to: BitString): +def write_label_same( + value: bool, length: int, key_length: int, to: BitString +) -> None: to.write_bit(1) to.write_bit(1) @@ -159,7 +163,7 @@ def detect_label_type(src: str, key_size: int) -> str: return kind -def write_label(src: str, key_size: int, to: BitString): +def write_label(src: str, key_size: int, to: BitString) -> None: match detect_label_type(src, key_size): case "short": write_label_short(src, to) @@ -169,7 +173,9 @@ def write_label(src: str, key_size: int, to: BitString): write_label_same(src[0] == "1", len(src), key_size, to) -def write_node(src: Node, key_size: int, serializer: Serializer, to: Cell): +def write_node( + src: Node, key_size: int, serializer: Serializer, to: Cell +) -> None: match src.kind: case "leaf": assert src.value is not None @@ -184,7 +190,9 @@ def write_node(src: Node, key_size: int, serializer: Serializer, to: Cell): to.refs.append(right_cell) -def write_edge(src: Edge, key_size: int, serializer: Serializer, to: Cell): +def write_edge( + src: Edge, key_size: int, serializer: Serializer, to: Cell +) -> None: write_label(src.label, key_size, to.bits) write_node(src.node, key_size - len(src.label), serializer, to) diff --git a/tonsdk_ng/types/_dict_builder.py b/tonsdk_ng/types/_dict_builder.py index f085bb1..b0c269f 100644 --- a/tonsdk_ng/types/_dict_builder.py +++ b/tonsdk_ng/types/_dict_builder.py @@ -1,7 +1,7 @@ from typing_extensions import Self from ._cell import Cell -from .dict import serialize_dict +from ._dict import serialize_dict class DictBuilder: @@ -34,7 +34,7 @@ def end_dict(self) -> Cell: if not self.items: return Cell() # ? - def default_serializer(src: Cell, dest: Cell): + def default_serializer(src: Cell, dest: Cell) -> None: dest.write_cell(src) return serialize_dict(self.items, self.key_size, default_serializer) diff --git a/tonsdk_ng/utils/__init__.py b/tonsdk_ng/utils/__init__.py index 77208c7..b28acd4 100644 --- a/tonsdk_ng/utils/__init__.py +++ b/tonsdk_ng/utils/__init__.py @@ -6,7 +6,6 @@ crc16, crc32c, move_to_end, - read_n_bytes_uint_from_array, sign_message, string_to_bytes, tree_walk, @@ -21,7 +20,6 @@ "crc32c", "from_nano", "move_to_end", - "read_n_bytes_uint_from_array", "sign_message", "string_to_bytes", "to_nano", diff --git a/tonsdk_ng/utils/_currency.py b/tonsdk_ng/utils/_currency.py index d80e82c..76ecd29 100644 --- a/tonsdk_ng/utils/_currency.py +++ b/tonsdk_ng/utils/_currency.py @@ -71,8 +71,7 @@ def to_nano(number: int | float | str | decimal.Decimal, unit: str) -> int: with decimal.localcontext() as ctx: ctx.prec = 999 - result_value = decimal.Decimal( - value=d_number, context=ctx) * unit_value + result_value = decimal.Decimal(value=d_number, context=ctx) * unit_value if result_value < MIN_VAL or result_value > MAX_VAL: raise ValueError( diff --git a/tonsdk_ng/utils/_utils.py b/tonsdk_ng/utils/_utils.py index c8f2167..09b6933 100644 --- a/tonsdk_ng/utils/_utils.py +++ b/tonsdk_ng/utils/_utils.py @@ -90,17 +90,6 @@ def crc16(data: bytes | bytearray) -> bytes: return bytes([math.floor(reg / 256), reg % 256]) -def read_n_bytes_uint_from_array( - size_bytes: int, uint8_array: bytes | bytearray -) -> int: - res = 0 - for c in range(size_bytes): - res *= 256 - res += uint8_array[c] # must be uint8 - - return res - - def string_to_bytes(string: str, size: int = 1) -> bytes: if size == 1: buf = bytearray(string, "utf-8")