diff --git a/.gitignore b/.gitignore index fb7ac3c4..5138b8f1 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ data/ logs/ .sqllsrc.json +assets/games/tictactoe/gaming_board.png diff --git a/README.md b/README.md index fd2c961d..5168be75 100644 --- a/README.md +++ b/README.md @@ -116,6 +116,14 @@ python3 -m wechatter - [x] **定时任务**:大部分命令均支持定时任务。需进行[配置](#%EF%B8%8F-task-cron-配置)。 +## 支持的游戏 + +- [x] **井字棋**:双人游戏,图片游戏。 +![tictactoe_show](docs/images/tictactoe_show.png) + +> [!TIP] +> 游戏相关命令帮助请使用查阅[游戏基本命令](docs/command_show.md#游戏基本命令)。 + ## 支持的 Webhook - [x] GitHub 仓库 Webhook,需在 GitHub 仓库 Settings 中添加 Webhook 并进行[配置](#%EF%B8%8F-github-webhook-配置)。 diff --git a/assets/games/tictactoe/board.png b/assets/games/tictactoe/board.png new file mode 100644 index 00000000..1b053966 Binary files /dev/null and b/assets/games/tictactoe/board.png differ diff --git a/assets/games/tictactoe/piece_o.png b/assets/games/tictactoe/piece_o.png new file mode 100644 index 00000000..298c76db Binary files /dev/null and b/assets/games/tictactoe/piece_o.png differ diff --git a/assets/games/tictactoe/piece_x.png b/assets/games/tictactoe/piece_x.png new file mode 100644 index 00000000..9485aed2 Binary files /dev/null and b/assets/games/tictactoe/piece_x.png differ diff --git a/config.yaml.example b/config.yaml.example index cf271d7f..afc858a2 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -81,4 +81,5 @@ task_cron_list: custom_command_key_dict: gpt4: [ ">" ] bili-hot: [ "bh" ] + play: [ "p" ] weather: [ "w", "温度" ] \ No newline at end of file diff --git a/docs/command_show.md b/docs/command_show.md index 61663460..cd5cd250 100644 --- a/docs/command_show.md +++ b/docs/command_show.md @@ -3,6 +3,13 @@ 本文档展示了 WeChatter 支持的命令功能。 ## 目录 +- [游戏基本命令](#游戏基本命令) + - [创建游戏](#创建游戏) + - [加入游戏](#加入游戏) + - [开始游戏](#开始游戏) + - [进行游戏](#进行游戏) + - [结束游戏](#结束游戏) + - [查看游戏列表](#查看游戏列表) - [天气预报](#天气预报) - [待办清单](#待办清单) - [添加待办](#添加待办) @@ -28,6 +35,32 @@ - [冷知识](#冷知识) - [历史上的今天](#历史上的今天) +## 游戏基本命令 + +### 创建游戏 + +![创建游戏](./images/cmd_game_create.png) + +### 加入游戏 + +![加入游戏](./images/cmd_game_join.png) + +### 开始游戏 + +![开始游戏](./images/cmd_game_start.png) + +### 进行游戏 + +![进行游戏](./images/cmd_game_play.png) + +### 结束游戏 + +![结束游戏](./images/cmd_game_over.png) + +### 查看游戏列表 + +![查看游戏列表](./images/cmd_game_list.png) + ## 天气预报 ![获取天气预报](./images/cmd_weather.png) @@ -124,4 +157,4 @@ ## 历史上的今天 -![历史上的今天](./images/cmd_today_in_history.png) +![历史上的今天](./images/cmd_today_in_history.png) \ No newline at end of file diff --git a/docs/images/cmd_game_create.png b/docs/images/cmd_game_create.png new file mode 100644 index 00000000..115a4077 Binary files /dev/null and b/docs/images/cmd_game_create.png differ diff --git a/docs/images/cmd_game_join.png b/docs/images/cmd_game_join.png new file mode 100644 index 00000000..e459c18a Binary files /dev/null and b/docs/images/cmd_game_join.png differ diff --git a/docs/images/cmd_game_list.png b/docs/images/cmd_game_list.png new file mode 100644 index 00000000..f446ed27 Binary files /dev/null and b/docs/images/cmd_game_list.png differ diff --git a/docs/images/cmd_game_over.png b/docs/images/cmd_game_over.png new file mode 100644 index 00000000..7e687c65 Binary files /dev/null and b/docs/images/cmd_game_over.png differ diff --git a/docs/images/cmd_game_play.png b/docs/images/cmd_game_play.png new file mode 100644 index 00000000..2e146fc6 Binary files /dev/null and b/docs/images/cmd_game_play.png differ diff --git a/docs/images/cmd_game_start.png b/docs/images/cmd_game_start.png new file mode 100644 index 00000000..dc7d12bb Binary files /dev/null and b/docs/images/cmd_game_start.png differ diff --git a/docs/images/tictactoe_show.png b/docs/images/tictactoe_show.png new file mode 100644 index 00000000..43f5bdf8 Binary files /dev/null and b/docs/images/tictactoe_show.png differ diff --git a/wechatter/app/routers/wechat.py b/wechatter/app/routers/wechat.py index cd31582c..73e67951 100644 --- a/wechatter/app/routers/wechat.py +++ b/wechatter/app/routers/wechat.py @@ -13,6 +13,7 @@ Person as DbPerson, make_db_session, ) +from wechatter.games import games from wechatter.message import MessageHandler from wechatter.models.wechat import Message from wechatter.models.wechat.group import Group @@ -68,7 +69,9 @@ async def recv_wechat_msg( print(str(message)) # 传入命令字典,构造消息处理器 - message_handler = MessageHandler(commands=commands, quoted_handlers=quoted_handlers) + message_handler = MessageHandler( + commands=commands, quoted_handlers=quoted_handlers, games=games + ) # 用户发来的消息均送给消息解析器处理 message_handler.handle_message(message) diff --git a/wechatter/commands/_commands/__init__.py b/wechatter/commands/_commands/__init__.py index 15c169ca..21fbdc23 100644 --- a/wechatter/commands/_commands/__init__.py +++ b/wechatter/commands/_commands/__init__.py @@ -15,3 +15,6 @@ continue # 动态导入模块 importlib.import_module("." + module_name, __package__) + +# 释放变量 +del command_files diff --git a/wechatter/commands/handlers.py b/wechatter/commands/handlers.py index 0c45b74e..70eca9f8 100644 --- a/wechatter/commands/handlers.py +++ b/wechatter/commands/handlers.py @@ -50,9 +50,7 @@ def __call__(self, func): logger.error(error_message) raise ValueError(error_message) if len(params) == 3 and "message_obj" not in params: - error_message = ( - f"参数名错误,命令处理函数的第3个参数必须为 message_obj{func.__name__}" - ) + error_message = f"参数名错误,命令处理函数的第3个参数必须为 message_obj:{func.__name__}" logger.error(error_message) raise ValueError(error_message) if len(params) > 3: diff --git a/wechatter/database/__init__.py b/wechatter/database/__init__.py index 3a8a862a..d4f9ff5c 100644 --- a/wechatter/database/__init__.py +++ b/wechatter/database/__init__.py @@ -1,5 +1,6 @@ from .database import create_tables, make_db_session from .tables import person_group_relation # noqa +from .tables.game_states import GameStates from .tables.gpt_chat_info import GptChatInfo from .tables.gpt_chat_message import GptChatMessage from .tables.group import Group @@ -16,4 +17,5 @@ "Group", "Person", "QuotedResponse", + "GameStates", ] diff --git a/wechatter/database/tables/game_states.py b/wechatter/database/tables/game_states.py new file mode 100644 index 00000000..13f10e4d --- /dev/null +++ b/wechatter/database/tables/game_states.py @@ -0,0 +1,80 @@ +import json +from datetime import datetime +from typing import TYPE_CHECKING + +from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from wechatter.database.tables import Base +from wechatter.models.game.game_states import GameStates as GameStatesModel +from wechatter.utils.unique_list import UniqueListDecoder, UniqueListEncoder + +if TYPE_CHECKING: + from wechatter.database.tables.group import Group + from wechatter.database.tables.person import Person + + +class GameStates(Base): + """ + 游戏状态表 + """ + + __tablename__ = "game_states" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + host_person_id: Mapped[str] = mapped_column(String, ForeignKey("person.id")) + host_group_id: Mapped[str] = mapped_column( + String, ForeignKey("group.id"), nullable=True + ) + game_class_name: Mapped[str] = mapped_column(String) + states: Mapped[str] = mapped_column(String) + is_over: Mapped[bool] = mapped_column(Boolean, default=False) + create_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now) + + host_person: Mapped["Person"] = relationship( + "Person", back_populates="game_states_list" + ) + host_group: Mapped["Group"] = relationship( + "Group", back_populates="game_states_list" + ) + + @classmethod + def from_model(cls, game_states_model: GameStatesModel): + group_id = None + if game_states_model.host_group: + group_id = game_states_model.host_group.id + return cls( + id=game_states_model.id, + host_person_id=game_states_model.host_person.id, + host_group_id=group_id, + game_class_name=game_states_model.game_class_name, + states=json.dumps(game_states_model.states, cls=UniqueListEncoder), + create_time=game_states_model.create_time, + is_over=game_states_model.is_over, + ) + + def to_model(self) -> GameStatesModel: + host_group = None + if self.host_group: + host_group = self.host_group.to_model() + return GameStatesModel( + id=self.id, + host_person=self.host_person.to_model(), + host_group=host_group, + game_class_name=self.game_class_name, + states=json.loads(self.states, cls=UniqueListDecoder), + create_time=self.create_time, + is_over=self.is_over, + ) + + def update(self, game_states_model: GameStatesModel): + group_id = None + if game_states_model.host_group: + group_id = game_states_model.host_group.id + self.host_person_id = game_states_model.host_person.id + self.host_group_id = group_id + self.game_class_name = game_states_model.game_class_name + self.states = json.dumps(game_states_model.states, cls=UniqueListEncoder) + self.create_time = game_states_model.create_time + self.is_over = game_states_model.is_over + return self diff --git a/wechatter/database/tables/gpt_chat_message.py b/wechatter/database/tables/gpt_chat_message.py index 5062a903..ad1f8fb1 100644 --- a/wechatter/database/tables/gpt_chat_message.py +++ b/wechatter/database/tables/gpt_chat_message.py @@ -45,5 +45,5 @@ def to_model(self) -> GptChatMessageModel: id=self.id, message=self.message.to_model(), gpt_chat_info=self.gpt_chat_info.to_model(), - gp_response=self.gpt_response, + gpt_response=self.gpt_response, ) diff --git a/wechatter/database/tables/group.py b/wechatter/database/tables/group.py index f7b2c14a..63708aeb 100644 --- a/wechatter/database/tables/group.py +++ b/wechatter/database/tables/group.py @@ -7,6 +7,7 @@ from wechatter.models.wechat import Group as GroupModel if TYPE_CHECKING: + from wechatter.database.tables.game_states import GameStates from wechatter.database.tables.message import Message from wechatter.database.tables.person import Person @@ -21,6 +22,7 @@ class Group(Base): id: Mapped[str] = mapped_column(String(100), primary_key=True) name: Mapped[str] alias: Mapped[Union[str, None]] = mapped_column(String, nullable=True) + is_gaming: Mapped[bool] = mapped_column(String, default=False) members: Mapped[List["Person"]] = relationship( "Person", @@ -28,12 +30,16 @@ class Group(Base): back_populates="groups", ) messages: Mapped[List["Message"]] = relationship("Message", back_populates="group") + game_states_list: Mapped[List["GameStates"]] = relationship( + "GameStates", back_populates="host_group" + ) @classmethod def from_model(cls, group_model: GroupModel): return cls( id=group_model.id, name=group_model.name, + is_gaming=group_model.is_gaming, ) def to_model(self) -> GroupModel: @@ -44,6 +50,7 @@ def to_model(self) -> GroupModel: id=self.id, name=self.name, member_list=member_list, + is_gaming=self.is_gaming, ) def update(self, group_model: GroupModel): @@ -52,3 +59,4 @@ def update(self, group_model: GroupModel): for member in self.members: member_list.append(Person.from_member_model(member)) self.members = member_list + self.is_gaming = group_model.is_gaming diff --git a/wechatter/database/tables/person.py b/wechatter/database/tables/person.py index 9d7efce3..7a5f2f00 100644 --- a/wechatter/database/tables/person.py +++ b/wechatter/database/tables/person.py @@ -7,6 +7,7 @@ from wechatter.models.wechat import Gender, GroupMember, Person as PersonModel if TYPE_CHECKING: + from wechatter.database.tables.game_states import GameStates from wechatter.database.tables.gpt_chat_info import GptChatInfo from wechatter.database.tables.group import Group from wechatter.database.tables.message import Message @@ -29,6 +30,7 @@ class Person(Base): is_star: Mapped[bool] = mapped_column(Boolean, default=False) is_friend: Mapped[bool] = mapped_column(Boolean, default=False) is_official_account: Mapped[bool] = mapped_column(Boolean, default=False) + is_gaming: Mapped[bool] = mapped_column(Boolean, default=False) groups: Mapped[List["Group"]] = relationship( "Group", @@ -39,6 +41,9 @@ class Person(Base): gpt_chat_infos: Mapped[List["GptChatInfo"]] = relationship( "GptChatInfo", back_populates="person" ) + game_states_list: Mapped[List["GameStates"]] = relationship( + "GameStates", back_populates="host_person" + ) @classmethod def from_model(cls, person_model: PersonModel): @@ -52,6 +57,7 @@ def from_model(cls, person_model: PersonModel): is_star=person_model.is_star, is_friend=person_model.is_friend, is_official_account=person_model.is_official_account, + is_gaming=person_model.is_gaming, ) @classmethod @@ -74,6 +80,7 @@ def to_model(self) -> PersonModel: is_star=self.is_star, is_friend=self.is_friend, is_official_account=self.is_official_account, + is_gaming=self.is_gaming, ) def update(self, person_model: PersonModel): @@ -85,3 +92,4 @@ def update(self, person_model: PersonModel): self.is_star = person_model.is_star self.is_friend = person_model.is_friend self.is_official_account = person_model.is_official_account + self.is_gaming = person_model.is_gaming diff --git a/wechatter/games/__init__.py b/wechatter/games/__init__.py new file mode 100644 index 00000000..ce355c5b --- /dev/null +++ b/wechatter/games/__init__.py @@ -0,0 +1,176 @@ +from typing import Dict, Type + +from loguru import logger + +from wechatter.commands.handlers import command +from wechatter.models.wechat import Message, SendTo +from wechatter.sender import sender +from wechatter.utils.unique_list import UniqueList + +from ._games import * # noqa: F403 +from .game import Game + +games = {} +game_class_name_dict: Dict[str, Type[Game]] = {} + + +def load_games(): + """ + 加载所有游戏 + """ + for game in Game.__subclasses__(): + games[game.name] = { + "class": game, + "desc": game.desc, + "keys": game.keys, + } + game_class_name_dict[game.__name__] = game + logger.info(f"加载游戏: {game.name}") + logger.info(f"游戏描述: {game.desc}") + logger.info(f"游戏关键字: {game.keys}") + + @command( + command=game.name, + keys=game.keys, + desc=game.desc, + ) + def game_command_handler(to: SendTo, message: str = "", message_obj=None): + """ + 创建游戏 + """ + _execute_game("create", to, message, message_obj, game_class=game) + + _register_game_basic_command() + logger.info(f"共加载了{len(games)}个游戏") + + +def _execute_game( + cmd: str, + to: SendTo, + message: str, + message_obj: Message, + game_class: Type[Game] = None, +): + """ + 执行游戏命令 + :param cmd: 游戏命令(create、start、join、play、over) + :param to: 发送对象 + :param message: 消息内容 + :param message_obj: 消息对象 + :param game_class: 游戏类 + """ + # 目前只支持群中游戏 + if not message_obj.is_group: + logger.info("目前只支持群中游戏!") + message = "目前只支持群中游戏!" + sender.send_msg(message_obj.sender_name, message, is_group=message_obj.is_group) + return + + # 从数据库中获取未结束的游戏的游戏状态 + group_id = None + if message_obj.is_group: + group_id = message_obj.group.id + game_states = Game.get_game_states( + person_id=message_obj.person.id, group_id=group_id + ) + if cmd == "create": + # 有游戏在进行中 + if game_states: + message = ( + "当前已有游戏在进行中!\n" + "若要创建新游戏,请使用 over 命令结束当前游戏。" + ) + sender.send_msg( + message_obj.sender_name, message, is_group=message_obj.is_group + ) + return + + group = None + if to.group: + group = to.group + game = game_class( + game_host_person=to.person, + game_players=UniqueList([to.person]), + game_host_group=group, + ) + game.create_game() + return + + if game_states is None: + message = "当前没有游戏在进行中!" + sender.send_msg(message_obj.sender_name, message, is_group=message_obj.is_group) + return + + game_class = game_class_name_dict[game_states.game_class_name] + game = game_class.from_dict(game_states.states) + if cmd == "start": + game.start_game(player=to.person, game_states=game_states) + elif cmd == "join": + game.join_game(player=to.person, message=message, game_states=game_states) + elif cmd == "play": + game.play_game(player=to.person, message=message, game_states=game_states) + elif cmd == "over": + game.over_game(message=message, game_states=game_states) + + +def _register_game_basic_command(): + @command( + command="start", + keys=["start", "开始"], + desc="开始游戏", + ) + def start_game_handler(to: SendTo, message: str = "", message_obj=None): + """ + 开始游戏 + """ + _execute_game("start", to, message, message_obj) + + @command( + command="join", + keys=["join", "加入"], + desc="加入游戏", + ) + def join_game_handler(to: SendTo, message: str = "", message_obj=None): + """ + 加入游戏 + """ + _execute_game("join", to, message, message_obj) + + @command( + command="play", + keys=["play", "玩"], + desc="进行一回合游戏", + ) + def play_game_handler(to: SendTo, message: str = "", message_obj=None): + """ + 进行一回合游戏 + """ + _execute_game("play", to, message, message_obj) + + @command( + command="over", + keys=["over", "结束"], + desc="结束游戏", + ) + def over_game_handler(to: SendTo, message: str = "", message_obj=None): + """ + 结束游戏 + """ + _execute_game("over", to, message, message_obj) + + @command( + command="games", + keys=["games", "游戏", "game_list"], + desc="查看所有游戏", + ) + def games_list_handler(to: SendTo, message: str = "", message_obj=None): + """ + 查看所有游戏 + """ + response = "✨==所有游戏列表==✨\n" + for i, (game_name, game_info) in enumerate(games.items()): + response += f"{i + 1}. {game_name}:{game_info['desc']}\n" + sender.send_msg(to, response) + + +__all__ = ["load_games", "games", "game_class_name_dict", "Game"] diff --git a/wechatter/games/_games/__init__.py b/wechatter/games/_games/__init__.py new file mode 100644 index 00000000..fe4fd984 --- /dev/null +++ b/wechatter/games/_games/__init__.py @@ -0,0 +1,20 @@ +# 导入该文件夹下所有模块 + +import glob +import importlib +import os + +# 获取commands目录下所有的.py文件 +games_files = glob.glob(os.path.dirname(__file__) + "/*.py") + +for file in games_files: + # 获取文件名(不包括扩展名) + module_name = os.path.basename(file)[:-3] + # 跳过__init__.py文件 + if module_name == "__init__": + continue + # 动态导入模块 + importlib.import_module("." + module_name, __package__) + +# 释放变量 +del games_files diff --git a/wechatter/games/_games/tictactoe.py b/wechatter/games/_games/tictactoe.py new file mode 100644 index 00000000..0c9d4776 --- /dev/null +++ b/wechatter/games/_games/tictactoe.py @@ -0,0 +1,169 @@ +import re +import shutil +from typing import Dict + +from loguru import logger +from PIL import Image +from typing_extensions import override + +from wechatter.games.game import Game +from wechatter.utils.path_manager import get_abs_path + + +class Tictactoe(Game): + """ + 井字棋游戏 + """ + + name = "tictactoe" + desc = "井字棋游戏" + keys = ["tictactoe", "井字棋", "ttt"] + + # 文件路径 + initial_board_image_path = get_abs_path("assets/games/tictactoe/board.png") + gaming_board_image_path = get_abs_path("assets/games/tictactoe/gaming_board.png") + piece_x_image_path = get_abs_path("assets/games/tictactoe/piece_x.png") + piece_o_image_path = get_abs_path("assets/games/tictactoe/piece_o.png") + + def __init__( + self, + game_host_person, + game_players, + game_host_group=None, + least_player_num: int = 2, + most_player_num: int = 2, + ): + super().__init__( + game_host_person=game_host_person, + game_players=game_players, + game_host_group=game_host_group, + least_player_num=least_player_num, + most_player_num=most_player_num, + ) + # 0表示空位,1表示玩家1(O),2表示玩家2(X) + self.board = [[0, 0, 0], [0, 0, 0], [0, 0, 0]] + + @override + def fill_from_dict(self, data: Dict): + self.board = data["board"] + + @override + def create(self): + create_msg = self.generate_raw_create_msg("井字棋游戏") + self._send_msg(create_msg) + + @override + def start(self, game_states): + # 保存初始棋盘到游戏棋盘 + shutil.copy(self.initial_board_image_path, self.gaming_board_image_path) + self._send_msg(self.initial_board_image_path, type="localfile") + + @override + def join(self, player, message, game_states): + pass + + @override + def play(self, player, message, game_states): + # 检测message是否符合 x,y 或者 x y 的格式 + # TODO: 多分割符写成工具函数 + split_list = re.split(r"[\s,]+", message) + if len(split_list) != 2: + logger.info("⚠️ 请按照 x,y 或者 x y 的格式输入坐标!") + self._send_msg("⚠️ 请按照 x,y 或者 x y 的格式输入坐标!") + raise ValueError + + if self.current_player_index != self.game_players.index(player): + logger.info( + f"⚠️ 不是你的回合,当前回合玩家为 {self.game_players[self.current_player_index].name}!" + ) + self._send_msg( + f"⚠️ 不是你的回合,当前回合玩家为 {self.game_players[self.current_player_index].name}!" + ) + raise ValueError + x, y = split_list + if not x.isdigit() or not y.isdigit(): + logger.info("⚠️ 请输入有效的坐标!") + self._send_msg("⚠️ 请输入有效的坐标!") + raise ValueError + x, y = int(x) - 1, int(y) - 1 + if not (0 <= x < 3 and 0 <= y < 3): + logger.info("⚠️ 坐标超出范围!") + self._send_msg("⚠️ 坐标超出范围!") + raise ValueError + if self.board[x][y] != 0: + logger.info("⚠️ 该位置已经有棋子了!") + self._send_msg("⚠️ 该位置已经有棋子了!") + raise ValueError + + # 玩家1为 O + if self.current_player_index == 0: + self.board[x][y] = 1 + self.gaming_board_image_path = self.__draw_board( + self.gaming_board_image_path, x, y + ) + # 玩家2为 X + elif self.current_player_index == 1: + self.board[x][y] = 2 + self.gaming_board_image_path = self.__draw_board( + self.gaming_board_image_path, x, y + ) + else: + logger.error("玩家索引错误!") + raise ValueError("玩家索引错误!") + self._send_msg(self.gaming_board_image_path, type="localfile") + + # 判断胜利 + winner = self.__judge_winner() + if winner == 1: + self._send_msg(f"🎉 玩家1 {self.game_players[0].name} 胜利!") + self.over_game(message="", game_states=game_states) + elif winner == 2: + self._send_msg(f"🎉 玩家2 {self.game_players[1].name} 胜利!") + self.over_game(message="", game_states=game_states) + elif winner == 0: + self._send_msg("🤝 平局!") + self.over_game(message="", game_states=game_states) + + @override + def over(self, message, game_states): + self._send_msg(message) + + def __judge_winner(self): + """ + 判断胜利 + """ + # 判断横向 + for i in range(3): + if self.board[i][0] == self.board[i][1] == self.board[i][2] != 0: + return self.board[i][0] + # 判断纵向 + for i in range(3): + if self.board[0][i] == self.board[1][i] == self.board[2][i] != 0: + return self.board[0][i] + # 判断对角线 + if self.board[0][0] == self.board[1][1] == self.board[2][2] != 0: + return self.board[0][0] + if self.board[0][2] == self.board[1][1] == self.board[2][0] != 0: + return self.board[0][2] + # 判断平局 + if all([i != 0 for row in self.board for i in row]): + return 0 + return None + + def __draw_board( + self, gaming_board_image_path: str = None, i: int = None, j: int = None + ): + """ + 绘制棋盘 + """ + board_image = Image.open(gaming_board_image_path) + if self.board[i][j] == 1: + piece_image = Image.open(self.piece_o_image_path).convert("RGBA") + elif self.board[i][j] == 2: + piece_image = Image.open(self.piece_x_image_path).convert("RGBA") + else: + logger.error("棋子索引错误!") + raise ValueError("棋子索引错误!") + board_image.paste(piece_image, (j * 130 + 60, i * 130 + 90), mask=piece_image) + board_image.save(self.gaming_board_image_path) + return self.gaming_board_image_path diff --git a/wechatter/games/game.py b/wechatter/games/game.py new file mode 100644 index 00000000..a3b7c6f9 --- /dev/null +++ b/wechatter/games/game.py @@ -0,0 +1,340 @@ +from abc import ABC, abstractmethod +from typing import Dict, List, Optional + +from loguru import logger + +from wechatter.database import GameStates as DbGameStates, make_db_session +from wechatter.models.game import GameStates +from wechatter.models.wechat import Group, Person +from wechatter.sender import sender +from wechatter.utils.unique_list import UniqueList + + +class Game(ABC): + """ + 聊天交互式游戏基类 + """ + + name: str + desc: str + keys: List[str] + + def __init__( + self, + game_host_person: Person, + game_players: UniqueList[Person], + game_host_group: Group = None, + least_player_num: int = 0, + most_player_num: int = 0, + ): + """ + 初始化游戏 + :param game_host_person: 游戏创建者 + :param game_players: 参与游戏的玩家(包括创建者) + :param game_host_group: 游戏创建者所在群 + :param least_player_num: 最少玩家数 + :param most_player_num: 最多玩家数 + """ + self.game_host_person = game_host_person + self.game_players = game_players + self.game_host_group = game_host_group + self.least_player_num = least_player_num + self.most_player_num = most_player_num + # 是否已开始 + self.is_started = False + # 当前回合数 + self.round = 0 + if game_host_group: + # 发送者名称 + self.sender_name = game_host_group.name + # 是否是群游戏 + self.is_group = True + else: + self.sender_name = game_host_person.name + self.is_group = False + # 是否可以加入游戏 + self.can_join = True + # 当前玩家下标 + self.current_player_index = 0 + pass + + @classmethod + def from_dict(cls, data: Dict): + instance = cls.__new__(cls) + instance.game_host_person = Person(**data["game_host_person"]) + game_players = UniqueList() + for player in data["game_players"]: + game_players.add(Person(**player)) + instance.game_players = game_players + if data["is_group"]: + instance.game_host_group = Group(**data["game_host_group"]) + instance.least_player_num = data["least_player_num"] + instance.most_player_num = data["most_player_num"] + instance.is_group = data["is_group"] + instance.is_started = data["is_started"] + instance.sender_name = data["sender_name"] + instance.round = data["round"] + instance.can_join = data["can_join"] + instance.current_player_index = data["current_player_index"] + # 填充子类属性 + instance.fill_from_dict(data) + return instance + + @staticmethod + def get_game_states( + person_id: str, group_id: Optional[str] = None + ) -> Optional[GameStates]: + """ + 获取游戏状态 + """ + + # 检测是否有游戏在进行中 + with make_db_session() as session: + if group_id: + _game_states = ( + session.query(DbGameStates) + .filter_by(host_group_id=group_id, is_over=False) + .first() + ) + else: + # TODO: 去群中心化游戏 + _game_states = ( + session.query(DbGameStates) + .filter_by(host_person_id=person_id, is_over=False) + .first() + ) + # 有游戏在进行中 + if _game_states: + game_states = _game_states.to_model() + return game_states + return None + + def create_game(self): + """ + 创建游戏 + """ + with make_db_session() as session: + # 插入新游戏状态 + game_states = GameStates( + host_person=self.game_host_person, + host_group=self.game_host_group, + game_class_name=self.__class__.__name__, + states=self.to_dict(), + ) + _game_states = DbGameStates.from_model(game_states) + session.add(_game_states) + session.commit() + self.create() + + def start_game(self, player: Person, game_states: GameStates): + # 消息发送者是否为游戏创建者 + if player.id != self.game_host_person.id: + logger.warning( + f"⚠️ 只有游戏创建者 {self.game_host_person.name} 可以开始游戏!" + ) + self._send_msg( + f"⚠️ 只有游戏创建者 {self.game_host_person.name} 可以开始游戏!" + ) + return + + # 判断游戏是否已经开始 + if self.is_started: + logger.warning("⚠️ 游戏已开始!") + self._send_msg("⚠️ 游戏已开始!") + return + # 判断游戏人数是否满足最少游戏人数 + if len(self.game_players) < self.least_player_num: + logger.warning( + f"⚠️ 游戏人数不足,无法开始游戏!最低游戏人数为 {self.least_player_num} 人。" + ) + self._send_msg( + f"⚠️ 游戏人数不足,无法开始游戏!最低游戏人数为 {self.least_player_num} 人。" + ) + return + + self.is_started = True + self.start(game_states) + + with make_db_session() as session: + _game_states = ( + session.query(DbGameStates).filter_by(id=game_states.id).first() + ) + # TODO: 直接优化成表的一个字段? + game_states.states = self.to_dict() + _game_states.update(game_states) + session.commit() + + logger.info("游戏开始!") + + def join_game(self, player: Person, message: str, game_states: GameStates): + """ + 加入游戏 + """ + # 判断游戏是否已经开始 + if self.is_started: + logger.warning("⚠️ 游戏已开始!无法加入游戏!") + self._send_msg("⚠️ 游戏已开始!无法加入游戏!") + return + + # 判断游戏人数是否大于等于最多游戏人数 + if len(self.game_players) >= self.most_player_num: + self.can_join = False + logger.warning( + f"⚠️ 游戏人数已满,无法加入游戏!最多游戏人数为 {self.most_player_num} 人。" + ) + self._send_msg( + f"⚠️ 游戏人数已满,无法加入游戏!最多游戏人数为 {self.most_player_num} 人。" + ) + return + + if self.can_join: + try: + self.game_players.add(player) + logger.info(f"{player.name} 加入游戏成功!") + # 已经在游戏中 + except ValueError: + logger.warning(f"⚠️ 加入游戏失败,{player.name} 已经在游戏中!") + self._send_msg(f"⚠️ 加入游戏失败,{player.name} 已经在游戏中!") + return + + if len(self.game_players) >= self.most_player_num: + self.can_join = False + + self.join(player, message, game_states) + + # 更新游戏状态 + with make_db_session() as session: + _game_states = ( + session.query(DbGameStates).filter_by(id=game_states.id).first() + ) + game_states.states = self.to_dict() + _game_states.update(game_states) + session.commit() + # TODO: 封装 + join_msg = ( + f"{player.name} 加入游戏成功!\n" + f"目前游戏人数为 {len(self.game_players)} 人。\n" + ) + if len(self.game_players) < self.least_player_num: + join_msg += f"最低游戏人数为 {self.least_player_num} 人。" + elif len(self.game_players) == self.most_player_num: + join_msg += "游戏人数已满,使用 start 命令开始游戏。" + else: + join_msg += "满足最低游戏人数,使用 start 命令开始游戏。" + self._send_msg(join_msg) + + def play_game(self, player: Person, message: str, game_states: GameStates): + try: + self.play(player, message, game_states) + except Exception: + logger.error("游戏回合出现异常!") + else: + self.round += 1 + # 下一个玩家 + self.next_player() + with make_db_session() as session: + _game_states = ( + session.query(DbGameStates).filter_by(id=game_states.id).first() + ) + game_states.states = self.to_dict() + _game_states.update(game_states) + session.commit() + + def over_game(self, message: str, game_states: GameStates): + self.over(message, game_states) + with make_db_session() as session: + _game_states = ( + session.query(DbGameStates).filter_by(id=game_states.id).first() + ) + game_states.is_over = True + _game_states.update(game_states) + session.commit() + logger.info("游戏结束!") + self._send_msg("游戏结束!") + + @staticmethod + def generate_raw_create_msg(title: str): + """ + 获取创建游戏的消息 + """ + create_msg = ( + f"=== {title} ===\n" + "🕹️ 游戏开始!等待对手加入游戏...\n" + "😃 使用 join 命令加入游戏。\n" + "⛔ 使用 over 命令退出游戏。" + ) + return create_msg + + def to_dict(self) -> Dict: + """ + 转换为字典 + """ + result = self.__dict__.copy() + result["game_host_person"] = self.game_host_person.to_dict() + game_players = UniqueList() + for player in self.game_players: + game_players.add(player.to_dict()) + result["game_players"] = game_players + if self.is_group: + result["game_host_group"] = self.game_host_group.to_dict() + return result + + def _send_msg(self, message: str, type: str = "text"): + """ + 发送给游戏房间消息 + :param message: 消息内容 + :param type: 消息类型 + """ + print(self.sender_name) + sender.send_msg(self.sender_name, message, type=type, is_group=self.is_group) + + def next_player(self): + """ + 下一个玩家 + """ + # 默认是列表顺序 + self.current_player_index = (self.current_player_index + 1) % len( + self.game_players + ) + + @abstractmethod + def fill_from_dict(cls, data: Dict): + """ + 从字典创建游戏实例 + """ + pass + + @abstractmethod + def create(self): + """ + 创建游戏房间 + """ + pass + + @abstractmethod + def start(self, game_states: GameStates): + """ + 开始游戏 + """ + pass + + @abstractmethod + def join(self, player: Person, message: str, game_states: GameStates): + """ + 加入游戏 + """ + pass + + @abstractmethod + def play(self, player: Person, message: str, game_states: GameStates): + """ + 进行一回合游戏 + """ + pass + + @abstractmethod + def over(self, message: str, game_states: GameStates): + """ + 结束游戏 + """ + pass diff --git a/wechatter/message/message_handler.py b/wechatter/message/message_handler.py index 4b5590d0..d855a256 100644 --- a/wechatter/message/message_handler.py +++ b/wechatter/message/message_handler.py @@ -9,71 +9,6 @@ from wechatter.message.message_forwarder import MessageForwarder from wechatter.models.wechat import Message, SendTo - -def _execute_command(cmd_dict: Dict, to: SendTo, message_obj: Message): - """ - 执行命令 - :param cmd_dict: 命令字典 - :param to: 发送对象 - :param message_obj: 消息对象 - """ - - cmd_handler = cmd_dict["handler"] - if cmd_handler is not None: - if cmd_dict["param_count"] == 2: - cmd_handler( - to=to, - message=cmd_dict["args"], - ) - elif cmd_dict["param_count"] == 3: - cmd_handler( - to=to, - message=cmd_dict["args"], - message_obj=message_obj, - ) - else: - logger.error("该命令未实现") - - -def _execute_quoted_handler( - quoted_handler, to: SendTo, message_obj: Message, quoted_response: QuotedResponse -): - """ - 执行可引用的命令消息处理函数 - :param quoted_handler: 可引用的命令消息处理函数 - :param to: 发送对象 - :param message_obj: 消息内容 - :param quoted_response: 可引用的命令消息 - """ - - if quoted_handler: - quoted_handler( - to=to, - message=message_obj.pure_content, - q_response=quoted_response.response, - ) - else: - logger.warning(f"未找到可引用的命令消息处理函数: {quoted_response.command}") - - -def _get_quoted_response(quotable_id: str) -> QuotedResponse: - """ - 获取可引用的命令消息 - :param quotable_id: 可引用的命令消息id - :return: 可引用的命令消息 - """ - - with make_db_session() as session: - _quoted_response = ( - session.query(QuotedResponse) - .filter_by(quotable_id=quotable_id) - .order_by(QuotedResponse.id.desc()) - .first() - ) - quoted_response = _quoted_response.to_model() - return quoted_response - - message_forwarding_enabled = False if config["message_forwarding_enabled"]: message_forwarding_enabled = True @@ -81,6 +16,8 @@ def _get_quoted_response(quotable_id: str) -> QuotedResponse: config["message_forwarding_rule_list"], config["official_account_reminder_rule_list"], ) + + # message_forwarder.official_account_reminder_type = config[ # "official_account_reminder_type" # ] @@ -91,13 +28,14 @@ class MessageHandler: 消息处理器,用于处理用户发来的消息 """ - def __init__(self, commands: Dict, quoted_handlers: Dict): + def __init__(self, commands: Dict, quoted_handlers: Dict, games: Dict): """ :param commands: 命令处理函数字典 :param quoted_handlers: 可引用的命令消息处理函数字典 """ self.commands = commands self.quoted_handlers = quoted_handlers + self.games = games def handle_message(self, message_obj: Message): """ @@ -136,24 +74,30 @@ def handle_message(self, message_obj: Message): ) return - # 非命令消息 - if cmd_dict["command"] == "None": - logger.info("该消息不是命令类型") - return - - logger.info(cmd_dict["desc"]) - # TODO: 可以为不同的群设置是否need_mentioned - if ( - config["need_mentioned"] - and message_obj.is_group - and not message_obj.is_mentioned - ): - logger.debug("该消息为群消息,但未@机器人,不处理") - return - # 是命令消息 - # 开始处理命令 - _execute_command(cmd_dict, to, message_obj) + if not cmd_dict["command"] == "None": + logger.info(cmd_dict["desc"]) + # TODO: 可以为不同的群设置是否need_mentioned + if ( + config["need_mentioned"] + and message_obj.is_group + and not message_obj.is_mentioned + ): + logger.debug("该消息为群消息,但未@机器人,不处理") + return + # 开始处理命令 + _execute_command(cmd_dict, to, message_obj) + else: + logger.debug("该消息不是命令类型") + + # game_name = self.__get_game(content, message_obj.is_mentioned, message_obj.is_group) + # # 是游戏命令 + # if game_name: + # game_class = self.games[game_name]["class"] + # logger.info(self.games[game_name]["desc"]) + # _execute_game(game, game_class, to, content, message_obj) + # else: + # logger.debug("该消息不是游戏类型") def __parse_command(self, content: str, is_mentioned: bool, is_group: bool) -> Dict: """ @@ -189,3 +133,67 @@ def __parse_command(self, content: str, is_mentioned: bool, is_group: bool) -> D cmd_dict["args"] = cont_list[1] # 消息内容 return cmd_dict return cmd_dict + + +def _execute_command(cmd_dict: Dict, to: SendTo, message_obj: Message): + """ + 执行命令 + :param cmd_dict: 命令字典 + :param to: 发送对象 + :param message_obj: 消息对象 + """ + + cmd_handler = cmd_dict["handler"] + if cmd_handler is not None: + if cmd_dict["param_count"] == 2: + cmd_handler( + to=to, + message=cmd_dict["args"], + ) + elif cmd_dict["param_count"] == 3: + cmd_handler( + to=to, + message=cmd_dict["args"], + message_obj=message_obj, + ) + else: + logger.error("该命令未实现") + + +def _execute_quoted_handler( + quoted_handler, to: SendTo, message_obj: Message, quoted_response: QuotedResponse +): + """ + 执行可引用的命令消息处理函数 + :param quoted_handler: 可引用的命令消息处理函数 + :param to: 发送对象 + :param message_obj: 消息内容 + :param quoted_response: 可引用的命令消息 + """ + + if quoted_handler: + quoted_handler( + to=to, + message=message_obj.pure_content, + q_response=quoted_response.response, + ) + else: + logger.warning(f"未找到可引用的命令消息处理函数: {quoted_response.command}") + + +def _get_quoted_response(quotable_id: str) -> QuotedResponse: + """ + 获取可引用的命令消息 + :param quotable_id: 可引用的命令消息id + :return: 可引用的命令消息 + """ + + with make_db_session() as session: + _quoted_response = ( + session.query(QuotedResponse) + .filter_by(quotable_id=quotable_id) + .order_by(QuotedResponse.id.desc()) + .first() + ) + quoted_response = _quoted_response.to_model() + return quoted_response diff --git a/wechatter/models/game/__init__.py b/wechatter/models/game/__init__.py new file mode 100644 index 00000000..cb23d695 --- /dev/null +++ b/wechatter/models/game/__init__.py @@ -0,0 +1,3 @@ +from .game_states import GameStates + +__all__ = ["GameStates"] diff --git a/wechatter/models/game/game_states.py b/wechatter/models/game/game_states.py new file mode 100644 index 00000000..37781772 --- /dev/null +++ b/wechatter/models/game/game_states.py @@ -0,0 +1,16 @@ +from datetime import datetime +from typing import Dict, Optional + +from pydantic import BaseModel + +from wechatter.models.wechat import Group, Person + + +class GameStates(BaseModel): + id: Optional[int] = None + host_person: Person + host_group: Optional[Group] = None + game_class_name: str + states: Dict + create_time: datetime = datetime.now() + is_over: bool = False diff --git a/wechatter/models/wechat/group.py b/wechatter/models/wechat/group.py index fd1b8a1b..336fcd6a 100644 --- a/wechatter/models/wechat/group.py +++ b/wechatter/models/wechat/group.py @@ -23,3 +23,12 @@ class Group(BaseModel): # alias: str 目前上游不支持 admin_id_list: Optional[List[str]] = None member_list: List[GroupMember] + is_gaming: bool = False + + def to_dict(self): + result = self.__dict__.copy() + member_list = [] + for member in self.member_list: + member_list.append(member.__dict__.copy()) + result["member_list"] = member_list + return result diff --git a/wechatter/models/wechat/person.py b/wechatter/models/wechat/person.py index d4ab8bc6..6a49da8a 100644 --- a/wechatter/models/wechat/person.py +++ b/wechatter/models/wechat/person.py @@ -29,3 +29,9 @@ class Person(BaseModel): is_star: bool is_friend: bool is_official_account: bool = False + is_gaming: bool = False + + def to_dict(self): + result = self.__dict__.copy() + result["gender"] = self.gender.value + return result diff --git a/wechatter/utils/unique_list.py b/wechatter/utils/unique_list.py new file mode 100644 index 00000000..1318668d --- /dev/null +++ b/wechatter/utils/unique_list.py @@ -0,0 +1,71 @@ +import json +from typing import Generic, Iterable, List, TypeVar + +T = TypeVar("T") + + +class UniqueList(Generic[T]): + """ + 唯一列表 + """ + + def __init__(self, iterable: Iterable[T] = ()): + self.elements: List[T] = [] + for element in iterable: + self.add(element) + + def add(self, element: T): + if element in self.elements: + raise ValueError(f"元素 {element} 已存在") + else: + self.elements.append(element) + + def __getitem__(self, index): + return self.elements[index] + + def __setitem__(self, index, value: T): + if value not in self.elements: + self.elements[index] = value + + def __delitem__(self, index): + del self.elements[index] + + def __len__(self): + return len(self.elements) + + def __str__(self): + return str(self.elements) + + def __iter__(self): + return iter(self.elements) + + def __repr__(self): + return f"UniqueList({self.elements})" + + def index(self, element: T): + return self.elements.index(element) + + +class UniqueListEncoder(json.JSONEncoder): + """ + 唯一列表编码器 + """ + + def default(self, obj): + if isinstance(obj, UniqueList): + return {"is_unique_list": True, "elements": obj.elements} + return super().default(obj) + + +class UniqueListDecoder(json.JSONDecoder): + """ + 唯一列表解码器 + """ + + def __init__(self, *args, **kwargs): + super().__init__(object_hook=self.object_hook, *args, **kwargs) + + def object_hook(self, dct): + if "is_unique_list" in dct: + return UniqueList(dct["elements"]) + return dct diff --git a/wechatter/wechatter.py b/wechatter/wechatter.py index 8a4f72ab..92e83407 100644 --- a/wechatter/wechatter.py +++ b/wechatter/wechatter.py @@ -13,6 +13,7 @@ from wechatter.app.app import app from wechatter.bot import BotInfo from wechatter.config import config +from wechatter.games import load_games def main(): @@ -27,6 +28,7 @@ def main(): fm.check_and_create_folder("data/text_image") db.create_tables() + load_games() # 启动uvicorn port = config["wechatter_port"]