-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcircle_api.py
246 lines (198 loc) · 9.24 KB
/
circle_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import uuid
import enum
import dotenv
import os
import requests
import asyncio
import definitions as defs
from constants import *
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import base64
import web3
from eth_abi import decode
dotenv.load_dotenv()
CIRCLE_API_KEY = os.getenv("CIRCLE_API_KEY")
ENTITY_SECRET = os.getenv("ENTITY_SECRET", "")
WALLET_SET_ID = os.getenv("WALLET_SET_ID")
with open("data/setup/key.pub", "r") as f:
PUBLIC_KEY = f.read()
def generate_entity_secret_ciphertext():
entity_secret = bytes.fromhex(ENTITY_SECRET)
if len(entity_secret) != 32:
raise Exception("invalid entity secret")
# encrypt data by the public key
public_key = RSA.importKey(PUBLIC_KEY)
cipher_rsa = PKCS1_OAEP.new(key=public_key, hashAlgo=SHA256)
encrypted_data = cipher_rsa.encrypt(entity_secret)
# encode to base64
ciphertext = base64.b64encode(encrypted_data)
return ciphertext.decode()
def create_wallet(nr_wallets: int, blockchain: defs.Blockchain = defs.Blockchain.MATIC_AMOY) -> defs.Wallets:
if nr_wallets > 200:
raise ValueError("Cannot create more than 200 wallets at a time")
url = "https://api.circle.com/v1/w3s/developer/wallets"
payload = {
"idempotencyKey": str(uuid.uuid4()),
"accountType": "SCA",
"blockchains": [blockchain.value],
"count": nr_wallets,
"entitySecretCiphertext": generate_entity_secret_ciphertext(),
"walletSetId": WALLET_SET_ID
}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {CIRCLE_API_KEY}"
}
response = requests.post(url, json=payload, headers=headers)
return defs.Wallets.parse_obj(response.json()['data'])
def update_wallet(wallet_id: str, wallet_name: str, wallet_ref_id: str):
url = f"https://api.circle.com/v1/w3s/wallets/{wallet_id}"
payload = {
"name": wallet_name,
"refId": wallet_ref_id
}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {CIRCLE_API_KEY}"
}
response = requests.put(url, json=payload, headers=headers)
return response.json()
def get_wallet_balance(wallet_id: str):
url = f"https://api.circle.com/v1/w3s/wallets/{wallet_id}/balances"
headers = {
"accept": "application/json",
"Authorization": f"Bearer {CIRCLE_API_KEY}"
}
response = requests.get(url, headers=headers)
return response.json()
def get_user_usdc_balance(user: defs.User) -> float:
balances = get_wallet_balance(user.wallet.id)['data']
for token in balances['tokenBalances']:
if token['token']['symbol'] == 'USDC':
return float(token['amount'])
return 0.0
def send_transfer(wallet_id: str, recipient: str, tokenId: str, amount: float, ref_id: str):
url = "https://api.circle.com/v1/w3s/developer/transactions/transfer"
payload = {
"walletId": wallet_id,
"destinationAddress": recipient,
"tokenId": tokenId,
"amounts": [str(amount)],
"idempotencyKey": str(uuid.uuid4()), # TODO create a uuid from the user request so that it can only be sent once
"entitySecretCiphertext": generate_entity_secret_ciphertext(),
"feeLevel": "MEDIUM",
"refId": ref_id
}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {CIRCLE_API_KEY}"
}
response = requests.post(url, json=payload, headers=headers)
print(response.json())
return response.json()
def get_transaction(transaction_id: str):
url = f"https://api.circle.com/v1/w3s/transactions/{transaction_id}"
headers = {
"accept": "application/json",
"Authorization": f"Bearer {CIRCLE_API_KEY}"
}
response = requests.get(url, headers=headers)
return response.json()["data"]["transaction"]
def execute_smart_contract(wallet_id: str, contract_address: str, abi_function_signature: str, abi_parameters: list, amount: float | None = None, ref_id: str | None = None):
url = "https://api.circle.com/v1/w3s/developer/transactions/contractExecution"
payload = {
"walletId": wallet_id,
"contractAddress": contract_address,
"abiFunctionSignature": abi_function_signature,
"abiParameters": abi_parameters,
"idempotencyKey": str(uuid.uuid4()),
"entitySecretCiphertext": generate_entity_secret_ciphertext(),
"feeLevel": "MEDIUM"
}
if amount is not None:
payload["amount"] = str(amount * 1e18) # 18 decimals for ETH
if ref_id is not None:
payload["refId"] = ref_id
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {CIRCLE_API_KEY}"
}
response = requests.post(url, json=payload, headers=headers)
return response.json()
def encode_address(address: str) -> str:
address = address.lower().removeprefix('0x')
if len(address) != 40:
raise ValueError("Invalid Ethereum address length")
address_bytes = bytes.fromhex(address)
return '0x' + (b'\x00' * 12 + address_bytes).hex()
def cctp_burn(user: defs.User, destination_chain: defs.Blockchain, destination_address: str, amount: float, ref_id: str):
# TODO looks like we need to wait for the transaction 1 before sending transaction 2 otherwise cricle will reject it
amount_str = str(round(amount * 1e6))
chain = user.wallet.blockchain.value
print(chain)
response1 = execute_smart_contract(user.wallet.id, USDC_TOKEN_ADDRESSES[chain], "approve(address,uint256)", [CCTP_TOKEN_MESSENGER[chain], amount_str])
abi_function_signature = "depositForBurn(uint256,uint32,bytes32,address)"
encoded_destination_address = encode_address(destination_address)
abi_parameters = [amount_str, CCTP_DOMAINS[destination_chain.value], encoded_destination_address, USDC_TOKEN_ADDRESSES[chain]]
response2 = execute_smart_contract(user.wallet.id, CCTP_TOKEN_MESSENGER[chain], abi_function_signature, abi_parameters, ref_id=ref_id)
return response1, response2
def cctp_burn_step_1(user: defs.User, amount: float, ref_id: str):
amount_str = str(round(amount * 1e6))
chain = user.wallet.blockchain.value
return execute_smart_contract(user.wallet.id, USDC_TOKEN_ADDRESSES[chain], "approve(address,uint256)", [CCTP_TOKEN_MESSENGER[chain], amount_str], ref_id=ref_id)
def cctp_burn_step_2(user: defs.User, destination_chain: defs.Blockchain, destination_address: str, amount: float, ref_id: str):
amount_str = str(round(amount * 1e6))
chain = user.wallet.blockchain.value
abi_function_signature = "depositForBurn(uint256,uint32,bytes32,address)"
encoded_destination_address = encode_address(destination_address)
abi_parameters = [amount_str, CCTP_DOMAINS[destination_chain.value], encoded_destination_address, USDC_TOKEN_ADDRESSES[chain]]
return execute_smart_contract(user.wallet.id, CCTP_TOKEN_MESSENGER[chain], abi_function_signature, abi_parameters, ref_id=ref_id)
def get_message_bytes_and_hash(blockchain: defs.Blockchain, tx_hash: str) -> tuple[str, str]:
provider = web3.Web3(web3.HTTPProvider(INFURA_ENPOINTS[blockchain.value]))
# Get the transaction receipt
transaction_receipt = provider.eth.get_transaction_receipt(tx_hash)
# Create the event topic
event_topic = web3.Web3.keccak(text='MessageSent(bytes)').hex()
# Find the log with the matching topic
log = next((l for l in transaction_receipt['logs'] if l['topics'][0].hex() == event_topic), None)
if log is None:
raise ValueError("MessageSent event not found in transaction logs")
# Decode the log data
message_bytes = decode(['bytes'], log['data'])[0]
# Calculate the message hash
message_hash = web3.Web3.keccak(message_bytes).hex()
return f'0x{message_bytes.hex()}', f'0x{message_hash}'
def get_atttestation(message_hash: str) -> str | None:
url = f"https://iris-api-sandbox.circle.com/v1/attestations/{message_hash}"
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers).json()
if response['status'] != 'complete':
return None
return response['attestation']
def cctp_mint(source_chain: defs.Blockchain, destination_walled_id: str, destination_chain: defs.Blockchain, tx_hash: str):
contract_address = CCTP_MESSAGE_TRANSMITTER[destination_chain.value]
message_bytes, message_hash = get_message_bytes_and_hash(source_chain, tx_hash)
attestation = get_atttestation(message_hash)
print("Attestation received")
abi_function_signature = "receiveMessage(bytes,bytes)"
abi_parameters = [message_bytes, attestation]
return execute_smart_contract(destination_walled_id, contract_address, abi_function_signature, abi_parameters)
def request_from_faucet(user: defs.User):
url = "https://api.circle.com/v1/faucet/drips"
payload = {
"address": user.wallet.address,
"blockchain": user.wallet.blockchain.value,
"native": True,
"usdc": True
}
headers = {
"Authorization": f"Bearer {CIRCLE_API_KEY}",
"Content-Type": "application/json"
}
requests.request("post", url, json=payload, headers=headers)