Skip to content

Commit

Permalink
info等优化
Browse files Browse the repository at this point in the history
  • Loading branch information
yjqiang committed Apr 11, 2019
1 parent cff028a commit bae802f
Show file tree
Hide file tree
Showing 20 changed files with 324 additions and 229 deletions.
16 changes: 8 additions & 8 deletions danmu/base_danmu.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async def _open_conn(self):
print("连接无法建立,请检查本地网络状况")
print(sys.exc_info()[0])
return False
printer.info([f'{self._area_id}号弹幕监控已连接b站服务器'], True)
printer.infos([f'{self._area_id}号弹幕监控已连接b站服务器'])

str_enter = f'{{"uid":0,"roomid":{self._room_id},"protover":1,"platform":"web","clientver":"1.3.3"}}'
bytes_enter = self._encapsulate(opt=7, str_body=str_enter)
Expand Down Expand Up @@ -128,8 +128,8 @@ async def _read_datas(self):
return
# 握手确认
elif opt == 8:
printer.info(
[f'{self._area_id}号弹幕监控进入房间({self._room_id})'], True)
printer.infos(
[f'{self._area_id}号弹幕监控进入房间({self._room_id})'])
else:
printer.warn(f'弹幕数据错误:{datas}')
return
Expand All @@ -144,9 +144,9 @@ async def run_forever(self):
time_now = 0
while not self._closed:
if utils.curr_time() - time_now <= 3:
printer.info([f'网络波动,{self._area_id}号弹幕姬延迟3秒后重启'], True)
printer.infos([f'网络波动,{self._area_id}号弹幕姬延迟3秒后重启'])
await asyncio.sleep(3)
printer.info([f'正在启动{self._area_id}号弹幕姬'], True)
printer.infos([f'正在启动{self._area_id}号弹幕姬'])
time_now = utils.curr_time()

