-
Notifications
You must be signed in to change notification settings - Fork 0
/
cleaner.py
385 lines (337 loc) · 13.1 KB
/
cleaner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
__author__ = 'caoyawen'
import yaml
import os
from pathlib import Path
import subprocess
import pyparsing
from contextlib import ContextDecorator
import platform
class ConfigYAML:
def __init__(self, fn: str, debug=False):
"""
初始化
:param fn: 指定的配置文件,每个配置文件管理对应目录以及子目录,子配置文件在父配置文件中时,子配置文件的作用范围覆盖父配置文件,而非继承
:param debug: 用于调试时查看内部状态
:return:
"""
self.fn = fn
self.debug = debug
if debug:
self.handle_file_list = []
self.add_by_handle_file = []
self.add_by_handle_dir = []
self.add_by_characteristic = []
self.fit_file_list = []
self.fit_dir = []
self.config = self.load_config()
self.name = self.config.get("name", "缺省配置文件")
self.handle_config()
def clean_git(self):
print("{name}'s git clean start".format(name=self.name))
branch = self.config.get("branch", "")
with EnterBranch(branch, self.pwd):
print("clean fit file")
for file in self.fit_file_list:
cmd_line = "/usr/bin/git filter-branch -f --index-filter " \
"'git rm --cached --ignore-unmatch {file}' HEAD".format(file=file)
call_cmd_with_status(cmd_line, self.pwd)
print("clean fit dir")
for special_path in self.fit_dir:
cmd_line = "/usr/bin/git filter-branch -f --index-filter " \
"'git rm -r --cached --ignore-unmatch {path}' HEAD".format(
path=special_path)
call_cmd_with_status(cmd_line, self.pwd)
print("git clean end")
@property
def pwd(self) -> str:
return self.config["pwd"]
@property
def base_exclude_path(self):
"""
基础需要排除的目录和文件,包括自己,子配置文件的目录
:return:
"""
cp = CollectPwd()
self.enum_config(cp)
exclude_dir = cp.pwd_list
exclude_dir.remove(self.pwd)
exclude_dir.append(self.fn)
return exclude_dir
def load_config(self) -> dict:
"""
载入配置文件
:return: dict
"""
config_p = Path(self.fn)
content = config_p.read_bytes().decode()
clean_config = yaml.load(content)
if not clean_config:
clean_config = {}
# 解析得到工作路径
pwd = str(config_p.parent)
clean_config["pwd"] = pwd
# 处理扩展file的问题
append_config_fn_list = clean_config.get("append", [])
append_config_list = []
for append_config_fn in append_config_fn_list:
new_append_config_fn = os.path.join(pwd, append_config_fn)
append_config_list.append(ConfigYAML(new_append_config_fn, self.debug))
clean_config["append_config_list"] = append_config_list
return clean_config
def handle_config(self) -> []:
"""
处理配置文件
:return:
"""
branch = self.config.get("branch", "")
try:
with EnterBranch(branch, self.pwd):
fit_file_list = []
fit_file_list += self.handle_file()
# fit_file_list += self.handle_dir()
fit_file_list += self.handle_characteristic()
# git用的是相对路径,所以我们也要用相对路径
fit_file_list = [item.replace(self.pwd + "/", "") for item in fit_file_list]
self.fit_file_list = remove_duplicate_item(fit_file_list)
self.fit_dir = self.config.get("dir", [])
except BranchNotSpecial:
raise BranchNotSpecial("{name}'s branch not special".format(name=self.name))
def handle_dir(self) -> []:
"""
把指定目录下的所有文件都包含进去
:return: 符合清理的文件列表
"""
# fit_file_list = []
dirs = self.config.get("dir", [])
caf = CollectAnyFile()
for one_dir in dirs:
self.enum_file(os.path.join(self.pwd, one_dir), self.base_exclude_path, caf)
if self.debug:
self.add_by_handle_dir = caf.file_list
return caf.file_list
def handle_file(self) -> []:
"""
得到文件
:return: 符合清理的文件列表
"""
if self.debug:
self.add_by_handle_file = [os.path.join(self.pwd, item) for item in self.config.get("file", [])]
return [os.path.join(self.pwd, item) for item in self.config.get("file", [])]
def handle_characteristic(self) -> []:
"""
根据特征值得到符合的文件
:return: []
"""
characteristic_list = self.config.get("characteristic", [])
exclude_dir = self.base_exclude_path
cff = CollectFitFile(characteristic_list)
self.enum_file(self.pwd, exclude_dir, cff)
if self.debug:
self.add_by_characteristic = cff.fit_file
return cff.fit_file
def __repr__(self):
return "{name}: {content}".format(name=self.name, content=str(self.config))
def enum_file(self, given_path: str, exclude_path: [], handler):
"""
遍历除了exclude_path指定的目录以外的其他目录
:param given_path: 要遍历的目录
:param exclude_path: 排除的目录
:param handler: 处理函数, handler(item:Path)
:return: 没有需要返回的内容
"""
p = Path(given_path)
if given_path in exclude_path:
# 如果指定目录本身就是被排除的目录,就直接返回
return
for item in p.iterdir():
if item.is_dir():
if str(item) not in exclude_path:
self.enum_file(str(item), exclude_path, handler)
if item.is_file():
if str(item) != self.fn:
# 防止自己被包括进去
if self.debug:
self.handle_file_list.append(str(item))
handler(item)
def enum_config(self, handler):
"""
遍历配置文件以及每一个子配置文件
:param handler: function(data:ConfigYAML),这里的data就是ConfigYAML本身,方便调试
:return:
"""
for sub_config in self.config.get("append_config_list", []):
sub_config.enum_config(handler)
handler(self)
def report(self, put=print):
put("**************")
put(" name: {name}".format(name=self.name))
say("开始检查 {name}".format(name=self.name))
put(" branch: {branch}".format(branch=self.config["branch"]))
if self.fit_file_list:
put(" {count} files are fit".format(count=len(self.fit_file_list)))
say("有{count}个文件符合删除要求".format(count=len(self.fit_dir)))
for index, file in enumerate(self.fit_file_list):
put(" {index:0>3}. {fn}".format(index=index+1, fn=file))
if self.fit_dir:
put(" {count} dirs are fit".format(count=len(self.fit_dir)))
say("有{count}个目录符合删除要求".format(count=len(self.fit_dir)))
for index, path in enumerate(self.fit_dir):
put(" {index:0>3}. {fn}".format(index=index+1, fn=path))
if not self.fit_file_list and not self.fit_dir:
put(" nothing need to remove")
say("没有需要删除的文件")
else:
put(" report complete")
say("报告结束")
class CollectAnyFile:
"""
收集所有的特征值文件
"""
def __init__(self):
self.file_list = []
def __call__(self, file: Path):
self.file_list.append(str(file))
class CollectFitFile:
"""
收集符合特征值的文件
"""
def __init__(self, characteristic: []):
self.characteristic = characteristic
self.fit_file = []
self.none_unicode_file = []
self.exclude_file = []
def __call__(self, file_item: Path):
"""
Path必须是is_file() true
"""
try:
content = file_item.read_bytes().decode()
except UnicodeDecodeError:
# 不处理无法decode的代码
self.none_unicode_file.append(str(file_item))
return
for one_char in self.characteristic:
for exclude_file in one_char.get("exclude", []):
if file_item.name == exclude_file:
# 跳过需要排除的文件
self.exclude_file.append(exclude_file)
return
# print("fit file {fn} by {exclude}".format(fn=file_item.name, exclude=exclude_file))
if one_char["data"] in content:
self.fit_file.append(str(file_item))
break
class CollectPwd:
"""
收集配置文件里的pwd
"""
def __init__(self):
self.pwd_list = []
def __call__(self, config: ConfigYAML):
self.pwd_list.append(config.config["pwd"])
class BranchNotSpecial(Exception):
pass
class EnterBranch(ContextDecorator):
"""
进入和离开指定的branch
"""
def __init__(self, branch, pwd):
self.branch = branch
self.pwd = pwd
if not branch:
raise Exception("branch should be specialed.")
def __enter__(self):
self.old_branch = current_branch(self.pwd)
cmd_line = "/usr/bin/git checkout {branch}".format(branch=self.branch)
# print(cmd_line)
call_cmd_with_status(cmd_line, self.pwd, )
# call_cmd_with_status(cmd_line, self.pwd, ["Switched to branch '{branch}'".format(branch=self.branch)])
def __exit__(self, *args, **kwargs):
cmd_line = "/usr/bin/git checkout {old_branch}".format(old_branch=self.old_branch)
# print(cmd_line)
call_cmd_with_status(cmd_line, self.pwd)
def remove_duplicate_item(data: list):
"""
删除一个列表中重复的项目
:param data:
:return:
"""
record = {}
new_data = []
for item in data:
if record.get(item, 0) == 1:
continue
record[item] = 1
new_data.append(item)
return new_data
class ReturnCodeErr(Exception):
pass
def call_cmd_with_status(cmd_line, work_dir):
"""
处理函数
:param cmd_line:
:param work_dir:
:return: 正常返回,错误返回
"""
pr = subprocess.Popen(cmd_line, cwd=work_dir, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, error) = pr.communicate()
# 用return_code而不是其他东西来检查返回
if pr.returncode != 0:
if error:
err_msg = error.decode()
print("cwd:{cwd}".format(cwd=work_dir))
print("cmd:{cmd}".format(cmd=cmd_line))
# print(error.decode("utf-8"))
# print(pr.returncode)
raise ReturnCodeErr(error.decode("utf-8"))
return out, error
def current_branch(target_path):
out, error = call_cmd_with_status("/usr/bin/git branch", target_path)
# print("get current branch")
for one_line in out.split(b'\n'):
if one_line:
# print(one_line)
# print(one_line[0])
if 42 == one_line[0]:
# 解析 b'* master'
branch_des = one_line.decode('utf-8')
# print(branch_des)
current_branch_parse = pyparsing.Literal("*") + pyparsing.Word(pyparsing.alphas + pyparsing.alphanums)(
"branch")
result = current_branch_parse.parseString(branch_des)
return result.branch
return ""
def say(content: str):
if platform.system() == "Darwin":
call_cmd_with_status("say {content} -r 200".format(content=content), None)
if __name__ == '__main__':
import argparse
import datetime
say("你好 下面开始进行项目敏感信息排查")
start = datetime.datetime.now()
parser = argparse.ArgumentParser("根据指定的yaml定义的规则彻底删除掉git中符合的文件")
parser.add_argument("config_file", metavar="配置yaml", help="需要删除文件的定义文件,用yaml格式")
parser.add_argument("-write", action="store_true", help="执行清理操作,没有这个选项,只显示计划清理的文件,而不真正操作")
parser.add_argument("-debug", action="store_true", help="出错时显示内部信息")
ret = parser.parse_args()
try:
cy = ConfigYAML(ret.config_file, ret.debug)
cy.enum_config(lambda config: config.report())
if ret.write is True:
cy.enum_config(lambda config: config.clean_git())
print("clean git complete")
except ReturnCodeErr as e:
if ret.debug:
raise e
else:
say("出错了")
print(str(e))
except BranchNotSpecial as e:
if ret.debug:
raise e
else:
print(str(e))
else:
end = datetime.datetime.now()
cost = end - start
say("检查完成 耗时{cost}秒".format(cost=cost.seconds))