-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
109 lines (90 loc) · 3.78 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# /usr/bin/env python3
from pprint import pp
import sys
import os
import time
import signal
import argparse
import httpx
from dotenv import load_dotenv
load_dotenv()
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--version', action='version',
version='Cloudflare Minion 0.1')
arg_parser.add_argument('--wipe-mode', action='store_true')
args = arg_parser.parse_args()
CLOUDFLARE_API_URL = os.getenv('CLOUDFLARE_API_URL', '')
CLOUDFLARE_API_TOKEN = os.getenv('CLOUDFLARE_API_TOKEN', '')
CLOUDFLARE_EMAIL = os.getenv('CLOUDFLARE_EMAIL', '')
CLOUDFLARE_ZONE = os.getenv('CLOUDFLARE_ZONE', '')
def sig_handler(signum, frame):
res = input("Ctrl+C was pressed. Do you really want to exit(y/n)?")
if res == 'y':
sys.exit(1)
def check_python3() -> bool:
if sys.version_info[0] == 3:
return True
return False
def delete_zone_names(zone_id: str, zone_names: dict, wipe_mode: bool = False):
for key, value in zone_names.items():
deletion_status = 'PASS'
if wipe_mode == True:
r = httpx.delete(CLOUDFLARE_API_URL + '/' + zone_id + '/dns_records/' + key,
headers={
'Content-Type': 'application/json',
'X-Auth-Email': CLOUDFLARE_EMAIL,
'Authorization': 'Bearer ' + CLOUDFLARE_API_TOKEN,
})
deletion_status = r.status_code
time.sleep(1)
print(
f"[INF] Deleting name: {value['name']}, Type: {value['type']} ... {deletion_status}")
def get_zone_names(zone_id: str) -> dict:
if zone_id == '':
print("[ERR] Empty Zone ID. Couldn't retrive names list! Exiting...")
sys.exit(1)
names_dict = {}
current_page = 1
print("[INF] Gathering names for zone ->", CLOUDFLARE_ZONE)
while True:
r = httpx.get(CLOUDFLARE_API_URL + '/' + zone_id + '/dns_records', params={'page': current_page},
headers={
'Content-Type': 'application/json',
'X-Auth-Email': CLOUDFLARE_EMAIL,
'Authorization': 'Bearer ' + CLOUDFLARE_API_TOKEN,
})
if r.json()['result_info']['total_count'] == 0:
print(f"[INF] You don't have any name in {CLOUDFLARE_ZONE} zone!")
sys.exit(0)
for i in range(len(r.json()['result'])):
names_dict[r.json()['result'][i]['id']] = {
'name': r.json()['result'][i]['name'],
'type': r.json()['result'][i]['type'],
}
print(
f"[INF] Current page {current_page} from {r.json()['result_info']['total_pages']}. Count -> {r.json()['result_info']['count']}")
if current_page == r.json()['result_info']['total_pages']:
break
current_page += 1
return names_dict
def main():
if check_python3() != True:
print("[ERR] Please use only Python 3 version. We are not support Python 2!")
sys.exit(1)
try:
signal.signal(signal.SIGINT, sig_handler)
r = httpx.get(CLOUDFLARE_API_URL, params={'name': CLOUDFLARE_ZONE}, headers={
'Content-Type': 'application/json',
'X-Auth-Email': CLOUDFLARE_EMAIL,
'Authorization': 'Bearer ' + CLOUDFLARE_API_TOKEN,
})
if r.status_code != 200 and r.json()['success'] == False:
print("[ERR] Something goes wrong ->",
r.json()['errors'][0]['message'])
zone_names = get_zone_names(r.json()['result'][0]['id'])
delete_zone_names(r.json()['result'][0]['id'],
zone_names, args.wipe_mode)
except Exception as ex:
print("[EXC] Exception happened ->", ex)
if __name__ == '__main__':
main()