-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbot.py
337 lines (271 loc) · 13.4 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import asyncio
import hashlib
import logging
import nest_asyncio
import pytz
import random
import pyshorteners
import re
import os
from urllib.parse import urlparse, urlunparse
from modules.keyboards import create_link_keyboard, button_callback
from utils import (
remove_links, screenshot_command, schedule_task, cat_command, ScreenshotManager,
game_state, game_command, end_game_command, clear_words_command, hint_command,
load_game_state, extract_urls
)
from const import domain_modifications, TOKEN, ALIEXPRESS_STICKER_ID, VideoPlatforms
from modules.gpt import ask_gpt_command, analyze_command, answer_from_gpt
from modules.weather import weather
from modules.file_manager import general_logger, chat_logger, get_daily_log_path, error_logger
from modules.user_management import restrict_user
from modules.video_downloader import VideoDownloader, setup_video_handlers
from telegram import Update
from telegram.ext import (
ApplicationBuilder, CommandHandler, MessageHandler, filters,
CallbackContext, CallbackQueryHandler, ContextTypes
)
# Apply the patch to allow nested event loops
nest_asyncio.apply()
LOCAL_TZ = pytz.timezone('Europe/Kyiv')
SUPPORTED_PLATFORMS = VideoPlatforms.SUPPORTED_PLATFORMS
class MessageCounter:
"""Manages message counts per chat for random GPT responses."""
def __init__(self):
self.counts = {}
def increment(self, chat_id):
self.counts[chat_id] = self.counts.get(chat_id, 0) + 1
return self.counts[chat_id]
def reset(self, chat_id):
self.counts[chat_id] = 0
message_counter = MessageCounter()
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
def contains_trigger_words(message_text: str) -> bool:
"""Check if message contains trigger words for user restriction."""
triggers = ["Ы", "ы", "ъ", "Ъ", "Э", "э", "Ё", "ё"]
return any(trigger in message_text for trigger in triggers)
def sanitize_url(url: str, replace_domain: str = None) -> str:
"""Sanitize a URL by keeping scheme, netloc, and path only."""
try:
parsed_url = urlparse(url)
netloc = replace_domain if replace_domain else parsed_url.netloc
return urlunparse((parsed_url.scheme, netloc, parsed_url.path, '', '', ''))
except Exception as e:
logger.error(f"Failed to sanitize URL {url}: {e}")
return url
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handler for /start command."""
welcome_text = (
"🤖 Video Downloader Bot\n\n"
"Send me a link from:\n"
"• TikTok\n• Instagram\n• YouTube Shorts\n"
"• Facebook\n• Twitter\n• Vimeo\n• Reddit\n\n"
"I'll download and send the video directly!"
)
await update.message.reply_text(welcome_text)
def needs_gpt_response(update: Update, context: CallbackContext, message_text: str) -> bool:
"""Determine if a GPT response is needed based on message context."""
bot_username = context.bot.username
is_private_chat = update.effective_chat.type == 'private'
mentioned = f"@{bot_username}" in message_text
contains_youtube_or_aliexpress = any(domain in message_text for domain in ["youtube.com", "youtu.be"]) or \
re.search(r'(?:aliexpress|a\.aliexpress)\.(?:[a-z]{2,3})/(?:item/)?', message_text)
contains_domain_modifications = any(domain in message_text for domain in domain_modifications)
return (mentioned or (is_private_chat and not (contains_youtube_or_aliexpress or contains_domain_modifications)))
# Sample mapping from English to Ukrainian keyboard layout
keyboard_mapping = {
'q': 'й', 'w': 'ц', 'e': 'у', 'r': 'к', 't': 'е', 'y': 'н', 'u': 'г', 'i': 'ш', 'o': 'щ', 'p': 'з',
'a': 'ф', 's': 'і', 'd': 'в', 'f': 'а', 'g': 'п', 'h': 'р', 'j': 'о', 'k': 'л', 'l': 'д',
'z': 'я', 'x': 'ч', 'c': 'с', 'v': 'м', 'b': 'и', 'n': 'т', 'm': 'ь',
'Q': 'Й', 'W': 'Ц', 'E': 'У', 'R': 'К', 'T': 'Е', 'Y': 'Н', 'U': 'Г', 'I': 'Ш', 'O': 'Щ', 'P': 'З',
'A': 'Ф', 'S': 'І', 'D': 'В', 'F': 'А', 'G': 'П', 'H': 'Р', 'J': 'О', 'K': 'Л', 'L': 'Д',
'Z': 'Я', 'X': 'Ч', 'C': 'С', 'V': 'М', 'B': 'И', 'N': 'Т', 'M': 'Ь'
}
# Dictionary to store the last message for each user
last_user_messages = {}
def convert_to_ukrainian(text: str) -> str:
"""Convert text from incorrect English layout to Ukrainian layout."""
return ''.join(keyboard_mapping.get(char, char) for char in text)
async def translate_last_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Convert the last message of the user to Ukrainian layout."""
user_id = update.message.from_user.id
username = update.message.from_user.username or "User" # Fallback if username is not available
# Retrieve the last message from the stored dictionary
last_message = last_user_messages.get(user_id)
if not last_message:
await update.message.reply_text("No previous message found to convert.")
return
# Convert the last message to Ukrainian layout
converted_text = convert_to_ukrainian(last_message)
# Send the converted message back to the user
response_text = f"@{username} хотів сказати: {converted_text}"
await update.message.reply_text(response_text)
async def handle_message(update: Update, context: CallbackContext):
"""Handle incoming text messages."""
if not update.message or not update.message.text:
return
message_text = update.message.text
chat_id = update.message.chat_id
username = update.message.from_user.username
# Store the last message for the user
last_user_messages[update.message.from_user.id] = message_text
# Log message
chat_title = update.effective_chat.title or "Private Chat"
log_path = get_daily_log_path(chat_id, chat_title=chat_title)
chat_logger.info(f"User message: {message_text}", extra={'chat_id': chat_id, 'chattitle': chat_title, 'username': username})
# Handle trigger words
if contains_trigger_words(message_text):
await restrict_user(update, context)
return
urls = extract_urls(message_text)
# Process URLs if present
if urls:
logger.info(f"Processing URLs: {urls}")
await process_urls(update, context, urls, message_text)
return
# Handle GPT responses
if needs_gpt_response(update, context, message_text):
cleaned_message = message_text.replace(f"@{context.bot.username}", "").strip()
logger.info(f"GPT response triggered for: {cleaned_message}")
await ask_gpt_command(cleaned_message, update, context)
return
# Random GPT response
await random_gpt_response(update, context)
async def process_urls(update: Update, context: CallbackContext, urls: list, message_text: str):
"""Process URLs for modification or video downloading."""
modified_links = []
needs_video_download = any(platform in url.lower() for url in urls for platform in SUPPORTED_PLATFORMS)
for url in urls:
sanitized_link = sanitize_url(url)
if re.search(r'(?:aliexpress|a\.aliexpress)\.(?:[a-z]{2,3})/(?:item/)?', sanitized_link):
modified_link = await shorten_url(sanitized_link) if len(sanitized_link) > 60 else sanitized_link
modified_links.append(f"{modified_link} #aliexpress")
await context.bot.send_sticker(chat_id=update.effective_chat.id, sticker=ALIEXPRESS_STICKER_ID)
else:
for domain, modified_domain in domain_modifications.items():
if domain in sanitized_link and modified_domain not in sanitized_link:
modified_links.append(sanitized_link.replace(domain, modified_domain))
break
elif modified_domain in sanitized_link:
modified_links.append(sanitized_link)
break
if modified_links:
cleaned_message_text = remove_links(message_text).replace("\n", " ")
await construct_and_send_message(update.effective_chat.id, update.message.from_user.username,
cleaned_message_text, modified_links, update, context)
if needs_video_download:
video_downloader = context.bot_data.get('video_downloader')
if video_downloader and hasattr(video_downloader, 'handle_video_link'):
logger.info(f"Attempting video download for URLs: {urls}")
await video_downloader.handle_video_link(update, context)
else:
logger.error("Video downloader is not initialized or handle_video_link is not callable.")
async def random_gpt_response(update: Update, context: CallbackContext):
"""Randomly respond with GPT if message meets criteria."""
chat_id = update.message.chat_id
message_text = update.message.text
if not message_text:
general_logger.info("Message text is empty or None.")
return
word_count = len(message_text.split())
if word_count < 5:
return
current_count = message_counter.increment(chat_id)
if random.random() < 0.02 and current_count > 50:
general_logger.info(
f"Random GPT response triggered in chat {chat_id}: "
f"Message: '{message_text}' | Random value: {random.random():.4f} | "
f"Current message count: {current_count}"
)
await answer_from_gpt(message_text, update, context)
message_counter.reset(chat_id)
async def handle_sticker(update: Update, context: CallbackContext):
"""Handle incoming stickers."""
sticker_id = update.message.sticker.file_unique_id
username = update.message.from_user.username
general_logger.info(f"Received sticker with file_unique_id: {sticker_id}")
if sticker_id == "AgAD6BQAAh-z-FM":
logging.info(f"Matched specific sticker from {username}, restricting user.")
await restrict_user(update, context)
async def shorten_url(url: str) -> str:
"""Shorten a URL using TinyURL."""
s = pyshorteners.Shortener()
try:
return s.tinyurl.short(url)
except Exception as e:
logging.error(f"Error shortening URL {url}: {str(e)}")
return url
async def construct_and_send_message(chat_id: int, username: str, cleaned_message_text: str,
modified_links: list, update: Update, context: CallbackContext):
"""Construct and send a message with modified links."""
try:
modified_message = " ".join(modified_links)
final_message = f"@{username}💬: {cleaned_message_text}\nWants to share: {modified_message}"
link_hash = hashlib.md5(modified_links[0].encode()).hexdigest()[:8]
context.bot_data[link_hash] = modified_links[0]
keyboard = create_link_keyboard(modified_links[0])
await context.bot.send_message(
chat_id=chat_id,
text=final_message,
reply_markup=keyboard,
reply_to_message_id=update.message.reply_to_message.message_id if update.message.reply_to_message else None
)
general_logger.info(f"Sent message with keyboard. Link hash: {link_hash}")
except Exception as e:
general_logger.error(f"Error in construct_and_send_message: {str(e)}")
await update.message.reply_text("Sorry, an error occurred.")
def extract_urls(message_text: str) -> list:
"""Extract URLs from the message text."""
# Your implementation for extracting URLs goes here
return re.findall(r'https?://[^\s]+', message_text)
async def main():
"""Main function to initialize and run the bot."""
try:
load_game_state()
os.makedirs('downloads', exist_ok=True)
application = ApplicationBuilder().token(TOKEN).build()
# Command handlers
commands = {
'start': start,
'cat': cat_command,
'gpt': ask_gpt_command,
'analyze': analyze_command,
'flares': screenshot_command,
'weather': weather,
'game': game_command,
'endgame': end_game_command,
'clearwords': clear_words_command,
'hint': hint_command,
'blya': translate_last_message # Registering the command as /blya
}
for command, handler in commands.items():
application.add_handler(CommandHandler(command, handler))
# Message and callback handlers
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
application.add_handler(MessageHandler(filters.Sticker.ALL, handle_sticker))
application.add_handler(CallbackQueryHandler(button_callback))
# Initialize video downloader with extract_urls function
video_downloader = setup_video_handlers(application, extract_urls_func=extract_urls)
application.bot_data['video_downloader'] = video_downloader
# Start screenshot scheduler
screenshot_manager = ScreenshotManager()
asyncio.create_task(screenshot_manager.schedule_task())
# Start bot
logger.info("Bot is running...")
await application.run_polling()
except Exception as e:
logger.error(f"Bot failed to start: {e}")
raise
if __name__ == '__main__':
try:
asyncio.run(main())
except RuntimeError as e:
if "Cannot close a running event loop" in str(e):
logger.error("Event loop is already running. Please check your environment.")
else:
raise