-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
80 lines (64 loc) · 2.46 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import uvicorn
import requests
import json
import os
from typing import Optional
from fastapi import FastAPI, Header
from pydantic import BaseModel
from evmoswallet.converter import eth_to_evmos, evmos_to_eth
from evmosgrpc.messages.msgsend import create_msg_send
from evmosgrpc.builder import TransactionBuilder
from evmosgrpc.transaction import Transaction
from google.protobuf.json_format import MessageToDict
from fastapi.middleware.cors import CORSMiddleware
import db
SEED = os.getenv('EVMOS_FAUCET_SEED')
# Rama test key = 6LcTwtocAAAAAJY2ef59GVG2w6xK2T5Ji6i333sW
RECAPTCHA_SECRET = os.getenv('RECAPTCHA_SECRET')
# Config
ALLOWED_HOSTS = ["*"]
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_HOSTS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Address(BaseModel):
address: str
class FaucetResponse(BaseModel):
transactionHash : Optional[str]
error: str
def authorize_recaptcha(token):
response = requests.post('https://www.google.com/recaptcha/api/siteverify', data={'secret':RECAPTCHA_SECRET, 'response':token})
if response.ok:
captchaResponse = json.loads(response.text)
return captchaResponse['success']
return False
def request_faucet(address):
builder = TransactionBuilder(SEED)
msg = create_msg_send(builder.address, address, 100)
res = builder.send_tx(Transaction().generate_tx(builder, msg))
dictResponse = MessageToDict(res)
if 'code' in dictResponse['txResponse'].keys():
return None, dictResponse['txResponse']['rawLog']
return dictResponse['txResponse']['txhash'], ""
def verify_address(address):
if address.startswith('evmos'):
return eth_to_evmos(evmos_to_eth(address)) == address
if address.startswith('0x'):
return evmos_to_eth(eth_to_evmos(address)) == address
return False
@app.post("/faucet/", response_model=FaucetResponse)
def faucet(address: Address, Authorization: str = Header(None)):
if not verify_address(address.address):
return {'transactionHash': None, 'error':"Invalid Address"}
canRequest = db.set_address(address.address)
authorized = authorize_recaptcha(Authorization)
if canRequest and authorized:
txhash, err = request_faucet(address.address)
return {'transactionHash': txhash, 'error':err}
return {'transactionHash': None, 'error':"Not Authorized"}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")