-
Notifications
You must be signed in to change notification settings - Fork 1
/
functions.py
76 lines (60 loc) · 2.01 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import datetime
import os
from json import JSONDecodeError
from typing import Any
from uuid import uuid4
import requests
import urllib3
from tqdm import tqdm
from threadpool import ThreadPool
from worker import worker
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
COOKIES = {}
def get_version(foreign_api_url: str) -> str:
payload = {
"jsonrpc": "2.0",
"id": str(uuid4()),
"method": "get_version",
"params": [],
}
response = requests.post(foreign_api_url, json=payload, cookies=COOKIES)
return response.json()["result"]["Ok"]["node_version"]
def get_block(foreign_api_url: str, block: int) -> Any:
payload = {
"jsonrpc": "2.0",
"id": str(uuid4()),
"method": "get_block",
"params": [block, None, None],
}
response = requests.post(
foreign_api_url, json=payload, cookies=COOKIES, timeout=10, verify=False
)
try:
block = response.json()
if "Err" in block["result"]:
raise ValueError(block["result"]["Err"])
except JSONDecodeError as ex:
raise Exception(response.content)
return block
def get_status(owner_api_url: str) -> Any:
payload = {
"jsonrpc": "2.0",
"id": str(uuid4()),
"method": "get_status",
"params": [],
}
response = requests.post(owner_api_url, json=payload, cookies=COOKIES)
return response.json()["result"]["Ok"]
def get_latest_block(owner_api) -> int:
status = get_status(owner_api)
return status["tip"]["height"]
def main(latest_block: int, url: str, threads: int):
# Create a folder to store the invalids
folder = f"./invalids/{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.mkdir(folder)
iterations = latest_block + 1
for iteration in tqdm(range(1, iterations, threads)):
pool = ThreadPool(threads)
for block_number in range(iteration, iteration + threads):
pool.add_task(worker, url, block_number, folder)
pool.wait_completion()