Skip to content

Commit

Permalink
Make sure to not answer old chat messages incase the bot has been dow…
Browse files Browse the repository at this point in the history
…n for a while and auto connect the bot when the user connects their youtube account
  • Loading branch information
thomaserlang committed Oct 15, 2024
1 parent f242672 commit aecffae
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 62 deletions.
2 changes: 1 addition & 1 deletion tbot/twitch_bot/tasks/channels_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ async def send_discord_live_notification(channel_id):
error,
)
)
except:
except Exception:
logger.exception(
'twitch discord webhook-id: {} '.format(
w['id'],
Expand Down
77 changes: 58 additions & 19 deletions tbot/twitch_bot/tasks/youtube_chat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from datetime import datetime, timezone

from dateutil.parser import parse as parse_dt
from httpx import AsyncClient

from tbot import config, logger
Expand All @@ -11,47 +13,84 @@

_started = False

channel_tasks: dict[str, asyncio.Task] = {}

@bot.on('AFTER_CONNECTED')

@bot.on('AFTER_CHANNELS_JOINED')
async def connected(**kwargs):
global _started
if not config.data.youtube.client_id:
return
if not _started:
_started = True
bot.loop.create_task(runner())


async def runner():
while True:
await asyncio.sleep(1)
bot.loop.create_task(check_youtubes_chat())
logger.info('Starting youtube chat tasks')
await asyncio.sleep(5)
await create_tasks()


@bot.on('REDIS_SERVER_COMMAND')
async def redis_server_command(cmd, cmd_args):
if cmd == 'youtube_connected':
logger.debug(f'Starting youtube chat task for {cmd_args[0]}')
if not channel_tasks.get(cmd_args[0]):
channel_tasks[cmd_args[0]] = bot.loop.create_task(
check_youtube_chat(cmd_args[0])
)
if cmd == 'youtube_disconnected':
logger.debug(f'Canceling youtube chat task for {cmd_args[0]}')
if channel_tasks.get(cmd_args[0]):
channel_tasks[cmd_args[0]].cancel()
del channel_tasks[cmd_args[0]]


async def check_youtubes_chat():
async def create_tasks():
logger.info('Setting up youtube chat tasks')
channels = await bot.db.fetchall(
'SELECT c.channel_id FROM twitch_channels c, twitch_youtube y WHERE c.active=1 AND c.channel_id=y.channel_id',
)
for t in channels:
bot.loop.create_task(check_youtube_chat(t['channel_id']))
logger.debug(f'Starting youtube chat task for {t["channel_id"]}')
channel_tasks[t['channel_id']] = bot.loop.create_task(
check_youtube_chat(t['channel_id'])
)


async def check_youtube_chat(channel_id: str):
live_chat_id = await get_live_chat_id(channel_id)
logger.debug(f'Checking youtube chat for {channel_id} ({live_chat_id})')
while True:
try:
live_chat_id = await get_live_chat_id(channel_id)
logger.debug(f'Checking youtube chat for {channel_id} ({live_chat_id})')

if not live_chat_id:
return
if not live_chat_id:
await asyncio.sleep(60)
continue

chat = await get_youtube_chat(channel_id, live_chat_id)
if chat.get('offlineAt'):
bot.channels_check[channel_id]['youtube_live_chat_id'] = None
await cache_channel(channel_id)
return
chat = await get_youtube_chat(channel_id, live_chat_id)
if chat.get('offlineAt'):
bot.channels_check[channel_id]['youtube_live_chat_id'] = None
await cache_channel(channel_id)
await asyncio.sleep(60)
continue

_ = asyncio.create_task(parse_chatmessages(channel_id, live_chat_id, chat))

logger.debug(
f'Youtube chat task sleeping for {chat["pollingIntervalMillis"]}ms for {channel_id}'
)
await asyncio.sleep(chat['pollingIntervalMillis'] / 1000)
except Exception as e:
logger.error(f'Error in youtube chat task: {e}')
await asyncio.sleep(60)


async def parse_chatmessages(channel_id: str, live_chat_id: str, chat: dict):
for m in chat.get('items', []):
if m['snippet']['type'] != 'textMessageEvent':
continue
if (
datetime.now(tz=timezone.utc) - parse_dt(m['snippet']['publishedAt'])
).total_seconds() > 30:
continue
message = m['snippet']['displayMessage']
logger.debug(f'Received message {message}')
author = m['snippet']['authorChannelId']
Expand Down
115 changes: 75 additions & 40 deletions tbot/web/handlers/api/twitch/connect_youtube.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,84 +10,119 @@


class Handler(Api_handler):

@Level(3)
async def get(self, channel_id):
r = await self.db.fetchone(
'SELECT handle FROM twitch_youtube WHERE channel_id=%s',
(channel_id,)
'SELECT handle FROM twitch_youtube WHERE channel_id=%s', (channel_id,)
)
self.write_object(
{
'connected': True if r else False,
'handle': r['handle'] if r else None,
}
)
self.write_object({
'connected': True if r else False,
'handle': r['handle'] if r else None,
})

@Level(3)
async def delete(self, channel_id):
r = await self.db.fetchone(
'DELETE FROM twitch_youtube WHERE channel_id=%s',
(channel_id,)
await self.db.fetchone(
'DELETE FROM twitch_youtube WHERE channel_id=%s', (channel_id,)
)
await self.redis.publish_json(
'tbot:server:commands', ['youtube_disconnected', channel_id]
)
self.set_status(204)

@Level(3)
async def post(self, channel_id):
r = await self.db.fetchone(
'select name from twitch_channels where channel_id=%s',
(channel_id,)
'select name from twitch_channels where channel_id=%s', (channel_id,)
)
if not r:
raise Exception('Unknown channel {}'.format(channel_id))
self.redirect('https://accounts.google.com/o/oauth2/v2/auth?'+parse.urlencode({
'client_id': config.data.youtube.client_id,
'access_type': 'offline',
'prompt': 'consent',
'include_granted_scopes': 'true',
'redirect_uri': parse.urljoin(config.data.web.base_url, 'connect/youtube'),
'scope': 'https://www.googleapis.com/auth/youtube',
'response_type': 'code',
'state': base64.b64encode(utils.json_dumps({
'channel_id': channel_id,
'channel_name': r['name'],
}).encode('utf-8')),
}))
self.redirect(
'https://accounts.google.com/o/oauth2/v2/auth?'
+ parse.urlencode(
{
'client_id': config.data.youtube.client_id,
'access_type': 'offline',
'prompt': 'consent',
'include_granted_scopes': 'true',
'redirect_uri': parse.urljoin(
config.data.web.base_url, 'connect/youtube'
),
'scope': 'https://www.googleapis.com/auth/youtube',
'response_type': 'code',
'state': base64.b64encode(
utils.json_dumps(
{
'channel_id': channel_id,
'channel_name': r['name'],
}
).encode('utf-8')
),
}
)
)

class Receive_handler(Base_handler):

class Receive_handler(Base_handler):
async def get(self):
code = self.get_argument('code')
state = utils.json_loads(base64.b64decode(self.get_argument('state')))
await self.check_access(state['channel_id'])

http = httpclient.AsyncHTTPClient()
response = await http.fetch('https://oauth2.googleapis.com/token', body=parse.urlencode({
'client_id': config.data.youtube.client_id,
'client_secret': config.data.youtube.client_secret,
'code': code,
'redirect_uri': parse.urljoin(config.data.web.base_url, 'connect/youtube'),
'grant_type': 'authorization_code',
}), method='POST', headers={'Content-Type': 'application/x-www-form-urlencoded'}, raise_error=False)
response = await http.fetch(
'https://oauth2.googleapis.com/token',
body=parse.urlencode(
{
'client_id': config.data.youtube.client_id,
'client_secret': config.data.youtube.client_secret,
'code': code,
'redirect_uri': parse.urljoin(
config.data.web.base_url, 'connect/youtube'
),
'grant_type': 'authorization_code',
}
),
method='POST',
headers={'Content-Type': 'application/x-www-form-urlencoded'},
raise_error=False,
)
if response.code != 200:
logger.error(escape.native_str(response.body))
self.write('Unable to verify you at YouTube, please try again.')
return
token = json.loads(escape.native_str(response.body))

response = await http.fetch('https://www.googleapis.com/youtube/v3/channels?part=snippet&mine=true', headers={
'Authorization': 'Bearer {}'.format(token['access_token']),
}, raise_error=False)
response = await http.fetch(
'https://www.googleapis.com/youtube/v3/channels?part=snippet&mine=true',
headers={
'Authorization': 'Bearer {}'.format(token['access_token']),
},
raise_error=False,
)
if response.code != 200:
logger.error(escape.native_str(response.body))
self.write('Unable retrieve your YouTube profile, please try again.')
return
user = json.loads(escape.native_str(response.body))

await self.db.execute('''
await self.db.execute(
"""
INSERT INTO twitch_youtube (channel_id, token, refresh_token, handle) VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE token=VALUES(token), refresh_token=VALUES(refresh_token),
handle=VALUES(handle)
''',
(state['channel_id'], token['access_token'], token['refresh_token'], ', '.join([user['snippet']['title'] for user in user['items']]))
""",
(
state['channel_id'],
token['access_token'],
token['refresh_token'],
', '.join([user['snippet']['title'] for user in user['items']]),
),
)
await self.redis.publish_json(
'tbot:server:commands', ['youtube_connected', state['channel_id']]
)

if state['channel_name']:
Expand All @@ -97,4 +132,4 @@ async def get(self):

@Level(3)
async def check_access(self, channel_id):
pass
pass
1 change: 1 addition & 0 deletions tbot/web/ui/twitch/dashboard/youtube.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class YouTube extends React.Component {
)
return (
<div>
<p>Connect your YouTube account to activate commands, timers and chat moderation for your YouTube Live streams.</p>
<form
method="post"
onSubmit={this.handleSubmit}
Expand Down
7 changes: 5 additions & 2 deletions tests/tbot/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import unittest
import tbot.testbase
from tbot import utils
from datetime import datetime

from dateutil.parser import parse

import tbot.testbase
from tbot import utils


class test_utils(unittest.TestCase):

def test_seconds_to_pretty(self):
Expand Down

0 comments on commit aecffae

Please sign in to comment.