-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWebRobotLib.py
186 lines (164 loc) · 5.66 KB
/
WebRobotLib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
from urllib import request,parse
from urllib.parse import urlparse
from http import cookiejar
import ssl
from io import BytesIO
import gzip
import random
import time
__version = "0.1"
__all__ = ["WebRobot","decompose_url"]
class WebRobot():
def __init__(self):
self.timeout = 4
self.headers = {
# "Host": "",
# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
# "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
# "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
# "Accept-Encoding": "gzip, deflate",
# "Connection": "close",
# "Cookie": "",
# "Upgrade-Insecure-Requests": "1",
}
self.default_encodeing = "UTF8"
self.global_proxy = {
"https":"127.0.0.1:8080",
"http":"127.0.0.1:8080"
}
self.ssl_verify = ssl._create_unverified_context() # 不验证未经过CA认证的站点
self.handlers = [
request.HTTPHandler(),
request.HTTPSHandler(context=self.ssl_verify)
]
self.cookie_file = "cookie.txt" ; self._cookiejar = None ; self.loadCookie()
self._global_proxy_enable = False ; self.global_proxy_enable = self._global_proxy_enable
@property
def global_proxy_enable(self):
return self._global_proxy_enable
@global_proxy_enable.setter
def global_proxy_enable(self,enable):
""" 选择是否启用代理,self.global_proxy设置代理 """
if isinstance(enable,bool):
self._global_proxy_enable = enable
else:
raise TypeError("args 'enable' should be <bool> not <%s>"%(type(enable)))
if enable:
self.setHandler(request.ProxyHandler(self.global_proxy))
else:
self.setHandler(request.ProxyHandler({}))
def openUrl(self,url="",args={},method="GET",encoding=None,headers=None,timeout=None,proxy_list=[],handlers=None):
"""
url: 请求的地址,如使用GET可直接包含参数无需args
args: 参数字典
method: 请求方法[GET/POST]
encoding: 用于对rul字符进行编码,不设置则使用self.default_encodeing,设为False返回原始数据
headers, 请求头字典,不设置则使用self.headers
timeout, 超时检测,若超时返回timeouterrorm,不设置则使用self.timeout
proxy_list, 随机代理列表(未测试)
handlers 自定义urllib.request.HTTPHandler()对象,不设置则使用self.handlers
"""
encoding = encoding if encoding else self.default_encodeing
headers = headers if headers else self.headers
_request = None
if method.upper() == "GET":
_request = request.Request(
url = "%s?%s"%(url,parse.urlencode(args,encoding=encoding)) if args != {} else url,
headers = headers,
origin_req_host = None,
unverifiable = False,
method = 'GET'
)
elif method.upper() == "POST":
# sleep_time = random.randrange(4,5)
# print(sleep_time,"s ....")
# time.sleep(sleep_time)
# post_log(url+str(args))
_request = request.Request(
url = url,
data = bytes(parse.urlencode(args,encoding=encoding),encoding='UTF8') if isinstance(args,dict) else bytes(args,encoding="UTF8"),
headers = headers,
origin_req_host = None,
unverifiable = False,
method = 'POST'
)
proxy = None
if proxy_list != []:
proxy = random.choice(proxy_list)
if not handlers:
handlers = self.handlers
opener = request.build_opener(*handlers)
if timeout:
timeout = self.timeout
_response = opener.open(_request,timeout=timeout)
res = {
"status":_response.status,
"headers":{header[0]: header[1] for header in _response.getheaders()},
"content":self.decodeHtml(_response.read(),encoding) if encoding else _response.read(),
"reason":_response.reason
}
return res
def setHandler(self,handler):
"""" 添加/修改本对象的handler """
assert isinstance(handler,request.BaseHandler)
for h in self.handlers:
if isinstance(h,type(handler)):
self.handlers.remove(h)
self.handlers.append(handler)
def loadCookie(self,url="",headers=None):
"""
从self.cookie_file加载filecookie对象,添加到self.handlers
通过url,headers访问一个站点获得cookie
"""
_cookiejar = cookiejar.MozillaCookieJar(self.cookie_file)
cookie_handler = request.HTTPCookieProcessor(_cookiejar)
self.setHandler(cookie_handler)
if url != "":
self.openUrl(
url = url,
headers = headers,
timeout = self.timeout
)
self._cookiejar = cookie_handler.cookiejar
def saveCookie(self):
"""
保存cookie到self.cookie_file
"""
self._cookiejar.save(self.cookie_file)
def gzipPage(self,data):
"""
对使用gzip压缩的数据进行解码
data为response html原始字节
返回解码后的字符字节
"""
buff = BytesIO(data)
res = gzip.GzipFile(fileobj=buff).read()
return res
def decodeHtml(self,data,encoding=None):
"""
data为response html原始字节
对data用encoding解码
自动对gzip格式进行处理
返回解码后的字符
"""
encoding = encoding if encoding else self.default_encodeing
res = None
if b'\x1f\x8b\x08\x00\x00\x00\x00' in data: # gzip
res = self.gzipPage(data).decode(encoding)
else:
res = data.decode(encoding)
return res
def decompose_url(url):
parsed_url = urlparse(url)
# 协议(scheme)
protocol = parsed_url.scheme
# 路径(包括目录和文件名)
path = parsed_url.path
# 从路径中提取文件名。如果路径以斜杠结尾,则认为没有单独的文件名
filename = path.split('/')[-1] if path.endswith('/') else path.split('/')[-1]
return protocol, path, filename
def post_log(s):
time_tuple = time.localtime(time.time())
time_str = ("{}月{}日{}点{}分{}秒".format(time_tuple[1],time_tuple[2],time_tuple[3],time_tuple[4],time_tuple[5]))
with open("post_log.txt","a",encoding="UTF8") as f:
f.write(s + "\t" + time_str + "\n")