-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathguilded_bridge_platform.py
396 lines (320 loc) · 14.3 KB
/
guilded_bridge_platform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
import nextcord
import guilded
from utils import platform_base
from typing import Union
arrow_unicode = '\U0000250C'
class GuildedPlatform(platform_base.PlatformBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.enable_tb = True
self.uses_webhooks = True
self.reply_using_text = True
self.filesize_limit = 500000000
def error_is_unavoidable(self, error):
return type(error) in [guilded.GuildedServerError, guilded.Forbidden]
def bot_id(self):
return self.bot.user.id
# WebhookCacheStore wrapper for Guilded Support NUPS module
def store_webhook(self, webhook: guilded.Webhook):
return self.bot.webhook_cache.store_webhook(webhook, webhook.id, webhook.server.id)
def store_webhooks(self, webhooks: list):
return self.bot.webhook_cache.store_webhooks(
webhooks, [webhook.id for webhook in webhooks], [webhooks[0].server.id] * len(webhooks)
)
def get_webhooks(self, guild: str):
return self.bot.webhook_cache.get_webhooks(guild)
def get_webhook(self, identifier: str):
return self.bot.webhook_cache.get_webhook(identifier)
def clear(self, guild: int or str = None):
return self.bot.webhook_cache.clear(guild)
# Guilded Support NUPS functions
def get_server(self, server_id):
return self.bot.get_server(server_id)
def get_channel(self, channel_id):
return self.bot.get_channel(channel_id)
def get_user(self, user_id):
return self.bot.get_user(user_id)
def get_member(self, server, user_id):
server = self.get_server(server)
return server.get_member(user_id)
def channel(self, message: guilded.ChatMessage):
return message.channel
def channel_id(self, obj):
return obj.channel_id
def server(self, message: guilded.ChatMessage):
return message.server
def server_id(self, obj):
return obj.server_id
def content(self, message: guilded.ChatMessage):
return message.content
def reply(self, message: guilded.ChatMessage or guilded.Message):
if len(message.replied_to) == 0:
return message.replied_to_ids[0]
return message.replied_to[0]
def roles(self, member: guilded.Member):
return member.roles
def get_hex(self, role):
# If the color is a gradient, use the first color value
color = role.colors[0]
return ''.join(f'{i:02X}' for i in (color.r, color.g, color.b))
def author(self, message: guilded.ChatMessage):
return message.author
def embeds(self, message: guilded.ChatMessage):
return message.embeds
def attachments(self, message: guilded.ChatMessage):
return message.attachments
def url(self, message: guilded.ChatMessage):
return message.share_url
def get_id(self, obj):
return obj.id
def display_name(self, user: guilded.User, message=None):
# Guilded doesn't have display names, so return username
return user.name
def user_name(self, user: guilded.User, message=None):
return user.name
def name(self, obj):
return obj.name
def avatar(self, user, message=None):
return user.avatar.url.split('?')[0] if user.avatar else None
def permissions(self, user: guilded.Member, channel=None):
# we can't fetch permissions for a channel without using async, so use server permissions only
user_perms = user.server_permissions
permissions = platform_base.Permissions()
permissions.ban_members = user_perms.ban_members
permissions.manage_channels = user_perms.manage_channels
return permissions
def is_bot(self, user):
return user.bot
def attachment_size(self, attachment):
if not attachment.size:
# Guilded sometimes (or always?) doesn't provide the size of an attachment
# If attachment.size is None, return 0
return 0
return attachment.size
def attachment_type(self, attachment: guilded.Attachment):
# We spent a great deal of time trying to figure this out for v2, thank guilded.py
if not type(attachment.file_type) is guilded.FileType:
# noinspection PyTypeChecker
# ^ this stops pycharm from complaining because file_type should be namedtuple
for value in list(attachment.file_type):
if value == guilded.FileType.image or value == 'unknown_image':
return 'image'
elif value == guilded.FileType.video or value == 'unknown_video':
return 'video'
else:
return 'unknown'
else:
if attachment.file_type.image:
return 'image'
elif attachment.file_type.video:
return 'video'
else:
return 'unknown'
def convert_embeds(self, embeds: list):
converted = []
for i in range(len(embeds)):
if not type(embeds[i]) is nextcord.Embed:
continue
embed = guilded.Embed(
title=embeds[i].title or guilded.Embed.Empty,
description=embeds[i].description or guilded.Embed.Empty,
url=embeds[i].url or guilded.Embed.Empty,
colour=embeds[i].colour.value if embeds[i].colour else guilded.Embed.Empty,
timestamp=embeds[i].timestamp or guilded.Embed.Empty,
)
if embeds[i].image:
embed.set_image(url=embeds[i].image.url or guilded.Embed.Empty)
if embeds[i].thumbnail:
embed.set_thumbnail(url=embeds[i].thumbnail.url or guilded.Embed.Empty)
if embeds[i].author:
embed.set_author(
name=embeds[i].author.name,
url=embeds[i].author.url,
icon_url=embeds[i].author.icon_url or guilded.Embed.Empty
)
if embeds[i].footer:
embed.set_footer(text=embeds[i].footer.text, icon_url=embeds[i].footer.icon_url or guilded.Embed.Empty)
converted.append(embed)
return converted
def convert_embeds_discord(self, embeds: list):
for i in range(len(embeds)):
if not type(embeds[i]) is guilded.Embed:
continue
embed = nextcord.Embed(
title=embeds[i].title,
description=embeds[i].description,
url=embeds[i].url,
colour=embeds[i].colour.value,
timestamp=embeds[i].timestamp,
)
if embeds[i].image:
embed.set_image(url=embeds[i].image.url)
if embeds[i].thumbnail:
embed.set_thumbnail(url=embeds[i].thumbnail.url)
if embeds[i].author:
embed.set_author(name=embeds[i].author.name, url=embeds[i].author.url, icon_url=embeds[i].author.icon_url)
if embeds[i].footer:
embed.set_footer(text=embeds[i].footer.text,icon_url=embeds[i].footer.icon_url)
embeds[i] = embed
return embeds
def webhook_id(self, message):
return message.webhook_id
async def fetch_server(self, server_id):
return await self.bot.fetch_server(server_id)
async def fetch_channel(self, channel_id):
return await self.bot.fetch_channel(channel_id)
async def fetch_webhook(self, webhook_id, server_id):
try:
return self.get_webhook(webhook_id)
except:
server = await self.bot.getch_server(server_id)
return await server.fetch_webhook(webhook_id)
async def fetch_message(self, channel, message_id):
return await channel.fetch_message(message_id)
async def make_friendly(self, text: str):
# Remove user mentions
if len(text.split('<@')) > 1:
for item in text.split('<@'):
if not '>' in item:
# not a mention
continue
user_id = list(item.split('>'))[0] # using list here to stop pycharm from complaining
user = self.get_user(user_id)
if not user:
continue
text = text.replace(
f'<@{user_id}>', f'@{self.user_name(user)}'
).replace(
f'<@!{user_id}>', f'@{self.user_name(user)}'
)
# Remove channel mentions
if len(text.split('<#')) > 1:
for item in text.split('<#'):
if not '>' in item:
# not a mention
continue
channel_id = item.split('>')[0]
channel = self.get_channel(channel_id)
if not channel:
continue
text = text.replace(
f'<#{channel_id}>', f'#{self.name(channel)}'
).replace(
f'<#!{channel_id}>', f'#{self.name(channel)}'
)
# Fix attachment URLs
lines = text.split('\n')
offset = 0
for index in range(len(lines)):
try:
line = lines[index - offset]
except:
break
if line.startswith(':
try:
lines.pop(index - offset)
offset += 1
except:
pass
elif line.startswith(' and line.endswith(')'):
lines[index - offset] = line.replace('[:-1]
if len(lines) == 0:
text = ''
else:
text = '\n'.join(lines)
return text
async def to_discord_file(self, file: guilded.Attachment):
tempfile = await file.to_file()
# noinspection PyTypeChecker
return nextcord.File(fp=tempfile.fp, filename=file.filename)
async def to_platform_file(self, file: Union[nextcord.Attachment, nextcord.File]):
if type(file) is nextcord.Attachment:
tempfile = await file.to_file(use_cached=True)
else:
tempfile = file
return guilded.File(fp=tempfile.fp, filename=file.filename)
async def send(self, channel, content, special: dict = None):
files = special.get('files', [])
embeds = special.get('embeds', [])
reply = special.get('reply', None)
reply_content = special.get('reply_content', None)
if not files:
files = []
if not embeds:
embeds = []
if 'bridge' in special.keys():
# check if we're in a room
room = self.parent.bridge.get_channel_room(channel, platform='guilded')
if room:
webhook_id = self.parent.bridge.get_room(room)['guilded'][channel.server.id][0]
try:
webhook = self.get_webhook(webhook_id)
except:
server = self.get_server(channel.server.id)
webhook = await server.fetch_webhook(webhook_id)
self.store_webhook(webhook)
else:
raise ValueError('channel is not linked to a room, remove bridge from special')
# as Guilded only supports ascii for usernames, remove all non-ascii characters
# user emojis will be omitted
name = special['bridge']['name'].encode("ascii", errors="ignore").decode() or 'Empty username'
avatar = special['bridge']['avatar']
replytext = ''
if reply:
# reply must be a UnifierMessage here
# as reply cannot be none, PyUnresolvedReferences can be ignored
reply_name = None
try:
# noinspection PyUnresolvedReferences
if reply.source == 'discord':
# noinspection PyUnresolvedReferences
user = self.parent.get_user(int(reply.author_id))
reply_name = user.global_name or user.name
elif reply.source == 'guilded':
# noinspection PyUnresolvedReferences
user = self.bot.get_user(reply.author_id)
reply_name = user.name
else:
# noinspection PyUnresolvedReferences
source_support = self.parent.platforms[reply.source]
# noinspection PyUnresolvedReferences
reply_name = source_support.display_name(source_support.get_user(reply.author_id))
except:
pass
if not reply_name:
reply_name = 'unknown'
else:
reply_name = '@' + reply_name.replace('[','').replace(']','')
# noinspection PyUnresolvedReferences
if channel.server.id in reply.urls.keys():
# noinspection PyUnresolvedReferences
replytext = f'{arrow_unicode} **[Replying to {reply_name}]({reply.urls[channel.server.id]})**'
else:
replytext = f'{arrow_unicode} **Replying to {reply_name}**'
if reply_content:
replytext += f' - *{reply_content}*\n'
else:
replytext += '\n'
if len(replytext + content) == 0:
content = '[empty message]'
return await webhook.send(replytext + content, embeds=embeds, files=files, username=name, avatar_url=avatar)
else:
if reply:
# reply must be an ID or ChatMessage here
if type(reply) is str:
reply = await channel.fetch_message(reply)
elif type(reply) is guilded.ChatMessage:
pass
else:
reply = None
return await channel.send(content, embeds=embeds, files=files, reply_to=[reply] if reply else None)
async def edit(self, message: guilded.ChatMessage, content, special: dict = None):
if message.webhook_id:
# can't edit a webhook message
return
embeds = special.get('embeds', [])
if not embeds:
embeds = []
await message.edit(content=content, embeds=embeds)
async def delete(self, message):
await message.delete()