Skip to content

Commit

Permalink
Add ledger manager
Browse files Browse the repository at this point in the history
  • Loading branch information
moisses89 committed Oct 23, 2023
1 parent 869b410 commit 6d7963a
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 64 deletions.
75 changes: 75 additions & 0 deletions safe_cli/operators/hw_accounts/ledger_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from typing import List, Optional, Set, Tuple

from eth_typing import ChecksumAddress
from ledgereth.accounts import get_account_by_path
from ledgereth.comms import init_dongle
from ledgereth.exceptions import LedgerLocked, LedgerNotFound

from gnosis.eth.eip712 import eip712_encode
from gnosis.safe import SafeTx
from gnosis.safe.signatures import signature_to_bytes

from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount


class LedgerManager:

LEDGER_SEARCH_DEEP = 10

def __init__(self):
self.accounts: Set[LedgerAccount] = set()
try:
self.dongle = init_dongle()
except LedgerNotFound:

raise IOError

def get_accounts(
self, legacy_account: Optional[bool] = False
) -> List[Tuple[ChecksumAddress, str]]:
"""
:param legacy_account:
:return: a list of tuples with address and derivation path
"""
accounts = []
for i in range(self.LEDGER_SEARCH_DEEP):
if legacy_account:
path_string = f"44'/60'/0'/{i}"
else:
path_string = f"44'/60'/{i}'/0/0"
try:
account = get_account_by_path(path_string, self.dongle)
except LedgerLocked as ledger_error:
print(f"Ledger exception: {ledger_error}")
accounts.append((account.address, account.path))
return accounts

def add_account(self, derivation_path: str):
try:
account = get_account_by_path(derivation_path, self.dongle)
except LedgerLocked as ledger_error:
print(f"Ledger exception: {ledger_error}")
self.accounts.add(LedgerAccount(account.path, account.address, self.dongle))

@staticmethod
def safe_tx_hash_sign(safe_tx: SafeTx, account: LedgerAccount) -> bytes:
"""
{bytes32 r}{bytes32 s}{uint8 v}
:param private_key:
:return: Signature
"""
encode_hash = eip712_encode(safe_tx.eip712_structured_data)
v, r, s = account.signHash(encode_hash[1], encode_hash[2])
signature = signature_to_bytes(v, r, s)
# Insert signature sorted
if account.address not in safe_tx.signers:
new_owners = safe_tx.signers + [account.address]
new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index(
account.address
)
safe_tx.signatures = (
safe_tx.signatures[: 65 * new_owner_pos]
+ signature
+ safe_tx.signatures[65 * new_owner_pos :]
)
return safe_tx
99 changes: 35 additions & 64 deletions safe_cli/operators/safe_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
from eth_typing import ChecksumAddress
from eth_utils import ValidationError
from hexbytes import HexBytes
from ledgereth import get_account_by_path
from ledgereth.comms import init_dongle
from ledgereth.exceptions import LedgerAppNotOpened, LedgerLocked, LedgerNotFound
from packaging import version as semantic_version
from prompt_toolkit import HTML, print_formatted_text
from web3 import Web3
Expand All @@ -26,20 +23,17 @@
get_safe_contract,
get_safe_V1_1_1_contract,
)
from gnosis.eth.eip712 import eip712_encode
from gnosis.safe import InvalidInternalTx, Safe, SafeOperation, SafeTx
from gnosis.safe.multi_send import MultiSend, MultiSendOperation, MultiSendTx
from gnosis.safe.signatures import signature_to_bytes

from safe_cli.api.transaction_service_api import TransactionServiceApi
from safe_cli.ethereum_hd_wallet import get_account_from_words
from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount
from safe_cli.safe_addresses import (
get_default_fallback_handler_address,
get_safe_contract_address,
get_safe_l2_contract_address,
)
from safe_cli.utils import get_erc_20_list, yes_or_no_question
from safe_cli.utils import get_erc_20_list, number_options_question, yes_or_no_question


@dataclasses.dataclass
Expand Down Expand Up @@ -200,13 +194,19 @@ def __init__(self, address: str, node_url: str):
self.safe_contract_1_1_0 = get_safe_V1_1_1_contract(
self.ethereum_client.w3, address=self.address
)
self.accounts: Set[LocalAccount | LedgerAccount] = set()
self.default_sender: Optional[LocalAccount] | LedgerAccount = None
self.accounts: Set[LocalAccount] = set()
self.default_sender: Optional[LocalAccount] = None
self.executed_transactions: List[str] = []
self._safe_cli_info: Optional[SafeCliInfo] = None # Cache for SafeCliInfo
self.require_all_signatures = (
True # Require all signatures to be present to send a tx
)
try:
from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager

self.ledger_manager = LedgerManager()
except (ModuleNotFoundError, IOError):
self.ledger_manager = None

@cached_property
def last_default_fallback_handler_address(self) -> ChecksumAddress:
Expand Down Expand Up @@ -305,40 +305,26 @@ def load_cli_owners(self, keys: List[str]):
print_formatted_text(HTML(f"<ansired>Cannot load key={key}</ansired>"))

def load_ledger_cli_owners(self):
try:
dongle = init_dongle()
# Search between 10 first accounts
for index in range(10):
path = f"44'/60'/{index}'/0/0"
account = get_account_by_path(path, dongle=dongle)
if account.address in self.safe_cli_info.owners:
sender = LedgerAccount(account.path, account.address, dongle)
self.accounts.add(sender)
balance = self.ethereum_client.get_balance(account.address)
print_formatted_text(
HTML(
f"Loaded account <b>{account.address}</b> "
f'with balance={Web3.from_wei(balance, "ether")} ether'
f"Ledger account cannot be defined as sender"
)
)
# TODO add ledger as sender
break
except LedgerNotFound:
print_formatted_text(
HTML("<ansired>Unable to find Ledger device</ansired>")
)
return
except LedgerAppNotOpened:
print_formatted_text(
HTML("<ansired>Ensure open ethereum app on your ledger</ansired>")
)
return
except LedgerLocked:
print_formatted_text(
HTML("<ansired>Ensure open ethereum app on your ledger</ansired>")
if not self.ledger_manager:
return None

ledger_accounts = self.ledger_manager.get_accounts()
for option, ledger_account in enumerate(ledger_accounts):
address, _ = ledger_account
print_formatted_text(HTML(f"{option} - <b>{address}</b> "))
option = number_options_question(
"Select the owner address", len(ledger_accounts)
)
address, derivation_path = ledger_accounts[option]
self.ledger_manager.add_account(derivation_path)
balance = self.ethereum_client.get_balance(address)
print_formatted_text(
HTML(
f"Loaded account <b>{address}</b> "
f'with balance={Web3.from_wei(balance, "ether")} ether'
f"Ledger account cannot be defined as sender"
)
return
)

def unload_cli_owners(self, owners: List[str]):
accounts_to_remove: Set[Account] = set()
Expand Down Expand Up @@ -699,6 +685,13 @@ def print_info(self):
"the Safe to a newest version</ansired>"
)
)
if not self.ledger_manager:
print_formatted_text(
HTML(
"<ansired>leger disabled, need to install ledgereth dependency link "
"</ansired>"
)
)

def get_safe_cli_info(self) -> SafeCliInfo:
safe = self.safe
Expand Down Expand Up @@ -872,28 +865,6 @@ def batch_safe_txs(
else:
return safe_tx

def ledger_sign(self, safe_tx: SafeTx, account: LedgerAccount) -> bytes:
"""
{bytes32 r}{bytes32 s}{uint8 v}
:param private_key:
:return: Signature
"""
encode_hash = eip712_encode(safe_tx.eip712_structured_data)
v, r, s = account.signHash(encode_hash[1], encode_hash[2])
signature = signature_to_bytes(v, r, s)
# Insert signature sorted
if account.address not in safe_tx.signers:
new_owners = safe_tx.signers + [account.address]
new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index(
account.address
)
safe_tx.signatures = (
safe_tx.signatures[: 65 * new_owner_pos]
+ signature
+ safe_tx.signatures[65 * new_owner_pos :]
)
return safe_tx

# TODO Set sender so we can save gas in that signature
def sign_transaction(self, safe_tx: SafeTx) -> SafeTx:
permitted_signers = self.get_permitted_signers()
Expand Down
14 changes: 14 additions & 0 deletions safe_cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import os

from gnosis.eth import EthereumClient
Expand Down Expand Up @@ -36,3 +37,16 @@ def yes_or_no_question(question: str, default_no: bool = True) -> bool:
return False
else:
return False if default_no else True


def number_options_question(
question: str, number_options: int, default_option: int = 0
) -> bool:
if "PYTEST_CURRENT_TEST" in os.environ:
return True # Ignore confirmations when running tests
choices = f" [0-{number_options}] default {default_option}:"
reply = str(input(question + choices)).lower().strip() or str(default_option)
option = int(reply)
if option not in range(0, number_options):
argparse.ArgumentTypeError(f"{option} is not between [0-{number_options}}}")
return option

0 comments on commit 6d7963a

Please sign in to comment.