Skip to content

Commit

Permalink
feat: sign_message fully operational
Browse files Browse the repository at this point in the history
  • Loading branch information
johnson2427 committed Apr 22, 2024
1 parent 212fd6b commit 60dd0b4
Showing 1 changed file with 23 additions and 86 deletions.
109 changes: 23 additions & 86 deletions ape_aws_kms/accounts.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import asn1tools
import boto3
import ecdsa
import json

from datetime import datetime
from typing import Any, Iterator, List, Optional

from eth_account import Account
from eth_account.messages import (
_hash_eip191_message,
encode_defunct,
encode_intended_validator,
encode_typed_data, # EIP-712 message
)
from eth_pydantic_types import HexBytes
from eth_utils import keccak, to_checksum_address
Expand All @@ -21,69 +16,10 @@
from ape.types import AddressType, MessageSignature
from ape.utils import cached_property

from web3.auto import w3


SECP256_K1_N = int("fffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141", 16)


def find_eth_signature(signature: bytes) -> dict:
signature_schema = asn1tools.compile_string(
'''
Signature DEFINITIONS ::= BEGIN
Ecdsa-Sig-Value ::= SEQUENCE {
r INTEGER,
s INTEGER }
END
'''
)

# https://tools.ietf.org/html/rfc3279#section-2.2.3
signature_decoded = signature_schema.decode(
'Ecdsa-Sig-Value', signature,
)
s = signature_decoded['s']
r = signature_decoded['r']
secp256_k1_n_half = SECP256_K1_N / 2
if s > secp256_k1_n_half:
s = SECP256_K1_N - s

return {'r': r, 's': s}


def find_eth_address(public_key):
key = asn1tools.compile_string(
'''
Key DEFINITIONS ::= BEGIN
SubjectPublicKeyInfo ::= SEQUENCE {
algorithm AlgorithmIdentifier,
subjectPublicKey BIT STRING
}
AlgorithmIdentifier ::= SEQUENCE {
algorithm OBJECT IDENTIFIER,
parameters ANY DEFINED BY algorithm OPTIONAL
}
END
'''
)
key_decoded = key.decode('SubjectPublicKeyInfo', public_key)
pub_key_raw = key_decoded['subjectPublicKey'][0]
pub_key = pub_key_raw[1:len(pub_key_raw)]

# https://www.oreilly.com/library/view/mastering-ethereum/9781491971932/ch04.html
hex_address = w3.keccak(bytes(pub_key)).hex()
eth_address = '0x{}'.format(hex_address[-40:])
eth_checksum_addr = w3.to_checksum_address(eth_address)
breakpoint()
return eth_checksum_addr



class AliasResponse(BaseModel):
alias: str = Field(alias="AliasName")
arn: str = Field(alias="AliasArn")
Expand Down Expand Up @@ -142,7 +78,9 @@ def public_key(self):

@cached_property
def address(self) -> AddressType:
return to_checksum_address(keccak(self.public_key[-64:])[-20:].hex().lower())
return to_checksum_address(
keccak(self.public_key[-64:])[-20:].hex().lower()
)

def sign_raw_msghash(self, msghash: HexBytes) -> Optional[MessageSignature]:
"""
Expand All @@ -157,30 +95,29 @@ def sign_raw_msghash(self, msghash: HexBytes) -> Optional[MessageSignature]:
MessageType='DIGEST',
SigningAlgorithm='ECDSA_SHA_256',
)
return response
return response['Signature']

def sign_message(self, msg: Any, **signer_options) -> Optional[MessageSignature]:
signable_message = encode_defunct(text=msg)
msg_hash = _hash_eip191_message(signable_message)
response = self.sign_raw_msghash(msg_hash)
signature = response['Signature']
r_val, s_val = ecdsa.util.sigdecode_der(signature, ecdsa.SECP256k1.order)
r_bytes = r_val.to_bytes(32, byteorder='big')
s_bytes = s_val.to_bytes(32, byteorder='big')
v = signature[0]
if v not in [27, 28]:
v += 27
v_byte = bytes([v])
message_signature = MessageSignature.from_vrs(v_byte + r_bytes + s_bytes)
msg_sig = find_eth_signature(signature)
breakpoint()
try:
if self.check_signature(signable_message, message_signature):
return message_signature
except:
pass

raise ValueError("Signature failed to verify")
signature = self.sign_raw_msghash(_hash_eip191_message(signable_message))
r, s = ecdsa.util.sigdecode_der(signature, ecdsa.SECP256k1.order)
if s > SECP256_K1_N / 2:
s = SECP256_K1_N - s
r = r.to_bytes(32, byteorder='big')
s = s.to_bytes(32, byteorder='big')
v = signature[0] + 27
if self.check_signature(
signable_message,
message_signature := MessageSignature.from_vrs(bytes([v]) + r + s),
):
return message_signature
elif self.check_signature(
signable_message,
message_signature := MessageSignature.from_vrs(bytes([v + 1]) + r + s),
):
return message_signature
else:
raise ValueError("Signature failed to verify")

def sign_transaction(self, txn: TransactionAPI, **signer_options) -> Optional[TransactionAPI]:
"""
Expand Down

0 comments on commit 60dd0b4

Please sign in to comment.