diff --git a/app/core/task_manager.py b/app/core/task_manager.py index 611d3da..eda3b9c 100644 --- a/app/core/task_manager.py +++ b/app/core/task_manager.py @@ -162,7 +162,7 @@ def run(self): self.accomplish.emit(self.logs) def search_member(self) -> dict: - """搜索所有脚本实例并固定相关配置信息""" + """搜索所有脚本实例及其路径""" member_dict = {} diff --git a/app/models/MAA.py b/app/models/MAA.py index 6d2abf3..0bb51d1 100644 --- a/app/models/MAA.py +++ b/app/models/MAA.py @@ -457,7 +457,6 @@ def run(self): elif "用户" in self.mode: self.user_config_path.mkdir(parents=True, exist_ok=True) shutil.copy(self.maa_set_path, self.user_config_path) - logger.debug(self.user_config_path) end_log = "" diff --git a/app/services/security.py b/app/services/security.py index 21d9a80..719c124 100644 --- a/app/services/security.py +++ b/app/services/security.py @@ -25,13 +25,17 @@ 作者:DLmaster_361 """ +from loguru import logger +import sqlite3 import hashlib import random import secrets +from pathlib import Path from Crypto.Cipher import AES from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_OAEP from Crypto.Util.Padding import pad, unpad +from typing import List, Dict, Union from app.core import Config @@ -130,27 +134,65 @@ def decryptx(self, note: bytes, PASSWORD: str) -> str: note = decrypter.decrypt(note) return note.decode("utf-8") - def change_PASSWORD(self, data: list, PASSWORD_old: str, PASSWORD_new: str) -> None: + def change_PASSWORD(self, PASSWORD_old: str, PASSWORD_new: str) -> None: """修改管理密钥""" - # 使用旧管理密钥解密 - new_data = [] - for i in range(len(data)): - new_data.append(self.decryptx(data[i][12], PASSWORD_old)) - # 使用新管理密钥重新加密 + member_list = self.search_member() + + for user_data in member_list: + + # 读取用户数据 + db = sqlite3.connect(user_data["Path"]) + cur = db.cursor() + cur.execute("SELECT * FROM adminx WHERE True") + data = cur.fetchall() + + # 使用旧管理密钥解密 + user_data["Password"] = [] + for i in range(len(data)): + user_data["Password"].append(self.decryptx(data[i][12], PASSWORD_old)) + cur.close() + db.close() + self.get_PASSWORD(PASSWORD_new) - for i in range(len(data)): - Config.cur.execute( - "UPDATE adminx SET password = ? WHERE mode = ? AND uid = ?", - ( - self.encryptx(new_data[i]), - data[i][15], - data[i][16], - ), - ) - Config.db.commit(), - new_data[i] = None - del new_data + + for user_data in member_list: + + # 读取用户数据 + db = sqlite3.connect(user_data["Path"]) + cur = db.cursor() + cur.execute("SELECT * FROM adminx WHERE True") + data = cur.fetchall() + + # 使用新管理密钥重新加密 + for i in range(len(data)): + cur.execute( + "UPDATE adminx SET password = ? WHERE mode = ? AND uid = ?", + ( + self.encryptx(user_data["Password"][i]), + data[i][15], + data[i][16], + ), + ) + db.commit() + user_data["Password"][i] = None + del user_data["Password"] + + cur.close() + db.close() + + def search_member(self) -> List[Dict[str, Union[Path, list]]]: + """搜索所有脚本实例及其用户数据库路径""" + + member_list = [] + + if (Config.app_path / "config/MaaConfig").exists(): + for subdir in (Config.app_path / "config/MaaConfig").iterdir(): + if subdir.is_dir(): + + member_list.append({"Path": subdir / "user_data.db"}) + + return member_list def check_PASSWORD(self, PASSWORD: str) -> bool: """验证管理密钥""" diff --git a/app/ui/setting.py b/app/ui/setting.py index 29adca9..00c11cd 100644 --- a/app/ui/setting.py +++ b/app/ui/setting.py @@ -132,117 +132,66 @@ def check_PASSWORD(self) -> None: def change_PASSWORD(self) -> None: """修改管理密钥""" - # 获取用户信息 - Config.cur.execute("SELECT * FROM adminx WHERE True") - data = Config.cur.fetchall() + if_change = True - if len(data) == 0: + while if_change: - choice = MessageBox("验证通过", "当前无用户,验证自动通过", self) - choice.cancelButton.hide() - choice.buttonLayout.insertStretch(1) + choice = InputMessageBox( + self, + "请输入旧的管理密钥", + "旧管理密钥", + "密码", + ) + if choice.exec() and choice.input.text() != "": - # 获取新的管理密钥 - if choice.exec(): + # 验证旧管理密钥 + if Crypto.check_PASSWORD(choice.input.text()): - while True: - - choice = InputMessageBox( - self, - "请输入新的管理密钥", - "新管理密钥", - "密码", - ) - if choice.exec() and choice.input.text() != "": - # 修改管理密钥 - Crypto.get_PASSWORD(choice.input.text()) - choice = MessageBox( - "操作成功", - "管理密钥修改成功", - self, - ) - choice.cancelButton.hide() - choice.buttonLayout.insertStretch(1) - if choice.exec(): - break - else: - choice = MessageBox( - "确认", - "您没有输入新的管理密钥,是否取消修改管理密钥?", + PASSWORD_old = choice.input.text() + # 获取新的管理密钥 + while True: + + choice = InputMessageBox( self, + "请输入新的管理密钥", + "新管理密钥", + "密码", ) - if choice.exec(): - break + if choice.exec() and choice.input.text() != "": - else: - # 验证管理密钥 - if_change = True - - while if_change: - - choice = InputMessageBox( - self, - "请输入旧的管理密钥", - "旧管理密钥", - "密码", - ) - if choice.exec() and choice.input.text() != "": - - # 验证旧管理密钥 - if Crypto.check_PASSWORD(choice.input.text()): + # 修改管理密钥 + Crypto.change_PASSWORD(PASSWORD_old, choice.input.text()) + MainInfoBar.push_info_bar( + "success", "操作成功", "管理密钥修改成功", 3000 + ) + if_change = False + break - PASSWORD_old = choice.input.text() - # 获取新的管理密钥 - while True: + else: - choice = InputMessageBox( + choice = MessageBox( + "确认", + "您没有输入新的管理密钥,是否取消修改管理密钥?", self, - "请输入新的管理密钥", - "新管理密钥", - "密码", ) - if choice.exec() and choice.input.text() != "": - - # 修改管理密钥 - Crypto.change_PASSWORD( - data, PASSWORD_old, choice.input.text() - ) - choice = MessageBox( - "操作成功", - "管理密钥修改成功", - self, - ) - choice.cancelButton.hide() - choice.buttonLayout.insertStretch(1) - if choice.exec(): - if_change = False - break - - else: - - choice = MessageBox( - "确认", - "您没有输入新的管理密钥,是否取消修改管理密钥?", - self, - ) - if choice.exec(): - if_change = False - break - - else: - choice = MessageBox("错误", "管理密钥错误", self) - choice.cancelButton.hide() - choice.buttonLayout.insertStretch(1) - if choice.exec(): - pass + if choice.exec(): + if_change = False + break + else: - choice = MessageBox( - "确认", - "您没有输入管理密钥,是否取消修改管理密钥?", - self, - ) + choice = MessageBox("错误", "管理密钥错误", self) + choice.cancelButton.hide() + choice.buttonLayout.insertStretch(1) if choice.exec(): - break + pass + else: + choice = MessageBox( + "确认", + "您没有输入管理密钥,是否取消修改管理密钥?", + self, + ) + if choice.exec(): + break def get_update_info(self) -> str: """检查主程序版本更新,返回更新信息"""