From 38ed638b796355a2afbd52b2118851ea56604366 Mon Sep 17 00:00:00 2001 From: moisses89 Date: Sat, 11 Feb 2023 20:22:52 +0100 Subject: [PATCH 01/19] Add ledger wallet class --- .../operators/hw_accounts/ledger_account.py | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 safe_cli/operators/hw_accounts/ledger_account.py diff --git a/safe_cli/operators/hw_accounts/ledger_account.py b/safe_cli/operators/hw_accounts/ledger_account.py new file mode 100644 index 00000000..7472e284 --- /dev/null +++ b/safe_cli/operators/hw_accounts/ledger_account.py @@ -0,0 +1,92 @@ +import warnings + +from eth_account.signers.base import BaseAccount +from gnosis.eth.eip712 import eip712_encode +from ledgereth import sign_typed_data_draft, create_transaction +from web3.types import TxParams + + +class LedgerAccount(BaseAccount): + def __init__(self, path, address, dongle): + """ + Initialize a new ledger account (no private key) + + :param path: path derivation + :param ~eth_account.account.Account account: the key-unaware management API + """ + self._address = address + self.path = path + self.dongle = dongle + + @property + def address(self): + return self._address + + @property + def privateKey(self): + """ + .. CAUTION:: Deprecated for :meth:`~eth_account.signers.local.LocalAccount.key`. + This attribute will be removed in v0.5 + """ + warnings.warn( + "privateKey is deprecated in favor of key", + category=DeprecationWarning, + ) + return None + + @property + def key(self): + """ + Get the private key. + """ + return None + + def encrypt(self, password, kdf=None, iterations=None): + """ + Generate a string with the encrypted key. + + This uses the same structure as in + :meth:`~eth_account.account.Account.encrypt`, but without a private key argument. + """ + # return self._publicapi.encrypt(self.key, password, kdf=kdf, iterations=iterations) + # TODO with ledger + pass + + def signHash(self, message_hash, domain_hash: bytes): + signed = sign_typed_data_draft(domain_hash, message_hash, dongle=self.dongle) + return (signed.v, signed.r, signed.s) + + def sign_message(self, signable_message): + """ + Generate a string with the encrypted key. + + This uses the same structure as in + :meth:`~eth_account.account.Account.sign_message`, but without a private key argument. + """ + # TODO with ledger + pass + + def signTransaction(self, transaction_dict): + warnings.warn( + "signTransaction is deprecated in favor of sign_transaction", + category=DeprecationWarning, + ) + pass + + def sign_transaction(self, tx: TxParams): + signed = create_transaction( + destination=tx["to"], + amount=tx["value"], + gas=tx["gas"], + max_priority_fee_per_gas=tx["maxPriorityFeePerGas"], + max_fee_per_gas=tx["maxFeePerGas"], + data=tx["data"], + nonce=tx["nonce"], + chain_id=tx["chainId"], + sender_path=self.path, + dongle=self.dongle + ) + return signed + + def __bytes__(self): + return self.key From d3404b1dad32b9446aae85ba3a199b08ba3337d1 Mon Sep 17 00:00:00 2001 From: moisses89 Date: Sat, 11 Feb 2023 21:36:24 +0100 Subject: [PATCH 02/19] Add load_ledger_cli_owners function --- safe_cli/operators/hw_accounts/__init__.py | 0 .../operators/hw_accounts/ledger_account.py | 4 +-- safe_cli/operators/safe_operator.py | 35 +++++++++++++++++-- safe_cli/prompt_parser.py | 7 ++++ safe_cli/safe_completer_constants.py | 5 +++ 5 files changed, 47 insertions(+), 4 deletions(-) create mode 100644 safe_cli/operators/hw_accounts/__init__.py diff --git a/safe_cli/operators/hw_accounts/__init__.py b/safe_cli/operators/hw_accounts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/safe_cli/operators/hw_accounts/ledger_account.py b/safe_cli/operators/hw_accounts/ledger_account.py index 7472e284..3e7165fc 100644 --- a/safe_cli/operators/hw_accounts/ledger_account.py +++ b/safe_cli/operators/hw_accounts/ledger_account.py @@ -1,13 +1,13 @@ import warnings from eth_account.signers.base import BaseAccount -from gnosis.eth.eip712 import eip712_encode +from ledgerblue import Dongle from ledgereth import sign_typed_data_draft, create_transaction from web3.types import TxParams class LedgerAccount(BaseAccount): - def __init__(self, path, address, dongle): + def __init__(self, path, address, dongle: Dongle): """ Initialize a new ledger account (no private key) diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index bbdccd53..42f481e2 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -9,6 +9,9 @@ from eth_typing import ChecksumAddress from eth_utils import ValidationError from hexbytes import HexBytes +from ledgereth import get_account_by_path +from ledgereth.comms import init_dongle +from ledgereth.exceptions import LedgerNotFound from packaging import version as semantic_version from prompt_toolkit import HTML, print_formatted_text from web3 import Web3 @@ -39,6 +42,7 @@ get_safe_contract_address, get_safe_l2_contract_address, ) +from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount from safe_cli.utils import get_erc_20_list, yes_or_no_question @@ -221,8 +225,8 @@ def __init__(self, address: ChecksumAddress, node_url: str): self.safe_contract_1_1_0 = get_safe_V1_1_1_contract( self.ethereum_client.w3, address=self.address ) - self.accounts: Set[LocalAccount] = set() - self.default_sender: Optional[LocalAccount] = None + self.accounts: Set[LocalAccount | LedgerAccount] = set() + self.default_sender: Optional[LocalAccount] | LedgerAccount = None self.executed_transactions: List[str] = [] self._safe_cli_info: Optional[SafeCliInfo] = None # Cache for SafeCliInfo self.require_all_signatures = ( @@ -325,6 +329,33 @@ def load_cli_owners(self, keys: List[str]): except ValueError: print_formatted_text(HTML(f"Cannot load key={key}")) + def load_ledger_cli_owners(self): + try: + dongle = init_dongle() + except LedgerNotFound: + print_formatted_text(HTML(f"Unable to find Ledger device")) + return + # Search between 10 first accounts + for index in range(10): + path = f"44'/60'/{index}'/0/0" + account = get_account_by_path(path, dongle=dongle) + if account.address in self.safe_cli_info.owners: + sender = LedgerAccount(account.path, account.address, dongle) + self.accounts.add(sender) + balance = self.ethereum_client.get_balance(account.address) + print_formatted_text( + HTML( + f"Loaded account {account.address} " + f'with balance={Web3.fromWei(balance, "ether")} ether' + ) + ) + if not self.default_sender and balance > 0: + print_formatted_text( + HTML(f"Set account {account.address} as default sender of txs") + ) + self.default_sender = sender + break + def unload_cli_owners(self, owners: List[str]): accounts_to_remove: Set[Account] = set() for owner in owners: diff --git a/safe_cli/prompt_parser.py b/safe_cli/prompt_parser.py index 7bd190ce..9f864fcd 100644 --- a/safe_cli/prompt_parser.py +++ b/safe_cli/prompt_parser.py @@ -152,6 +152,10 @@ def load_cli_owners_from_words(args): def load_cli_owners(args): safe_operator.load_cli_owners(args.keys) + @safe_exception + def load_ledger_cli_owners(args): + safe_operator.load_ledger_cli_owners() + @safe_exception def unload_cli_owners(args): safe_operator.unload_cli_owners(args.addresses) @@ -292,6 +296,9 @@ def remove_delegate(args): parser_load_cli_owners.add_argument("keys", type=str, nargs="+") parser_load_cli_owners.set_defaults(func=load_cli_owners) + parser_load_ledger_cli_owners = subparsers.add_parser("load_ledger_cli_owners") + parser_load_ledger_cli_owners.set_defaults(func=load_ledger_cli_owners) + parser_unload_cli_owners = subparsers.add_parser("unload_cli_owners") parser_unload_cli_owners.add_argument( "addresses", type=check_ethereum_address, nargs="+" diff --git a/safe_cli/safe_completer_constants.py b/safe_cli/safe_completer_constants.py index 7294db66..18f7d44c 100644 --- a/safe_cli/safe_completer_constants.py +++ b/safe_cli/safe_completer_constants.py @@ -24,6 +24,7 @@ "history": "(read-only)", "info": "(read-only)", "load_cli_owners": " [...]", + "load_ledger_cli_owners": "", "load_cli_owners_from_words": " ... ", "refresh": "", "remove_delegate": "
", @@ -155,6 +156,10 @@ "Command load_cli_owners will try to load a new owner via " "<account-private-key>." ), + "load_ledger_cli_owners": HTML( + "Command load_cli_owners will try to load a new owner via " + "<account-private-key>." + ), "load_cli_owners_from_words": HTML( "Command load_cli_owners_from_words will try to load owners via" "seed_words. Only relevant accounts(owners) will be loaded" From 5e08783df32e6084c28b96bde9ca09917b35c7a2 Mon Sep 17 00:00:00 2001 From: moisses89 Date: Sat, 11 Feb 2023 22:10:17 +0100 Subject: [PATCH 03/19] Add sign with ledger --- .../operators/hw_accounts/ledger_account.py | 2 +- safe_cli/operators/safe_operator.py | 40 +++++++++++++++---- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/safe_cli/operators/hw_accounts/ledger_account.py b/safe_cli/operators/hw_accounts/ledger_account.py index 3e7165fc..d75f37e1 100644 --- a/safe_cli/operators/hw_accounts/ledger_account.py +++ b/safe_cli/operators/hw_accounts/ledger_account.py @@ -52,7 +52,7 @@ def encrypt(self, password, kdf=None, iterations=None): # TODO with ledger pass - def signHash(self, message_hash, domain_hash: bytes): + def signHash(self, domain_hash: bytes, message_hash: bytes): signed = sign_typed_data_draft(domain_hash, message_hash, dongle=self.dongle) return (signed.v, signed.r, signed.s) diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index 42f481e2..3d5214ee 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -32,9 +32,11 @@ get_safe_contract, get_safe_V1_1_1_contract, ) +from gnosis.eth.eip712 import eip712_encode from gnosis.safe import InvalidInternalTx, Safe, SafeOperation, SafeTx from gnosis.safe.api import TransactionServiceApi from gnosis.safe.multi_send import MultiSend, MultiSendOperation, MultiSendTx +from gnosis.safe.signatures import signature_to_bytes from safe_cli.ethereum_hd_wallet import get_account_from_words from safe_cli.safe_addresses import ( @@ -333,7 +335,9 @@ def load_ledger_cli_owners(self): try: dongle = init_dongle() except LedgerNotFound: - print_formatted_text(HTML(f"Unable to find Ledger device")) + print_formatted_text( + HTML("Unable to find Ledger device") + ) return # Search between 10 first accounts for index in range(10): @@ -347,13 +351,10 @@ def load_ledger_cli_owners(self): HTML( f"Loaded account {account.address} " f'with balance={Web3.fromWei(balance, "ether")} ether' + f"Ledger account cannot be defined as sender" ) ) - if not self.default_sender and balance > 0: - print_formatted_text( - HTML(f"Set account {account.address} as default sender of txs") - ) - self.default_sender = sender + # TODO add ledger as sender break def unload_cli_owners(self, owners: List[str]): @@ -888,6 +889,28 @@ def batch_safe_txs( else: return safe_tx + def ledger_sign(self, safe_tx: SafeTx, account: LedgerAccount) -> bytes: + """ + {bytes32 r}{bytes32 s}{uint8 v} + :param private_key: + :return: Signature + """ + encode_hash = eip712_encode(safe_tx.eip712_structured_data) + v, r, s = account.signHash(encode_hash[1], encode_hash[2]) + signature = signature_to_bytes(v, r, s) + # Insert signature sorted + if account.address not in safe_tx.signers: + new_owners = safe_tx.signers + [account.address] + new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index( + account.address + ) + safe_tx.signatures = ( + safe_tx.signatures[: 65 * new_owner_pos] + + signature + + safe_tx.signatures[65 * new_owner_pos :] + ) + return safe_tx + # TODO Set sender so we can save gas in that signature def sign_transaction(self, safe_tx: SafeTx) -> SafeTx: permitted_signers = self.get_permitted_signers() @@ -906,7 +929,10 @@ def sign_transaction(self, safe_tx: SafeTx) -> SafeTx: raise NotEnoughSignatures(threshold) for selected_account in selected_accounts: - safe_tx.sign(selected_account.key) + if selected_account.key: + safe_tx.sign(selected_account.key) + else: + safe_tx = self.ledger_sign(safe_tx, selected_account) return safe_tx From bad9e9f9d342c5d6d56748e644aaf82ed84528ad Mon Sep 17 00:00:00 2001 From: moisses89 Date: Sat, 11 Feb 2023 22:24:56 +0100 Subject: [PATCH 04/19] Add ledger-eth-lib --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 9f7496c2..15a09308 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,6 @@ prompt_toolkit==3.0.39 pygments==2.16.1 requests==2.31.0 safe-eth-py==6.0.0b2 +ledgereth==0.7.3 tabulate==0.9.0 web3==6.10.0 From adc4fc4460f6daa1c26b6d5855fd8bd02625cd84 Mon Sep 17 00:00:00 2001 From: moisses89 Date: Sat, 11 Feb 2023 22:30:02 +0100 Subject: [PATCH 05/19] Fix lint issues --- safe_cli/operators/hw_accounts/ledger_account.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/safe_cli/operators/hw_accounts/ledger_account.py b/safe_cli/operators/hw_accounts/ledger_account.py index d75f37e1..36a97101 100644 --- a/safe_cli/operators/hw_accounts/ledger_account.py +++ b/safe_cli/operators/hw_accounts/ledger_account.py @@ -2,7 +2,7 @@ from eth_account.signers.base import BaseAccount from ledgerblue import Dongle -from ledgereth import sign_typed_data_draft, create_transaction +from ledgereth import create_transaction, sign_typed_data_draft from web3.types import TxParams @@ -84,7 +84,7 @@ def sign_transaction(self, tx: TxParams): nonce=tx["nonce"], chain_id=tx["chainId"], sender_path=self.path, - dongle=self.dongle + dongle=self.dongle, ) return signed From 86929594d93bac31ce4656269ff888f14ffde116 Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Fri, 21 Apr 2023 18:07:01 +0200 Subject: [PATCH 06/19] Add test_load_ledger_cli_owners --- tests/test_safe_operator.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_safe_operator.py b/tests/test_safe_operator.py index 2f60a57f..f89af671 100644 --- a/tests/test_safe_operator.py +++ b/tests/test_safe_operator.py @@ -3,6 +3,7 @@ from unittest.mock import MagicMock from eth_account import Account +from ledgereth.objects import LedgerAccount from web3 import Web3 from gnosis.eth import EthereumClient @@ -67,6 +68,31 @@ def test_load_cli_owner(self, get_contract_mock: MagicMock): self.assertEqual(len(safe_operator.accounts), number_of_accounts - 1) self.assertFalse(safe_operator.default_sender) + @mock.patch("safe_cli.operators.safe_operator.get_account_by_path") + def test_load_ledger_cli_owner(self, mock_get_account_by_path: MagicMock): + owner_address = Account.create().address + safe_address = self.deploy_test_safe(owners=[owner_address]).safe_address + safe_operator = SafeOperator(safe_address, self.ethereum_node_url) + safe_operator.load_ledger_cli_owners() + self.assertEqual(len(safe_operator.accounts), 0) + + with mock.patch("safe_cli.operators.safe_operator.init_dongle"): + # Test ledger address is not an owner + random_account = Account.create().address + mock_get_account_by_path.return_value = LedgerAccount( + "44'/60'/0'/0/0", random_account + ) + safe_operator.load_ledger_cli_owners() + self.assertEqual(len(safe_operator.accounts), 0) + + # Test ledger address is an owner + mock_get_account_by_path.return_value = LedgerAccount( + "44'/60'/0'/0/0", owner_address + ) + safe_operator.load_ledger_cli_owners() + self.assertEqual(len(safe_operator.accounts), 1) + self.assertEqual(list(safe_operator.accounts)[0].address, owner_address) + def test_approve_hash(self): safe_address = self.deploy_test_safe( owners=[self.ethereum_test_account.address] From 0dc3df9fc40a8f2c9718d7ee6e6fc1bdd6f58207 Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Fri, 20 Oct 2023 11:47:04 +0200 Subject: [PATCH 07/19] Add sign_transaction --- requirements.txt | 4 +- .../operators/hw_accounts/ledger_account.py | 14 +++++- safe_cli/operators/safe_operator.py | 48 +++++++++++-------- setup.py | 2 +- tests/test_safe_operator.py | 2 +- 5 files changed, 45 insertions(+), 25 deletions(-) diff --git a/requirements.txt b/requirements.txt index 15a09308..dc4865ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,10 @@ art==6.1 colorama==0.4.6 +ledgereth==0.9.0 packaging>=23.1 prompt_toolkit==3.0.39 pygments==2.16.1 requests==2.31.0 -safe-eth-py==6.0.0b2 -ledgereth==0.7.3 +safe-eth-py==6.0.0b3 tabulate==0.9.0 web3==6.10.0 diff --git a/safe_cli/operators/hw_accounts/ledger_account.py b/safe_cli/operators/hw_accounts/ledger_account.py index 36a97101..a73a7329 100644 --- a/safe_cli/operators/hw_accounts/ledger_account.py +++ b/safe_cli/operators/hw_accounts/ledger_account.py @@ -1,8 +1,11 @@ import warnings +from eth_account.datastructures import SignedTransaction from eth_account.signers.base import BaseAccount +from hexbytes import HexBytes from ledgerblue import Dongle from ledgereth import create_transaction, sign_typed_data_draft +from web3 import Web3 from web3.types import TxParams @@ -73,7 +76,7 @@ def signTransaction(self, transaction_dict): ) pass - def sign_transaction(self, tx: TxParams): + def sign_transaction(self, tx: TxParams) -> SignedTransaction: signed = create_transaction( destination=tx["to"], amount=tx["value"], @@ -86,7 +89,14 @@ def sign_transaction(self, tx: TxParams): sender_path=self.path, dongle=self.dongle, ) - return signed + raw_transaction = signed.raw_transaction() + return SignedTransaction( + HexBytes(raw_transaction), + Web3.keccak(HexBytes(raw_transaction)), + signed.sender_r, + signed.sender_s, + signed.y_parity, + ) def __bytes__(self): return self.key diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index 3d5214ee..859f175a 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -11,7 +11,7 @@ from hexbytes import HexBytes from ledgereth import get_account_by_path from ledgereth.comms import init_dongle -from ledgereth.exceptions import LedgerNotFound +from ledgereth.exceptions import LedgerAppNotOpened, LedgerLocked, LedgerNotFound from packaging import version as semantic_version from prompt_toolkit import HTML, print_formatted_text from web3 import Web3 @@ -39,12 +39,12 @@ from gnosis.safe.signatures import signature_to_bytes from safe_cli.ethereum_hd_wallet import get_account_from_words +from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount from safe_cli.safe_addresses import ( get_default_fallback_handler_address, get_safe_contract_address, get_safe_l2_contract_address, ) -from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount from safe_cli.utils import get_erc_20_list, yes_or_no_question @@ -334,28 +334,38 @@ def load_cli_owners(self, keys: List[str]): def load_ledger_cli_owners(self): try: dongle = init_dongle() + # Search between 10 first accounts + for index in range(10): + path = f"44'/60'/{index}'/0/0" + account = get_account_by_path(path, dongle=dongle) + if account.address in self.safe_cli_info.owners: + sender = LedgerAccount(account.path, account.address, dongle) + self.accounts.add(sender) + balance = self.ethereum_client.get_balance(account.address) + print_formatted_text( + HTML( + f"Loaded account {account.address} " + f'with balance={Web3.from_wei(balance, "ether")} ether' + f"Ledger account cannot be defined as sender" + ) + ) + # TODO add ledger as sender + break except LedgerNotFound: print_formatted_text( HTML("Unable to find Ledger device") ) return - # Search between 10 first accounts - for index in range(10): - path = f"44'/60'/{index}'/0/0" - account = get_account_by_path(path, dongle=dongle) - if account.address in self.safe_cli_info.owners: - sender = LedgerAccount(account.path, account.address, dongle) - self.accounts.add(sender) - balance = self.ethereum_client.get_balance(account.address) - print_formatted_text( - HTML( - f"Loaded account {account.address} " - f'with balance={Web3.fromWei(balance, "ether")} ether' - f"Ledger account cannot be defined as sender" - ) - ) - # TODO add ledger as sender - break + except LedgerAppNotOpened: + print_formatted_text( + HTML("Ensure open ethereum app on your ledger") + ) + return + except LedgerLocked: + print_formatted_text( + HTML("Ensure open ethereum app on your ledger") + ) + return def unload_cli_owners(self, owners: List[str]): accounts_to_remove: Set[Account] = set() diff --git a/setup.py b/setup.py index 35d814aa..2e6d885d 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ "prompt_toolkit>=3", "pygments>=2", "requests>=2", - "safe-eth-py>=6", + "safe-eth-py==6.0.0b3", "tabulate>=0.8", ], packages=setuptools.find_packages(), diff --git a/tests/test_safe_operator.py b/tests/test_safe_operator.py index f89af671..fe7b185a 100644 --- a/tests/test_safe_operator.py +++ b/tests/test_safe_operator.py @@ -71,7 +71,7 @@ def test_load_cli_owner(self, get_contract_mock: MagicMock): @mock.patch("safe_cli.operators.safe_operator.get_account_by_path") def test_load_ledger_cli_owner(self, mock_get_account_by_path: MagicMock): owner_address = Account.create().address - safe_address = self.deploy_test_safe(owners=[owner_address]).safe_address + safe_address = self.deploy_test_safe(owners=[owner_address]).address safe_operator = SafeOperator(safe_address, self.ethereum_node_url) safe_operator.load_ledger_cli_owners() self.assertEqual(len(safe_operator.accounts), 0) From ca57e206557ff86d0314d351ea56f5c60d41964b Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Mon, 23 Oct 2023 10:23:08 +0200 Subject: [PATCH 08/19] Add ledger manager --- .../operators/hw_accounts/ledger_account.py | 66 +------- .../operators/hw_accounts/ledger_manager.py | 112 +++++++++++++ safe_cli/operators/safe_operator.py | 157 ++++++++++-------- safe_cli/utils.py | 14 ++ 4 files changed, 221 insertions(+), 128 deletions(-) create mode 100644 safe_cli/operators/hw_accounts/ledger_manager.py diff --git a/safe_cli/operators/hw_accounts/ledger_account.py b/safe_cli/operators/hw_accounts/ledger_account.py index a73a7329..260a3abc 100644 --- a/safe_cli/operators/hw_accounts/ledger_account.py +++ b/safe_cli/operators/hw_accounts/ledger_account.py @@ -1,7 +1,4 @@ -import warnings - from eth_account.datastructures import SignedTransaction -from eth_account.signers.base import BaseAccount from hexbytes import HexBytes from ledgerblue import Dongle from ledgereth import create_transaction, sign_typed_data_draft @@ -9,8 +6,8 @@ from web3.types import TxParams -class LedgerAccount(BaseAccount): - def __init__(self, path, address, dongle: Dongle): +class LedgerAccount: + def __init__(self, path, address): """ Initialize a new ledger account (no private key) @@ -19,64 +16,16 @@ def __init__(self, path, address, dongle: Dongle): """ self._address = address self.path = path - self.dongle = dongle @property def address(self): return self._address - @property - def privateKey(self): - """ - .. CAUTION:: Deprecated for :meth:`~eth_account.signers.local.LocalAccount.key`. - This attribute will be removed in v0.5 - """ - warnings.warn( - "privateKey is deprecated in favor of key", - category=DeprecationWarning, - ) - return None - - @property - def key(self): - """ - Get the private key. - """ - return None - - def encrypt(self, password, kdf=None, iterations=None): - """ - Generate a string with the encrypted key. - - This uses the same structure as in - :meth:`~eth_account.account.Account.encrypt`, but without a private key argument. - """ - # return self._publicapi.encrypt(self.key, password, kdf=kdf, iterations=iterations) - # TODO with ledger - pass - - def signHash(self, domain_hash: bytes, message_hash: bytes): - signed = sign_typed_data_draft(domain_hash, message_hash, dongle=self.dongle) + def signMessage(self, domain_hash: bytes, message_hash: bytes, dongle: Dongle): + signed = sign_typed_data_draft(domain_hash, message_hash, self.path, dongle) return (signed.v, signed.r, signed.s) - def sign_message(self, signable_message): - """ - Generate a string with the encrypted key. - - This uses the same structure as in - :meth:`~eth_account.account.Account.sign_message`, but without a private key argument. - """ - # TODO with ledger - pass - - def signTransaction(self, transaction_dict): - warnings.warn( - "signTransaction is deprecated in favor of sign_transaction", - category=DeprecationWarning, - ) - pass - - def sign_transaction(self, tx: TxParams) -> SignedTransaction: + def sign_transaction(self, tx: TxParams, dongle: Dongle) -> SignedTransaction: signed = create_transaction( destination=tx["to"], amount=tx["value"], @@ -87,7 +36,7 @@ def sign_transaction(self, tx: TxParams) -> SignedTransaction: nonce=tx["nonce"], chain_id=tx["chainId"], sender_path=self.path, - dongle=self.dongle, + dongle=dongle, ) raw_transaction = signed.raw_transaction() return SignedTransaction( @@ -97,6 +46,3 @@ def sign_transaction(self, tx: TxParams) -> SignedTransaction: signed.sender_s, signed.y_parity, ) - - def __bytes__(self): - return self.key diff --git a/safe_cli/operators/hw_accounts/ledger_manager.py b/safe_cli/operators/hw_accounts/ledger_manager.py new file mode 100644 index 00000000..5a0d2068 --- /dev/null +++ b/safe_cli/operators/hw_accounts/ledger_manager.py @@ -0,0 +1,112 @@ +from enum import Enum +from typing import List, Optional, Set, Tuple + +from eth_typing import ChecksumAddress +from ledgereth.accounts import get_account_by_path +from ledgereth.comms import init_dongle +from ledgereth.constants import DEFAULT_PATH_STRING +from ledgereth.exceptions import LedgerAppNotOpened, LedgerLocked, LedgerNotFound +from prompt_toolkit import HTML, print_formatted_text + +from gnosis.safe.signatures import signature_to_bytes + +from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount + + +class LedgerStatus(Enum): + DISCONNECTED = 0 + LOCKED = 1 # Connected but locked + APP_CLOSED = 2 # Connected, unlocked but app is closed + READY = 3 # Ready to communicate + + +class LedgerManager: + + LEDGER_SEARCH_DEEP = 10 + + def __init__(self): + self.dongle = None + self.accounts: Set[LedgerAccount] = set() + self.connected: bool + + def _print_error_message(self, message: str): + print_formatted_text(HTML(f"{message}")) + + def check_status(self, print_message: bool = False) -> LedgerStatus: + try: + self.dongle = init_dongle(self.dongle) + # Get default derivation to check following status + get_account_by_path(DEFAULT_PATH_STRING) + except LedgerNotFound: + if print_message: + self._print_error_message("Ledger is disconnected") + return LedgerStatus.DISCONNECTED + except LedgerLocked: + if print_message: + self._print_error_message("Ledger is locked") + return LedgerStatus.LOCKED + except LedgerAppNotOpened: + if print_message: + self._print_error_message("Ledger is disconnected") + return LedgerStatus.APP_CLOSED + + return LedgerStatus.READY + + @property + def connected(self) -> bool: + if self.check_status() != LedgerStatus.DISCONNECTED: + return True + return False + + def get_accounts( + self, legacy_account: Optional[bool] = False + ) -> List[Tuple[ChecksumAddress, str]]: + """ + :param legacy_account: + :return: a list of tuples with address and derivation path + """ + accounts = [] + if self.check_status(True) != LedgerStatus.READY: + return [] + for i in range(self.LEDGER_SEARCH_DEEP): + if legacy_account: + path_string = f"44'/60'/0'/{i}" + else: + path_string = f"44'/60'/{i}'/0/0" + try: + account = get_account_by_path(path_string, self.dongle) + except LedgerLocked as ledger_error: + print(f"Ledger exception: {ledger_error}") + accounts.append((account.address, account.path)) + return accounts + + def add_account(self, derivation_path: str) -> bool: + """ + Add account to ledger manager list + + :param derivation_path: + :return: + """ + if self.check_status(True) != LedgerStatus.READY: + return False + account = get_account_by_path(derivation_path, self.dongle) + self.accounts.add(LedgerAccount(account.path, account.address)) + return True + + def sign_eip712( + self, domain_hash: bytes, message_hash: bytes, account: LedgerAccount + ) -> bytes | None: + """ + Sign eip712 hashes + + :param domain_hash: + :param message_hash: + :param account: ledger account + :return: bytes signature + """ + if self.check_status(True) != LedgerStatus.READY: + return None + + v, r, s = account.signMessage(domain_hash, message_hash, self.dongle) + + return signature_to_bytes(v, r, s) diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index 859f175a..bb920a19 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -9,9 +9,6 @@ from eth_typing import ChecksumAddress from eth_utils import ValidationError from hexbytes import HexBytes -from ledgereth import get_account_by_path -from ledgereth.comms import init_dongle -from ledgereth.exceptions import LedgerAppNotOpened, LedgerLocked, LedgerNotFound from packaging import version as semantic_version from prompt_toolkit import HTML, print_formatted_text from web3 import Web3 @@ -36,16 +33,14 @@ from gnosis.safe import InvalidInternalTx, Safe, SafeOperation, SafeTx from gnosis.safe.api import TransactionServiceApi from gnosis.safe.multi_send import MultiSend, MultiSendOperation, MultiSendTx -from gnosis.safe.signatures import signature_to_bytes from safe_cli.ethereum_hd_wallet import get_account_from_words -from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount from safe_cli.safe_addresses import ( get_default_fallback_handler_address, get_safe_contract_address, get_safe_l2_contract_address, ) -from safe_cli.utils import get_erc_20_list, yes_or_no_question +from safe_cli.utils import choose_option_question, get_erc_20_list, yes_or_no_question @dataclasses.dataclass @@ -227,13 +222,19 @@ def __init__(self, address: ChecksumAddress, node_url: str): self.safe_contract_1_1_0 = get_safe_V1_1_1_contract( self.ethereum_client.w3, address=self.address ) - self.accounts: Set[LocalAccount | LedgerAccount] = set() - self.default_sender: Optional[LocalAccount] | LedgerAccount = None + self.accounts: Set[LocalAccount] = set() + self.default_sender: Optional[LocalAccount] = None self.executed_transactions: List[str] = [] self._safe_cli_info: Optional[SafeCliInfo] = None # Cache for SafeCliInfo self.require_all_signatures = ( True # Require all signatures to be present to send a tx ) + try: + from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager + + self.ledger_manager = LedgerManager() + except (ModuleNotFoundError, IOError): + self.ledger_manager = None @cached_property def last_default_fallback_handler_address(self) -> ChecksumAddress: @@ -332,40 +333,30 @@ def load_cli_owners(self, keys: List[str]): print_formatted_text(HTML(f"Cannot load key={key}")) def load_ledger_cli_owners(self): - try: - dongle = init_dongle() - # Search between 10 first accounts - for index in range(10): - path = f"44'/60'/{index}'/0/0" - account = get_account_by_path(path, dongle=dongle) - if account.address in self.safe_cli_info.owners: - sender = LedgerAccount(account.path, account.address, dongle) - self.accounts.add(sender) - balance = self.ethereum_client.get_balance(account.address) - print_formatted_text( - HTML( - f"Loaded account {account.address} " - f'with balance={Web3.from_wei(balance, "ether")} ether' - f"Ledger account cannot be defined as sender" - ) - ) - # TODO add ledger as sender - break - except LedgerNotFound: - print_formatted_text( - HTML("Unable to find Ledger device") - ) - return - except LedgerAppNotOpened: - print_formatted_text( - HTML("Ensure open ethereum app on your ledger") - ) - return - except LedgerLocked: + if not self.ledger_manager: + return None + + ledger_accounts = self.ledger_manager.get_accounts() + if ledger_accounts: + return None + + for option, ledger_account in enumerate(ledger_accounts): + address, _ = ledger_account + print_formatted_text(HTML(f"{option} - {address} ")) + + option = choose_option_question( + "Select the owner address", len(ledger_accounts) - 1 + ) + address, derivation_path = ledger_accounts[option] + if self.ledger_manager.add_account(derivation_path): + balance = self.ethereum_client.get_balance(address) print_formatted_text( - HTML("Ensure open ethereum app on your ledger") + HTML( + f"Loaded account {address} " + f'with balance={Web3.from_wei(balance, "ether")} ether' + f"Ledger account cannot be defined as sender" + ) ) - return def unload_cli_owners(self, owners: List[str]): accounts_to_remove: Set[Account] = set() @@ -385,10 +376,15 @@ def unload_cli_owners(self, owners: List[str]): print_formatted_text(HTML("No account was deleted")) def show_cli_owners(self): - if not self.accounts: + accounts = ( + self.accounts | self.ledger_manager.accounts + if self.ledger_manager + else self.accounts + ) + if not accounts: print_formatted_text(HTML("No accounts loaded")) else: - for account in self.accounts: + for account in accounts: print_formatted_text( HTML( f"Account {account.address} loaded" @@ -719,6 +715,28 @@ def print_info(self): ) ) + if not self.ledger_manager: + print_formatted_text( + HTML( + "Ledger=" + "ledgereth library is not installed" + ) + ) + elif self.ledger_manager.connected: + print_formatted_text( + HTML( + "Ledger=" + "Connected" + ) + ) + else: + print_formatted_text( + HTML( + "Ledger=" + "disconnected" + ) + ) + if not self.is_version_updated(): print_formatted_text( HTML( @@ -899,28 +917,6 @@ def batch_safe_txs( else: return safe_tx - def ledger_sign(self, safe_tx: SafeTx, account: LedgerAccount) -> bytes: - """ - {bytes32 r}{bytes32 s}{uint8 v} - :param private_key: - :return: Signature - """ - encode_hash = eip712_encode(safe_tx.eip712_structured_data) - v, r, s = account.signHash(encode_hash[1], encode_hash[2]) - signature = signature_to_bytes(v, r, s) - # Insert signature sorted - if account.address not in safe_tx.signers: - new_owners = safe_tx.signers + [account.address] - new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index( - account.address - ) - safe_tx.signatures = ( - safe_tx.signatures[: 65 * new_owner_pos] - + signature - + safe_tx.signatures[65 * new_owner_pos :] - ) - return safe_tx - # TODO Set sender so we can save gas in that signature def sign_transaction(self, safe_tx: SafeTx) -> SafeTx: permitted_signers = self.get_permitted_signers() @@ -934,16 +930,41 @@ def sign_transaction(self, safe_tx: SafeTx) -> SafeTx: threshold -= 1 if threshold == 0: break + # If still pending required signatures continue with ledger owners + selected_ledger_accounts = [] + if threshold > 0 and self.ledger_manager: + for ledger_account in self.ledger_manager.accounts: + if ledger_account.address in permitted_signers: + selected_ledger_accounts.append(ledger_account) + threshold -= 1 + if threshold == 0: + break if self.require_all_signatures and threshold > 0: raise NotEnoughSignatures(threshold) for selected_account in selected_accounts: - if selected_account.key: - safe_tx.sign(selected_account.key) - else: - safe_tx = self.ledger_sign(safe_tx, selected_account) + safe_tx.sign(selected_account.key) + # Sign with ledger if + for selected_ledger_account in selected_ledger_accounts: + encode_hash = eip712_encode(safe_tx.eip712_structured_data) + signature = self.ledger_manager.sign_eip712( + encode_hash[1], encode_hash[2], selected_ledger_account + ) + if signature: + # TODO refactor on safe_eth_py function insert_signature_sorted + # Insert signature sorted + if selected_ledger_account.address not in safe_tx.signers: + new_owners = safe_tx.signers + [selected_ledger_account.address] + new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index( + selected_ledger_account.address + ) + safe_tx.signatures = ( + safe_tx.signatures[: 65 * new_owner_pos] + + signature + + safe_tx.signatures[65 * new_owner_pos :] + ) return safe_tx @require_tx_service diff --git a/safe_cli/utils.py b/safe_cli/utils.py index b3e96f87..806ef3f9 100644 --- a/safe_cli/utils.py +++ b/safe_cli/utils.py @@ -1,3 +1,4 @@ +import argparse import os from gnosis.eth import EthereumClient @@ -41,3 +42,16 @@ def yes_or_no_question(question: str, default_no: bool = True) -> bool: return False else: return False if default_no else True + + +def choose_option_question( + question: str, number_options: int, default_option: int = 0 +) -> bool: + if "PYTEST_CURRENT_TEST" in os.environ: + return True # Ignore confirmations when running tests + choices = f" [0-{number_options}] default {default_option}:" + reply = str(input(question + choices)).lower().strip() or str(default_option) + option = int(reply) + if option not in range(0, number_options): + argparse.ArgumentTypeError(f"{option} is not between [0-{number_options}}}") + return option From c54110eca8b6397425b645c991e3cf2bb085de26 Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Wed, 25 Oct 2023 21:23:27 +0200 Subject: [PATCH 09/19] Add HwException * add tests --- .../operators/hw_accounts/hw_exceptions.py | 31 ++++ .../operators/hw_accounts/ledger_account.py | 48 ------ .../operators/hw_accounts/ledger_manager.py | 83 ++++------ safe_cli/operators/safe_operator.py | 8 +- safe_cli/prompt_parser.py | 5 + safe_cli/utils.py | 2 +- tests/test_ledger_manager.py | 147 ++++++++++++++++++ tests/test_safe_operator.py | 33 ++-- 8 files changed, 234 insertions(+), 123 deletions(-) create mode 100644 safe_cli/operators/hw_accounts/hw_exceptions.py delete mode 100644 safe_cli/operators/hw_accounts/ledger_account.py create mode 100644 tests/test_ledger_manager.py diff --git a/safe_cli/operators/hw_accounts/hw_exceptions.py b/safe_cli/operators/hw_accounts/hw_exceptions.py new file mode 100644 index 00000000..1f3832db --- /dev/null +++ b/safe_cli/operators/hw_accounts/hw_exceptions.py @@ -0,0 +1,31 @@ +import functools + +from ledgereth.exceptions import ( + LedgerAppNotOpened, + LedgerCancel, + LedgerLocked, + LedgerNotFound, +) + +from safe_cli.operators.safe_operator import HwDeviceException + + +def hw_account_exception(function): + @functools.wraps(function) + def wrapper(*args, **kwargs): + try: + return function(*args, **kwargs) + except LedgerNotFound as e: + raise HwDeviceException(e.message) + except LedgerLocked as e: + raise HwDeviceException(e.message) + except LedgerAppNotOpened as e: + raise HwDeviceException(e.message) + except LedgerCancel as e: + raise HwDeviceException(e.message) + except BaseException as e: + if "Error while writing" in e.args: + raise HwDeviceException("Ledger error writting, restart safe-cli") + raise e + + return wrapper diff --git a/safe_cli/operators/hw_accounts/ledger_account.py b/safe_cli/operators/hw_accounts/ledger_account.py deleted file mode 100644 index 260a3abc..00000000 --- a/safe_cli/operators/hw_accounts/ledger_account.py +++ /dev/null @@ -1,48 +0,0 @@ -from eth_account.datastructures import SignedTransaction -from hexbytes import HexBytes -from ledgerblue import Dongle -from ledgereth import create_transaction, sign_typed_data_draft -from web3 import Web3 -from web3.types import TxParams - - -class LedgerAccount: - def __init__(self, path, address): - """ - Initialize a new ledger account (no private key) - - :param path: path derivation - :param ~eth_account.account.Account account: the key-unaware management API - """ - self._address = address - self.path = path - - @property - def address(self): - return self._address - - def signMessage(self, domain_hash: bytes, message_hash: bytes, dongle: Dongle): - signed = sign_typed_data_draft(domain_hash, message_hash, self.path, dongle) - return (signed.v, signed.r, signed.s) - - def sign_transaction(self, tx: TxParams, dongle: Dongle) -> SignedTransaction: - signed = create_transaction( - destination=tx["to"], - amount=tx["value"], - gas=tx["gas"], - max_priority_fee_per_gas=tx["maxPriorityFeePerGas"], - max_fee_per_gas=tx["maxFeePerGas"], - data=tx["data"], - nonce=tx["nonce"], - chain_id=tx["chainId"], - sender_path=self.path, - dongle=dongle, - ) - raw_transaction = signed.raw_transaction() - return SignedTransaction( - HexBytes(raw_transaction), - Web3.keccak(HexBytes(raw_transaction)), - signed.sender_r, - signed.sender_s, - signed.y_parity, - ) diff --git a/safe_cli/operators/hw_accounts/ledger_manager.py b/safe_cli/operators/hw_accounts/ledger_manager.py index 5a0d2068..e7668a4b 100644 --- a/safe_cli/operators/hw_accounts/ledger_manager.py +++ b/safe_cli/operators/hw_accounts/ledger_manager.py @@ -1,63 +1,39 @@ -from enum import Enum from typing import List, Optional, Set, Tuple from eth_typing import ChecksumAddress -from ledgereth.accounts import get_account_by_path +from ledgereth import sign_typed_data_draft +from ledgereth.accounts import LedgerAccount, get_account_by_path from ledgereth.comms import init_dongle -from ledgereth.constants import DEFAULT_PATH_STRING -from ledgereth.exceptions import LedgerAppNotOpened, LedgerLocked, LedgerNotFound +from ledgereth.exceptions import LedgerNotFound from prompt_toolkit import HTML, print_formatted_text from gnosis.safe.signatures import signature_to_bytes -from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount - - -class LedgerStatus(Enum): - DISCONNECTED = 0 - LOCKED = 1 # Connected but locked - APP_CLOSED = 2 # Connected, unlocked but app is closed - READY = 3 # Ready to communicate +from safe_cli.operators.hw_accounts.hw_exceptions import hw_account_exception class LedgerManager: - LEDGER_SEARCH_DEEP = 10 + LEDGER_SEARCH_DEEP = 5 def __init__(self): self.dongle = None self.accounts: Set[LedgerAccount] = set() - self.connected: bool - - def _print_error_message(self, message: str): - print_formatted_text(HTML(f"{message}")) + self.connect() - def check_status(self, print_message: bool = False) -> LedgerStatus: + def connect(self) -> bool: try: self.dongle = init_dongle(self.dongle) - # Get default derivation to check following status - get_account_by_path(DEFAULT_PATH_STRING) + return True except LedgerNotFound: - if print_message: - self._print_error_message("Ledger is disconnected") - return LedgerStatus.DISCONNECTED - except LedgerLocked: - if print_message: - self._print_error_message("Ledger is locked") - return LedgerStatus.LOCKED - except LedgerAppNotOpened: - if print_message: - self._print_error_message("Ledger is disconnected") - return LedgerStatus.APP_CLOSED - - return LedgerStatus.READY + return False @property + @hw_account_exception def connected(self) -> bool: - if self.check_status() != LedgerStatus.DISCONNECTED: - return True - return False + return self.connect() + @hw_account_exception def get_accounts( self, legacy_account: Optional[bool] = False ) -> List[Tuple[ChecksumAddress, str]]: @@ -66,36 +42,31 @@ def get_accounts( :return: a list of tuples with address and derivation path """ accounts = [] - if self.check_status(True) != LedgerStatus.READY: - return [] for i in range(self.LEDGER_SEARCH_DEEP): if legacy_account: path_string = f"44'/60'/0'/{i}" else: path_string = f"44'/60'/{i}'/0/0" - try: - account = get_account_by_path(path_string, self.dongle) - except LedgerLocked as ledger_error: - print(f"Ledger exception: {ledger_error}") + + account = get_account_by_path(path_string, self.dongle) accounts.append((account.address, account.path)) return accounts - def add_account(self, derivation_path: str) -> bool: + @hw_account_exception + def add_account(self, derivation_path: str): """ Add account to ledger manager list :param derivation_path: :return: """ - if self.check_status(True) != LedgerStatus.READY: - return False account = get_account_by_path(derivation_path, self.dongle) self.accounts.add(LedgerAccount(account.path, account.address)) - return True + @hw_account_exception def sign_eip712( self, domain_hash: bytes, message_hash: bytes, account: LedgerAccount - ) -> bytes | None: + ) -> bytes: """ Sign eip712 hashes @@ -104,9 +75,15 @@ def sign_eip712( :param account: ledger account :return: bytes signature """ - if self.check_status(True) != LedgerStatus.READY: - return None - - v, r, s = account.signMessage(domain_hash, message_hash, self.dongle) - - return signature_to_bytes(v, r, s) + print_formatted_text( + HTML( + "Ensure to compare in your ledger before to sign that domain_hash and message_hash are both correct" + ) + ) + print_formatted_text(HTML(f"Domain_hash: {domain_hash.hex()}")) + print_formatted_text(HTML(f"Message_hash: {message_hash.hex()}")) + signed = sign_typed_data_draft( + domain_hash, message_hash, account.path, self.dongle + ) + + return signature_to_bytes(signed.v, signed.r, signed.s) diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index bb920a19..2abb57ea 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -148,6 +148,10 @@ class SafeServiceNotAvailable(SafeOperatorException): pass +class HwDeviceException(SafeOperatorException): + pass + + def require_tx_service(f): @wraps(f) def decorated(self, *args, **kwargs): @@ -337,7 +341,7 @@ def load_ledger_cli_owners(self): return None ledger_accounts = self.ledger_manager.get_accounts() - if ledger_accounts: + if len(ledger_accounts) == 0: return None for option, ledger_account in enumerate(ledger_accounts): @@ -718,7 +722,7 @@ def print_info(self): if not self.ledger_manager: print_formatted_text( HTML( - "Ledger=" + "Ledger=" "ledgereth library is not installed" ) ) diff --git a/safe_cli/prompt_parser.py b/safe_cli/prompt_parser.py index 9f864fcd..ec8d08c4 100644 --- a/safe_cli/prompt_parser.py +++ b/safe_cli/prompt_parser.py @@ -15,6 +15,7 @@ ExistingOwnerException, FallbackHandlerNotSupportedException, HashAlreadyApproved, + HwDeviceException, InvalidMasterCopyException, NonExistingOwnerException, NotEnoughEtherToSend, @@ -116,6 +117,10 @@ def wrapper(*args, **kwargs): f"Service not available for network {e.args[0]}" ) ) + except HwDeviceException as e: + print_formatted_text( + HTML(f"HwDevice exception: {e.args[0]}") + ) return wrapper diff --git a/safe_cli/utils.py b/safe_cli/utils.py index 806ef3f9..41c6bba0 100644 --- a/safe_cli/utils.py +++ b/safe_cli/utils.py @@ -48,7 +48,7 @@ def choose_option_question( question: str, number_options: int, default_option: int = 0 ) -> bool: if "PYTEST_CURRENT_TEST" in os.environ: - return True # Ignore confirmations when running tests + return 0 # Ignore confirmations when running tests choices = f" [0-{number_options}] default {default_option}:" reply = str(input(question + choices)).lower().strip() or str(default_option) option = int(reply) diff --git a/tests/test_ledger_manager.py b/tests/test_ledger_manager.py new file mode 100644 index 00000000..8dad7b11 --- /dev/null +++ b/tests/test_ledger_manager.py @@ -0,0 +1,147 @@ +import unittest +from unittest import mock +from unittest.mock import MagicMock + +from eth_account import Account +from ledgerblue.Dongle import Dongle +from ledgereth.accounts import LedgerAccount +from ledgereth.exceptions import LedgerLocked, LedgerNotFound + +from gnosis.eth.eip712 import eip712_encode +from gnosis.safe import SafeTx +from gnosis.safe.signatures import signature_split +from gnosis.safe.tests.safe_test_case import SafeTestCaseMixin + +from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager + + +class TestLedgerManager(SafeTestCaseMixin, unittest.TestCase): + def test_setup_ledger_manager(self): + ledger_manager = LedgerManager() + self.assertIsNone(ledger_manager.dongle) + self.assertEqual(len(ledger_manager.accounts), 0) + self.assertEqual(ledger_manager.connected, False) + + @mock.patch("safe_cli.operators.hw_accounts.ledger_manager.init_dongle") + @mock.patch("safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path") + def test_connected( + self, mock_get_account_by_path: MagicMock, mock_init_dongle: MagicMock + ): + ledger_manager = LedgerManager() + mock_init_dongle.side_effect = LedgerNotFound() + + self.assertEqual(ledger_manager.connected, False) + + mock_init_dongle.side_effect = None + mock_init_dongle.return_value = Dongle() + mock_get_account_by_path.side_effect = LedgerLocked() + + self.assertEqual(ledger_manager.connected, True) + + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.LedgerManager.LEDGER_SEARCH_DEEP", + 2, + ) + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path", + autospec=True, + ) + def test_get_accounts(self, mock_get_account_by_path: MagicMock): + ledger_manager = LedgerManager() + + addresses = [Account.create().address, Account.create().address] + derivation_paths = ["44'/60'/0'/0", "44'/60'/0'/1"] + mock_get_account_by_path.side_effect = [ + LedgerAccount(derivation_paths[0], addresses[0]), + LedgerAccount(derivation_paths[1], addresses[1]), + ] + ledger_accounts = ledger_manager.get_accounts() + self.assertEqual(len(ledger_accounts), 2) + for ledger_account, expected_address, expected_derivation_path in zip( + ledger_accounts, addresses, derivation_paths + ): + ledger_address, ledger_path = ledger_account + self.assertEqual(expected_address, ledger_address) + self.assertEqual(expected_derivation_path, ledger_path) + + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path", + autospec=True, + ) + def test_add_account(self, mock_get_account_by_path: MagicMock): + ledger_manager = LedgerManager() + derivation_path = "44'/60'/0'/0" + account_address = Account.create().address + mock_get_account_by_path.return_value = LedgerAccount( + derivation_path, account_address + ) + self.assertEqual(len(ledger_manager.accounts), 0) + + ledger_manager.add_account(derivation_path) + + self.assertEqual(len(ledger_manager.accounts), 1) + ledger_account = list(ledger_manager.accounts)[0] + self.assertEqual(ledger_account.address, account_address) + self.assertEqual(ledger_account.path, derivation_path) + + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.init_dongle", + autospec=True, + return_value=Dongle(), + ) + def test_sign_eip712(self, mock_init_dongle: MagicMock): + ledger_manager = LedgerManager() + owner = Account.create() + to = Account.create() + ledger_account = LedgerAccount("44'/60'/0'/0", owner.address) + safe = self.deploy_test_safe( + owners=[owner.address], + threshold=1, + initial_funding_wei=self.w3.to_wei(0.1, "ether"), + ) + safe_tx = SafeTx( + self.ethereum_client, + safe.address, + to.address, + 10, + b"", + 0, + 200000, + 200000, + self.gas_price, + None, + None, + safe_nonce=0, + ) + encode_hash = eip712_encode(safe_tx.eip712_structured_data) + # We need to split to change the bytes signature order to v + r + s like ledger return signature + expected_v, expected_r, expected_s = signature_split(safe_tx.sign(owner.key)) + + signature = ( + expected_v.to_bytes(1, byteorder="big") + + expected_r.to_bytes(32, byteorder="big") + + expected_s.to_bytes(32, byteorder="big") + ) + mock_init_dongle.return_value.exchange = MagicMock(return_value=signature) + v, r, s = signature_split( + ledger_manager.sign_eip712(encode_hash[1], encode_hash[2], ledger_account) + ) + self.assertEqual(expected_v, v) + self.assertEqual(expected_r, r) + self.assertEqual(expected_s, s) + + # Check that dongle exchange is called with the expected payload + # https://github.com/LedgerHQ/app-ethereum/blob/master/doc/ethapp.adoc#sign-eth-eip-712 + command = "e00c0000" + "51" # command + payload length + payload = ( + "04" + "8000002c8000003c8000000000000000" + ) # number of derivations + 44'/60'/0'/0 + expected_exchange_payload = ( + bytes.fromhex(command) + + bytes.fromhex(payload) + + encode_hash[1] + + encode_hash[2] + ) + mock_init_dongle.return_value.exchange.assert_called_once_with( + expected_exchange_payload + ) diff --git a/tests/test_safe_operator.py b/tests/test_safe_operator.py index fe7b185a..2ef99214 100644 --- a/tests/test_safe_operator.py +++ b/tests/test_safe_operator.py @@ -68,30 +68,25 @@ def test_load_cli_owner(self, get_contract_mock: MagicMock): self.assertEqual(len(safe_operator.accounts), number_of_accounts - 1) self.assertFalse(safe_operator.default_sender) - @mock.patch("safe_cli.operators.safe_operator.get_account_by_path") + @mock.patch("safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path") def test_load_ledger_cli_owner(self, mock_get_account_by_path: MagicMock): owner_address = Account.create().address safe_address = self.deploy_test_safe(owners=[owner_address]).address safe_operator = SafeOperator(safe_address, self.ethereum_node_url) + safe_operator.ledger_manager.get_accounts = MagicMock(return_value=[]) safe_operator.load_ledger_cli_owners() - self.assertEqual(len(safe_operator.accounts), 0) - - with mock.patch("safe_cli.operators.safe_operator.init_dongle"): - # Test ledger address is not an owner - random_account = Account.create().address - mock_get_account_by_path.return_value = LedgerAccount( - "44'/60'/0'/0/0", random_account - ) - safe_operator.load_ledger_cli_owners() - self.assertEqual(len(safe_operator.accounts), 0) - - # Test ledger address is an owner - mock_get_account_by_path.return_value = LedgerAccount( - "44'/60'/0'/0/0", owner_address - ) - safe_operator.load_ledger_cli_owners() - self.assertEqual(len(safe_operator.accounts), 1) - self.assertEqual(list(safe_operator.accounts)[0].address, owner_address) + self.assertEqual(len(safe_operator.ledger_manager.accounts), 0) + random_account = Account.create().address + other_random_account = Account.create().address + safe_operator.ledger_manager.get_accounts.return_value = [ + (random_account, "44'/60'/0'/0/0"), + (other_random_account, "44'/60'/0'/0/1"), + ] + mock_get_account_by_path.return_value = LedgerAccount( + "44'/60'/0'/0/0", random_account + ) + safe_operator.load_ledger_cli_owners() + self.assertEqual(len(safe_operator.ledger_manager.accounts), 1) def test_approve_hash(self): safe_address = self.deploy_test_safe( From e0ca8e84ff6989484a69e777fc8a6f949174fa4b Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Thu, 26 Oct 2023 14:19:30 +0200 Subject: [PATCH 10/19] Add delete_account Refactor ledger sign_eip712 to be used in other places --- .../operators/hw_accounts/ledger_manager.py | 84 ++++++++---- safe_cli/operators/safe_operator.py | 32 ++--- setup.py | 1 + tests/test_ledger_manager.py | 122 +++++++++++++++--- tests/test_safe_operator.py | 10 +- 5 files changed, 189 insertions(+), 60 deletions(-) diff --git a/safe_cli/operators/hw_accounts/ledger_manager.py b/safe_cli/operators/hw_accounts/ledger_manager.py index e7668a4b..4087f8eb 100644 --- a/safe_cli/operators/hw_accounts/ledger_manager.py +++ b/safe_cli/operators/hw_accounts/ledger_manager.py @@ -7,21 +7,24 @@ from ledgereth.exceptions import LedgerNotFound from prompt_toolkit import HTML, print_formatted_text +from gnosis.eth.eip712 import eip712_encode +from gnosis.safe import SafeTx from gnosis.safe.signatures import signature_to_bytes from safe_cli.operators.hw_accounts.hw_exceptions import hw_account_exception class LedgerManager: - - LEDGER_SEARCH_DEEP = 5 - def __init__(self): self.dongle = None self.accounts: Set[LedgerAccount] = set() self.connect() def connect(self) -> bool: + """ + Connect with ledger + :return: True if connection was successful or False in other case + """ try: self.dongle = init_dongle(self.dongle) return True @@ -31,18 +34,24 @@ def connect(self) -> bool: @property @hw_account_exception def connected(self) -> bool: + """ + :return: True if ledger is connected or False in other case + """ return self.connect() @hw_account_exception def get_accounts( - self, legacy_account: Optional[bool] = False + self, legacy_account: Optional[bool] = False, number_accounts: Optional[int] = 5 ) -> List[Tuple[ChecksumAddress, str]]: """ + Request to ledger device the first n accounts + :param legacy_account: + :param number_accounts: number of accounts requested to ledger :return: a list of tuples with address and derivation path """ accounts = [] - for i in range(self.LEDGER_SEARCH_DEEP): + for i in range(number_accounts): if legacy_account: path_string = f"44'/60'/0'/{i}" else: @@ -55,7 +64,7 @@ def get_accounts( @hw_account_exception def add_account(self, derivation_path: str): """ - Add account to ledger manager list + Add an account to ledger manager set :param derivation_path: :return: @@ -63,27 +72,58 @@ def add_account(self, derivation_path: str): account = get_account_by_path(derivation_path, self.dongle) self.accounts.add(LedgerAccount(account.path, account.address)) + def delete_accounts(self, addresses: List[ChecksumAddress]) -> Set: + """ + Remove ledger accounts from address + + :param accounts: + :return: list with the delete accounts + """ + accounts_to_remove = set() + for address in addresses: + for account in self.accounts: + if account.address == address: + accounts_to_remove.add(account) + self.accounts = self.accounts.difference(accounts_to_remove) + return accounts_to_remove + @hw_account_exception - def sign_eip712( - self, domain_hash: bytes, message_hash: bytes, account: LedgerAccount - ) -> bytes: + def sign_eip712(self, safe_tx: SafeTx, accounts: List[LedgerAccount]) -> SafeTx: """ - Sign eip712 hashes + Call ledger ethereum app method to sign eip712 hashes with a ledger account :param domain_hash: :param message_hash: :param account: ledger account - :return: bytes signature + :return: bytes of signature """ - print_formatted_text( - HTML( - "Ensure to compare in your ledger before to sign that domain_hash and message_hash are both correct" + encode_hash = eip712_encode(safe_tx.eip712_structured_data) + domain_hash = encode_hash[1] + message_hash = encode_hash[2] + for account in accounts: + print_formatted_text( + HTML( + "Ensure to compare in your ledger before to sign that domain_hash and message_hash are both correct" + ) + ) + print_formatted_text(HTML(f"Domain_hash: {domain_hash.hex()}")) + print_formatted_text(HTML(f"Message_hash: {message_hash.hex()}")) + signed = sign_typed_data_draft( + domain_hash, message_hash, account.path, self.dongle ) - ) - print_formatted_text(HTML(f"Domain_hash: {domain_hash.hex()}")) - print_formatted_text(HTML(f"Message_hash: {message_hash.hex()}")) - signed = sign_typed_data_draft( - domain_hash, message_hash, account.path, self.dongle - ) - - return signature_to_bytes(signed.v, signed.r, signed.s) + + signature = signature_to_bytes(signed.v, signed.r, signed.s) + # TODO should be refactored on safe_eth_py function insert_signature_sorted + # Insert signature sorted + if account.address not in safe_tx.signers: + new_owners = safe_tx.signers + [account.address] + new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index( + account.address + ) + safe_tx.signatures = ( + safe_tx.signatures[: 65 * new_owner_pos] + + signature + + safe_tx.signatures[65 * new_owner_pos :] + ) + + return safe_tx diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index 2abb57ea..664d04ff 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -29,7 +29,6 @@ get_safe_contract, get_safe_V1_1_1_contract, ) -from gnosis.eth.eip712 import eip712_encode from gnosis.safe import InvalidInternalTx, Safe, SafeOperation, SafeTx from gnosis.safe.api import TransactionServiceApi from gnosis.safe.multi_send import MultiSend, MultiSendOperation, MultiSendTx @@ -372,6 +371,12 @@ def unload_cli_owners(self, owners: List[str]): accounts_to_remove.add(account) break self.accounts = self.accounts.difference(accounts_to_remove) + # Check if there are ledger owners + if self.ledger_manager and len(accounts_to_remove) < len(owners): + accounts_to_remove = ( + accounts_to_remove | self.ledger_manager.delete_accounts(owners) + ) + if accounts_to_remove: print_formatted_text( HTML("Accounts have been deleted") @@ -723,7 +728,7 @@ def print_info(self): print_formatted_text( HTML( "Ledger=" - "ledgereth library is not installed" + "Disabled Optional ledger library is not installed, run pip install safe-cli[ledger] " ) ) elif self.ledger_manager.connected: @@ -950,25 +955,10 @@ def sign_transaction(self, safe_tx: SafeTx) -> SafeTx: for selected_account in selected_accounts: safe_tx.sign(selected_account.key) - # Sign with ledger if - for selected_ledger_account in selected_ledger_accounts: - encode_hash = eip712_encode(safe_tx.eip712_structured_data) - signature = self.ledger_manager.sign_eip712( - encode_hash[1], encode_hash[2], selected_ledger_account - ) - if signature: - # TODO refactor on safe_eth_py function insert_signature_sorted - # Insert signature sorted - if selected_ledger_account.address not in safe_tx.signers: - new_owners = safe_tx.signers + [selected_ledger_account.address] - new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index( - selected_ledger_account.address - ) - safe_tx.signatures = ( - safe_tx.signatures[: 65 * new_owner_pos] - + signature - + safe_tx.signatures[65 * new_owner_pos :] - ) + # Sign with ledger + if len(selected_ledger_accounts) > 0: + safe_tx = self.ledger_manager.sign_eip712(safe_tx, selected_ledger_accounts) + return safe_tx @require_tx_service diff --git a/setup.py b/setup.py index 2e6d885d..5f67b776 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ "safe-eth-py==6.0.0b3", "tabulate>=0.8", ], + extras_require={"ledger": ["ledgereth==0.9.0"]}, packages=setuptools.find_packages(), entry_points={ "console_scripts": [ diff --git a/tests/test_ledger_manager.py b/tests/test_ledger_manager.py index 8dad7b11..556bf8bf 100644 --- a/tests/test_ledger_manager.py +++ b/tests/test_ledger_manager.py @@ -5,7 +5,12 @@ from eth_account import Account from ledgerblue.Dongle import Dongle from ledgereth.accounts import LedgerAccount -from ledgereth.exceptions import LedgerLocked, LedgerNotFound +from ledgereth.exceptions import ( + LedgerAppNotOpened, + LedgerCancel, + LedgerLocked, + LedgerNotFound, +) from gnosis.eth.eip712 import eip712_encode from gnosis.safe import SafeTx @@ -13,6 +18,7 @@ from gnosis.safe.tests.safe_test_case import SafeTestCaseMixin from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager +from safe_cli.operators.safe_operator import HwDeviceException class TestLedgerManager(SafeTestCaseMixin, unittest.TestCase): @@ -39,23 +45,87 @@ def test_connected( self.assertEqual(ledger_manager.connected, True) @mock.patch( - "safe_cli.operators.hw_accounts.ledger_manager.LedgerManager.LEDGER_SEARCH_DEEP", - 2, + "safe_cli.operators.hw_accounts.ledger_manager.sign_typed_data_draft", + autospec=True, ) @mock.patch( "safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path", autospec=True, ) - def test_get_accounts(self, mock_get_account_by_path: MagicMock): + def test_hw_device_exception(self, mock_ledger_fn: MagicMock, mock_sign: MagicMock): ledger_manager = LedgerManager() + derivation_path = "44'/60'/0'/0" + ledger_account = LedgerAccount(derivation_path, Account.create().address) + safe = self.deploy_test_safe( + owners=[Account.create().address], + threshold=1, + initial_funding_wei=self.w3.to_wei(0.1, "ether"), + ) + safe_tx = SafeTx( + self.ethereum_client, + safe.address, + Account.create().address, + 10, + b"", + 0, + 200000, + 200000, + self.gas_price, + None, + None, + safe_nonce=0, + ) + + mock_ledger_fn.side_effect = LedgerNotFound + mock_sign.side_effect = LedgerNotFound + with self.assertRaises(HwDeviceException): + ledger_manager.get_accounts() + with self.assertRaises(HwDeviceException): + ledger_manager.add_account(derivation_path) + with self.assertRaises(HwDeviceException): + ledger_manager.sign_eip712(safe_tx, [ledger_account]) + + mock_ledger_fn.side_effect = LedgerLocked + mock_sign.side_effect = LedgerLocked + with self.assertRaises(HwDeviceException): + ledger_manager.get_accounts() + with self.assertRaises(HwDeviceException): + ledger_manager.add_account(derivation_path) + with self.assertRaises(HwDeviceException): + ledger_manager.sign_eip712(safe_tx, [ledger_account]) + + mock_ledger_fn.side_effect = LedgerAppNotOpened + mock_sign.side_effect = LedgerAppNotOpened + with self.assertRaises(HwDeviceException): + ledger_manager.get_accounts() + with self.assertRaises(HwDeviceException): + ledger_manager.add_account(derivation_path) + with self.assertRaises(HwDeviceException): + ledger_manager.sign_eip712(safe_tx, [ledger_account]) + + mock_ledger_fn.side_effect = LedgerCancel + mock_sign.side_effect = LedgerCancel + with self.assertRaises(HwDeviceException): + ledger_manager.get_accounts() + with self.assertRaises(HwDeviceException): + ledger_manager.add_account(derivation_path) + with self.assertRaises(HwDeviceException): + ledger_manager.sign_eip712(safe_tx, [ledger_account]) + + @mock.patch( + "safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path", + autospec=True, + ) + def test_get_accounts(self, mock_get_account_by_path: MagicMock): + ledger_manager = LedgerManager() addresses = [Account.create().address, Account.create().address] derivation_paths = ["44'/60'/0'/0", "44'/60'/0'/1"] mock_get_account_by_path.side_effect = [ LedgerAccount(derivation_paths[0], addresses[0]), LedgerAccount(derivation_paths[1], addresses[1]), ] - ledger_accounts = ledger_manager.get_accounts() + ledger_accounts = ledger_manager.get_accounts(number_accounts=2) self.assertEqual(len(ledger_accounts), 2) for ledger_account, expected_address, expected_derivation_path in zip( ledger_accounts, addresses, derivation_paths @@ -84,6 +154,27 @@ def test_add_account(self, mock_get_account_by_path: MagicMock): self.assertEqual(ledger_account.address, account_address) self.assertEqual(ledger_account.path, derivation_path) + def test_delete_account(self): + ledger_manager = LedgerManager() + random_address = Account.create().address + random_address_2 = Account.create().address + self.assertEqual(len(ledger_manager.accounts), 0) + self.assertEqual(len(ledger_manager.delete_accounts([random_address])), 0) + ledger_manager.accounts.add(LedgerAccount("44'/60'/0'/0", random_address_2)) + self.assertEqual(len(ledger_manager.delete_accounts([random_address])), 0) + self.assertEqual(len(ledger_manager.accounts), 1) + self.assertEqual(len(ledger_manager.delete_accounts([])), 0) + ledger_manager.accounts.add(LedgerAccount("44'/60'/0'/1", random_address)) + self.assertEqual(len(ledger_manager.accounts), 2) + self.assertEqual(len(ledger_manager.delete_accounts([random_address])), 1) + self.assertEqual(len(ledger_manager.accounts), 1) + ledger_manager.accounts.add(LedgerAccount("44'/60'/0'/1", random_address)) + self.assertEqual(len(ledger_manager.accounts), 2) + self.assertEqual( + len(ledger_manager.delete_accounts([random_address, random_address_2])), 2 + ) + self.assertEqual(len(ledger_manager.accounts), 0) + @mock.patch( "safe_cli.operators.hw_accounts.ledger_manager.init_dongle", autospec=True, @@ -114,21 +205,20 @@ def test_sign_eip712(self, mock_init_dongle: MagicMock): safe_nonce=0, ) encode_hash = eip712_encode(safe_tx.eip712_structured_data) + expected_signature = safe_tx.sign(owner.key) # We need to split to change the bytes signature order to v + r + s like ledger return signature - expected_v, expected_r, expected_s = signature_split(safe_tx.sign(owner.key)) + v, r, s = signature_split(expected_signature) - signature = ( - expected_v.to_bytes(1, byteorder="big") - + expected_r.to_bytes(32, byteorder="big") - + expected_s.to_bytes(32, byteorder="big") + ledger_return_signature = ( + v.to_bytes(1, byteorder="big") + + r.to_bytes(32, byteorder="big") + + s.to_bytes(32, byteorder="big") ) - mock_init_dongle.return_value.exchange = MagicMock(return_value=signature) - v, r, s = signature_split( - ledger_manager.sign_eip712(encode_hash[1], encode_hash[2], ledger_account) + mock_init_dongle.return_value.exchange = MagicMock( + return_value=ledger_return_signature ) - self.assertEqual(expected_v, v) - self.assertEqual(expected_r, r) - self.assertEqual(expected_s, s) + safe_tx = ledger_manager.sign_eip712(safe_tx, [ledger_account]) + self.assertEqual(safe_tx.signatures, expected_signature) # Check that dongle exchange is called with the expected payload # https://github.com/LedgerHQ/app-ethereum/blob/master/doc/ethapp.adoc#sign-eth-eip-712 diff --git a/tests/test_safe_operator.py b/tests/test_safe_operator.py index 2ef99214..6862cd9a 100644 --- a/tests/test_safe_operator.py +++ b/tests/test_safe_operator.py @@ -64,9 +64,17 @@ def test_load_cli_owner(self, get_contract_mock: MagicMock): # Test unload cli owner safe_operator.default_sender = random_accounts[0] number_of_accounts = len(safe_operator.accounts) - safe_operator.unload_cli_owners(["aloha", random_accounts[0].address, "bye"]) + ledger_random_address = Account.create().address + safe_operator.ledger_manager.accounts.add( + LedgerAccount("44'/60'/0'/1", ledger_random_address) + ) + self.assertEqual(len(safe_operator.ledger_manager.accounts), 1) + safe_operator.unload_cli_owners( + ["aloha", random_accounts[0].address, "bye", ledger_random_address] + ) self.assertEqual(len(safe_operator.accounts), number_of_accounts - 1) self.assertFalse(safe_operator.default_sender) + self.assertEqual(len(safe_operator.ledger_manager.accounts), 0) @mock.patch("safe_cli.operators.hw_accounts.ledger_manager.get_account_by_path") def test_load_ledger_cli_owner(self, mock_get_account_by_path: MagicMock): From 0e7a96494525eef8a7c3479ccb236722fb15a85b Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Thu, 26 Oct 2023 15:08:30 +0200 Subject: [PATCH 11/19] Add submit signature with ledger --- .../operators/safe_tx_service_operator.py | 10 ++++++++++ tests/test_safe_tx_service_operator.py | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/safe_cli/operators/safe_tx_service_operator.py b/safe_cli/operators/safe_tx_service_operator.py index 7465251b..437b88db 100644 --- a/safe_cli/operators/safe_tx_service_operator.py +++ b/safe_cli/operators/safe_tx_service_operator.py @@ -102,6 +102,16 @@ def submit_signatures(self, safe_tx_hash: bytes) -> bool: for account in self.accounts: if account.address in owners: safe_tx.sign(account.key) + # Check if there is ledger signers + if self.ledger_manager: + selected_ledger_accounts = [] + for ledger_account in self.ledger_manager.accounts: + if ledger_account.address in owners: + selected_ledger_accounts.append(ledger_account) + if len(selected_ledger_accounts) > 0: + safe_tx = self.ledger_manager.sign_eip712( + safe_tx, selected_ledger_accounts + ) if safe_tx.signers: self.safe_tx_service.post_signatures(safe_tx_hash, safe_tx.signatures) diff --git a/tests/test_safe_tx_service_operator.py b/tests/test_safe_tx_service_operator.py index d04bebc8..c9d48914 100644 --- a/tests/test_safe_tx_service_operator.py +++ b/tests/test_safe_tx_service_operator.py @@ -4,6 +4,7 @@ from eth_account import Account from hexbytes import HexBytes +from ledgereth.objects import LedgerAccount from web3 import Web3 from gnosis.eth import EthereumClient @@ -134,6 +135,24 @@ def test_submit_signatures( get_safe_transaction_mock.return_value = GetMultisigTxRequestMock(executed=True) self.assertFalse(safe_operator.submit_signatures(safe_tx_hash)) + # Test ledger signers + with mock.patch.object( + SafeTx, "signers", return_value=["signer"], new_callable=mock.PropertyMock + ) as mock_safe_tx: + safe_operator.ledger_manager.sign_eip712 = MagicMock( + return_value=mock_safe_tx + ) + get_safe_transaction_mock.return_value = GetMultisigTxRequestMock( + executed=False + ) + safe_operator.ledger_manager.accounts.add( + LedgerAccount("44'/60'/0'/0", Account.create().address) + ) + get_permitted_signers_mock.return_value = { + list(safe_operator.ledger_manager.accounts)[0].address + } + self.assertTrue(safe_operator.submit_signatures(safe_tx_hash)) + @mock.patch.object(TransactionServiceApi, "post_transaction", return_value=True) @mock.patch.object( SafeTx, "safe_version", return_value="1.4.1", new_callable=mock.PropertyMock From 386d54d59428bc5f63a3153880fc893cc1e76b1d Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Thu, 26 Oct 2023 15:16:39 +0200 Subject: [PATCH 12/19] Add ledger module documentation --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index f456ac3d..9cdd0695 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,20 @@ the information about the Safe using: ``` > refresh ``` +## Ledger module +Ledger module is an optional feature of safe-cli to sign transactions with the help of [ledgereth](https://github.com/mikeshultz/ledger-eth-lib) library based on [ledgerblue](https://github.com/LedgerHQ/blue-loader-python). +To enable, safe-cli must be installed as follows: +``` +pip install safe-cli[ledger] +``` +When running on Linux, make sure the following rules have been added to `/etc/udev/rules.d/`: +```commandline +SUBSYSTEMS=="usb", ATTRS{idVendor}=="2c97", ATTRS{idProduct}=="0000", MODE="0660", TAG+="uaccess", TAG+="udev-acl" OWNER="" +SUBSYSTEMS=="usb", ATTRS{idVendor}=="2c97", ATTRS{idProduct}=="0001", MODE="0660", TAG+="uaccess", TAG+="udev-acl" OWNER="" +SUBSYSTEMS=="usb", ATTRS{idVendor}=="2c97", ATTRS{idProduct}=="0004", MODE="0660", TAG+="uaccess", TAG+="udev-acl" OWNER="" +``` +**NOTE**: before to sign anything ensure that the sign data proposed on ledger is the same than the safe-cli data. ## Creating a new Safe Use `safe-creator --owners --threshold --salt-nonce `. From cb2e5a95364c958839af9a608f66ed8ffb7c5072 Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Thu, 26 Oct 2023 15:23:19 +0200 Subject: [PATCH 13/19] Fix typo --- safe_cli/operators/safe_tx_service_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/safe_cli/operators/safe_tx_service_operator.py b/safe_cli/operators/safe_tx_service_operator.py index 437b88db..5581a4d3 100644 --- a/safe_cli/operators/safe_tx_service_operator.py +++ b/safe_cli/operators/safe_tx_service_operator.py @@ -102,7 +102,7 @@ def submit_signatures(self, safe_tx_hash: bytes) -> bool: for account in self.accounts: if account.address in owners: safe_tx.sign(account.key) - # Check if there is ledger signers + # Check if there are ledger signers if self.ledger_manager: selected_ledger_accounts = [] for ledger_account in self.ledger_manager.accounts: From 7755051e99bdde02a730617799de62dbffd0d690 Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Thu, 26 Oct 2023 15:43:58 +0200 Subject: [PATCH 14/19] Add load_leger_cli legacy-accounts option --- safe_cli/operators/safe_operator.py | 6 ++++-- safe_cli/prompt_parser.py | 7 ++++++- safe_cli/safe_completer_constants.py | 5 ++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index 664d04ff..35a37e3d 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -335,11 +335,13 @@ def load_cli_owners(self, keys: List[str]): except ValueError: print_formatted_text(HTML(f"Cannot load key={key}")) - def load_ledger_cli_owners(self): + def load_ledger_cli_owners(self, legacy_account: bool = False): if not self.ledger_manager: return None - ledger_accounts = self.ledger_manager.get_accounts() + ledger_accounts = self.ledger_manager.get_accounts( + legacy_account=legacy_account + ) if len(ledger_accounts) == 0: return None diff --git a/safe_cli/prompt_parser.py b/safe_cli/prompt_parser.py index ec8d08c4..f88c4495 100644 --- a/safe_cli/prompt_parser.py +++ b/safe_cli/prompt_parser.py @@ -159,7 +159,7 @@ def load_cli_owners(args): @safe_exception def load_ledger_cli_owners(args): - safe_operator.load_ledger_cli_owners() + safe_operator.load_ledger_cli_owners(args.legacy_accounts) @safe_exception def unload_cli_owners(args): @@ -302,6 +302,11 @@ def remove_delegate(args): parser_load_cli_owners.set_defaults(func=load_cli_owners) parser_load_ledger_cli_owners = subparsers.add_parser("load_ledger_cli_owners") + parser_load_ledger_cli_owners.add_argument( + "--legacy-accounts", + action="store_true", + help="Enable search legacy accounts", + ) parser_load_ledger_cli_owners.set_defaults(func=load_ledger_cli_owners) parser_unload_cli_owners = subparsers.add_parser("unload_cli_owners") diff --git a/safe_cli/safe_completer_constants.py b/safe_cli/safe_completer_constants.py index 18f7d44c..fc511e80 100644 --- a/safe_cli/safe_completer_constants.py +++ b/safe_cli/safe_completer_constants.py @@ -24,7 +24,7 @@ "history": "(read-only)", "info": "(read-only)", "load_cli_owners": " [...]", - "load_ledger_cli_owners": "", + "load_ledger_cli_owners": "[--legacy-accounts]", "load_cli_owners_from_words": " ... ", "refresh": "", "remove_delegate": "
", @@ -157,8 +157,7 @@ "<account-private-key>." ), "load_ledger_cli_owners": HTML( - "Command load_cli_owners will try to load a new owner via " - "<account-private-key>." + "Command load_ledger_cli_owners show a list of ledger addresses to choose between them " ), "load_cli_owners_from_words": HTML( "Command load_cli_owners_from_words will try to load owners via" From ecb0706a84804de814a11c66e4decec91e27596e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mois=C3=A9s=20Fern=C3=A1ndez?= <7888669+moisses89@users.noreply.github.com> Date: Mon, 30 Oct 2023 09:39:30 +0100 Subject: [PATCH 15/19] Update README.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Uxío --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9cdd0695..26ba7f6d 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,7 @@ SUBSYSTEMS=="usb", ATTRS{idVendor}=="2c97", ATTRS{idProduct}=="0000", MODE="0660 SUBSYSTEMS=="usb", ATTRS{idVendor}=="2c97", ATTRS{idProduct}=="0001", MODE="0660", TAG+="uaccess", TAG+="udev-acl" OWNER="" SUBSYSTEMS=="usb", ATTRS{idVendor}=="2c97", ATTRS{idProduct}=="0004", MODE="0660", TAG+="uaccess", TAG+="udev-acl" OWNER="" ``` -**NOTE**: before to sign anything ensure that the sign data proposed on ledger is the same than the safe-cli data. +**NOTE**: before signing anything ensure that the data showing on your ledger is the same as the safe-cli data. ## Creating a new Safe Use `safe-creator --owners --threshold --salt-nonce `. From 9ff417f8dc88452cd26d80e807ecde6a30c250a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mois=C3=A9s=20Fern=C3=A1ndez?= <7888669+moisses89@users.noreply.github.com> Date: Mon, 30 Oct 2023 09:39:42 +0100 Subject: [PATCH 16/19] Update requirements.txt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Uxío --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index dc4865ea..0a816c78 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,6 @@ packaging>=23.1 prompt_toolkit==3.0.39 pygments==2.16.1 requests==2.31.0 -safe-eth-py==6.0.0b3 +safe-eth-py==6.0.0b5 tabulate==0.9.0 web3==6.10.0 From 56b79fe1de2c0d329f77ec70c3ff6295729655f9 Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Mon, 30 Oct 2023 09:50:16 +0100 Subject: [PATCH 17/19] Refactor exceptions --- safe_cli/operators/__init__.py | 3 +- safe_cli/operators/exceptions.py | 86 ++++++++++++++ .../{hw_exceptions.py => exceptions.py} | 14 +-- .../operators/hw_accounts/ledger_manager.py | 10 +- safe_cli/operators/safe_operator.py | 108 ++++-------------- .../operators/safe_tx_service_operator.py | 9 +- safe_cli/prompt_parser.py | 10 +- tests/test_ledger_manager.py | 26 ++--- tests/test_safe_operator.py | 4 +- 9 files changed, 143 insertions(+), 127 deletions(-) create mode 100644 safe_cli/operators/exceptions.py rename safe_cli/operators/hw_accounts/{hw_exceptions.py => exceptions.py} (56%) diff --git a/safe_cli/operators/__init__.py b/safe_cli/operators/__init__.py index 64d6296f..da9edc1b 100644 --- a/safe_cli/operators/__init__.py +++ b/safe_cli/operators/__init__.py @@ -1,4 +1,5 @@ # flake8: noqa F401 from .enums import SafeOperatorMode -from .safe_operator import SafeOperator, SafeServiceNotAvailable +from .exceptions import SafeServiceNotAvailable +from .safe_operator import SafeOperator from .safe_tx_service_operator import SafeTxServiceOperator diff --git a/safe_cli/operators/exceptions.py b/safe_cli/operators/exceptions.py new file mode 100644 index 00000000..083bb318 --- /dev/null +++ b/safe_cli/operators/exceptions.py @@ -0,0 +1,86 @@ +class SafeOperatorException(Exception): + pass + + +class ExistingOwnerException(SafeOperatorException): + pass + + +class NonExistingOwnerException(SafeOperatorException): + pass + + +class HashAlreadyApproved(SafeOperatorException): + pass + + +class ThresholdLimitException(SafeOperatorException): + pass + + +class SameFallbackHandlerException(SafeOperatorException): + pass + + +class InvalidFallbackHandlerException(SafeOperatorException): + pass + + +class FallbackHandlerNotSupportedException(SafeOperatorException): + pass + + +class SameGuardException(SafeOperatorException): + pass + + +class InvalidGuardException(SafeOperatorException): + pass + + +class GuardNotSupportedException(SafeOperatorException): + pass + + +class SameMasterCopyException(SafeOperatorException): + pass + + +class SafeAlreadyUpdatedException(SafeOperatorException): + pass + + +class UpdateAddressesNotValid(SafeOperatorException): + pass + + +class SenderRequiredException(SafeOperatorException): + pass + + +class AccountNotLoadedException(SafeOperatorException): + pass + + +class NotEnoughSignatures(SafeOperatorException): + pass + + +class InvalidMasterCopyException(SafeOperatorException): + pass + + +class NotEnoughEtherToSend(SafeOperatorException): + pass + + +class NotEnoughTokenToSend(SafeOperatorException): + pass + + +class SafeServiceNotAvailable(SafeOperatorException): + pass + + +class HardwareWalletException(SafeOperatorException): + pass diff --git a/safe_cli/operators/hw_accounts/hw_exceptions.py b/safe_cli/operators/hw_accounts/exceptions.py similarity index 56% rename from safe_cli/operators/hw_accounts/hw_exceptions.py rename to safe_cli/operators/hw_accounts/exceptions.py index 1f3832db..f74947f0 100644 --- a/safe_cli/operators/hw_accounts/hw_exceptions.py +++ b/safe_cli/operators/hw_accounts/exceptions.py @@ -7,25 +7,25 @@ LedgerNotFound, ) -from safe_cli.operators.safe_operator import HwDeviceException +from safe_cli.operators.exceptions import HardwareWalletException -def hw_account_exception(function): +def raise_as_hw_account_exception(function): @functools.wraps(function) def wrapper(*args, **kwargs): try: return function(*args, **kwargs) except LedgerNotFound as e: - raise HwDeviceException(e.message) + raise HardwareWalletException(e.message) except LedgerLocked as e: - raise HwDeviceException(e.message) + raise HardwareWalletException(e.message) except LedgerAppNotOpened as e: - raise HwDeviceException(e.message) + raise HardwareWalletException(e.message) except LedgerCancel as e: - raise HwDeviceException(e.message) + raise HardwareWalletException(e.message) except BaseException as e: if "Error while writing" in e.args: - raise HwDeviceException("Ledger error writting, restart safe-cli") + raise HardwareWalletException("Ledger error writting, restart safe-cli") raise e return wrapper diff --git a/safe_cli/operators/hw_accounts/ledger_manager.py b/safe_cli/operators/hw_accounts/ledger_manager.py index 4087f8eb..a2c2f822 100644 --- a/safe_cli/operators/hw_accounts/ledger_manager.py +++ b/safe_cli/operators/hw_accounts/ledger_manager.py @@ -11,7 +11,7 @@ from gnosis.safe import SafeTx from gnosis.safe.signatures import signature_to_bytes -from safe_cli.operators.hw_accounts.hw_exceptions import hw_account_exception +from safe_cli.operators.hw_accounts.exceptions import raise_as_hw_account_exception class LedgerManager: @@ -32,14 +32,14 @@ def connect(self) -> bool: return False @property - @hw_account_exception + @raise_as_hw_account_exception def connected(self) -> bool: """ :return: True if ledger is connected or False in other case """ return self.connect() - @hw_account_exception + @raise_as_hw_account_exception def get_accounts( self, legacy_account: Optional[bool] = False, number_accounts: Optional[int] = 5 ) -> List[Tuple[ChecksumAddress, str]]: @@ -61,7 +61,7 @@ def get_accounts( accounts.append((account.address, account.path)) return accounts - @hw_account_exception + @raise_as_hw_account_exception def add_account(self, derivation_path: str): """ Add an account to ledger manager set @@ -87,7 +87,7 @@ def delete_accounts(self, addresses: List[ChecksumAddress]) -> Set: self.accounts = self.accounts.difference(accounts_to_remove) return accounts_to_remove - @hw_account_exception + @raise_as_hw_account_exception def sign_eip712(self, safe_tx: SafeTx, accounts: List[LedgerAccount]) -> SafeTx: """ Call ledger ethereum app method to sign eip712 hashes with a ledger account diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index 35a37e3d..d65f2c4e 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -34,6 +34,26 @@ from gnosis.safe.multi_send import MultiSend, MultiSendOperation, MultiSendTx from safe_cli.ethereum_hd_wallet import get_account_from_words +from safe_cli.operators.exceptions import ( + AccountNotLoadedException, + ExistingOwnerException, + FallbackHandlerNotSupportedException, + GuardNotSupportedException, + HashAlreadyApproved, + InvalidFallbackHandlerException, + InvalidGuardException, + InvalidMasterCopyException, + NonExistingOwnerException, + NotEnoughEtherToSend, + NotEnoughSignatures, + SafeAlreadyUpdatedException, + SameFallbackHandlerException, + SameGuardException, + SameMasterCopyException, + SenderRequiredException, + ThresholdLimitException, + UpdateAddressesNotValid, +) from safe_cli.safe_addresses import ( get_default_fallback_handler_address, get_safe_contract_address, @@ -63,94 +83,6 @@ def __str__(self): ) -class SafeOperatorException(Exception): - pass - - -class ExistingOwnerException(SafeOperatorException): - pass - - -class NonExistingOwnerException(SafeOperatorException): - pass - - -class HashAlreadyApproved(SafeOperatorException): - pass - - -class ThresholdLimitException(SafeOperatorException): - pass - - -class SameFallbackHandlerException(SafeOperatorException): - pass - - -class InvalidFallbackHandlerException(SafeOperatorException): - pass - - -class FallbackHandlerNotSupportedException(SafeOperatorException): - pass - - -class SameGuardException(SafeOperatorException): - pass - - -class InvalidGuardException(SafeOperatorException): - pass - - -class GuardNotSupportedException(SafeOperatorException): - pass - - -class SameMasterCopyException(SafeOperatorException): - pass - - -class SafeAlreadyUpdatedException(SafeOperatorException): - pass - - -class UpdateAddressesNotValid(SafeOperatorException): - pass - - -class SenderRequiredException(SafeOperatorException): - pass - - -class AccountNotLoadedException(SafeOperatorException): - pass - - -class NotEnoughSignatures(SafeOperatorException): - pass - - -class InvalidMasterCopyException(SafeOperatorException): - pass - - -class NotEnoughEtherToSend(SafeOperatorException): - pass - - -class NotEnoughTokenToSend(SafeOperatorException): - pass - - -class SafeServiceNotAvailable(SafeOperatorException): - pass - - -class HwDeviceException(SafeOperatorException): - pass - - def require_tx_service(f): @wraps(f) def decorated(self, *args, **kwargs): diff --git a/safe_cli/operators/safe_tx_service_operator.py b/safe_cli/operators/safe_tx_service_operator.py index 5581a4d3..f493f58d 100644 --- a/safe_cli/operators/safe_tx_service_operator.py +++ b/safe_cli/operators/safe_tx_service_operator.py @@ -13,12 +13,9 @@ from safe_cli.utils import yes_or_no_question -from .safe_operator import ( - AccountNotLoadedException, - NonExistingOwnerException, - SafeOperator, - SafeServiceNotAvailable, -) +from . import SafeServiceNotAvailable +from .exceptions import AccountNotLoadedException, NonExistingOwnerException +from .safe_operator import SafeOperator class SafeTxServiceOperator(SafeOperator): diff --git a/safe_cli/prompt_parser.py b/safe_cli/prompt_parser.py index f88c4495..a16190a6 100644 --- a/safe_cli/prompt_parser.py +++ b/safe_cli/prompt_parser.py @@ -10,25 +10,25 @@ check_hex_str, check_keccak256_hash, ) -from .operators.safe_operator import ( +from .operators import SafeServiceNotAvailable +from .operators.exceptions import ( AccountNotLoadedException, ExistingOwnerException, FallbackHandlerNotSupportedException, + HardwareWalletException, HashAlreadyApproved, - HwDeviceException, InvalidMasterCopyException, NonExistingOwnerException, NotEnoughEtherToSend, NotEnoughSignatures, NotEnoughTokenToSend, SafeAlreadyUpdatedException, - SafeOperator, - SafeServiceNotAvailable, SameFallbackHandlerException, SameMasterCopyException, SenderRequiredException, ThresholdLimitException, ) +from .operators.safe_operator import SafeOperator def safe_exception(function): @@ -117,7 +117,7 @@ def wrapper(*args, **kwargs): f"Service not available for network {e.args[0]}" ) ) - except HwDeviceException as e: + except HardwareWalletException as e: print_formatted_text( HTML(f"HwDevice exception: {e.args[0]}") ) diff --git a/tests/test_ledger_manager.py b/tests/test_ledger_manager.py index 556bf8bf..8b9a103d 100644 --- a/tests/test_ledger_manager.py +++ b/tests/test_ledger_manager.py @@ -17,8 +17,8 @@ from gnosis.safe.signatures import signature_split from gnosis.safe.tests.safe_test_case import SafeTestCaseMixin +from safe_cli.operators.exceptions import HardwareWalletException from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager -from safe_cli.operators.safe_operator import HwDeviceException class TestLedgerManager(SafeTestCaseMixin, unittest.TestCase): @@ -79,38 +79,38 @@ def test_hw_device_exception(self, mock_ledger_fn: MagicMock, mock_sign: MagicMo mock_ledger_fn.side_effect = LedgerNotFound mock_sign.side_effect = LedgerNotFound - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.get_accounts() - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.add_account(derivation_path) - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.sign_eip712(safe_tx, [ledger_account]) mock_ledger_fn.side_effect = LedgerLocked mock_sign.side_effect = LedgerLocked - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.get_accounts() - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.add_account(derivation_path) - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.sign_eip712(safe_tx, [ledger_account]) mock_ledger_fn.side_effect = LedgerAppNotOpened mock_sign.side_effect = LedgerAppNotOpened - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.get_accounts() - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.add_account(derivation_path) - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.sign_eip712(safe_tx, [ledger_account]) mock_ledger_fn.side_effect = LedgerCancel mock_sign.side_effect = LedgerCancel - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.get_accounts() - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.add_account(derivation_path) - with self.assertRaises(HwDeviceException): + with self.assertRaises(HardwareWalletException): ledger_manager.sign_eip712(safe_tx, [ledger_account]) @mock.patch( diff --git a/tests/test_safe_operator.py b/tests/test_safe_operator.py index 6862cd9a..b0b795e5 100644 --- a/tests/test_safe_operator.py +++ b/tests/test_safe_operator.py @@ -10,7 +10,7 @@ from gnosis.safe import Safe from gnosis.safe.multi_send import MultiSend -from safe_cli.operators.safe_operator import ( +from safe_cli.operators.exceptions import ( AccountNotLoadedException, ExistingOwnerException, FallbackHandlerNotSupportedException, @@ -22,12 +22,12 @@ NonExistingOwnerException, NotEnoughEtherToSend, NotEnoughSignatures, - SafeOperator, SameFallbackHandlerException, SameGuardException, SameMasterCopyException, SenderRequiredException, ) +from safe_cli.operators.safe_operator import SafeOperator from safe_cli.utils import get_erc_20_list from tests.utils import generate_transfers_erc20 From 01e9b223b2c56e5477a3fe125beabc593b853201 Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Mon, 30 Oct 2023 09:58:43 +0100 Subject: [PATCH 18/19] Add load_ledger_manager function --- safe_cli/operators/safe_operator.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index d65f2c4e..86ba6248 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -117,6 +117,19 @@ def decorated(self, *args, **kwargs): return decorated +def load_ledger_manager(): + """ + Load ledgerManager if dependencies are installed + :return: LedgerManager or None + """ + try: + from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager + + return LedgerManager() + except (ModuleNotFoundError, IOError): + return None + + class SafeOperator: address: ChecksumAddress node_url: str @@ -164,12 +177,7 @@ def __init__(self, address: ChecksumAddress, node_url: str): self.require_all_signatures = ( True # Require all signatures to be present to send a tx ) - try: - from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager - - self.ledger_manager = LedgerManager() - except (ModuleNotFoundError, IOError): - self.ledger_manager = None + self.ledger_manager = load_ledger_manager() @cached_property def last_default_fallback_handler_address(self) -> ChecksumAddress: From 91a7a9d76b1b7af54ee3b64109174db8d81086ff Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Mon, 30 Oct 2023 10:13:44 +0100 Subject: [PATCH 19/19] Add test_choose_option_question --- .../operators/hw_accounts/ledger_manager.py | 2 +- safe_cli/operators/safe_operator.py | 18 ++++++++------- safe_cli/utils.py | 23 ++++++++++++++----- tests/test_utils.py | 19 ++++++++++++++- 4 files changed, 46 insertions(+), 16 deletions(-) diff --git a/safe_cli/operators/hw_accounts/ledger_manager.py b/safe_cli/operators/hw_accounts/ledger_manager.py index a2c2f822..7eb4e446 100644 --- a/safe_cli/operators/hw_accounts/ledger_manager.py +++ b/safe_cli/operators/hw_accounts/ledger_manager.py @@ -103,7 +103,7 @@ def sign_eip712(self, safe_tx: SafeTx, accounts: List[LedgerAccount]) -> SafeTx: for account in accounts: print_formatted_text( HTML( - "Ensure to compare in your ledger before to sign that domain_hash and message_hash are both correct" + "Make sure in your ledger before signing that domain_hash and message_hash are both correct" ) ) print_formatted_text(HTML(f"Domain_hash: {domain_hash.hex()}")) diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index 86ba6248..0153a225 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -292,16 +292,18 @@ def load_ledger_cli_owners(self, legacy_account: bool = False): option = choose_option_question( "Select the owner address", len(ledger_accounts) - 1 ) + if option is None: + return None address, derivation_path = ledger_accounts[option] - if self.ledger_manager.add_account(derivation_path): - balance = self.ethereum_client.get_balance(address) - print_formatted_text( - HTML( - f"Loaded account {address} " - f'with balance={Web3.from_wei(balance, "ether")} ether' - f"Ledger account cannot be defined as sender" - ) + self.ledger_manager.add_account(derivation_path) + balance = self.ethereum_client.get_balance(address) + print_formatted_text( + HTML( + f"Loaded account {address} " + f'with balance={Web3.from_wei(balance, "ether")} ether' + f"Ledger account cannot be defined as sender" ) + ) def unload_cli_owners(self, owners: List[str]): accounts_to_remove: Set[Account] = set() diff --git a/safe_cli/utils.py b/safe_cli/utils.py index 41c6bba0..132861fa 100644 --- a/safe_cli/utils.py +++ b/safe_cli/utils.py @@ -1,5 +1,7 @@ -import argparse import os +from typing import Optional + +from prompt_toolkit import HTML, print_formatted_text from gnosis.eth import EthereumClient @@ -46,12 +48,21 @@ def yes_or_no_question(question: str, default_no: bool = True) -> bool: def choose_option_question( question: str, number_options: int, default_option: int = 0 -) -> bool: +) -> Optional[int]: if "PYTEST_CURRENT_TEST" in os.environ: - return 0 # Ignore confirmations when running tests + return default_option # Ignore confirmations when running tests choices = f" [0-{number_options}] default {default_option}:" - reply = str(input(question + choices)).lower().strip() or str(default_option) - option = int(reply) + reply = str(get_input(question + choices)).lower().strip() or str(default_option) + try: + option = int(reply) + except ValueError: + print_formatted_text(HTML(" Option must be an integer ")) + return None + if option not in range(0, number_options): - argparse.ArgumentTypeError(f"{option} is not between [0-{number_options}}}") + print_formatted_text( + HTML(f" {option} is not between [0-{number_options}}} ") + ) + return None + return option diff --git a/tests/test_utils.py b/tests/test_utils.py index 729c1ee9..06e22e04 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,7 +3,7 @@ from unittest import mock from unittest.mock import MagicMock -from safe_cli.utils import yes_or_no_question +from safe_cli.utils import choose_option_question, yes_or_no_question class TestUtils(unittest.TestCase): @@ -34,6 +34,23 @@ def test_yes_or_no_question(self, input_mock: MagicMock): os.environ["PYTEST_CURRENT_TEST"] = pytest_current_test + @mock.patch("safe_cli.utils.get_input") + def test_choose_option_question(self, input_mock: MagicMock): + pytest_current_test = os.environ.pop("PYTEST_CURRENT_TEST") + + input_mock.return_value = "" + self.assertEqual(choose_option_question("", 1), 0) + input_mock.return_value = "" + self.assertEqual(choose_option_question("", 5, 4), 4) + input_mock.return_value = "m" + self.assertIsNone(choose_option_question("", 1)) + input_mock.return_value = "10" + self.assertIsNone(choose_option_question("", 1)) + input_mock.return_value = "1" + self.assertEqual(choose_option_question("", 2), 1) + + os.environ["PYTEST_CURRENT_TEST"] = pytest_current_test + if __name__ == "__main__": unittest.main()