-
Notifications
You must be signed in to change notification settings - Fork 0
/
authors_rcv.py
205 lines (168 loc) · 6.5 KB
/
authors_rcv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import os
import time
import json
import gzip
import requests
from datetime import datetime
from elasticsearch import Elasticsearch, helpers
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Elasticsearch 配置
ES_HOST = "localhost"
ES_PORT = 9200
ES_USERNAME = "elastic" # 替换为您的 Elasticsearch 用户名
ES_PASSWORD = "yXC0ZTAbjmhmyLHb7fBv" # 替换为您的 Elasticsearch 密码
INDEX_NAME = "authors" # 根据您的需求更改
DOC_TYPE = "_doc"
CORP_ID = "ww139c8045e371e29e"
CORP_SECRET = "CjNHTG1-4cozBQpCo2WWnLb7weyCT0Bc0pzdGNp9Lfk"
ACCESS_TOKEN = None
AGENT_ID = "1000003"
# 初始化 Elasticsearch 客户端
es = Elasticsearch(
[{
'scheme': 'http',
'host': ES_HOST,
'port': ES_PORT
}],
basic_auth=(ES_USERNAME, ES_PASSWORD)
)
def get_access_token():
"""
向微信企业API发送GET请求以获取access_token。
"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={CORP_ID}&corpsecret={CORP_SECRET}"
response = requests.get(url)
if response.status_code == 200:
print("access_token get success")
return response.json().get('access_token')
else:
raise Exception("Failed to get access token")
def send_message(message):
"""
向微信企业API发送POST请求以发送消息。
"""
global ACCESS_TOKEN
if not ACCESS_TOKEN:
ACCESS_TOKEN = get_access_token()
message_data = {
"touser": "ZhouXingDa",
"msgtype": "text",
"agentid": AGENT_ID,
"text": {
"content": message
},
"safe": 0
}
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={ACCESS_TOKEN}"
response = requests.post(url, json=message_data)
if response.status_code == 200 and response.json().get("errcode") == 0:
print("send success")
return response.json()
else:
# 如果发送失败,尝试重新获取access_token并重发
print(response.json())
ACCESS_TOKEN = get_access_token()
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={ACCESS_TOKEN}"
response = requests.post(url, json=message_data)
return response.json()
def decompress_gz(gz_path, output_path):
with gzip.open(gz_path, 'rb') as f_in:
with open(output_path, 'wb') as f_out:
f_out.write(f_in.read())
# 准备批量上传的函数
def bulk_index(file_path):
try:
with open(file_path, 'r') as file:
actions = []
count = 0
for line in file:
json_data = json.loads(line) # 解析每一行为 JSON
author_id = json_data.pop('aid')
action = {
"_index": INDEX_NAME,
"_id": author_id,
"_source": json_data
}
actions.append(action)
count += 1
# 每读取1000个数据就进行一次批量上传
if count % 1000 == 0:
helpers.bulk(es, actions)
actions = [] # 清空列表以便下一批数据
# 处理剩余的数据(如果有)
if actions:
helpers.bulk(es, actions)
return True
except Exception as e:
print(f"Error indexing file {file_path}: {e}")
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
text_message = f"Error indexing file {file_path} at {current_time}: {e}"
send_message(text_message)
return False
# Elasticsearch 配置和初始化,以及 bulk_index 函数保持不变
class Watcher:
DIRECTORY_TO_WATCH = "./authors"
def __init__(self):
self.observer = Observer()
def count_files(self, extensions):
"""统计目录中特定扩展名文件的数量"""
files = [f for f in os.listdir(self.DIRECTORY_TO_WATCH) if any(f.endswith(ext) for ext in extensions)]
return len(files)
def run(self):
event_handler = Handler()
self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=True)
self.observer.start()
try:
while True:
time.sleep(5)
# if self.count_files(['.json', '.gzdone']) > 5:
# send_message("File Alert: More than 5 files in directory.")
except:
self.observer.stop()
print("Observer stopped")
self.observer.join()
class Handler(FileSystemEventHandler):
@staticmethod
def process_json_file(json_file_path):
"""处理JSON文件的通用逻辑"""
print(f"Processing JSON file: {json_file_path}")
if bulk_index(json_file_path):
os.remove(json_file_path)
print(f"Finished indexing and deleted file: {json_file_path}")
send_message(f"Finished indexing and deleted file: {json_file_path}")
else:
print(f"Failed to index file: {json_file_path}")
@staticmethod
def on_created(event):
if event.is_directory:
return None
if event.event_type == 'created':
if event.src_path.endswith('.done'):
# 处理.done文件
json_file_path = event.src_path[:-5] # 获取JSON文件路径
print(f"Detected .done file for {json_file_path}")
if os.path.exists(json_file_path):
Handler.process_json_file(json_file_path)
else:
print("JSON file not found")
# 删除处理过的.done或.gzdone文件
os.remove(event.src_path)
elif event.src_path.endswith('.gzdone'):
# 处理.gzdone文件
gzip_file_path = event.src_path[:-7] # 获取gzip文件路径
json_file_path = gzip_file_path[:-3] # 获取JSON文件路径
print(f"Detected .gzdone file for {gzip_file_path}")
if os.path.exists(gzip_file_path):
decompress_gz(gzip_file_path, json_file_path)
print(f"Decompressed file: {json_file_path}")
Handler.process_json_file(json_file_path)
os.remove(gzip_file_path)
else:
print("Gzip file not found")
# 删除处理过的.done或.gzdone文件
os.remove(event.src_path)
if __name__ == '__main__':
send_message("Server IS Ready to receive and index authors!!!")
w = Watcher()
w.run()