diff --git a/tbot/twitch_bot/tasks/channels_check.py b/tbot/twitch_bot/tasks/channels_check.py index 50ff128..515037f 100644 --- a/tbot/twitch_bot/tasks/channels_check.py +++ b/tbot/twitch_bot/tasks/channels_check.py @@ -440,7 +440,7 @@ async def send_discord_live_notification(channel_id): error, ) ) - except: + except Exception: logger.exception( 'twitch discord webhook-id: {} '.format( w['id'], diff --git a/tbot/twitch_bot/tasks/youtube_chat.py b/tbot/twitch_bot/tasks/youtube_chat.py index ae02e47..2e25297 100644 --- a/tbot/twitch_bot/tasks/youtube_chat.py +++ b/tbot/twitch_bot/tasks/youtube_chat.py @@ -1,5 +1,7 @@ import asyncio +from datetime import datetime, timezone +from dateutil.parser import parse as parse_dt from httpx import AsyncClient from tbot import config, logger @@ -11,47 +13,84 @@ _started = False +channel_tasks: dict[str, asyncio.Task] = {} -@bot.on('AFTER_CONNECTED') + +@bot.on('AFTER_CHANNELS_JOINED') async def connected(**kwargs): global _started if not config.data.youtube.client_id: return if not _started: _started = True - bot.loop.create_task(runner()) - - -async def runner(): - while True: - await asyncio.sleep(1) - bot.loop.create_task(check_youtubes_chat()) + logger.info('Starting youtube chat tasks') + await asyncio.sleep(5) + await create_tasks() + + +@bot.on('REDIS_SERVER_COMMAND') +async def redis_server_command(cmd, cmd_args): + if cmd == 'youtube_connected': + logger.debug(f'Starting youtube chat task for {cmd_args[0]}') + if not channel_tasks.get(cmd_args[0]): + channel_tasks[cmd_args[0]] = bot.loop.create_task( + check_youtube_chat(cmd_args[0]) + ) + if cmd == 'youtube_disconnected': + logger.debug(f'Canceling youtube chat task for {cmd_args[0]}') + if channel_tasks.get(cmd_args[0]): + channel_tasks[cmd_args[0]].cancel() + del channel_tasks[cmd_args[0]] -async def check_youtubes_chat(): +async def create_tasks(): + logger.info('Setting up youtube chat tasks') channels = await bot.db.fetchall( 'SELECT c.channel_id FROM twitch_channels c, twitch_youtube y WHERE c.active=1 AND c.channel_id=y.channel_id', ) for t in channels: - bot.loop.create_task(check_youtube_chat(t['channel_id'])) + logger.debug(f'Starting youtube chat task for {t["channel_id"]}') + channel_tasks[t['channel_id']] = bot.loop.create_task( + check_youtube_chat(t['channel_id']) + ) async def check_youtube_chat(channel_id: str): - live_chat_id = await get_live_chat_id(channel_id) - logger.debug(f'Checking youtube chat for {channel_id} ({live_chat_id})') + while True: + try: + live_chat_id = await get_live_chat_id(channel_id) + logger.debug(f'Checking youtube chat for {channel_id} ({live_chat_id})') - if not live_chat_id: - return + if not live_chat_id: + await asyncio.sleep(60) + continue - chat = await get_youtube_chat(channel_id, live_chat_id) - if chat.get('offlineAt'): - bot.channels_check[channel_id]['youtube_live_chat_id'] = None - await cache_channel(channel_id) - return + chat = await get_youtube_chat(channel_id, live_chat_id) + if chat.get('offlineAt'): + bot.channels_check[channel_id]['youtube_live_chat_id'] = None + await cache_channel(channel_id) + await asyncio.sleep(60) + continue + + _ = asyncio.create_task(parse_chatmessages(channel_id, live_chat_id, chat)) + logger.debug( + f'Youtube chat task sleeping for {chat["pollingIntervalMillis"]}ms for {channel_id}' + ) + await asyncio.sleep(chat['pollingIntervalMillis'] / 1000) + except Exception as e: + logger.error(f'Error in youtube chat task: {e}') + await asyncio.sleep(60) + + +async def parse_chatmessages(channel_id: str, live_chat_id: str, chat: dict): for m in chat.get('items', []): if m['snippet']['type'] != 'textMessageEvent': continue + if ( + datetime.now(tz=timezone.utc) - parse_dt(m['snippet']['publishedAt']) + ).total_seconds() > 30: + continue message = m['snippet']['displayMessage'] logger.debug(f'Received message {message}') author = m['snippet']['authorChannelId'] diff --git a/tbot/web/handlers/api/twitch/connect_youtube.py b/tbot/web/handlers/api/twitch/connect_youtube.py index d5a0228..3440aa3 100644 --- a/tbot/web/handlers/api/twitch/connect_youtube.py +++ b/tbot/web/handlers/api/twitch/connect_youtube.py @@ -10,84 +10,119 @@ class Handler(Api_handler): - @Level(3) async def get(self, channel_id): r = await self.db.fetchone( - 'SELECT handle FROM twitch_youtube WHERE channel_id=%s', - (channel_id,) + 'SELECT handle FROM twitch_youtube WHERE channel_id=%s', (channel_id,) + ) + self.write_object( + { + 'connected': True if r else False, + 'handle': r['handle'] if r else None, + } ) - self.write_object({ - 'connected': True if r else False, - 'handle': r['handle'] if r else None, - }) @Level(3) async def delete(self, channel_id): - r = await self.db.fetchone( - 'DELETE FROM twitch_youtube WHERE channel_id=%s', - (channel_id,) + await self.db.fetchone( + 'DELETE FROM twitch_youtube WHERE channel_id=%s', (channel_id,) + ) + await self.redis.publish_json( + 'tbot:server:commands', ['youtube_disconnected', channel_id] ) self.set_status(204) @Level(3) async def post(self, channel_id): r = await self.db.fetchone( - 'select name from twitch_channels where channel_id=%s', - (channel_id,) + 'select name from twitch_channels where channel_id=%s', (channel_id,) ) if not r: raise Exception('Unknown channel {}'.format(channel_id)) - self.redirect('https://accounts.google.com/o/oauth2/v2/auth?'+parse.urlencode({ - 'client_id': config.data.youtube.client_id, - 'access_type': 'offline', - 'prompt': 'consent', - 'include_granted_scopes': 'true', - 'redirect_uri': parse.urljoin(config.data.web.base_url, 'connect/youtube'), - 'scope': 'https://www.googleapis.com/auth/youtube', - 'response_type': 'code', - 'state': base64.b64encode(utils.json_dumps({ - 'channel_id': channel_id, - 'channel_name': r['name'], - }).encode('utf-8')), - })) + self.redirect( + 'https://accounts.google.com/o/oauth2/v2/auth?' + + parse.urlencode( + { + 'client_id': config.data.youtube.client_id, + 'access_type': 'offline', + 'prompt': 'consent', + 'include_granted_scopes': 'true', + 'redirect_uri': parse.urljoin( + config.data.web.base_url, 'connect/youtube' + ), + 'scope': 'https://www.googleapis.com/auth/youtube', + 'response_type': 'code', + 'state': base64.b64encode( + utils.json_dumps( + { + 'channel_id': channel_id, + 'channel_name': r['name'], + } + ).encode('utf-8') + ), + } + ) + ) -class Receive_handler(Base_handler): +class Receive_handler(Base_handler): async def get(self): code = self.get_argument('code') state = utils.json_loads(base64.b64decode(self.get_argument('state'))) await self.check_access(state['channel_id']) http = httpclient.AsyncHTTPClient() - response = await http.fetch('https://oauth2.googleapis.com/token', body=parse.urlencode({ - 'client_id': config.data.youtube.client_id, - 'client_secret': config.data.youtube.client_secret, - 'code': code, - 'redirect_uri': parse.urljoin(config.data.web.base_url, 'connect/youtube'), - 'grant_type': 'authorization_code', - }), method='POST', headers={'Content-Type': 'application/x-www-form-urlencoded'}, raise_error=False) + response = await http.fetch( + 'https://oauth2.googleapis.com/token', + body=parse.urlencode( + { + 'client_id': config.data.youtube.client_id, + 'client_secret': config.data.youtube.client_secret, + 'code': code, + 'redirect_uri': parse.urljoin( + config.data.web.base_url, 'connect/youtube' + ), + 'grant_type': 'authorization_code', + } + ), + method='POST', + headers={'Content-Type': 'application/x-www-form-urlencoded'}, + raise_error=False, + ) if response.code != 200: logger.error(escape.native_str(response.body)) self.write('Unable to verify you at YouTube, please try again.') return token = json.loads(escape.native_str(response.body)) - response = await http.fetch('https://www.googleapis.com/youtube/v3/channels?part=snippet&mine=true', headers={ - 'Authorization': 'Bearer {}'.format(token['access_token']), - }, raise_error=False) + response = await http.fetch( + 'https://www.googleapis.com/youtube/v3/channels?part=snippet&mine=true', + headers={ + 'Authorization': 'Bearer {}'.format(token['access_token']), + }, + raise_error=False, + ) if response.code != 200: logger.error(escape.native_str(response.body)) self.write('Unable retrieve your YouTube profile, please try again.') return user = json.loads(escape.native_str(response.body)) - await self.db.execute(''' + await self.db.execute( + """ INSERT INTO twitch_youtube (channel_id, token, refresh_token, handle) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE token=VALUES(token), refresh_token=VALUES(refresh_token), handle=VALUES(handle) - ''', - (state['channel_id'], token['access_token'], token['refresh_token'], ', '.join([user['snippet']['title'] for user in user['items']])) + """, + ( + state['channel_id'], + token['access_token'], + token['refresh_token'], + ', '.join([user['snippet']['title'] for user in user['items']]), + ), + ) + await self.redis.publish_json( + 'tbot:server:commands', ['youtube_connected', state['channel_id']] ) if state['channel_name']: @@ -97,4 +132,4 @@ async def get(self): @Level(3) async def check_access(self, channel_id): - pass \ No newline at end of file + pass diff --git a/tbot/web/ui/twitch/dashboard/youtube.jsx b/tbot/web/ui/twitch/dashboard/youtube.jsx index e3a344e..07587dd 100644 --- a/tbot/web/ui/twitch/dashboard/youtube.jsx +++ b/tbot/web/ui/twitch/dashboard/youtube.jsx @@ -58,6 +58,7 @@ class YouTube extends React.Component { ) return (
+

Connect your YouTube account to activate commands, timers and chat moderation for your YouTube Live streams.