diff --git a/Wumpus.Net.sln b/Wumpus.Net.sln index 609cadd..ac37d95 100644 --- a/Wumpus.Net.sln +++ b/Wumpus.Net.sln @@ -32,6 +32,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Wumpus.Net.Tests.Server", " EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Wumpus.Net.Bot", "src\Wumpus.Net.Bot\Wumpus.Net.Bot.csproj", "{ED977313-7BC8-4A5E-8A24-1BF42635D293}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wumpus.Net.Audio", "src\Wumpus.Net.Audio\Wumpus.Net.Audio.csproj", "{5ABF3E1B-45E9-4CA7-A455-8923C54B480F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -150,6 +152,18 @@ Global {ED977313-7BC8-4A5E-8A24-1BF42635D293}.Release|x64.Build.0 = Release|Any CPU {ED977313-7BC8-4A5E-8A24-1BF42635D293}.Release|x86.ActiveCfg = Release|Any CPU {ED977313-7BC8-4A5E-8A24-1BF42635D293}.Release|x86.Build.0 = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|x64.ActiveCfg = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|x64.Build.0 = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|x86.ActiveCfg = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|x86.Build.0 = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|Any CPU.Build.0 = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|x64.ActiveCfg = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|x64.Build.0 = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|x86.ActiveCfg = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -164,6 +178,7 @@ Global {0967FF88-E435-4132-9524-B252A3D9BCD6} = {9806905E-1B09-4AC2-BF4F-731C9E473F94} {C6BE9630-1E1F-4DD1-9205-045A50181AA7} = {9806905E-1B09-4AC2-BF4F-731C9E473F94} {ED977313-7BC8-4A5E-8A24-1BF42635D293} = {F7B9BFB1-C836-4432-BE64-719A38E0BBEF} + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F} = {F7B9BFB1-C836-4432-BE64-719A38E0BBEF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B5B02039-F4C2-44C6-91B8-340E44472AED} diff --git a/src/Wumpus.Net.Audio/Events/VoiceHelloEvent.cs b/src/Wumpus.Net.Audio/Events/VoiceHelloEvent.cs new file mode 100644 index 0000000..e959c48 --- /dev/null +++ b/src/Wumpus.Net.Audio/Events/VoiceHelloEvent.cs @@ -0,0 +1,15 @@ +using Voltaic.Serialization; + +namespace Wumpus.Events +{ + public class VoiceHelloEvent + { + // Given as float because discord returns json of the form + // {"op":8,"d":{"v":4,"heartbeat_interval":13750.0}} + [ModelProperty("heartbeat_interval")] + public float HeartbeatInterval { get; set; } + + [ModelProperty("v")] + public int GatewayVersion { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Events/VoiceReadyEvent.cs b/src/Wumpus.Net.Audio/Events/VoiceReadyEvent.cs new file mode 100644 index 0000000..e73170c --- /dev/null +++ b/src/Wumpus.Net.Audio/Events/VoiceReadyEvent.cs @@ -0,0 +1,20 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Events +{ + public class VoiceReadyEvent + { + [ModelProperty("ssrc")] + public uint Ssrc { get; set; } + + [ModelProperty("port")] + public int Port { get; set; } + + [ModelProperty("modes")] + public Utf8String[] EncryptionSchemes { get; set; } + + [ModelProperty("ip")] + public Utf8String IpAddress { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Events/VoiceSessionDescriptionEvent.cs b/src/Wumpus.Net.Audio/Events/VoiceSessionDescriptionEvent.cs new file mode 100644 index 0000000..f055b72 --- /dev/null +++ b/src/Wumpus.Net.Audio/Events/VoiceSessionDescriptionEvent.cs @@ -0,0 +1,23 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Events +{ + public class VoiceSessionDescriptionEvent + { + [ModelProperty("video_codec")] + public Utf8String VideoCodec { get; set; } + + [ModelProperty("secret_key")] + public byte[] SecretKey { get; set; } + + [ModelProperty("mode")] + public Utf8String EncryptionScheme { get; set; } + + [ModelProperty("media_session_id")] + public Utf8String VideoSessionId { get; set; } + + [ModelProperty("audio_codec")] + public Utf8String AudioCodec { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/IPUtilities.cs b/src/Wumpus.Net.Audio/IPUtilities.cs new file mode 100644 index 0000000..c88b0aa --- /dev/null +++ b/src/Wumpus.Net.Audio/IPUtilities.cs @@ -0,0 +1,38 @@ +using System; +using System.Net; +using Voltaic.Serialization.Utf8; + +namespace Wumpus +{ + public static class IPUtilities + { + public static bool TryParseIPv4Address(ReadOnlySpan buffer, out IPAddress address) + => TryParseIPv4Address(ref buffer, out address); + + public static bool TryParseIPv4Address(ref ReadOnlySpan buffer, out IPAddress address) + { + address = default; + ulong value = 0; + + for (int i = 0; i < 4; i++) + { + if (!Utf8Reader.TryReadUInt8(ref buffer, out byte section, 'g')) + return false; + + value |= (ulong)section << (i * 8); + + // last value does not have a dot following it + if (i != 3) + { + if (buffer[0] != '.') + return false; + + buffer = buffer.Slice(1); + } + } + + address = new IPAddress((long)value); + return true; + } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs b/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs new file mode 100644 index 0000000..1d0fbe8 --- /dev/null +++ b/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs @@ -0,0 +1,20 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Requests +{ + public class VoiceIdentifyParams + { + [ModelProperty("user_id")] + public Snowflake UserId { get; set; } + + [ModelProperty("server_id")] + public Snowflake GuildId { get; set; } + + [ModelProperty("session_id")] + public Utf8String SessionId { get; set; } + + [ModelProperty("token")] + public Utf8String Token { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs b/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs new file mode 100644 index 0000000..4dc0760 --- /dev/null +++ b/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs @@ -0,0 +1,17 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Requests +{ + public class VoiceResumeParams + { + [ModelProperty("server_id")] + public Snowflake GuildId { get; set; } + + [ModelProperty("session_id")] + public Utf8String SessionId { get; set; } + + [ModelProperty("token")] + public Utf8String Token { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Requests/VoiceSelectProtocolParams.cs b/src/Wumpus.Net.Audio/Requests/VoiceSelectProtocolParams.cs new file mode 100644 index 0000000..1ffc047 --- /dev/null +++ b/src/Wumpus.Net.Audio/Requests/VoiceSelectProtocolParams.cs @@ -0,0 +1,26 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Requests +{ + public class VoiceSelectProtocolParams + { + [ModelProperty("protocol")] + public Utf8String TransportProtocol { get; set; } + + [ModelProperty("data")] + public VoiceSelectProtocolConnectionProperties Properties { get; set; } + } + + public class VoiceSelectProtocolConnectionProperties + { + [ModelProperty("ip")] + public Utf8String Ip { get; set; } + + [ModelProperty("port")] + public int Port { get; set; } + + [ModelProperty("mode")] + public Utf8String EncryptionScheme { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Requests/VoiceSpeakingParams.cs b/src/Wumpus.Net.Audio/Requests/VoiceSpeakingParams.cs new file mode 100644 index 0000000..0b6f96f --- /dev/null +++ b/src/Wumpus.Net.Audio/Requests/VoiceSpeakingParams.cs @@ -0,0 +1,16 @@ +using Voltaic.Serialization; + +namespace Wumpus.Requests +{ + public class VoiceSpeakingParams + { + [ModelProperty("speaking")] + public int Speaking { get; set; } + + [ModelProperty("delay")] + public uint DelayMilliseconds { get; set; } + + [ModelProperty("ssrc")] + public uint Ssrc { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs b/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs new file mode 100644 index 0000000..28d4ad9 --- /dev/null +++ b/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs @@ -0,0 +1,55 @@ +using System; +using System.Runtime.InteropServices; + +namespace Wumpus +{ + internal class SodiumPrimitives + { + // sodium.dll on windows, libsodium.so on linux + [DllImport("sodium")] + extern static int crypto_secretbox_macbytes(); + [DllImport("sodium")] + extern static int crypto_secretbox_keybytes(); + [DllImport("sodium")] + extern static int crypto_secretbox_noncebytes(); + + [DllImport("sodium")] + extern static int crypto_secretbox_easy(ref byte c, in byte m, int mlen, in byte n, in byte k); + [DllImport("sodium")] + extern static int crypto_secretbox_open_easy(ref byte m, in byte c, int clen, in byte n, in byte k); + + [DllImport("sodium")] + extern static void randombytes_buf(ref byte buf, int size); + + // use the functions to grab the info so it's not hardcoded in the lib + private static readonly int MACBYTES = crypto_secretbox_macbytes(); + private static readonly int KEYBYTES = crypto_secretbox_keybytes(); + private static readonly int NONCEBYTES = crypto_secretbox_noncebytes(); + + public static int NonceSize => NONCEBYTES; + + public static int ComputeMessageLength(int messageLength) + => messageLength + MACBYTES; + + public static bool TryEncryptInPlace(Span ciphertext, ReadOnlySpan message, ReadOnlySpan nonce, ReadOnlySpan secret) + { + if (ciphertext.Length < message.Length + MACBYTES) + return false; + if (secret.Length < KEYBYTES) + return false; + if (nonce.Length < NONCEBYTES) + return false; + + return crypto_secretbox_easy( + ref ciphertext.GetPinnableReference(), + message.GetPinnableReference(), message.Length, + nonce.GetPinnableReference(), + secret.GetPinnableReference()) == 0; + } + + public static void GenerateRandomBytes(Span buffer) + { + randombytes_buf(ref buffer.GetPinnableReference(), buffer.Length); + } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/SpeakingState.cs b/src/Wumpus.Net.Audio/SpeakingState.cs new file mode 100644 index 0000000..b2dfdbe --- /dev/null +++ b/src/Wumpus.Net.Audio/SpeakingState.cs @@ -0,0 +1,15 @@ +using System; + +namespace Wumpus +{ + [Flags] + public enum SpeakingState : byte + { + NotSpeaking = 0b0, + Speaking = 0b1, + + Priority = 0b100, + + PrioritySpeaking = Speaking | Priority + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/VoiceGatewayOperation.cs b/src/Wumpus.Net.Audio/VoiceGatewayOperation.cs new file mode 100644 index 0000000..2474171 --- /dev/null +++ b/src/Wumpus.Net.Audio/VoiceGatewayOperation.cs @@ -0,0 +1,41 @@ +namespace Wumpus.Events +{ + /// + /// Voice connections operate in a similar fashion to the Gateway connection. + /// However, they use a different set of payloads and a separate UDP-based connection for voice data transmission. + /// https://discordapp.com/developers/docs/topics/voice-connections#voice + /// + public enum VoiceGatewayOperation : byte + { + /// C→S - Used to begin a voice websocket connection. + Identify = 0, + /// C→S - Used to select the voice protocol. + SelectProtocol = 1, + /// C←S - Used to complete the websocket handshake. + Ready = 2, + /// C→S - Used to keep the websocket connection alive. + Heartbeat = 3, + /// C→S - Used to describe the session. + SessionDescription = 4, + /// C↔S - Used to indicate which users are speaking. + Speaking = 5, + /// C←S - Used to reply to a heartbeat. + HeartbeatAck = 6, + /// C→S - Used to resume a connection. + Resume = 7, + /// C→S - Used to begin the websocket handshake. + Hello = 8, + /// C←S - Used to complete the websocket handshake with an existing session. + Resumed = 9 + + //NOTE: these do not have official names! + //They are documented here for future expansion purposes + + //ssrc update, occurs when a user connects or changes screenshare settings + //SsrcUpdate = 12, + //user disconnected, occurs when a user disconnects + //UserDisconnected = 13, + //change channel, occurs whenever the client gets moved into another channel + //ChangeChannel = 14 + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs b/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs new file mode 100644 index 0000000..0645042 --- /dev/null +++ b/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using Voltaic.Serialization; +using Wumpus.Entities; +using Wumpus.Requests; + +namespace Wumpus.Events +{ + public class VoiceGatewayPayload + { + [ModelProperty("op")] + public VoiceGatewayOperation Operation { get; set; } + + [ModelProperty("d")] + [ModelTypeSelector(nameof(Operation), nameof(OpCodeTypeSelector))] + public object Data { get; set; } + + private static Dictionary OpCodeTypeSelector => new Dictionary() + { + [VoiceGatewayOperation.Hello] = typeof(VoiceHelloEvent), + [VoiceGatewayOperation.Ready] = typeof(VoiceReadyEvent), + [VoiceGatewayOperation.HeartbeatAck] = typeof(int), + + [VoiceGatewayOperation.Identify] = typeof(VoiceIdentifyParams), + [VoiceGatewayOperation.SelectProtocol] = typeof(VoiceSelectProtocolParams), + [VoiceGatewayOperation.SessionDescription] = typeof(VoiceSessionDescriptionEvent), + [VoiceGatewayOperation.Resume] = typeof(VoiceResumeParams), + [VoiceGatewayOperation.Heartbeat] = typeof(int), + + [VoiceGatewayOperation.Speaking] = typeof(VoiceSpeakingParams), + }; + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj b/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj new file mode 100644 index 0000000..9b132df --- /dev/null +++ b/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj @@ -0,0 +1,19 @@ + + + + Wumpus + Wumpus.Net.Audio + discord;discordapp;wumpus;audio;websocket;api;rogueexception + Provides a low-level audio client for the Discord API + netstandard2.0 + + + + all + + + + + + + diff --git a/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs new file mode 100644 index 0000000..04d1952 --- /dev/null +++ b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs @@ -0,0 +1,127 @@ +using System; +using System.Buffers; +using System.Buffers.Binary; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Voltaic; +using Voltaic.Serialization.Utf8; + +namespace Wumpus +{ + public class WumpusAudioDataClient : IDisposable + { + private readonly IPEndPoint _endpoint; + private readonly ArrayPool _pool; + private readonly Socket _socket; + + public WumpusAudioDataClient(IPEndPoint endpoint, ArrayPool pool = null) + { + _endpoint = endpoint; + _pool = pool; + _socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + + // Bind to any available port + _socket.Bind(new IPEndPoint(IPAddress.Any, 0)); + } + + public async Task SendAsync(uint ssrc, ushort sequence, uint samplePosition, ArraySegment audio, Memory secret, IPEndPoint endpoint = null) + { + // TODO: this only supports xsalsa20_poly1305_lite - should we support more? + + endpoint = endpoint ?? _endpoint; + + var memory = new ResizableMemory(10 * 1024, _pool); + WriteHeader(); + Encrypt(audio.AsSpan(), secret.Span); + + try + { + await _socket.SendToAsync(memory.AsSegment(), SocketFlags.None, endpoint).ConfigureAwait(false); + } + finally + { + memory.Return(); + } + + void WriteHeader() + { + var header = memory.RequestSpan(12); + + header[0] = 0x80; header[1] = 0x78; + BinaryPrimitives.WriteUInt16BigEndian(header.Slice(2), sequence); // 2 bytes + BinaryPrimitives.WriteUInt32BigEndian(header.Slice(4), samplePosition); // 4 bytes + BinaryPrimitives.WriteUInt32BigEndian(header.Slice(8), ssrc); // 4 bytes + + memory.Advance(12); + } + + void Encrypt(Span data, Span key) + { + var destinationSize = SodiumPrimitives.ComputeMessageLength(data.Length); + var destinationSpan = memory.RequestSpan(destinationSize); + + Span nonce = stackalloc byte[SodiumPrimitives.NonceSize]; + SodiumPrimitives.GenerateRandomBytes(nonce.Slice(0, 4)); + + SodiumPrimitives.TryEncryptInPlace(destinationSpan, data, nonce, key); + memory.Advance(destinationSize); + + nonce.Slice(0, 4).CopyTo(memory.RequestSpan(4)); + memory.Advance(4); + } + } + + public async Task DiscoverAsync(uint ssrc, IPEndPoint endpoint = null) + { + endpoint = endpoint ?? _endpoint; + + var memory = new ResizableMemory(70, _pool); + BinaryPrimitives.WriteUInt32BigEndian(memory.RequestSpan(70), ssrc); + memory.Advance(70); + + try + { + await _socket.SendToAsync(memory.AsSegment(), SocketFlags.None, endpoint).ConfigureAwait(false); + + var received = await _socket.ReceiveFromAsync(memory.AsSegment(), SocketFlags.None, endpoint).ConfigureAwait(false); + + if (received.ReceivedBytes != 70) + throw new Exception("Discovery response was not 70 bytes"); + + return ParseDiscovery(memory.AsReadOnlySpan()); + } + finally + { + memory.Return(); + } + + IPEndPoint ParseDiscovery(ReadOnlySpan discovery) + { + if (discovery.Length != 70) + return null; + + discovery = discovery.Slice(4); // skip ssrc, it's always 0 + + if (!IPUtilities.TryParseIPv4Address(ref discovery, out var address)) + return null; + + // trim zeros and parse port + discovery = discovery.Slice(discovery.Length - 2); + if (!BinaryPrimitives.TryReadUInt16LittleEndian(discovery, out var port)) + return null; + + return new IPEndPoint(address, port); + } + } + + public void Dispose() + { + _socket?.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs b/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs new file mode 100644 index 0000000..bfc4189 --- /dev/null +++ b/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs @@ -0,0 +1,452 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO.Compression; +using System.Linq; +using System.Net.WebSockets; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Voltaic; +using Voltaic.Serialization; +using Wumpus; +using Wumpus.Events; +using Wumpus.Requests; +using Wumpus.Serialization; + +namespace Wumpus +{ + public enum ConnectionState + { + Disconnected, + Connecting, + Connected, + Disconnecting + } + + public class WumpusAudioGatewayClient : IDisposable + { + public const int ApiVersion = 4; + public static string Version { get; } = + typeof(WumpusAudioGatewayClient).GetTypeInfo().Assembly.GetCustomAttribute()?.InformationalVersion ?? + typeof(WumpusAudioGatewayClient).GetTypeInfo().Assembly.GetName().Version.ToString(3) ?? + "Unknown"; + + private const int InitialBackoffMillis = 1000; // 1 second + private const int MaxBackoffMillis = 60000; // 1 min + private const double BackoffMultiplier = 1.75; // 1.75x + private const double BackoffJitter = 0.25; // 1.5 to 2.0x + private const int ConnectionTimeoutMillis = 30000; // 30 sec + private const int IdentifyTimeoutMillis = 60000; // 1 min + // Typical backoff: 1.75s, 3.06s, 5.36s, 9.38s, 16.41s, 28.72s, 50.27s, 60s, 60s... + + + // Status events + public event Action Connected; + public event Action Disconnected; + public event Action DeserializationError; + + // Raw events + public event Action ReceivedPayload; + public event Action SentPayload; + + // Voice gateway events + public event Action VoiceGatewayHello; + public event Action VoiceGatewayReady; + public event Action VoiceGatewayResumed; + public event Action VoiceSessionDescription; + public event Action VoiceSpeaking; + public event Action VoiceGatewayHeartbeatAck; + + private readonly SemaphoreSlim _stateLock; + + // Instance + private Task _connectionTask; + private CancellationTokenSource _runCts; + + // Run (Start/Stop) + private int _lastSeq; + private string _endpoint; + private Utf8String _session; + private Utf8String _token; + + // Connection (For each WebSocket connection) + private BlockingCollection _sendQueue; + private bool _receivedData; + + public ConnectionState State { get; private set; } + public WumpusJsonSerializer JsonSerializer { get; } + + public Snowflake UserId { get; } + public Snowflake GuildId { get; } + + public WumpusAudioGatewayClient(Snowflake userId, Snowflake guildId, WumpusJsonSerializer serializer = null) + { + UserId = userId; + GuildId = guildId; + JsonSerializer = serializer ?? new WumpusJsonSerializer(); + _stateLock = new SemaphoreSlim(1, 1); + _connectionTask = Task.CompletedTask; + _runCts = new CancellationTokenSource(); + _runCts.Cancel(); // Start canceled + } + + // TODO: Utf8String, string or custom type? + public void Run(string endpoint, string session, string token) + => RunAsync(endpoint, session, token).GetAwaiter().GetResult(); + public async Task RunAsync(string endpoint, string session, string token) + { + string SlicePort() + { + if (endpoint.EndsWith(":80")) + return endpoint.Substring(0, endpoint.Length - 3); + return endpoint; + } + + Task exceptionSignal; + await _stateLock.WaitAsync().ConfigureAwait(false); + try + { + await StopAsyncInternal().ConfigureAwait(false); + + _endpoint = SlicePort(); + _runCts = new CancellationTokenSource(); + _session = new Utf8String(session); + _token = new Utf8String(token); + _lastSeq = 0; + + _connectionTask = RunTaskAsync(_runCts.Token); + exceptionSignal = _connectionTask; + } + finally + { + _stateLock.Release(); + } + await exceptionSignal.ConfigureAwait(false); + } + private async Task RunTaskAsync(CancellationToken runCancelToken) + { + Task[] tasks = null; + bool isRecoverable = true; + int backoffMillis = InitialBackoffMillis; + int connectionAttempts = 0; + var jitter = new Random(); + + while (isRecoverable) + { + using (var connectionCts = new CancellationTokenSource()) + using (var cancelTokenCts = CancellationTokenSource.CreateLinkedTokenSource(runCancelToken, connectionCts.Token)) + using (var client = new ClientWebSocket()) + { + Exception disconnectEx = null; + var cancelToken = cancelTokenCts.Token; + try + { + cancelToken.ThrowIfCancellationRequested(); + var readySignal = new TaskCompletionSource(); + _receivedData = true; + + // Connect + State = ConnectionState.Connecting; + var uri = new Uri("wss://" + _endpoint + $"/?v={ApiVersion}"); + await client.ConnectAsync(uri, cancelToken).ConfigureAwait(false); + + // Receive HELLO (timeout = ConnectionTimeoutMillis) + var receiveTask = ReceiveAsync(client, readySignal, cancelToken); + await WhenAny(new Task[] { receiveTask }, ConnectionTimeoutMillis, + "Timed out waiting for HELLO").ConfigureAwait(false); + + var evnt = await receiveTask.ConfigureAwait(false); + if (!(evnt.Data is VoiceHelloEvent helloEvent)) + throw new Exception("First event was not a HELLO event"); + int heartbeatRate = (int)helloEvent.HeartbeatInterval; + + // Start tasks here since HELLO must be handled before another thread can send/receive + _sendQueue = new BlockingCollection(); + tasks = new[] + { + RunSendAsync(client, cancelToken), + RunHeartbeatAsync(heartbeatRate, cancelToken), + RunReceiveAsync(client, readySignal, cancelToken) + }; + + SendIdentify(connectionAttempts == 0); + + await WhenAny(tasks.Append(readySignal.Task), IdentifyTimeoutMillis, + "Timed out waiting for READY or InvalidSession").ConfigureAwait(false); + if (await readySignal.Task.ConfigureAwait(false) == false) + continue; // Invalid session + + // Success + backoffMillis = InitialBackoffMillis; + State = ConnectionState.Connected; + Connected?.Invoke(); + + // Wait until an exception occurs (due to cancellation or failure) + await WhenAny(tasks).ConfigureAwait(false); + } + catch (Exception ex) + { + disconnectEx = ex; + isRecoverable = IsRecoverable(ex); + if (!isRecoverable) + throw; + } + finally + { + var oldState = State; + State = ConnectionState.Disconnecting; + + connectionCts.Cancel(); + if (tasks != null) + { + try { await Task.WhenAll(tasks).ConfigureAwait(false); } + catch { } // We already captured the root exception + } + + if (client.State == WebSocketState.Open) + { + try { await client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None).ConfigureAwait(false); } + catch { } + } + + _sendQueue = null; + State = ConnectionState.Disconnected; + if (oldState == ConnectionState.Connected) + Disconnected?.Invoke(disconnectEx); + } + if (isRecoverable) + { + backoffMillis = (int)(backoffMillis * (BackoffMultiplier + (jitter.NextDouble() * BackoffJitter * 2.0 - BackoffJitter))); + if (backoffMillis > MaxBackoffMillis) + backoffMillis = MaxBackoffMillis; + connectionAttempts++; + await Task.Delay(backoffMillis).ConfigureAwait(false); + } + } + } + _runCts.Cancel(); + } + private Task RunReceiveAsync(ClientWebSocket client, TaskCompletionSource readySignal, CancellationToken cancelToken) + { + return Task.Run(async () => + { + while (true) + { + cancelToken.ThrowIfCancellationRequested(); + try + { + await ReceiveAsync(client, readySignal, cancelToken).ConfigureAwait(false); + } + catch (SerializationException ex) + { + DeserializationError?.Invoke(ex); + } + } + }); + } + private Task RunSendAsync(ClientWebSocket client, CancellationToken cancelToken) + { + return Task.Run(async () => + { + while (true) + { + cancelToken.ThrowIfCancellationRequested(); + var payload = _sendQueue.Take(cancelToken); + await SendAsync(client, cancelToken, payload).ConfigureAwait(false); + } + }); + } + private Task RunHeartbeatAsync(int rate, CancellationToken cancelToken) + { + return Task.Run(async () => + { + // extra delay at the beginning because we can only heartbeat after identifying + await Task.Delay(rate, cancelToken).ConfigureAwait(false); + while (true) + { + cancelToken.ThrowIfCancellationRequested(); + if (!_receivedData) + throw new TimeoutException("No data was received since the last heartbeat"); + _receivedData = false; + SendHeartbeat(); + await Task.Delay(rate, cancelToken).ConfigureAwait(false); + } + }); + } + + private async Task WhenAny(IEnumerable tasks) + { + var task = await Task.WhenAny(tasks).ConfigureAwait(false); + await task.ConfigureAwait(false); + } + private async Task WhenAny(IEnumerable tasks, int millis, string errorText) + { + var timeoutTask = Task.Delay(millis); + var task = await Task.WhenAny(tasks.Append(timeoutTask)).ConfigureAwait(false); + if (task == timeoutTask) + throw new TimeoutException(errorText); + await task.ConfigureAwait(false); + } + + private bool IsRecoverable(Exception ex) + { + switch (ex) + { + case WebSocketException wsEx: + if (wsEx.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely) + return true; + break; + case WebSocketClosedException wscEx: + if (wscEx.CloseStatus.HasValue) + { + switch (wscEx.CloseStatus.Value) + { + case WebSocketCloseStatus.Empty: + case WebSocketCloseStatus.NormalClosure: + case WebSocketCloseStatus.InternalServerError: + case WebSocketCloseStatus.ProtocolError: + return true; + } + } + else + { + switch (wscEx.Code) + { + case 4009: + case 4014: + case 4015: + return true; + } + } + break; + case TimeoutException _: + return true; + } + if (ex.InnerException != null) + return IsRecoverable(ex.InnerException); + return false; + } + + public void Stop() + => StopAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + public async Task StopAsync() + { + await _stateLock.WaitAsync().ConfigureAwait(false); + try + { + await StopAsyncInternal().ConfigureAwait(false); + } + finally + { + _stateLock.Release(); + } + } + private async Task StopAsyncInternal() + { + _runCts?.Cancel(); + + try { await _connectionTask.ConfigureAwait(false); } catch { } + _connectionTask = Task.CompletedTask; + + var state = State; + if (state != ConnectionState.Disconnected) + throw new InvalidOperationException($"Client did not successfully disconnect (State = {state})"); + } + + public void Dispose() + { + Stop(); + } + + private async Task ReceiveAsync(ClientWebSocket client, TaskCompletionSource readySignal, CancellationToken cancelToken) + { + ResizableMemory wireData = new ResizableMemory(10 * 1024); + WebSocketReceiveResult result; + do + { + var buffer = wireData.RequestSegment(10 * 1024); + result = await client.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false); + wireData.Advance(result.Count); + _receivedData = true; + + if (result.CloseStatus != null) + throw new WebSocketClosedException(result.CloseStatus.Value, result.CloseStatusDescription); + } + while (!result.EndOfMessage); + + var payload = JsonSerializer.Read(wireData.AsReadOnlySpan()); + + HandleEvent(payload, readySignal); + ReceivedPayload?.Invoke(payload, wireData.Length); + return payload; + } + private void HandleEvent(VoiceGatewayPayload evnt, TaskCompletionSource readySignal) + { + switch (evnt.Operation) + { + case VoiceGatewayOperation.Ready: + var readyEvent = evnt.Data as VoiceReadyEvent; + readySignal.TrySetResult(true); + VoiceGatewayReady?.Invoke(readyEvent); + break; + case VoiceGatewayOperation.Resumed: + readySignal.TrySetResult(true); + VoiceGatewayResumed?.Invoke(); + break; + case VoiceGatewayOperation.SessionDescription: VoiceSessionDescription?.Invoke(evnt.Data as VoiceSessionDescriptionEvent); break; + case VoiceGatewayOperation.Speaking: VoiceSpeaking?.Invoke(evnt.Data as VoiceSpeakingParams); break; + case VoiceGatewayOperation.HeartbeatAck: VoiceGatewayHeartbeatAck?.Invoke(); break; + case VoiceGatewayOperation.Hello: VoiceGatewayHello?.Invoke(evnt.Data as VoiceHelloEvent); break; + } + } + + public void Send(VoiceGatewayPayload payload) + { + if (!_runCts.IsCancellationRequested) + _sendQueue?.Add(payload); + } + private async Task SendAsync(ClientWebSocket client, CancellationToken cancelToken, VoiceGatewayPayload payload) + { + var writer = JsonSerializer.Write(payload); + await client.SendAsync(writer.AsSegment(), WebSocketMessageType.Text, true, cancelToken); + SentPayload?.Invoke(payload, writer.Length); + } + + private void SendIdentify(bool identify) + { + if (identify) // IDENTIFY + { + Send(new VoiceGatewayPayload + { + Operation = VoiceGatewayOperation.Identify, + Data = new VoiceIdentifyParams + { + UserId = UserId, + GuildId = GuildId, + SessionId = _session, + Token = _token + } + }); + } + else // RESUME + { + Send(new VoiceGatewayPayload + { + Operation = VoiceGatewayOperation.Resume, + Data = new VoiceResumeParams + { + GuildId = GuildId, + SessionId = _session, + Token = _token + } + }); + } + } + private void SendHeartbeat() => Send(new VoiceGatewayPayload + { + Operation = VoiceGatewayOperation.Heartbeat, + Data = _lastSeq + }); + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Core/AssemblyInfo.cs b/src/Wumpus.Net.Core/AssemblyInfo.cs index ecbc283..b29eb30 100644 --- a/src/Wumpus.Net.Core/AssemblyInfo.cs +++ b/src/Wumpus.Net.Core/AssemblyInfo.cs @@ -1,5 +1,6 @@ using System.Runtime.CompilerServices; [assembly: InternalsVisibleTo("Wumpus.Net.Gateway")] +[assembly: InternalsVisibleTo("Wumpus.Net.Audio")] [assembly: InternalsVisibleTo("Wumpus.Net.Rest")] [assembly: InternalsVisibleTo("Wumpus.Net.Rpc")] \ No newline at end of file