-
Notifications
You must be signed in to change notification settings - Fork 96
/
hikari_music.py
194 lines (151 loc) · 6.95 KB
/
hikari_music.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""
The majority of this example plugin is based on music.py, except it works with
hikari and lightbulb Discord libraries.
"""
import re
import logging
import hikari
import lightbulb
import lavalink
plugin = lightbulb.Plugin('Music', 'Music commands')
class EventHandler:
"""Events from the Lavalink server"""
@lavalink.listener(lavalink.TrackStartEvent)
async def track_start(self, event: lavalink.TrackStartEvent):
logging.info('Track started on guild: %s', event.player.guild_id)
@lavalink.listener(lavalink.TrackEndEvent)
async def track_end(self, event: lavalink.TrackEndEvent):
logging.info('Track finished on guild: %s', event.player.guild_id)
@lavalink.listener(lavalink.TrackExceptionEvent)
async def track_exception(self, event: lavalink.TrackExceptionEvent):
logging.warning('Track exception event happened on guild: %d', event.player.guild_id)
@lavalink.listener(lavalink.QueueEndEvent)
async def queue_finish(self, event: lavalink.QueueEndEvent):
logging.info('Queue finished on guild: %s', event.player.guild_id)
@plugin.listener(hikari.ShardReadyEvent)
async def init(event: hikari.ShardReadyEvent) -> None:
"""Add node to bot on ready"""
client = lavalink.Client(plugin.bot.get_me().id)
client.add_node(
host='localhost',
port=2333,
password='youshallnotpass',
region='us',
name='default-node'
)
client.add_event_hooks(EventHandler())
plugin.bot.d.lavalink = client
@plugin.listener(hikari.VoiceServerUpdateEvent)
async def voice_server_update(event: hikari.VoiceServerUpdateEvent) -> None:
# the data needs to be transformed before being handed down to
# voice_update_handler
lavalink_data = {
't': 'VOICE_SERVER_UPDATE',
'd': {
'guild_id': event.guild_id,
'endpoint': event.endpoint[6:], # get rid of wss://
'token': event.token,
}
}
await plugin.bot.d.lavalink.voice_update_handler(lavalink_data)
@plugin.listener(hikari.VoiceStateUpdateEvent)
async def voice_state_update(event: hikari.VoiceStateUpdateEvent) -> None:
# the data needs to be transformed before being handed down to
# voice_update_handler
lavalink_data = {
't': 'VOICE_STATE_UPDATE',
'd': {
'guild_id': event.state.guild_id,
'user_id': event.state.user_id,
'channel_id': event.state.channel_id,
'session_id': event.state.session_id,
}
}
await plugin.bot.d.lavalink.voice_update_handler(lavalink_data)
async def _join(ctx: lightbulb.Context):
states = plugin.bot.cache.get_voice_states_view_for_guild(ctx.guild_id)
voice_state = [state[1] for state in filter(lambda i : i[0] == ctx.author.id, states.items())]
# user not in voice channel
if not voice_state:
return
channel_id = voice_state[0].channel_id # channel user is connected to
plugin.bot.d.lavalink.player_manager.create(guild_id=ctx.guild_id)
await plugin.bot.update_voice_state(ctx.guild_id, channel_id, self_deaf=True)
return channel_id
@plugin.command()
@lightbulb.add_checks(lightbulb.guild_only)
@lightbulb.command('join', 'Joins the voice channel you are in.', auto_defer=True)
@lightbulb.implements(lightbulb.SlashCommand)
async def join(ctx: lightbulb.Context) -> None:
"""
Connect the bot to the voice channel the user is currently in
and create a player_manager if it doesn't exist yet.
"""
channel_id = await _join(ctx)
if not channel_id:
await ctx.respond('Connect to a voice channel first!')
return
await ctx.respond(f'Connected to <#{channel_id}>')
logging.info('Client connected to voice channel on guild: %s', ctx.guild_id)
@plugin.command()
@lightbulb.add_checks(lightbulb.guild_only)
@lightbulb.command('leave', 'Leaves voice channel, clearing queue.', auto_defer=True)
@lightbulb.implements(lightbulb.PrefixCommand, lightbulb.SlashCommand)
async def leave(ctx: lightbulb.Context) -> None:
"""Leaves the voice channel the bot is in, clearing the queue."""
player = plugin.bot.d.lavalink.player_manager.get(ctx.guild_id)
if not player or not player.is_connected:
await ctx.respond('Not currently in any voice channel!')
return
player.queue.clear() # clear queue
await player.stop() # stop player
player.channel_id = None # update the channel_id of the player to None
await plugin.bot.update_voice_state(ctx.guild_id, None)
await ctx.respond('Disconnected')
@plugin.command()
@lightbulb.add_checks(lightbulb.guild_only)
@lightbulb.option('query', 'The query to search for.', modifier=lightbulb.OptionModifier.CONSUME_REST, required=True)
@lightbulb.command('play', 'Searches query on youtube, or adds the URL to the queue.', auto_defer = True)
@lightbulb.implements(lightbulb.PrefixCommand, lightbulb.SlashCommand)
async def play(ctx: lightbulb.Context) -> None:
"""Searches the query on youtube, or adds the URL to the queue."""
player = plugin.bot.d.lavalink.player_manager.get(ctx.guild_id)
# Remove leading and trailing <>. <> may be used to suppress embedding links in Discord.
query = ctx.options.query.strip('<>')
if not player or not player.is_connected:
channel_id = await _join(ctx)
if not channel_id:
await ctx.respond('Connect to a voice channel first!')
return
# get player again after having connected to voice channel
player = plugin.bot.d.lavalink.player_manager.get(ctx.guild_id)
# Check if the user input might be a URL. If it isn't, we can Lavalink do a YouTube search for it instead.
url_rx = re.compile(r'https?://(?:www\.)?.+')
if not url_rx.match(query):
query = f'ytsearch:{query}'
# Get the results for the query from Lavalink.
results = await player.node.get_tracks(query)
# Results could be None if Lavalink returns an invalid response (non-JSON/non-200 (OK)).
# Alternatively, results.tracks could be an empty array if the query yielded no tracks.
if not results or not results.tracks:
return await ctx.respond('Nothing found!')
embed = hikari.Embed()
if results.load_type == 'PLAYLIST_LOADED':
tracks = results.tracks
for track in tracks:
# Add all of the tracks from the playlist to the queue.
player.add(requester=ctx.author.id, track=track)
embed.title = 'Playlist Enqueued!'
embed.description = f'{results.playlist_info.name} - {len(tracks)} tracks'
else:
track = results.tracks[0]
embed.title = 'Track Enqueued'
embed.description = f'[{track.title}]({track.uri})'
player.add(requester=ctx.author.id, track=track)
await ctx.respond(embed=embed)
# We don't want to call .play() if the player is playing as that will effectively skip
# the current track.
if not player.is_playing:
await player.play()
def load(bot: lightbulb.BotApp) -> None:
bot.add_plugin(plugin)