-
Notifications
You must be signed in to change notification settings - Fork 84
/
Copy pathdnspod_diag.py
130 lines (102 loc) · 3.19 KB
/
dnspod_diag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#! /usr/bin/env python
# -*- coding: utf-8 -*-
'''
https://www.dnspod.cn/User
export dnspod_logintoken='10000, xxxx'
'''
import os
import json
import time
import socket
import urllib
import urllib2
import logging
import itertools
logging.basicConfig(level=logging.NOTSET)
API_BASEURL = 'https://dnsapi.cn'
API_LOGIN_TOKEN = os.environ.get('dnspod_logintoken')
API_HEADERS = {"Content-type": "application/x-www-form-urlencoded",
"Accept": "text/json",
"User-Agent": "onlytiancai/0.0.1 (onlytiancai@gmail.com)"}
all_tasks = set()
error_task = []
def api(api, **data):
url = '%s/%s' % (API_BASEURL, api)
data.update(login_token=API_LOGIN_TOKEN, format='json')
data = urllib.urlencode(data)
req = urllib2.Request(url, data, headers=API_HEADERS)
rsp = urllib2.urlopen(req)
rsp = json.loads(rsp.read())
if rsp['status']['code'] != '1':
raise Exception(rsp['status']['message'])
return rsp
def domain_list():
ret = api('Domain.List')
for item in ret['domains']:
if item['status'] != 'enable' or item['ext_status'] != '':
continue
yield item['id'], item['name']
def record_list(domain_id):
ret = api('Record.List', domain_id=domain_id)
for item in ret['records']:
if item['type'] not in ('A', 'CNAME'):
continue
if item['enabled'] != '1':
continue
yield item
def httptest(host, ip):
url = 'http://%s/' % ip
headers = {"Host": host}
req = urllib2.Request(url, headers=headers)
try:
urllib2.urlopen(req, timeout=1)
except urllib2.HTTPError, e:
if e.code == 508:
return '508 Loop Detected'
return '%s %s' % (e.code, e.reason)
except urllib2.URLError as e:
if type(e.reason) is socket.timeout:
return 'timeout'
if type(e.reason) is socket.gaierror:
return e.reason.strerror
return e.reason
except socket.timeout as e:
return 'timeout'
except socket.error as e:
return e.strerror
return ''
def get_host(domain, sub_domain):
if sub_domain == '@':
return domain
if sub_domain == '*':
sub_domain = 'test'
return '%s.%s' % (sub_domain, domain)
def test_one(host, ip):
if (host, ip) in all_tasks:
return
t1 = time.time()
ret = httptest(host, ip)
all_tasks.add((host, ip))
duration = int((time.time() - t1) * 1000)
print '%s(%s) %sms "%s"' % (host, ip, duration, ret or 'ok')
if ret:
error_task.append((ret, host, ip))
def summary():
print
print 'total: %s, error: %s' % (len(all_tasks), len(error_task))
for err, tasks in itertools.groupby(sorted(error_task), lambda x: x[0]):
tasks = list(tasks)
print '%s(%s):' % (err, len(tasks))
for task in sorted(tasks):
print '\t%s(%s)' % (task[1], task[2])
def run():
all_tasks.clear()
for domain_id, domain_name in domain_list():
for record in record_list(domain_id):
sub_domain = record['name']
ip = record['value']
host = get_host(domain_name, sub_domain)
test_one(host, ip)
summary()
if __name__ == '__main__':
run()