async with self._conn_lock:
Expand All @@ -159,12 +159,12 @@ async def run_forever(self):
tasks = [self._task_main, task_heartbeat]
_, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
printer.info([f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息'], True)
printer.infos([f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息'])
if not task_heartbeat.done():
task_heartbeat.cancel()
await self._close_conn()
await asyncio.wait(pending)
printer.info([f'{self._area_id}号弹幕姬退出,剩余任务处理完毕'], True)
printer.infos([f'{self._area_id}号弹幕姬退出,剩余任务处理完毕'])
self._waiting.set_result(True)

async def reset_roomid(self, room_id):
Expand All @@ -176,7 +176,7 @@ async def reset_roomid(self, room_id):
await self._task_main
# 由于锁的存在,绝对不可能到达下一个的自动重连状态,这里是保证正确显示当前监控房间号
self._room_id = room_id
printer.info([f'{self._area_id}号弹幕姬已经切换房间({room_id})'], True)
printer.infos([f'{self._area_id}号弹幕姬已经切换房间({room_id})'])

async def close(self):
if not self._closed:
Expand Down
18 changes: 9 additions & 9 deletions danmu/monitor_danmu_raffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def _check_area(self):
await asyncio.sleep(300)
is_ok = await asyncio.shield(notifier.exec_func(-1, UtilsTask.is_ok_as_monitor, self._room_id, self._area_id))
if not is_ok:
printer.info([f'{self._room_id}不再适合作为监控房间,即将切换'], True)
printer.infos([f'{self._room_id}不再适合作为监控房间,即将切换'])
return
except asyncio.CancelledError:
pass
Expand All @@ -37,7 +37,7 @@ def handle_danmu(self, dict_danmu):
cmd = dict_danmu['cmd']

if cmd == 'PREPARING':
printer.info([f'{self._area_id}号弹幕监控房间下播({self._room_id})'], True)
printer.infos([f'{self._area_id}号弹幕监控房间下播({self._room_id})'])
return False

elif cmd == 'NOTICE_MSG':
Expand All @@ -64,19 +64,19 @@ def handle_danmu(self, dict_danmu):
raffle_num = 1
raffle_name = str_gift
broadcast = msg_common.split('广播')[0]
printer.info([f'{self._area_id}号弹幕监控检测到{real_roomid:^9}{raffle_name}'], True)
printer.infos([f'{self._area_id}号弹幕监控检测到{real_roomid:^9}{raffle_name}'])
raffle_handler.push2queue(TvRaffleHandlerTask, real_roomid)
broadcast_type = 0 if broadcast == '全区' else 1
bili_statistics.add2pushed_raffles(raffle_name, broadcast_type, raffle_num)
elif msg_type == 3:
raffle_name = msg_common.split('开通了')[-1][:2]
printer.info([f'{self._area_id}号弹幕监控检测到{real_roomid:^9}{raffle_name}'], True)
printer.infos([f'{self._area_id}号弹幕监控检测到{real_roomid:^9}{raffle_name}'])
raffle_handler.push2queue(GuardRaffleHandlerTask, real_roomid)
broadcast_type = 0 if raffle_name == '总督' else 2
bili_statistics.add2pushed_raffles(raffle_name, broadcast_type)
elif msg_type == 6:
raffle_name = '二十倍节奏风暴'
printer.info([f'{self._area_id}号弹幕监控检测到{real_roomid:^9}{raffle_name}'], True)
printer.infos([f'{self._area_id}号弹幕监控检测到{real_roomid:^9}{raffle_name}'])
# raffle_handler.push2queue(StormRaffleHandlerTask, real_roomid)
bili_statistics.add2pushed_raffles(raffle_name)
return True
Expand All @@ -86,9 +86,9 @@ async def run_forever(self):
time_now = 0
while not self._closed:
if utils.curr_time() - time_now <= 3:
printer.info([f'网络波动,{self._area_id}号弹幕姬延迟3秒后重启'], True)
printer.infos([f'网络波动,{self._area_id}号弹幕姬延迟3秒后重启'])
await asyncio.sleep(3)
printer.info([f'正在启动{self._area_id}号弹幕姬'], True)
printer.infos([f'正在启动{self._area_id}号弹幕姬'])
time_now = utils.curr_time()

async with self._conn_lock:
Expand All @@ -106,14 +106,14 @@ async def run_forever(self):
tasks = [self._task_main, task_heartbeat, task_checkarea]
_, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
printer.info([f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息'], True)
printer.infos([f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息'])
if not task_heartbeat.done():
task_heartbeat.cancel()
if not task_checkarea.done():
task_checkarea.cancel()
await self._close_conn()
await asyncio.wait(pending)
printer.info([f'{self._area_id}号弹幕姬退出,剩余任务处理完毕'], True)
printer.infos([f'{self._area_id}号弹幕姬退出,剩余任务处理完毕'])
self._waiting.set_result(True)


Expand Down
22 changes: 11 additions & 11 deletions danmu/yj_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ def handle_danmu(self, dict_danmu):
msg_id, type, id = msg
if type == '~' and not msg_id % 2:
raffle_id = id
printer.info([f'{self._area_id}号弹幕监控检测到{"0":^9}的节奏风暴(id: {raffle_id})'], True)
printer.infos([f'{self._area_id}号弹幕监控检测到{"0":^9}的节奏风暴(id: {raffle_id})'])
# raffle_handler.exec_at_once(StormRaffleHandlerTask, 0, raffle_id)
bili_statistics.add2pushed_raffles('Yj协同节奏风暴', 2)
result = self.__combine_piece(uid, msg)
if result is None:
return True
type, raffle_id, real_roomid = result
if type == '+':
printer.info([f'{self._area_id}号弹幕监控检测到{real_roomid:^9}的大航海(id: {raffle_id})'], True)
printer.infos([f'{self._area_id}号弹幕监控检测到{real_roomid:^9}的大航海(id: {raffle_id})'])
raffle_handler.push2queue(GuardRaffleHandlerTask, real_roomid, raffle_id)
bili_statistics.add2pushed_raffles('Yj协同大航海', 2)
except Exception:
Expand Down Expand Up @@ -153,7 +153,7 @@ async def _open_conn(self):
print("连接无法建立,请检查本地网络状况")
print(sys.exc_info()[0])
return False
printer.info([f'{self._area_id}号弹幕监控已连接推送服务器'], True)
printer.infos([f'{self._area_id}号弹幕监控已连接推送服务器'])

dict_enter = {
'code': 0,
Expand Down Expand Up @@ -204,8 +204,8 @@ async def _read_datas(self):
return
# 握手确认
elif data_type == 'entered':
printer.info(
[f'{self._area_id}号推送监控确认连接'], True)
printer.infos(
[f'{self._area_id}号推送监控确认连接'])
elif data_type == 'error':
printer.warn(f'发生致命错误{json_data}')
await asyncio.sleep(3)
Expand All @@ -215,13 +215,13 @@ def handle_danmu(self, data):
if raffle_type == 'STORM':
raffle_id = data['raffle_id']
raffle_roomid = 0
printer.info([f'{self._area_id}号弹幕监控检测到{"0":^9}的节奏风暴(id: {raffle_id})'], True)
printer.infos([f'{self._area_id}号弹幕监控检测到{"0":^9}的节奏风暴(id: {raffle_id})'])
# raffle_handler.exec_at_once(StormRaffleHandlerTask, 0, raffle_id)
bili_statistics.add2pushed_raffles('Yj协同节奏风暴', 2)
elif raffle_type == 'GUARD':
raffle_id = data['raffle_id']
raffle_roomid = data['room_id']
printer.info([f'{self._area_id}号弹幕监控检测到{raffle_roomid:^9}的大航海(id: {raffle_id})'], True)
printer.infos([f'{self._area_id}号弹幕监控检测到{raffle_roomid:^9}的大航海(id: {raffle_id})'])
raffle_handler.push2queue(GuardRaffleHandlerTask, raffle_roomid, raffle_id)
bili_statistics.add2pushed_raffles('Yj协同大航海', 2)
return True
Expand All @@ -231,9 +231,9 @@ async def run_forever(self):
time_now = 0
while not self._closed:
if utils.curr_time() - time_now <= 3:
printer.info([f'网络波动,{self._area_id}号弹幕姬延迟3秒后重启'], True)
printer.infos([f'网络波动,{self._area_id}号弹幕姬延迟3秒后重启'])
await asyncio.sleep(3)
printer.info([f'正在启动{self._area_id}号弹幕姬'], True)
printer.infos([f'正在启动{self._area_id}号弹幕姬'])
time_now = utils.curr_time()

async with self._conn_lock:
Expand All @@ -246,13 +246,13 @@ async def run_forever(self):
tasks = [self._task_main, task_heartbeat]
_, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
printer.info([f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息'], True)
printer.infos([f'{self._area_id}号弹幕姬异常或主动断开,正在处理剩余信息'])
if not task_heartbeat.done():
task_heartbeat.cancel()
await self._close_conn()
if pending:
await asyncio.wait(pending)
printer.info([f'{self._area_id}号弹幕姬退出,剩余任务处理完毕'], True)
printer.infos([f'{self._area_id}号弹幕姬退出,剩余任务处理完毕'])
self._waiting.set_result(True)

async def close(self):
Expand Down
30 changes: 15 additions & 15 deletions dyn/monitor_dyn_raffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,38 @@ def __init__(self, should_join_immediately: bool, init_docid=None, init_feed_lim
async def dig_and_filter(self, dyn_raffle_status: DynRaffleStatus):
doc_id = dyn_raffle_status.doc_id
if dyn_raffle_status.lottery_time <= utils.curr_time() + 180:
printer.info([f'{doc_id}的动态抽奖已经开奖或马上开奖,不再参与'], True)
printer.infos([f'{doc_id}的动态抽奖已经开奖或马上开奖,不再参与'])
return
for key_word in self.dyn_raffle_description_filter:
if key_word in dyn_raffle_status.describe:
printer.info([f'{doc_id}的动态抽奖正文触发关键词过滤({key_word})'], True)
printer.infos([f'{doc_id}的动态抽奖正文触发关键词过滤({key_word})'])
return
for key_word in self.dyn_prize_cmt_filter:
if key_word in dyn_raffle_status.prize_cmt_1st or \
key_word in dyn_raffle_status.prize_cmt_2nd or \
key_word in dyn_raffle_status.prize_cmt_3rd:
printer.info([f'{doc_id}的动态抽奖正文触发关键词过滤({key_word})'], True)
printer.infos([f'{doc_id}的动态抽奖正文触发关键词过滤({key_word})'])
return
# 如果是刚刚出来的抽奖,就延迟150秒,
if dyn_raffle_status.post_time >= utils.curr_time() - 150:
printer.info([f'{doc_id}的动态抽奖触发时间约束,休眠150秒后再正式参与'], True)
printer.infos([f'{doc_id}的动态抽奖触发时间约束,休眠150秒后再正式参与'])
await asyncio.sleep(150)

if dyn_raffle_sql.is_raffleid_duplicate(dyn_raffle_status.dyn_id):
printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖触发重复性过滤'], True)
printer.infos([f'{dyn_raffle_status.doc_id}的动态抽奖触发重复性过滤'])
return
dyn_raffle_sql.insert_dynraffle_status_table(dyn_raffle_status)

printer.info([f'{doc_id}的动态抽奖通过过滤与验证,正式处理'], True)
printer.infos([f'{doc_id}的动态抽奖通过过滤与验证,正式处理'])

if not self.should_join_immediately:
printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖暂不参与,仅记录数据库中等候轮询'], True)
printer.infos([f'{dyn_raffle_status.doc_id}的动态抽奖暂不参与,仅记录数据库中等候轮询'])
return
printer.info([f'{doc_id}的动态抽奖正在参与'], True)
printer.infos([f'{doc_id}的动态抽奖正在参与'])
await notifier.exec_task_awaitable(-2, DynRaffleHandlerTask, 1, dyn_raffle_status,
delay_range=(0, 30))
dyn_raffle_sql.set_rafflestatus_handle_status(1, dyn_raffle_status.dyn_id)
printer.info([f'{doc_id}的动态抽奖参与完毕'], True)
printer.infos([f'{doc_id}的动态抽奖参与完毕'])

# 暴力docid,查找动态抽奖
async def check_raffle(self):
Expand Down Expand Up @@ -99,7 +99,7 @@ async def check_result(self):
while True:
results = dyn_raffle_sql.select_rafflestatus(1, None, utils.curr_time() - 900) # 延迟15min处理抽奖
results += dyn_raffle_sql.select_rafflestatus(-1, None, utils.curr_time() - 900)
printer.info(['正在查找已经结束的动态抽奖:', results], True)
printer.infos(['正在查找已经结束的动态抽奖:', results])
for dyn_raffle_status in results:

dyn_raffle_results: Optional[DynRaffleResults] = await notifier.exec_func(
Expand All @@ -119,29 +119,29 @@ async def check_handle(self):
while True:
curr_time = utils.curr_time()
results = dyn_raffle_sql.select_rafflestatus(-1, curr_time + 300, curr_time + 1200)[:5] # 20分钟到5分钟
printer.info(['正在查找需要参与的动态抽奖:', results], True)
printer.infos(['正在查找需要参与的动态抽奖:', results])
for dyn_raffle_status in results:
print(dyn_raffle_status)
is_exist = await notifier.exec_func(
-1, DynRaffleHandlerTask.check, dyn_raffle_status.doc_id)
if not is_exist:
dyn_raffle_sql.del_from_dynraffle_status_table(dyn_raffle_status.dyn_id)
continue
printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖正在参与'], True)
printer.infos([f'{dyn_raffle_status.doc_id}的动态抽奖正在参与'])
await notifier.exec_task_awaitable(-2, DynRaffleHandlerTask, 1, dyn_raffle_status)
dyn_raffle_sql.set_rafflestatus_handle_status(1, dyn_raffle_status.dyn_id)
printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖参与完毕'], True)
printer.infos([f'{dyn_raffle_status.doc_id}的动态抽奖参与完毕'])
if not results:
await asyncio.sleep(60)

async def run(self):
results = dyn_raffle_sql.select_rafflestatus(0)
for dyn_raffle_status in results:
print(dyn_raffle_status)
printer.info([f'正在暴力处理上次中断的{dyn_raffle_status.doc_id}的动态抽奖后续'], True)
printer.infos([f'正在暴力处理上次中断的{dyn_raffle_status.doc_id}的动态抽奖后续'])
dyn_raffle_sql.set_rafflestatus_handle_status(1, dyn_raffle_status.dyn_id)

printer.info([f'欢迎使用动态抽奖'], True)
printer.infos([f'欢迎使用动态抽奖'])
tasks = []
task_check_raffle = asyncio.ensure_future(self.check_raffle())
tasks.append(task_check_raffle)
Expand Down
Loading

0 comments on commit bae802f

Please sign in to comment.