diff --git a/Source/Callback.py b/Source/Callback.py index 223caa4..a570dba 100644 --- a/Source/Callback.py +++ b/Source/Callback.py @@ -2,6 +2,7 @@ from Source.Configurator import Configurator from Source.BotsManager import BotsManager from MessageEditor import MessageEditor +from Source.Datasets import API_Types from Source.Functions import * from threading import Thread from time import sleep @@ -15,7 +16,7 @@ class Callback: # Обрабатывает очередь сообщений. def __SenderThread(self): # Запись в лог отладочной информации: поток очереди отправки запущен. - logging.debug("Callback API sender thread started.") + logging.debug("[Callback API] Sender thread started.") # Пока сообщение не отправлено. while True: @@ -89,19 +90,22 @@ def __SenderThread(self): # Описание исключения. Description = str(ExceptionData) - # Если исключение вызвано частыми запросами, то выждать указанный интервал. + # Если исключение вызвано частыми запросами. if "Too Many Requests" in Description: + # Запись в лог предупреждения: слишком много запросов. + logging.warning("[Callback API] Too many requests to Telegram. Waiting...") + # Выждать указанный исключением интервал. sleep(int(Description.split()[-1]) + 1) else: # Запись в лог ошибки: исключение Telegram. - logging.error("Telegram exception: \"" + Description + "\".") + logging.error("[Callback API] Telegram exception: \"" + Description + "\".") # Удаление первого сообщения в очереди отправки. self.__MessagesBufer.pop(0) except Exception as ExceptionData: # Запись в лог ошибки: исключение. - logging.error("Exception: \"" + str(ExceptionData) + "\".") + logging.error("[Callback API] Exception: \"" + str(ExceptionData) + "\".") # Удаление первого сообщения в очереди отправки. self.__MessagesBufer.pop(0) @@ -111,12 +115,12 @@ def __SenderThread(self): else: # Запись в лог отладочной информации: поток очереди отправки оставновлен. - logging.debug("Callback API sender thread stopped.") + logging.debug("[Callback API] Sender thread stopped.") # Остановка потока. break # Отправляет сообщение в группу Telegram через буфер ожидания. - def __SendMessage(self, PostObject: dict, Source: str): + def __SendMessage(self, PostObject: dict, Source: str, LaunchSenderThread: bool = True): # Состояние: есть ли запрещённые слова в посте. HasBlacklistWords = False # Конфигурация источника. @@ -168,27 +172,34 @@ def __SendMessage(self, PostObject: dict, Source: str): SupportedTypes.append(Type) # Получение вложений. - MessageStruct["attachments"] = GetAttachments(PostObject["attachments"], Source, SupportedTypes, PostObject["id"]) + MessageStruct["attachments"] = GetAttachments(PostObject["attachments"], Source, SupportedTypes, PostObject["id"], API_Types.Callback) # Помещение поста в очередь на отправку. self.__MessagesBufer.append(MessageStruct) else: # Запись в лог отладочной информации: пост был проигнорирован. - logging.info(f"Source: \"{Source}\". Post with ID " + str(PostObject["id"]) + " was ignored.") + logging.info(f"[Callback API] Source: \"{Source}\". Post with ID " + str(PostObject["id"]) + " was ignored.") - # Активировать поток отправки, если не активен. + # Если указано, активировать поток отправки сообщений. + if LaunchSenderThread == True: + self.__StartSenderThread() + + # Запускает поток отправки сообщений, если тот уже не запущен. + def __StartSenderThread(self): + + # Если поток отправки не функционирует, то запустить его. if self.__Sender.is_alive() == False: - self.__Sender = Thread(target = self.__SenderThread, name = "VK-Telegram Poster (Callback API sender)") + self.__Sender = Thread(target = self.__SenderThread, name = "[Open API] Sender.") self.__Sender.start() - + # Конструктор: задаёт глобальные настройки, обработчик конфигураций и менеджер подключений к ботам. def __init__(self, Settings: dict, ConfiguratorObject: Configurator, BotsManagerObject: BotsManager): #---> Генерация динамических свойств. #==========================================================================================# # Поток отправки сообщений. - self.__Sender = Thread(target = self.__SenderThread) + self.__Sender = Thread(target = self.__SenderThread, name = "[Callback API] Sender.") # Конфигурации. self.__Configurations = ConfiguratorObject # Экзмепляры обработчиков постов. @@ -200,11 +211,8 @@ def __init__(self, Settings: dict, ConfiguratorObject: Configurator, BotsManager # Очередь отложенных сообщений. self.__MessagesBufer = list() - # Запуск потока обработки буфера сообщений. - self.__Sender.start() - # Инициализация экзепляров ботов. - for ConfigName in self.__Configurations.getConfigsNames("Callback"): + for ConfigName in self.__Configurations.getConfigsNames(API_Types.Callback): # Конфигурация источника. Config = self.__Configurations.getConfig(ConfigName) # Инициализация подключения к боту. @@ -213,7 +221,7 @@ def __init__(self, Settings: dict, ConfiguratorObject: Configurator, BotsManager # Добавляет сообщение в очередь отправки. def AddMessageToBufer(self, CallbackRequest: dict, Source: str): # Запись в лог сообщения: получен новый пост. - logging.info(f"Source: \"{Source}\". New post with ID " + str(CallbackRequest["object"]["id"]) + ".") + logging.info(f"[Callback API] Source: \"{Source}\". New post with ID " + str(CallbackRequest["object"]["id"]) + ".") # Проверка работы потоков. for Index in range(0, len(self.__PostsEditorsThreads)): @@ -223,6 +231,6 @@ def AddMessageToBufer(self, CallbackRequest: dict, Source: str): self.__PostsEditorsThreads.pop(Index) # Добавление потока обработчика поста в список. - self.__PostsEditorsThreads.append(Thread(target = self.__SendMessage, args = (CallbackRequest["object"], Source))) + self.__PostsEditorsThreads.append(Thread(target = self.__SendMessage, args = (CallbackRequest["object"], Source), name = "[Callback API] Post editor.")) # Запуск потока обработчика поста в список. self.__PostsEditorsThreads[-1].start() \ No newline at end of file diff --git a/Source/Configurator.py b/Source/Configurator.py index 5427855..726729c 100644 --- a/Source/Configurator.py +++ b/Source/Configurator.py @@ -1,3 +1,4 @@ +from Source.Datasets import API_Types from dublib.Methods import ReadJSON import os @@ -49,29 +50,22 @@ def getConfig(self, ConfigName: str) -> dict: return self.__Configurations[ConfigName] # Возвращает список конфигураций. Поддерживает фильтрацию по типу API. - def getConfigsNames(self, API_Type: str | None = None) -> list[str]: + def getConfigsNames(self, API_Type: API_Types | None = None) -> list[str]: # Список конфигурация. Configs = list() # Если не указана спецификация API. if API_Type == None: Configs = self.__Configurations.keys() - - # Если запрошен список конфигураций для Callback API. - elif API_Type.lower() == "callback": - - # Для каждой конфигурации проверить соответствие типу API. - for Name in self.__Configurations.keys(): - if self.__Configurations[Name]["api"].lower() == "callback": - Configs.append(Name) + + else: + + # Для каждой конфигурации. + for ConfigName in self.__Configurations.keys(): - # Если запрошен список конфигураций для Open API. - elif API_Type.lower() == "open": - - # Для каждой конфигурации проверить соответствие типу API. - for Name in self.__Configurations.keys(): - if self.__Configurations[Name]["api"].lower() == "open": - Configs.append(Name) + # Если конфигурация соответствует искомому API, то записать её название. + if self.__Configurations[ConfigName]["api"].lower() == API_Type.value.lower(): + Configs.append(ConfigName) return Configs @@ -79,20 +73,20 @@ def getConfigsNames(self, API_Type: str | None = None) -> list[str]: def getRequiredAPI(self) -> dict: # Список API. API = { - "Callback": 0, - "Open": 0 + API_Types.Callback: 0, + API_Types.LongPoll: 0, + API_Types.Open: 0 } - - # Для каждой конфигурации. - for ConfigName in self.__Configurations.keys(): + + # Для каждого типа API. + for API_Type in list(API_Types): - # Если конфигурация требует Callback API. - if self.__Configurations[ConfigName]["api"].lower() == "callback": - API["Callback"] += 1 - - # Если конфигурация требует Open API. - if self.__Configurations[ConfigName]["api"].lower() == "open": - API["Open"] += 1 + # Для каждой конфигурации. + for ConfigName in self.__Configurations.keys(): + + # Если конфигурация соответствует API. + if self.__Configurations[ConfigName]["api"].lower() == API_Type.value.lower(): + API[API_Type] += 1 return API @@ -112,18 +106,18 @@ def getTokens(self): return TokensList # Обновляет конфигурации с указанным типом API. - def updateConfigs(self, API_Type: str): - # Список конфигураций с Open API. - OpenConfigs = list() + def updateConfigs(self, API_Type: API_Types): + # Список конфигураций с указанный API. + SelectedConfigs = list() # Для каждой конфигурации. for ConfigName in self.__Configurations.keys(): - # Если конфигурация требует Open API. - if self.__Configurations[ConfigName]["api"].lower() == API_Type: - OpenConfigs.append(ConfigName) + # Если конфигурация требует указанный API. + if self.__Configurations[ConfigName]["api"].lower() == API_Type.value.lower(): + SelectedConfigs.append(ConfigName) - # Для каждого файла конфигурации с Open API. - for Filename in OpenConfigs: + # Для каждого файла конфигурации с указанный API. + for Filename in SelectedConfigs: # Прочитать конфигурацию в словарь. self.__Configurations[Filename] = ReadJSON("Config/" + Filename + ".json") \ No newline at end of file diff --git a/Source/Datasets.py b/Source/Datasets.py new file mode 100644 index 0000000..faf721c --- /dev/null +++ b/Source/Datasets.py @@ -0,0 +1,13 @@ +from enum import Enum + +# Перечисление типов API +class API_Types(Enum): + + #==========================================================================================# + # >>>>> СТАТИЧЕСКИЕ СВОЙСТВА <<<<< # + #==========================================================================================# + + # Типы API. + Callback = "Callback" + LongPoll = "LongPoll" + Open = "Open" \ No newline at end of file diff --git a/Source/Functions.py b/Source/Functions.py index 16f9df0..22e864e 100644 --- a/Source/Functions.py +++ b/Source/Functions.py @@ -1,3 +1,5 @@ +from Source.Datasets import API_Types + import requests import logging import os @@ -29,7 +31,7 @@ def EscapeCharacters(Post: str) -> str: return Post # Получает URL вложения и загружает его. -def GetAttachments(PostAttachements: dict, Source: str, SupportedTypes: list[str], PostID: int) -> list: +def GetAttachments(PostAttachements: dict, Source: str, SupportedTypes: list[str], PostID: int, API_Type: API_Types) -> list: # Список вложений. Attachements = list() @@ -99,6 +101,6 @@ def GetAttachments(PostAttachements: dict, Source: str, SupportedTypes: list[str Attachements.append(Bufer) # Запись в лог сообщения: количество. - logging.info(f"Source: \"{Source}\". Post with ID {PostID} contains " + str(len(Attachements)) + " supported attachments.") + logging.info(f"[{API_Type.value} API] Source: \"{Source}\". Post with ID {PostID} contains " + str(len(Attachements)) + " supported attachments.") return Attachements \ No newline at end of file diff --git a/Source/Open.py b/Source/Open.py index 825abc0..a53cc1d 100644 --- a/Source/Open.py +++ b/Source/Open.py @@ -5,7 +5,8 @@ from Source.BotsManager import BotsManager from MessageEditor import MessageEditor from vk_captcha import VkCaptchaSolver -from threading import Thread, Timer +from Source.Datasets import API_Types +from threading import Thread from Source.Functions import * from vk_api import VkApi from time import sleep @@ -91,15 +92,15 @@ def __GetUpdates(self, ConfigName: str) -> list[dict]: # Для каждого полученного поста. for Post in Bufer: - # Если ID обрабатываемого поста меньше или равен ID последнего отправленного поста. - if Post["id"] <= Config["last-post-id"]: - # Переключить состояние обновления. - IsUpdated = True - - else: + # Если ID обрабатываемого поста больше ID последнего отправленного поста. + if Post["id"] > Config["last-post-id"]: # Записать пост. Posts.append(Post) + else: + # Переключить состояние обновления. + IsUpdated = True + else: # Записать последний пост. Posts.append(Bufer[0]) @@ -118,7 +119,7 @@ def __GetUpdates(self, ConfigName: str) -> list[dict]: # Обрабатывает очередь сообщений. def __SenderThread(self): # Запись в лог отладочной информации: поток очереди отправки запущен. - logging.debug("Open API sender thread started.") + logging.debug("[Open API] Sender thread started.") # Пока сообщение не отправлено. while True: @@ -192,19 +193,22 @@ def __SenderThread(self): # Описание исключения. Description = str(ExceptionData) - # Если исключение вызвано частыми запросами, то выждать указанный интервал. + # Если исключение вызвано частыми запросами. if "Too Many Requests" in Description: + # Запись в лог предупреждения: слишком много запросов. + logging.warning("[Open API] Too many requests to Telegram. Waiting...") + # Выждать указанный исключением интервал. sleep(int(Description.split()[-1]) + 1) else: # Запись в лог ошибки: исключение Telegram. - logging.error("Telegram exception: \"" + Description + "\"." + self.__MessagesBufer[0]["text"]) + logging.error("[Open API] Telegram exception: \"" + Description + "\"." + self.__MessagesBufer[0]["text"]) # Удаление первого сообщения в очереди отправки. self.__MessagesBufer.pop(0) except Exception as ExceptionData: # Запись в лог ошибки: исключение. - logging.error("Exception: \"" + str(ExceptionData) + "\".") + logging.error("[Open API] Exception: \"" + str(ExceptionData) + "\".") # Удаление первого сообщения в очереди отправки. self.__MessagesBufer.pop(0) @@ -214,12 +218,12 @@ def __SenderThread(self): else: # Запись в лог отладочной информации: поток очереди отправки оставновлен. - logging.debug("Open API sender thread stopped.") + logging.debug("[Open API] Sender thread stopped.") # Остановка потока. break # Отправляет сообщение в группу Telegram через буфер ожидания. - def __SendMessage(self, PostObject: dict, Source: str): + def __SendMessage(self, PostObject: dict, Source: str, LaunchSenderThread: bool = True): # Состояние: есть ли запрещённые слова в посте. HasBlacklistWords = False # Конфигурация источника. @@ -271,20 +275,39 @@ def __SendMessage(self, PostObject: dict, Source: str): SupportedTypes.append(Type) # Получение вложений. - MessageStruct["attachments"] = GetAttachments(PostObject["attachments"], Source, SupportedTypes, PostObject["id"]) + MessageStruct["attachments"] = GetAttachments(PostObject["attachments"], Source, SupportedTypes, PostObject["id"], API_Types.Open) # Помещение поста в очередь на отправку. self.__MessagesBufer.append(MessageStruct) else: # Запись в лог отладочной информации: пост был проигнорирован. - logging.info(f"Source: \"{Source}\". Post with ID " + str(PostObject["id"]) + " was ignored.") + logging.info(f"[Open API] Source: \"{Source}\". Post with ID " + str(PostObject["id"]) + " was ignored.") - # Активировать поток отправки, если не активен. + # Если указано, активировать поток отправки сообщений. + if LaunchSenderThread == True: + self.__StartSenderThread() + + # Запускает поток отправки сообщений, если тот уже не запущен. + def __StartSenderThread(self): + + # Если поток отправки не функционирует, то запустить его. if self.__Sender.is_alive() == False: - self.__Sender = Thread(target = self.__SenderThread, name = "VK-Telegram Poster (Open API sender)") + self.__Sender = Thread(target = self.__SenderThread, name = "[Open API] Sender.") self.__Sender.start() + # Поток отправки запросов к ВКонтакте. + def __UpdaterThread(self): + # Немедленная проверка новых постов. + self.CheckUpdates() + + # Запуск цикла ожидания. + while True: + # Выжидание одной секунды. + sleep(self.__Settings["openapi-period"] * 60) + # Проверка новых постов. + self.CheckUpdates() + # Записывает ID последнего отправленного поста. def __WriteLastPostID(self, Source: str, ID: int): # Чтение конфигурации. @@ -299,86 +322,76 @@ def __init__(self, Settings: dict, ConfiguratorObject: Configurator, BotsManager #---> Генерация динамических свойств. #==========================================================================================# + # Экземпляр повторителя. + self.__Repeater = Thread(target = self.__UpdaterThread, name = "[Open API] Requests repeater.") # Поток отправки сообщений. - self.__Sender = Thread(target = self.__SenderThread) + self.__Sender = Thread(target = self.__SenderThread, name = "[Open API] Sender.") # Конфигурации. self.__Configurations = ConfiguratorObject - # Экзмепляры обработчиков постов. - self.__PostsEditorsThreads = list() # Глобальные настройки. self.__Settings = Settings.copy() # Менеджер подключений к ботам. self.__Bots = BotsManagerObject # Очередь отложенных сообщений. self.__MessagesBufer = list() + # Состояние: идёт ли обновление. + self.__IsUpdating = False # Сессия ВКонтакте. self.__Session = None - # Обработчик повторов. - self.__Repiter = None # Экземпляр API. self.__API = None # Авторизация и получение API. self.__Authorizate() - # Запуск потока обработки буфера сообщений. - self.__Sender.start() # Инициализация экзепляров ботов. - for ConfigName in self.__Configurations.getConfigsNames("Open"): + for ConfigName in self.__Configurations.getConfigsNames(API_Types.Open): # Конфигурация источника. Config = self.__Configurations.getConfig(ConfigName) # Инициализация подключения к боту. self.__Bots.createBotConnection(Config["token"], ConfigName, Config["target"]) - # Немедленная проверка новых постов и активация таймера. - self.CheckUpdates() - # Активация таймера. - self.__Repeater = Timer(float(self.__Settings["openapi-period"] * 60), self.CheckUpdates) + # Запуск повторителя проверок. self.__Repeater.start() - + # Интервально проверяет обновления и добавляет сообщения в очередь отправки. def CheckUpdates(self): - # Обновление конфигураций с Open API. - self.__Configurations.updateConfigs("Open") - # Получение списка конфигураций, использующих Open API. - Configs = self.__Configurations.getConfigsNames("Open") - # Количество новых постов. - NewPostsCount = 0 - # Список постов. - Posts = list() - # Список индексов мёртвых потоков. - DeadThreadsIndexes = list() - # Проверка работы потоков. - for Index in range(0, len(self.__PostsEditorsThreads)): - - # Если поток завершил работу, то записать его индекс. - if self.__PostsEditorsThreads[Index].is_alive() == False: - DeadThreadsIndexes.append(Index) - - # Удалить потоки по индексам начиная с конца. - for Index in reversed(DeadThreadsIndexes): - self.__PostsEditorsThreads.pop(Index) + # Если обновление не выполняется. + if self.__IsUpdating == False: + # Переключение статуса обновления. + self.__IsUpdating = True + # Обновление конфигураций с Open API. + self.__Configurations.updateConfigs(API_Types.Open) + # Получение списка конфигураций, использующих Open API. + Configs = self.__Configurations.getConfigsNames(API_Types.Open) + # Список постов. + Posts = list() - # Для каждой конфигурации. - for Source in Configs: - # Получение списка новых постов (инверитрование порядка для обработки в порядке возрастания даты публикации). - Posts = list(reversed(self.__GetUpdates(Source))) - # Подсчёт количества новых постов. - NewPostsCount += len(Posts) - - # Для каждого поста. - for Post in Posts: - # Запись в лог сообщения: получен новый пост. - logging.info(f"Source: \"{Source}\". New post with ID " + str(Post["id"]) + ".") - # Добавление потока обработчика поста в список. - self.__PostsEditorsThreads.append(Thread(target = self.__SendMessage, args = (Post, Source))) - # Запуск потока обработчика поста в список. - self.__PostsEditorsThreads[-1].start() + # Для каждой конфигурации. + for Source in Configs: + # Получение списка новых постов (инверитрование порядка для обработки в порядке возрастания даты публикации). + Posts = list(reversed(self.__GetUpdates(Source))) - # Запись ID последнего отправленного поста в конфигурацию. - if len(Posts) > 0: - self.__WriteLastPostID(Source, Posts[-1]["id"]) - - # Запись в лог сообщения: количество обновлённых постов. - logging.info(f"[Open API] Updates checked. New posts count: {NewPostsCount}.") \ No newline at end of file + # Запись ID последнего отправленного поста в конфигурацию. + if len(Posts) > 0: + self.__WriteLastPostID(Source, Posts[-1]["id"]) + + # Запись в лог сообщения: количество обновлённых постов. + logging.info(f"[Open API] Source: \"{Source}\". Updates checked. New posts count: " + str(len(Posts)) + ".") + + # Для каждого поста. + for Post in Posts: + # Запись в лог сообщения: получен новый пост. + logging.info(f"[Open API] Source: \"{Source}\". New post with ID " + str(Post["id"]) + ".") + # Отправка сообщения в буфер ожидания. + self.__SendMessage(Post, Source, LaunchSenderThread = False) + + # Запуск потока отправки сообщений. + self.__StartSenderThread() + # Переключение статуса обновления. + self.__IsUpdating = False + + else: + # Запись в лог предупреждения: обновление уже выполняется. + logging.warning("[Open API] Update already in progress. Skipped.") \ No newline at end of file diff --git a/vtp.py b/vtp.py index e82d5ee..8f3d91a 100644 --- a/vtp.py +++ b/vtp.py @@ -5,7 +5,9 @@ from Source.Configurator import Configurator from Source.BotsManager import BotsManager from starlette.requests import Request +from Source.Datasets import API_Types from Source.Callback import Callback +from Source.Open import Open from fastapi import FastAPI import datetime @@ -13,8 +15,6 @@ import sys import os -from Source.Open import Open - #==========================================================================================# # >>>>> ИНИЦИАЛИЗАЦИЯ СКРИПТА <<<<< # #==========================================================================================# @@ -108,7 +108,7 @@ logging.info("====== Working ======") # Если требуется обработка Callback API. -if RequiredAPI["Callback"] > 0: +if RequiredAPI[API_Types.Callback] > 0: # Обработчик Callback-запросов. CallbackSender = Callback(Settings, ConfiguratorObject, BotsManagerObject) @@ -189,6 +189,6 @@ async def SendMessageToGroup(CallbackRequest: Request, Source: str) -> Response: return Response(content = "ok") # Если требуется обработка Open API. -if RequiredAPI["Open"] > 0: +if RequiredAPI[API_Types.Open] > 0: # Обработчик Open-запросов. OpenSender = Open(Settings, ConfiguratorObject, BotsManagerObject) \ No newline at end of file