-
Notifications
You must be signed in to change notification settings - Fork 1
/
tester.py
56 lines (51 loc) · 1.92 KB
/
tester.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# IP代理池-检查模块
from db import RedisClient
import aiohttp
import asyncio
import time
VALID_STATUS_CODE = [200]
TEST_URL = 'http://www.baidu.com'
BATCH_TEST_SIZE = 100
class Tester(object):
def __init__(self):
self.redis = RedisClient()
async def test_single_proxy(self,proxy):
'''
测试单个代理
:param proxy: 单个代理
:return: None
'''
conn = aiohttp.TCPConnector(verify_ssl=False)
async with aiohttp.ClientSession(connector=conn) as session:
try:
if isinstance(proxy,bytes):
proxy = proxy.decode('utf-8')
real_proxy = 'http://' + proxy
print('正在测试',proxy)
async with session.get(TEST_URL,proxy=real_proxy,timeout=15) as response:
if response.status in VALID_STATUS_CODE:
self.redis.max(proxy)
print('代理可用',proxy)
else:
self.redis.decrease(proxy)
print('请求响应码不合法', proxy)
except (aiohttp.ClientError,aiohttp.ClientConnectorError,asyncio.TimeoutError,AttributeError):
self.redis.decrease(proxy)
print('代理请求失败',proxy)
def run(self):
'''
测试主函数
:return: None
'''
print('测试器开始运行')
try:
proxies = self.redis.all()
loop = asyncio.get_event_loop()
# 批量测试
for i in range(0, len(proxies), BATCH_TEST_SIZE):
test_proxies = proxies[i:i + BATCH_TEST_SIZE]
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
loop.run_until_complete(asyncio.wait(tasks))
time.sleep(5)
except Exception as e:
print('测试器发生错误', e.args)