-
Notifications
You must be signed in to change notification settings - Fork 18
/
discord_bot.py
212 lines (177 loc) · 8.33 KB
/
discord_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import asyncio
import concurrent.futures
import logging as stdlib_logging
import os
import datetime
import pathlib
from absl import app, logging
import discord
from discord.enums import IntegrationType, InteractionContextType
from discord.ext import commands
from discord import option
from google.cloud import datastore
import hashids
import constants
import scanner
ERROR_EMOJI = ':exclamation:'
SUCCESS_EMOJI = ':tada:'
SCANNING_EMOJI = ':mag:'
WAIT_EMOJI = ':hourglass:'
WAIT_LOCK = asyncio.Lock()
intents = discord.Intents.default()
bot = commands.Bot(intents=intents)
hashid_client = hashids.Hashids(salt=constants.SALT, min_length=6)
def upload_to_datastore(result, discord_user_id=None) -> datastore.Entity:
datastore_client = datastore.Client.from_service_account_json('catalog-scanner.json')
temp_key = datastore_client.key('Catalog')
key = datastore_client.allocate_ids(temp_key, 1)[0]
catalog = datastore.Entity(key, exclude_from_indexes=['data', 'unmatched'])
key_hash = hashid_client.encode(key.id)
catalog_data = '\n'.join(result.items).encode('utf-8')
catalog.update(
{
'hash': key_hash,
'data': catalog_data,
'locale': result.locale,
'type': result.mode.name.lower(),
'created': datetime.datetime.utcnow(),
'discord_user': discord_user_id,
}
)
if result.unmatched:
unmatched_data = '\n'.join(result.unmatched).encode('utf-8')
catalog['unmatched'] = unmatched_data
datastore_client.put(catalog)
return catalog
async def handle_scan(
ctx: discord.ApplicationContext,
attachment: discord.Attachment,
filetype: str,
) -> None:
"""Downloads the file, runs the scans and uploads the results, while updating the user along the way."""
await reply(ctx, f'{SCANNING_EMOJI} Scan started, your results will be ready soon!')
tmp_dir = pathlib.Path('cache')
logging.info('Downloading attachment %s', attachment.id)
file = await attachment.to_file()
tmp_file = tmp_dir / f'{attachment.id}_{file.filename}'
tmp_file.parent.mkdir(parents=True, exist_ok=True)
await attachment.save(tmp_file)
try:
result = await async_scan(tmp_file)
except AssertionError as e:
error_message = improve_error_message(str(e))
await reply(ctx, f'{ERROR_EMOJI} Failed to scan: {error_message}')
return
except Exception:
logging.exception('Unexpected scan error.')
await reply(
ctx,
f'{ERROR_EMOJI} Failed to scan media. Make sure you have a valid {filetype}.',
)
return
if not result.items:
await reply(ctx, f'{ERROR_EMOJI} Did not find any items.')
return
catalog = upload_to_datastore(result, ctx.user.id)
url = 'https://nook.lol/{}'.format(catalog['hash'])
logging.info('Found %s items with %s: %s', len(result.items), result.mode, url)
await reply(
ctx,
f'{SUCCESS_EMOJI} Found {len(result.items)} items in your {filetype}.\nResults: {url}',
)
async def async_scan(filename: os.PathLike) -> scanner.ScanResult:
"""Runs the scan asynchronously in a thread pool."""
pool = concurrent.futures.ThreadPoolExecutor()
future = pool.submit(scanner.scan_media, str(filename))
return await asyncio.wrap_future(future)
async def reply(ctx: discord.ApplicationContext, message: str) -> None:
"""Responds with an ephemeral message, or updates the existing message."""
if not ctx.interaction.response.is_done():
await ctx.interaction.response.send_message(content=message, ephemeral=True)
else:
await ctx.interaction.edit_original_response(content=message)
def improve_error_message(message: str) -> str:
"""Adds some more details to the error message."""
if 'is too long' in message:
message += ' Make sure you scroll with the correct analog stick (see instructions),'
message += ' and trim the video around the start and end of the scrolling.'
if 'scrolling too slowly' in message:
message += ' Make sure you hold down the *right* analog stick.'
message += ' See https://x.com/CatalogScanner/status/1261737975244865539'
if 'scrolling inconsistently' in message:
message += ' Please scroll once, from start to finish, in one direction only.'
if 'Invalid video' in message:
message += ' Make sure the video is exported directly from your Nintendo Switch '
message += 'and that you\'re scrolling through a supported page. See nook.lol'
if 'not showing catalog or recipes' in message:
message += ' Make sure to record the video with your Switch using the capture button.'
if 'x224' in message:
message += '\n(It seems like you\'re downloading the video from your Facebook and '
message += 're-posting it; try downloading it directly from your Switch instead)'
if 'x360' in message:
message += '\nIt looks like you might have Data Saving Mode enabled; '
message += 'go to *Settings -> Text & Media*, uncheck Data Saving Mode and make sure *Video Uploads* is set to **Best Quality**.'
elif 'x480' in message:
message += '\nIt seems like Discord might have compressed your video; '
message += 'go to *Settings -> Text & Media* and set *Video Uploads* to **Best Quality**.'
elif 'Invalid resolution' in message:
message += '\n(Make sure you are recording and sending directly from the Switch)'
if 'Pictures Mode' in message:
message += ' Press X to switch to list mode and try again!'
if 'blocking a reaction' in message:
message += ' Make sure to move the cursor to an empty slot or the top right corner, '
message += 'otherwise your results may not be accurate.'
if 'Workbench scanning' in message:
message += ' Please use the DIY Recipes phone app instead (beige background).'
if 'catalog is not supported' in message:
message += ' Please use the Catalog phone app instead (yellow background).'
if 'Incomplete critter scan' in message:
message += ' Make sure to fully capture the leftmost and rightmost sides of the page.'
if 'not uploaded directly' in message:
message += ' Make sure to record and download the video using the Switch\'s media gallery.'
return message
@bot.slash_command(
name='scan',
description='Extracts your Animal Crossing items (catalog, recipes, critters, reactions, music).',
integration_types=[IntegrationType.user_install, IntegrationType.guild_install],
contexts=[InteractionContextType.guild, InteractionContextType.bot_dm, InteractionContextType.private_channel],
)
@option(
'attachment',
discord.Attachment,
description='The video or image to scan',
required=True,
)
async def scan(ctx: discord.ApplicationContext, attachment: discord.Attachment):
if attachment:
assert attachment.content_type
filetype, _, _ = attachment.content_type.partition('/') # {type}/{format}
if filetype not in ('video', 'image'):
logging.info('Invalid attachment type %r, skipping.', attachment.content_type)
await reply(ctx, f'{ERROR_EMOJI} The attachment needs to be a valid video or image file.')
return
else:
await reply(ctx, f'{ERROR_EMOJI} No attachment found.')
return
logging.info('Got request from %s with type %r', ctx.user, filetype)
# Have a queue system that handles requests one at a time.
if WAIT_LOCK.locked and (waiters := WAIT_LOCK._waiters): # type: ignore
logging.info('%s (%s) is in queue position %s', ctx.user, attachment.id, len(waiters))
await reply(ctx, f'{WAIT_EMOJI} You are #{len(waiters)} in the queue, your scan will start soon.')
async with WAIT_LOCK:
await handle_scan(ctx, attachment, filetype)
@bot.event
async def on_ready():
assert bot.user, 'Failed to login'
logging.info('Bot logged in as %s', bot.user)
def main(argv):
del argv # unused
bot.run(constants.DISCORD_TOKEN)
if __name__ == '__main__':
# Write logs to file.
file_handler = stdlib_logging.FileHandler('logs.txt')
logging.get_absl_logger().addHandler(file_handler) # type: ignore
# Disable noise discord logs.
stdlib_logging.getLogger('discord.client').setLevel(stdlib_logging.WARNING)
stdlib_logging.getLogger('discord.gateway').setLevel(stdlib_logging.WARNING)
app.run(main)