From 5f8866c7fda29cf04ebcff344aac7c818568b77d Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Mon, 25 Sep 2023 21:03:36 +0100 Subject: [PATCH 01/11] feat(gateway): start on gateway version 3 --- .../com/seailz/discordjar/DiscordJar.java | 6 +- .../seailz/discordjar/gateway/Gateway.java | 570 ++++++++++++++++++ .../discordjar/gateway/GatewayFactory.java | 128 ++-- .../gateway/events/DispatchedEvents.java | 7 +- .../discordjar/gateway/heartbeat/Heart.java | 3 +- .../gateway/heartbeat/HeartLogic.java | 9 +- .../com/seailz/discordjar/ws/WebSocket.java | 9 +- 7 files changed, 657 insertions(+), 75 deletions(-) create mode 100644 src/main/java/com/seailz/discordjar/gateway/Gateway.java diff --git a/src/main/java/com/seailz/discordjar/DiscordJar.java b/src/main/java/com/seailz/discordjar/DiscordJar.java index 2d90f625..255c6a4d 100644 --- a/src/main/java/com/seailz/discordjar/DiscordJar.java +++ b/src/main/java/com/seailz/discordjar/DiscordJar.java @@ -147,6 +147,7 @@ public class DiscordJar { public List gatewayFactories = new ArrayList<>(); private final List memberCachingDisabledGuilds = new ArrayList<>(); private final GatewayTransportCompressionType gatewayTransportCompressionType; + private final APIVersion apiVersion; /** * @deprecated Use {@link DiscordJarBuilder} instead. @@ -216,6 +217,7 @@ public DiscordJar(String token, EnumSet intents, APIVersion version, boo this.eventDispatcher = new EventDispatcher(this); this.token = token; this.intents = intents; + this.apiVersion = version; this.cacheTypes = cacheTypes; new URLS(release, version); logger = Logger.getLogger("DISCORD.JAR"); @@ -1570,5 +1572,7 @@ public Long getAverageGatewayPing() { return sum / getGatewayPingHistory().size(); } - + public APIVersion getApiVersion() { + return apiVersion; + } } diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java new file mode 100644 index 00000000..9dd9f8cd --- /dev/null +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -0,0 +1,570 @@ +package com.seailz.discordjar.gateway; + +import com.seailz.discordjar.DiscordJar; +import com.seailz.discordjar.action.guild.members.RequestGuildMembersAction; +import com.seailz.discordjar.events.model.Event; +import com.seailz.discordjar.events.model.interaction.command.CommandInteractionEvent; +import com.seailz.discordjar.gateway.events.DispatchedEvents; +import com.seailz.discordjar.gateway.events.GatewayEvents; +import com.seailz.discordjar.gateway.heartbeat.HeartLogic; +import com.seailz.discordjar.model.api.version.APIVersion; +import com.seailz.discordjar.model.application.Intent; +import com.seailz.discordjar.model.guild.Member; +import com.seailz.discordjar.utils.URLS; +import com.seailz.discordjar.utils.rest.DiscordRequest; +import com.seailz.discordjar.utils.rest.DiscordResponse; +import com.seailz.discordjar.voice.model.VoiceServerUpdate; +import com.seailz.discordjar.voice.model.VoiceState; +import com.seailz.discordjar.ws.ExponentialBackoffLogic; +import com.seailz.discordjar.ws.WebSocket; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.socket.CloseStatus; + +import java.lang.reflect.InvocationTargetException; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.logging.Logger; + +public class Gateway { + + private final DiscordJar bot; + private final Logger logger = Logger.getLogger("Gateway"); + private int shardCount; + private int shardId; + private GatewayTransportCompressionType compressionType; + private WebSocket socket; + private boolean resumedConnection = false; + private ReconnectInfo resumeInfo; + private long lastSequenceNumber = -1; + private boolean readyForMessages = false; + private HeartLogic heartbeatManager; + public static Date lastHeartbeatSent = new Date(); + public static List pingHistoryMs = new ArrayList<>(); + private final List> onVoiceStateUpdateListeners = new ArrayList<>(); + private final List> onVoiceServerUpdateListeners = new ArrayList<>(); + public final HashMap memberRequestChunks = new HashMap<>(); + + protected Gateway(DiscordJar bot, int shardCount, int shardId, GatewayTransportCompressionType compressionType) { + this.bot = bot; + this.shardCount = shardCount; + this.shardId = shardId; + this.compressionType = compressionType; + + connectionFlow(); + } + + /** + * Runs the necessary steps to initialize the Gateway connection. + */ + private void connectionFlow() { + String gatewayUrl; + try { + gatewayUrl = getGatewayUrl(); + if (bot.isDebug()) logger.info("[Gateway - Connection Flow] Gateway URL: " + gatewayUrl); + } catch (InterruptedException e) { + logger.warning("[Gateway - Connection Flow] Failed to get gateway URL, retrying..."); + connectionFlow(); + return; + } + + gatewayUrl = appendGatewayQueryParams(gatewayUrl); + if (bot.isDebug()) logger.info("[Gateway - Connection Flow] Gateway URL with query params: " + gatewayUrl); + + socket = new WebSocket(gatewayUrl, bot.isDebug()); + setupDisconnectedSocket(socket); + connectToSocket(socket, false); + + lastSequenceNumber = -1; + } + + public void disconnect(@NotNull CloseStatus closeStatus) { + socket.disconnect(closeStatus.getCode(), closeStatus.getReason()); + } + + public void disconnectFlow(@NotNull CloseStatus closeStatus) { + CloseCode closeCode = CloseCode.fromCode(closeStatus.getCode()); + readyForMessages = false; + boolean attemptReconnect = closeCode.shouldReconnect(); + if (!attemptReconnect) { + logger.severe("[Gateway] Connection closed, won't attempt reconnect - " + closeCode.getLog()); + return; + } + + if (closeCode.shouldResume()) resumeFlow(); + else connectionFlow(); + } + + private void resumeFlow() { + if (resumeInfo == null) { + logger.warning("[Gateway - Resume Flow] Resume info is null, cannot resume. Attempting normal connection."); + connectionFlow(); + return; + } + + String connectUrl = appendGatewayQueryParams(resumeInfo.url()); + if (bot.isDebug()) logger.info("[Gateway - Resume Flow] Resume URL: " + connectUrl); + socket = new WebSocket(connectUrl, bot.isDebug()); + setupDisconnectedSocket(socket); + connectToSocket(socket, true); + + resumedConnection = true; + } + + protected void handleTextMessage(String message) throws Exception { + JSONObject payload = new JSONObject(message); + + if (bot.isDebug()) { + logger.info("[Gateway - DEBUG] Received message: " + payload.toString()); + logger.info("[Gateway - DEBUG] Message size: " + payload.toString().getBytes(StandardCharsets.UTF_8).length + "b"); + } + + try { + lastSequenceNumber = payload.getInt("s"); + if (heartbeatManager != null) heartbeatManager.setLastSequence(lastSequenceNumber); + } catch (JSONException ignored) { + } + + GatewayEvents event = GatewayEvents.getEvent(payload.getInt("op")); + if (event == null) { + logger.warning("[discord.jar] Unknown event received: " + payload.getInt("op") + ". This is rare, please create an issue on GitHub with this log message. Payload: " + payload.toString()); + return; + } + + switch (event) { + case HELLO: + handleHello(payload); + if (!resumedConnection) sendIdentify(); + readyForMessages = true; + + if (bot.isDebug()) { + logger.info("[Gateway] Received HELLO event. Heartbeat cycle has been started. If this isn't a resume, IDENTIFY has been sent."); + } + break; + case HEARTBEAT_REQUEST: + heartbeatManager.forceHeartbeat(); + if (bot.isDebug()) { + logger.info("[Gateway] Received HEARTBEAT_REQUEST event. Heartbeat has been forced."); + } + break; + case DISPATCHED: + handleDispatch(payload); + if (bot.isDebug()) { + logger.info("[Gateway] Received DISPATCHED event. Event has been handled."); + } + break; + case RECONNECT: + logger.info("[Gateway] Gateway requested a reconnect, reconnecting..."); + disconnect(CloseStatus.SESSION_NOT_RELIABLE); + break; + case INVALID_SESSION: + logger.info("[Gateway] Gateway requested a reconnect (invalid session), reconnecting..."); + disconnect(CloseStatus.SESSION_NOT_RELIABLE); + break; + case HEARTBEAT_ACK: + // Heartbeat was acknowledged, can ignore, but we'll log the request ping anyway. + if (lastHeartbeatSent != null) { + long ping = System.currentTimeMillis() - lastHeartbeatSent.getTime(); + if (bot.isDebug()) { + logger.info("[Gateway] Received HEARTBEAT_ACK event. Ping: " + ping + "ms"); + } + + pingHistoryMs.add(ping); + } + break; + } + } + + private void handleDispatch(JSONObject payload) { + // Handle dispatched events + // actually dispatch the event + Class eventClass = DispatchedEvents.getEventByName(payload.getString("t")).getEvent().apply(payload, this, bot); + if (eventClass == null) { + if (bot.isDebug()) logger.info("[discord.jar] Unhandled event: " + payload.getString("t") + "\nThis is usually ok, if a new feature has recently been added to Discord as discord.jar may not support it yet.\nIf that is not the case, please report this to the discord.jar developers."); + return; + } + if (bot.isDebug()) { + logger.info("[Gateway] Event class: " + eventClass.getName()); + } + if (eventClass.equals(CommandInteractionEvent.class)) return; + + new Thread(() -> { + Event event; + try { + event = eventClass.getConstructor(DiscordJar.class, long.class, JSONObject.class) + .newInstance(bot, lastSequenceNumber, payload); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException e) { + logger.warning("[Gateway] Failed to dispatch " + eventClass.getName() + " event. This is usually a bug, please report it on discord.jar's GitHub with this log message."); + e.printStackTrace(); + return; + } catch (InvocationTargetException e) { + logger.warning("[Gateway] Failed to dispatch " + eventClass.getName() + " event. This is usually a bug, please report it on discord.jar's GitHub with this log message."); + // If it's a runtime exception, we want to catch it and print the stack trace. + e.getCause().printStackTrace(); + return; + } + + bot.getEventDispatcher().dispatchEvent(event, eventClass, bot); + + if (bot.isDebug()) { + logger.info("[Gateway] Event dispatched: " + eventClass.getName()); + } + }, "djar--event-dispatch-gw").start(); + + if (Objects.requireNonNull(DispatchedEvents.getEventByName(payload.getString("t"))) == DispatchedEvents.READY) { + + resumeInfo = new ReconnectInfo( + payload.getJSONObject("d").getString("session_id"), + bot.getToken(), + payload.getJSONObject("d").getString("resume_gateway_url") + ); + readyForMessages = true; + + if (bot.getStatus() != null) { + JSONObject json = new JSONObject(); + json.put("d", bot.getStatus().compile()); + json.put("op", OpCodes.PRESENCE_UPDATE); + queueMessage(json); + } + } + } + + private void handleHello(JSONObject payload) { + heartbeatManager = new HeartLogic(socket, payload.getJSONObject("d").getInt("heartbeat_interval")); + heartbeatManager.start(); + } + + private void sendIdentify() { + AtomicInteger intents = new AtomicInteger(); + if (bot.getIntents().contains(Intent.ALL)) { + intents.set(3243773); + bot.getIntents().forEach(intent -> { + if (intent.isPrivileged()) { + intents.getAndAdd(intent.getLeftShiftId()); + } + }); + } else { + bot.getIntents().forEach(intent -> intents.getAndAdd(intent.getLeftShiftId())); + } + + JSONObject payload = new JSONObject(); + payload.put("op", 2); + JSONObject data = new JSONObject(); + data.put("token", bot.getToken()); + if (shardCount != -1 && shardId != -1) { + data.put("shard", new JSONArray().put(shardId).put(shardCount)); + } + String os = System.getProperty("os.name").toLowerCase(); + data.put("properties", new JSONObject().put("os", os).put("browser", "discord.jar").put("device", "discord.jar")); + data.put("intents", intents.get()); + payload.put("d", data); + socket.send(payload.toString()); + } + + @Contract("_, _ -> param1") + private @NotNull WebSocket connectToSocket(@NotNull WebSocket socket, boolean resuming) { + socket.connect() + .onFailed((e) -> { + logger.warning("[Gateway - Connection Flow] Failed to connect to gateway, retrying..."); + connectionFlow(); + }) + .onSuccess((v) -> { + logger.info("[Gateway] Connection established successfully. ⚡"); + + if (resuming) { + JSONObject resumeObject = new JSONObject(); + resumeObject.put("op", OpCodes.RESUME); + resumeObject.put("d", new JSONObject() + .put("token", bot.getToken()) + .put("session_id", resumeInfo.sessionId()) + .put("seq", lastSequenceNumber) + ); + + queueMessage(resumeObject); + } + }); + return socket; + } + @Contract("_ -> param1") + private @NotNull WebSocket setupDisconnectedSocket(@NotNull WebSocket socket) { + ExponentialBackoffLogic backoffReconnectLogic = new ExponentialBackoffLogic(); + socket.setReEstablishConnection(backoffReconnectLogic.getFunction()); + backoffReconnectLogic.setAttemptReconnect((c) -> { + new Thread(bot::clearMemberCaches, "djar--clearing-member-caches").start(); + return false; + }); + + socket.addOnDisconnectConsumer((cs) -> { + if (bot.isDebug()) logger.info("[Gateway] Disconnected from gateway. Reason: " + cs.getCode() + ":" + cs.getReason()); + disconnectFlow(cs); + }); + + socket.addMessageConsumer((m) -> { + try { + this.handleTextMessage(m); + } catch (Exception e) { + logger.warning("[Gateway] Failed to handle text message. This is usually a bug, please report it on discord.jar's GitHub with this log message."); + e.printStackTrace(); + } + }); + return socket; + } + + private String appendGatewayQueryParams(String url) { + url = url + "?v=" + bot.getApiVersion().getCode() + "&encoding=json"; + if (compressionType != GatewayTransportCompressionType.NONE) url = url + "&compress=" + compressionType.getValue(); + return url; + } + + public void queueMessage(JSONObject payload) { + if (bot.isDebug()) logger.info("[Gateway] Queued message: " + payload); + while (readyForMessages) { + socket.send(payload.toString()); + if (bot.isDebug()) { + logger.info("[Gateway] Sent message: " + payload); + } + break; + } + } + + public void requestGuildMembers(RequestGuildMembersAction action, CompletableFuture> future) { + if (action.getQuery() == null & action.getUserIds() == null) { + throw new IllegalArgumentException("You must provide either a query or a list of user ids"); + } + + if (bot.isDebug()) { + logger.info("[Gateway] Requesting guild members..."); + } + + JSONObject payload = new JSONObject(); + JSONObject dPayload = new JSONObject(); + dPayload.put("guild_id", action.getGuildId()); + if (action.getQuery() != null) { + dPayload.put("query", action.getQuery()); + } + + if (action.getUserIds() != null && !action.getUserIds().isEmpty()) { + dPayload.put("user_ids", action.getUserIds()); + } + + dPayload.put("limit", action.getLimit()); + if (action.isPresences()) dPayload.put("presences", true); + dPayload.put("nonce", action.getNonce()); + payload.put("op", 8); + payload.put("d", dPayload); + + queueMessage(payload); + + memberRequestChunks.put(action.getNonce(), new GatewayFactory.MemberChunkStorageWrapper(new ArrayList<>(), future)); + } + + public void sendVoicePayload(String guildId, String channelId, boolean selfMute, boolean selfDeaf) { + JSONObject payload = new JSONObject(); + JSONObject dPayload = new JSONObject(); + dPayload.put("guild_id", guildId); + dPayload.put("channel_id", channelId); + dPayload.put("self_mute", selfMute); + dPayload.put("self_deaf", selfDeaf); + payload.put("op", 4); + payload.put("d", dPayload); + queueMessage(payload); + } + + public void onVoiceStateUpdate(Consumer consumer) { + onVoiceStateUpdateListeners.add(consumer); + } + + public void onVoiceServerUpdate(Consumer consumer) { + onVoiceServerUpdateListeners.add(consumer); + } + + public List> getOnVoiceStateUpdateListeners() { + return onVoiceStateUpdateListeners; + } + + public List> getOnVoiceServerUpdateListeners() { + return onVoiceServerUpdateListeners; + } + + public record MemberChunkStorageWrapper(List members, CompletableFuture> future) { + public void addMember(Member member) { + members.add(member); + } + + public void complete() { + future.complete(members); + } + } + + /** + * Retrieves the Gateway URL from Discord. + * @return The Gateway URL. + * @throws InterruptedException If there is a failure to sleep after a failed request. + */ + private String getGatewayUrl() throws InterruptedException { + String gatewayUrl; + try { + DiscordResponse response = null; + try { + response = new DiscordRequest( + new JSONObject(), + new HashMap<>(), + "/gateway", + bot, + "/gateway", RequestMethod.GET + ).invoke(); + } catch (DiscordRequest.UnhandledDiscordAPIErrorException ignored) {} + if (response == null || response.body() == null || !response.body().has("url")) { + // In case the request fails, we can attempt to use the backup gateway URL instead. + gatewayUrl = URLS.GATEWAY.BASE_URL; + } else gatewayUrl = response.body().getString("url") + "/"; + } catch (Exception e) { + logger.warning("[Gateway - Connection Flow] Failed to get gateway URL, waiting 5 seconds and retrying..."); + Thread.sleep(5000); + return getGatewayUrl(); + } + return gatewayUrl; + } + + private enum OpCodes { + DISPATCH(0), + HEARTBEAT(1), + IDENTIFY(2), + PRESENCE_UPDATE(3), + VOICE_STATE_UPDATE(4), + RESUME(6), + RECONNECT(7), + REQUEST_GUILD_MEMBERS(8), + INVALID_SESSION(9), + HELLO(10), + HEARTBEAT_ACK(11), + UNKNOWN(-1) + ; + private int opCode; + + OpCodes(int opCode) { + this.opCode = opCode; + } + + public int getOpCode() { + return opCode; + } + + public static OpCodes fromOpCode(int opCode) { + for (OpCodes value : values()) { + if (value.getOpCode() == opCode) { + return value; + } + } + return UNKNOWN; + } + } + private enum CloseCode { + + UNKNOWN_ERROR(4000, true, true, null), + UNKNOWN_OPCODE(4001, true, true, null), + DECODE_ERROR(4002, true, true, null), + NOT_AUTHENTICATED(4003, true, false, null), + AUTHENTICATION_FAILED(4004, false, false, "Authentication failed - check your bot token."), + ALREADY_AUTHENTICATED(4005, true, false, null), + INVALID_SEQUENCE(4007, true, true, null), + RATE_LIMITED(4008, true, true, null), + SESSION_TIMEOUT(4009, true, false, null), + INVALID_SHARD(4010, false, false, "Invalid shard - check your shard count and shard ID."), + SHARDING_REQUIRED(4011, false, false, "Sharding required - if your bot is in over 2.5k guilds, you must enable sharding."), + INVALID_API_VERSION(4012, false, false, "Invalid API version - check your bot version."), + INVALID_INTENTS(4013, false, false, "Invalid intents - check your intents."), + DISALLOWED_INTENTS(4014, false, false, "Disallowed intents - you tried to specify an intent that you have not enabled or are not whitelisted for."), + UNKNOWN(-1, true, false, null), + ; + private int code; + private boolean reconnect; + private boolean resume; + private String log; + + CloseCode(int code, boolean reconnect, boolean resume, String log) { + this.code = code; + this.reconnect = reconnect; + this.resume = resume; + this.log = log; + } + + public int getCode() { + return code; + } + + public boolean shouldReconnect() { + return reconnect; + } + + public boolean shouldResume() { + return resume; + } + + public String getLog() { + return log; + } + + public static CloseCode fromCode(int code) { + for (CloseCode value : values()) { + if (value.getCode() == code) { + return value; + } + } + return UNKNOWN; + } + } + private record ReconnectInfo(String sessionId, String token, String url) {} + + public static Builder builder(DiscordJar bot) { + return new GatewayBuilder(bot); + } + + public interface Builder { + Gateway build(); + Gateway.Builder setShardCount(int shardCount); + Gateway.Builder setShardId(int shardId); + Gateway.Builder setTransportCompressionType(GatewayTransportCompressionType compressionType); + } + private static class GatewayBuilder implements Builder { + private final DiscordJar bot; + private int shardCount = 1; + private int shardId = -1; + private GatewayTransportCompressionType compressionType = GatewayTransportCompressionType.ZLIB_STREAM; + + public GatewayBuilder(DiscordJar bot) { + this.bot = bot; + } + + @Override + public Gateway build() { + return new Gateway(bot, shardCount, shardId, compressionType); + } + + @Override + public Gateway.Builder setShardCount(int shardCount) { + this.shardCount = shardCount; + return this; + } + + @Override + public Gateway.Builder setShardId(int shardId) { + this.shardId = shardId; + return this; + } + + @Override + public Gateway.Builder setTransportCompressionType(GatewayTransportCompressionType compressionType) { + this.compressionType = compressionType; + return this; + } + } + +} diff --git a/src/main/java/com/seailz/discordjar/gateway/GatewayFactory.java b/src/main/java/com/seailz/discordjar/gateway/GatewayFactory.java index 1a506540..6af85a9d 100644 --- a/src/main/java/com/seailz/discordjar/gateway/GatewayFactory.java +++ b/src/main/java/com/seailz/discordjar/gateway/GatewayFactory.java @@ -78,69 +78,69 @@ public GatewayFactory(DiscordJar discordJar, boolean debug, int shardId, int num this.numShards = numShards; this.transportCompressionType = compressionType; - discordJar.setGatewayFactory(this); - - if (resumeUrl == null) { - try { - DiscordResponse response = null; - try { - response = new DiscordRequest( - new JSONObject(), - new HashMap<>(), - "/gateway", - discordJar, - "/gateway", RequestMethod.GET - ).invoke(); - } catch (DiscordRequest.UnhandledDiscordAPIErrorException ignored) {} - if (response == null || response.body() == null || !response.body().has("url")) { - // In case the request fails, we can attempt to use the backup gateway URL instead. - this.gatewayUrl = URLS.GATEWAY.BASE_URL; - } else this.gatewayUrl = response.body().getString("url") + "/"; - } catch (Exception e) { - logger.warning("[DISCORD.JAR] Failed to get gateway URL. Restarting gateway after 5 seconds."); - Thread.sleep(5000); - discordJar.restartGateway(); - return; - } - } else gatewayUrl = resumeUrl; - - socket = new WebSocket(appendGatewayQueryParams(gatewayUrl), debug); - - ExponentialBackoffLogic backoffReconnectLogic = new ExponentialBackoffLogic(); - socket.setReEstablishConnection(backoffReconnectLogic.getFunction()); - backoffReconnectLogic.setAttemptReconnect((c) -> { - new Thread(discordJar::clearMemberCaches, "djar--clearing-member-caches").start(); - return !shouldResume; - }); - - socket.addMessageConsumer((tm) -> { - try { - handleTextMessage(tm); - } catch (Exception e) { - logger.warning("[Gateway] Failed to handle text message: " + e.getMessage()); - } - }); - - socket.connect().onSuccess(this::onConnect).onFailed((e) -> { - logger.severe("[Gateway] Failed to establish connection: " + e.getThrowable().getMessage() + " - Will attempt to reconnect after 5 seconds."); - try { - Thread.sleep(5000); - } catch (InterruptedException interruptedException) { - interruptedException.printStackTrace(); - } - discordJar.restartGateway(); - }); - - socket.addOnConnect(() -> { - onConnect(null); - }); - - socket.addOnDisconnectConsumer((cs) -> { - this.heartbeatManager = null; - readyForMessages = false; - attemptReconnect(cs); - discordJar.clearMemberCaches(); - }); +// discordJar.setGatewayFactory(this); +// +// if (resumeUrl == null) { +// try { +// DiscordResponse response = null; +// try { +// response = new DiscordRequest( +// new JSONObject(), +// new HashMap<>(), +// "/gateway", +// discordJar, +// "/gateway", RequestMethod.GET +// ).invoke(); +// } catch (DiscordRequest.UnhandledDiscordAPIErrorException ignored) {} +// if (response == null || response.body() == null || !response.body().has("url")) { +// // In case the request fails, we can attempt to use the backup gateway URL instead. +// this.gatewayUrl = URLS.GATEWAY.BASE_URL; +// } else this.gatewayUrl = response.body().getString("url") + "/"; +// } catch (Exception e) { +// logger.warning("[DISCORD.JAR] Failed to get gateway URL. Restarting gateway after 5 seconds."); +// Thread.sleep(5000); +// discordJar.restartGateway(); +// return; +// } +// } else gatewayUrl = resumeUrl; +// +// socket = new WebSocket(appendGatewayQueryParams(gatewayUrl), debug); +// +// ExponentialBackoffLogic backoffReconnectLogic = new ExponentialBackoffLogic(); +// socket.setReEstablishConnection(backoffReconnectLogic.getFunction()); +// backoffReconnectLogic.setAttemptReconnect((c) -> { +// new Thread(discordJar::clearMemberCaches, "djar--clearing-member-caches").start(); +// return !shouldResume; +// }); +// +// socket.addMessageConsumer((tm) -> { +// try { +// handleTextMessage(tm); +// } catch (Exception e) { +// logger.warning("[Gateway] Failed to handle text message: " + e.getMessage()); +// } +// }); +// +// socket.connect().onSuccess(this::onConnect).onFailed((e) -> { +// logger.severe("[Gateway] Failed to establish connection: " + e.getThrowable().getMessage() + " - Will attempt to reconnect after 5 seconds."); +// try { +// Thread.sleep(5000); +// } catch (InterruptedException interruptedException) { +// interruptedException.printStackTrace(); +// } +// discordJar.restartGateway(); +// }); +// +// socket.addOnConnect(() -> { +// onConnect(null); +// }); +// +// socket.addOnDisconnectConsumer((cs) -> { +// this.heartbeatManager = null; +// readyForMessages = false; +// attemptReconnect(cs); +// discordJar.clearMemberCaches(); +// }); } /** @@ -342,7 +342,7 @@ private boolean ifNotSelf() { private void handleDispatched(JSONObject payload) { // Handle dispatched events // actually dispatch the event - Class eventClass = DispatchedEvents.getEventByName(payload.getString("t")).getEvent().apply(payload, this, discordJar); + Class eventClass = DispatchedEvents.getEventByName(payload.getString("t")).getEvent().apply(payload, null, discordJar); if (eventClass == null) { if (debug) logger.info("[discord.jar] Unhandled event: " + payload.getString("t") + "\nThis is usually ok, if a new feature has recently been added to Discord as discord.jar may not support it yet.\nIf that is not the case, please report this to the discord.jar developers."); return; diff --git a/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java b/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java index bde040d3..dbbc7004 100644 --- a/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java +++ b/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java @@ -31,6 +31,7 @@ import com.seailz.discordjar.events.model.interaction.select.entity.UserSelectMenuInteractionEvent; import com.seailz.discordjar.events.model.message.MessageCreateEvent; import com.seailz.discordjar.events.model.message.TypingStartEvent; +import com.seailz.discordjar.gateway.Gateway; import com.seailz.discordjar.gateway.GatewayFactory; import com.seailz.discordjar.command.CommandType; import com.seailz.discordjar.model.channel.Channel; @@ -338,13 +339,13 @@ public enum DispatchedEvents { UNKNOWN((p, g, d) -> null), ; - private final TriFunction> event; + private final TriFunction> event; - DispatchedEvents(TriFunction> event) { + DispatchedEvents(TriFunction> event) { this.event = event; } - public TriFunction> getEvent() { + public TriFunction> getEvent() { return event; } diff --git a/src/main/java/com/seailz/discordjar/gateway/heartbeat/Heart.java b/src/main/java/com/seailz/discordjar/gateway/heartbeat/Heart.java index 1923784c..64ae5c97 100644 --- a/src/main/java/com/seailz/discordjar/gateway/heartbeat/Heart.java +++ b/src/main/java/com/seailz/discordjar/gateway/heartbeat/Heart.java @@ -1,5 +1,6 @@ package com.seailz.discordjar.gateway.heartbeat; +import com.seailz.discordjar.gateway.Gateway; import com.seailz.discordjar.gateway.GatewayFactory; import org.json.JSONObject; @@ -73,7 +74,7 @@ private void send() { .put("d", GatewayFactory.sequence) ); - GatewayFactory.lastHeartbeatSent = new Date(); + Gateway.lastHeartbeatSent = new Date(); } public void deactivate() { diff --git a/src/main/java/com/seailz/discordjar/gateway/heartbeat/HeartLogic.java b/src/main/java/com/seailz/discordjar/gateway/heartbeat/HeartLogic.java index 748cc884..ec7944f1 100644 --- a/src/main/java/com/seailz/discordjar/gateway/heartbeat/HeartLogic.java +++ b/src/main/java/com/seailz/discordjar/gateway/heartbeat/HeartLogic.java @@ -1,11 +1,14 @@ package com.seailz.discordjar.gateway.heartbeat; -import com.seailz.discordjar.gateway.GatewayFactory; +import com.seailz.discordjar.gateway.Gateway; import com.seailz.discordjar.ws.WSPayloads; import com.seailz.discordjar.ws.WebSocket; import org.json.JSONObject; -import java.util.*; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; /** * Every animal has a heart, and as we all know that websockets are animals, they also have hearts. @@ -65,7 +68,7 @@ public void start() { socket.send( WSPayloads.HEARBEAT.fill(lastSequence == -1 ? JSONObject.NULL : lastSequence).toString() ); - GatewayFactory.lastHeartbeatSent = new Date(); + Gateway.lastHeartbeatSent = new Date(); Thread.sleep(interval); } catch (InterruptedException e) { e.printStackTrace(); diff --git a/src/main/java/com/seailz/discordjar/ws/WebSocket.java b/src/main/java/com/seailz/discordjar/ws/WebSocket.java index 818bf3d9..efa106f8 100644 --- a/src/main/java/com/seailz/discordjar/ws/WebSocket.java +++ b/src/main/java/com/seailz/discordjar/ws/WebSocket.java @@ -68,6 +68,10 @@ public void disconnect() { ws.close(1000, "Disconnecting"); } + public void disconnect(int code, String reason) { + ws.close(code, reason); + } + public void send(String message) { ws.send(message); } @@ -144,9 +148,8 @@ public void onClosed(@NotNull okhttp3.WebSocket webSocket, int code, @NotNull St // Force session disconnect in case it failed to disconnect open = false; buffer = null; - new Thread(() -> { - onDisconnectConsumers.forEach(consumer -> consumer.accept(new CloseStatus(code, reason))); - }, "djar--ws-disconnect-consumers").start(); + inflator.reset(); + onDisconnectConsumers.forEach(consumer -> consumer.accept(new CloseStatus(code, reason))); if (reEstablishConnection.apply(new CloseStatus(code, reason))) { try { From c5d150f10f024899b67fcee7aa7c17f3bfa4cccd Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Mon, 25 Sep 2023 21:42:51 +0100 Subject: [PATCH 02/11] feat(gateway): fully integrated gateway v3 into the rest of the codebase --- .../com/seailz/discordjar/DiscordJar.java | 59 +- .../seailz/discordjar/gateway/Gateway.java | 17 +- .../discordjar/gateway/GatewayFactory.java | 556 ------------------ .../gateway/events/DispatchedEvents.java | 9 +- .../gateway/events/GatewayEvents.java | 2 +- .../discordjar/gateway/heartbeat/Heart.java | 88 --- .../discordjar/http/HttpOnlyManager.java | 8 +- .../channel/internal/AudioChannelImpl.java | 5 +- 8 files changed, 36 insertions(+), 708 deletions(-) delete mode 100644 src/main/java/com/seailz/discordjar/gateway/GatewayFactory.java delete mode 100644 src/main/java/com/seailz/discordjar/gateway/heartbeat/Heart.java diff --git a/src/main/java/com/seailz/discordjar/DiscordJar.java b/src/main/java/com/seailz/discordjar/DiscordJar.java index 255c6a4d..6ad7fb2b 100644 --- a/src/main/java/com/seailz/discordjar/DiscordJar.java +++ b/src/main/java/com/seailz/discordjar/DiscordJar.java @@ -19,7 +19,7 @@ import com.seailz.discordjar.command.listeners.slash.SubCommandListener; import com.seailz.discordjar.events.DiscordListener; import com.seailz.discordjar.events.EventDispatcher; -import com.seailz.discordjar.gateway.GatewayFactory; +import com.seailz.discordjar.gateway.Gateway; import com.seailz.discordjar.gateway.GatewayTransportCompressionType; import com.seailz.discordjar.http.HttpOnlyApplication; import com.seailz.discordjar.model.api.APIRelease; @@ -49,6 +49,7 @@ import org.json.JSONArray; import org.json.JSONObject; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.socket.CloseStatus; import java.io.IOException; import java.lang.annotation.Annotation; @@ -82,7 +83,7 @@ public class DiscordJar { /** * Used to manage the gateway connection */ - private GatewayFactory gatewayFactory; + private Gateway gatewayFactory; /** * Stores the logger */ @@ -144,7 +145,6 @@ public class DiscordJar { private Status status; public int gatewayConnections = 0; - public List gatewayFactories = new ArrayList<>(); private final List memberCachingDisabledGuilds = new ArrayList<>(); private final GatewayTransportCompressionType gatewayTransportCompressionType; private final APIVersion apiVersion; @@ -298,7 +298,11 @@ public DiscordJar(String token, EnumSet intents, APIVersion version, boo this.numShards = numShards; if (!httpOnly) { - this.gatewayFactory = new GatewayFactory(this, debug, shardId, numShards, gwCompressionType); + this.gatewayFactory = Gateway.builder(this) + .setShardCount(numShards) + .setShardId(shardId) + .setTransportCompressionType(gwCompressionType) + .build(); } } @@ -343,43 +347,10 @@ protected void initiateNoShutdown() { }, "djar-shutdown-prevention").start(); } - public GatewayFactory getGateway() { + public Gateway getGateway() { return gatewayFactory; } - /** - * Kills the gateway connection and destroys the {@link GatewayFactory} instance. - * This method will also initiate garbage collection to avoid memory leaks. This probably shouldn't be used unless in {@link #restartGateway()}. - */ - public void killGateway() { - try { - if (gatewayFactory != null) gatewayFactory.killConnection(); - } catch (IOException ignored) {} - gatewayFactory = null; - // init garbage collection to avoid memory leaks - System.gc(); - } - - /** - * Restarts the gateway connection and creates a new {@link GatewayFactory} instance. - * This will invalidate and destroy the old {@link GatewayFactory} instance. - * This method will also initiate garbage collection to avoid memory leaks. - * - * @see GatewayFactory - * @see #killGateway() - */ - public void restartGateway() { - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException ignored) {} - killGateway(); - try { - gatewayFactory = new GatewayFactory(this, debug, shardId, numShards, gatewayTransportCompressionType); - gatewayFactories.add(gatewayFactory); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } - } protected void initiateShutdownHooks() { Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -389,11 +360,7 @@ protected void initiateShutdownHooks() { throw new RuntimeException(e); } if (gatewayFactory != null) { - try { - gatewayFactory.killConnectionNicely(); - } catch (IOException e) { - throw new RuntimeException(e); - } + gatewayFactory.disconnect(CloseStatus.GOING_AWAY); } }, "djar--shutdown-hook")); } @@ -418,10 +385,6 @@ public void updateVoiceState(VoiceState state) { } } - public void setGatewayFactory(GatewayFactory gatewayFactory) { - this.gatewayFactory = gatewayFactory; - } - public List getBuckets() { return buckets; } @@ -1556,7 +1519,7 @@ public EnumSet getCacheTypes() { *
The time is in milliseconds. */ public List getGatewayPingHistory() { - return GatewayFactory.pingHistoryMs; + return Gateway.pingHistoryMs; } /** diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index 9dd9f8cd..e3315158 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -10,6 +10,7 @@ import com.seailz.discordjar.model.api.version.APIVersion; import com.seailz.discordjar.model.application.Intent; import com.seailz.discordjar.model.guild.Member; +import com.seailz.discordjar.model.status.Status; import com.seailz.discordjar.utils.URLS; import com.seailz.discordjar.utils.rest.DiscordRequest; import com.seailz.discordjar.utils.rest.DiscordResponse; @@ -43,14 +44,15 @@ public class Gateway { private WebSocket socket; private boolean resumedConnection = false; private ReconnectInfo resumeInfo; - private long lastSequenceNumber = -1; + public static long lastSequenceNumber = -1; private boolean readyForMessages = false; private HeartLogic heartbeatManager; public static Date lastHeartbeatSent = new Date(); public static List pingHistoryMs = new ArrayList<>(); private final List> onVoiceStateUpdateListeners = new ArrayList<>(); private final List> onVoiceServerUpdateListeners = new ArrayList<>(); - public final HashMap memberRequestChunks = new HashMap<>(); + public final HashMap memberRequestChunks = new HashMap<>(); + private Status status = null; protected Gateway(DiscordJar bot, int shardCount, int shardId, GatewayTransportCompressionType compressionType) { this.bot = bot; @@ -362,7 +364,7 @@ public void requestGuildMembers(RequestGuildMembersAction action, CompletableFut queueMessage(payload); - memberRequestChunks.put(action.getNonce(), new GatewayFactory.MemberChunkStorageWrapper(new ArrayList<>(), future)); + memberRequestChunks.put(action.getNonce(), new Gateway.MemberChunkStorageWrapper(new ArrayList<>(), future)); } public void sendVoicePayload(String guildId, String channelId, boolean selfMute, boolean selfDeaf) { @@ -393,6 +395,15 @@ public List> getOnVoiceServerUpdateListeners() { return onVoiceServerUpdateListeners; } + /** + * Do not use this method - it is for internal use only. + * @param status The status to set. + */ + @Deprecated + public void setStatus(Status status) { + this.status = status; + } + public record MemberChunkStorageWrapper(List members, CompletableFuture> future) { public void addMember(Member member) { members.add(member); diff --git a/src/main/java/com/seailz/discordjar/gateway/GatewayFactory.java b/src/main/java/com/seailz/discordjar/gateway/GatewayFactory.java deleted file mode 100644 index 6af85a9d..00000000 --- a/src/main/java/com/seailz/discordjar/gateway/GatewayFactory.java +++ /dev/null @@ -1,556 +0,0 @@ -package com.seailz.discordjar.gateway; - -import com.seailz.discordjar.DiscordJar; -import com.seailz.discordjar.action.guild.members.RequestGuildMembersAction; -import com.seailz.discordjar.events.model.Event; -import com.seailz.discordjar.events.model.interaction.command.CommandInteractionEvent; -import com.seailz.discordjar.gateway.events.DispatchedEvents; -import com.seailz.discordjar.gateway.events.GatewayEvents; -import com.seailz.discordjar.model.api.version.APIVersion; -import com.seailz.discordjar.model.application.Intent; -import com.seailz.discordjar.model.guild.Member; -import com.seailz.discordjar.model.status.Status; -import com.seailz.discordjar.utils.URLS; -import com.seailz.discordjar.utils.rest.DiscordRequest; -import com.seailz.discordjar.utils.rest.DiscordResponse; -import com.seailz.discordjar.voice.model.VoiceServerUpdate; -import com.seailz.discordjar.voice.model.VoiceState; -import com.seailz.discordjar.ws.ExponentialBackoffLogic; -import com.seailz.discordjar.gateway.heartbeat.HeartLogic; -import com.seailz.discordjar.ws.WebSocket; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.socket.CloseStatus; -import org.springframework.web.socket.handler.TextWebSocketHandler; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.logging.Logger; - -/** - * An implementation of a client for the Discord gateway. - * The gateway is how bots receive events, and allows bots to communicate their status to Discord. - * - * @author Seailz - * @since 1.0 - */ -public class GatewayFactory extends TextWebSocketHandler { - - private final DiscordJar discordJar; - private final Logger logger = Logger.getLogger(GatewayFactory.class.getName().toUpperCase()); - public static int sequence; - - private Status status; - private String gatewayUrl = null; - private String sessionId; - private static String resumeUrl; - private HeartLogic heartbeatManager; - private boolean shouldResume = false; - private boolean readyForMessages = false; - public HashMap memberRequestChunks = new HashMap<>(); - private final boolean debug; - private List> onVoiceStateUpdateListeners; - private List> onVoiceServerUpdateListeners; - public UUID uuid = UUID.randomUUID(); - private int shardId; - private int numShards; - private GatewayTransportCompressionType transportCompressionType; - public static List pingHistoryMs = new ArrayList<>(); - public static Date lastHeartbeatSent = new Date(); - - private WebSocket socket; - - public GatewayFactory(DiscordJar discordJar, boolean debug, int shardId, int numShards, GatewayTransportCompressionType compressionType) throws ExecutionException, InterruptedException { - logger.info("[Gateway] New instance created"); - this.discordJar = discordJar; - this.debug = debug; - this.onVoiceStateUpdateListeners = new ArrayList<>(); - this.onVoiceServerUpdateListeners = new ArrayList<>(); - this.shardId = shardId; - this.numShards = numShards; - this.transportCompressionType = compressionType; - -// discordJar.setGatewayFactory(this); -// -// if (resumeUrl == null) { -// try { -// DiscordResponse response = null; -// try { -// response = new DiscordRequest( -// new JSONObject(), -// new HashMap<>(), -// "/gateway", -// discordJar, -// "/gateway", RequestMethod.GET -// ).invoke(); -// } catch (DiscordRequest.UnhandledDiscordAPIErrorException ignored) {} -// if (response == null || response.body() == null || !response.body().has("url")) { -// // In case the request fails, we can attempt to use the backup gateway URL instead. -// this.gatewayUrl = URLS.GATEWAY.BASE_URL; -// } else this.gatewayUrl = response.body().getString("url") + "/"; -// } catch (Exception e) { -// logger.warning("[DISCORD.JAR] Failed to get gateway URL. Restarting gateway after 5 seconds."); -// Thread.sleep(5000); -// discordJar.restartGateway(); -// return; -// } -// } else gatewayUrl = resumeUrl; -// -// socket = new WebSocket(appendGatewayQueryParams(gatewayUrl), debug); -// -// ExponentialBackoffLogic backoffReconnectLogic = new ExponentialBackoffLogic(); -// socket.setReEstablishConnection(backoffReconnectLogic.getFunction()); -// backoffReconnectLogic.setAttemptReconnect((c) -> { -// new Thread(discordJar::clearMemberCaches, "djar--clearing-member-caches").start(); -// return !shouldResume; -// }); -// -// socket.addMessageConsumer((tm) -> { -// try { -// handleTextMessage(tm); -// } catch (Exception e) { -// logger.warning("[Gateway] Failed to handle text message: " + e.getMessage()); -// } -// }); -// -// socket.connect().onSuccess(this::onConnect).onFailed((e) -> { -// logger.severe("[Gateway] Failed to establish connection: " + e.getThrowable().getMessage() + " - Will attempt to reconnect after 5 seconds."); -// try { -// Thread.sleep(5000); -// } catch (InterruptedException interruptedException) { -// interruptedException.printStackTrace(); -// } -// discordJar.restartGateway(); -// }); -// -// socket.addOnConnect(() -> { -// onConnect(null); -// }); -// -// socket.addOnDisconnectConsumer((cs) -> { -// this.heartbeatManager = null; -// readyForMessages = false; -// attemptReconnect(cs); -// discordJar.clearMemberCaches(); -// }); - } - - /** - * Applies the needed query string params for the Gateway URL. - * @param url Input URL - * @return Output URL with query springs applied. - */ - private String appendGatewayQueryParams(String url) { - url = url + "?v=" + APIVersion.getLatest().getCode() + "&encoding=json"; - if (transportCompressionType != GatewayTransportCompressionType.NONE) url = url + "&compress=" + transportCompressionType.getValue(); - return url; - } - - private void onConnect(Void vd) { - if (ifNotSelf()) return; - logger.info("[Gateway] Connection established successfully. ⚡"); - } - - public boolean attemptReconnect(CloseStatus status) { - // connection closed - if (this.heartbeatManager != null) heartbeatManager.stop(); - heartbeatManager = null; - - if (debug) logger.info("[GW] [" + status.getCode() + "] " + status.getReason()); - - switch (status.getCode()) { - case 1012: - return true; - case 1011, 1015: - return false; - case 4000: - if (debug) logger.info("[Gateway] Connection was closed due to an unknown error. Will attempt reconnect."); - return true; - case 4001: - if (debug) logger.warning("[Gateway] Connection was closed due to an unknown opcode. Will attempt reconnect. This is usually a discord.jar bug."); - return true; - case 4002: - if (debug) logger.warning("[Gateway] Connection was closed due to a decode error. Will attempt reconnect."); - return true; - case 4003: - if (debug) logger.warning("[Gateway] Connection was closed due to not being authenticated. Will attempt reconnect."); - return true; - case 4004: - if (debug) logger.warning("[Gateway] Connection was closed due to authentication failure. Will not attempt reconnect. Check your bot token!"); - return false; - case 4005: - if (debug) logger.warning("[Gateway] Connection was closed due to an already authenticated connection. Will attempt reconnect."); - return true; - case 4007: - if (debug) logger.info("[Gateway] Connection was closed due to an invalid sequence. Will attempt reconnect."); - return true; - case 4008: - if (debug) logger.warning("[Gateway] Connection was closed due to rate limiting. Will attempt reconnect. Make sure you're not spamming gateway requests!"); - return true; - case 4009: - if (debug) logger.info("[Gateway] Connection was closed due to an invalid session. Will attempt reconnect. This is nothing to worry about."); - return true; - case 4010: - if (debug) logger.warning("[Gateway] Connection was closed due to an invalid shard. Will not attempt reconnect."); - return false; - case 4011: - if (debug) logger.warning("[Gateway] Connection was closed due to a shard being required. Will not attempt reconnect. If your bot is in more than 2500 servers, you must use sharding. See discord.jar's GitHub for more info."); - return false; - case 4012: - if (debug) logger.warning("[Gateway] Connection was closed due to an invalid API version. Will not attempt reconnect."); - return false; - case 4013: - if (debug) logger.warning("[Gateway] Connection was closed due to an invalid intent. Will not attempt reconnect."); - return false; - case 4014: - if (debug) logger.warning("[Gateway] Connection was closed due to a disallowed intent. Will not attempt reconnect. If you've set intents, please make sure they are enabled in the developer portal and you're approved for them if you run a verified bot."); - return false; - case 1000: - if (debug) logger.info("[Gateway] Connection was closed using the close code 1000. The heartbeat cycle has likely fallen out of sync. Will attempt reconnect."); - return true; - case 1001: - if (debug) logger.info("[Gateway] Gateway requested a reconnect (close code 1001), reconnecting..."); - return true; - case 1006: - if (debug) logger.info("[Gateway] Connection was closed using the close code 1006. This is usually an error with Spring. Please post this error with the stacktrace below (if there is one) on discord.jar's GitHub. Will attempt reconnect."); - discordJar.restartGateway(); - return false; - default: - logger.warning( - "[Gateway] Connection was closed with an unknown status code. Status code: " - + status.getCode() + ". Reason: " + status.getReason() + ". Will attempt reconnect." - ); - return true; - } - } - - - /** - * Do not use this method - it is for internal use only. - * @param status The status to set. - */ - @Deprecated - public void setStatus(Status status) { - this.status = status; - } - - public Status getStatus() { - return status; - } - - public void queueMessage(JSONObject payload) { - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Queued message: " + payload.toString()); - } - while (readyForMessages) { - sendPayload(payload); - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Sent message: " + payload.toString()); - } - break; - } - } - - protected void handleTextMessage(String message) throws Exception { - if (ifNotSelf()) return; - JSONObject payload = new JSONObject(message); - - if (debug) { - logger.info("[Gateway - DEBUG] Received message: " + payload.toString()); - logger.info("[Gateway - DEBUG] Message size: " + payload.toString().getBytes(StandardCharsets.UTF_8).length + "b"); - } - try { - sequence = payload.getInt("s"); - } catch (JSONException ignored) { - } - - GatewayEvents event = GatewayEvents.getEvent(payload.getInt("op")); - if (event == null) { - logger.warning("[discord.jar] Unknown event received: " + payload.getInt("op") + ". This is rare, please create an issue on GitHub with this log message. Payload: " + payload.toString()); - return; - } - - switch (event) { - case HELLO: - handleHello(payload); - if (!shouldResume) sendIdentify(); - this.shouldResume = false; - readyForMessages = true; - - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Received HELLO event. Heartbeat cycle has been started. If this isn't a resume, IDENTIFY has been sent."); - } - break; - case HEARTBEAT_REQUEST: - heartbeatManager.forceHeartbeat(); - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Received HEARTBEAT_REQUEST event. Heartbeat has been forced."); - } - break; - case DISPATCHED: - handleDispatched(payload); - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Received DISPATCHED event. Event has been handled."); - } - break; - case RECONNECT: - logger.info("[discord.jar] Gateway requested a reconnect, reconnecting after 5 seconds..."); - Thread.sleep(5000); - reconnect(); - break; - case INVALID_SESSION: - logger.info("[discord.jar] Gateway requested a reconnect (invalid session), reconnecting after 5 seconds..."); - Thread.sleep(5000); - reconnect(); - break; - case HEARTBEAT_ACK: - // Heartbeat was acknowledged, can ignore, but we'll log the request ping anyway. - if (lastHeartbeatSent != null) { - long ping = System.currentTimeMillis() - lastHeartbeatSent.getTime(); - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Received HEARTBEAT_ACK event. Ping: " + ping + "ms"); - } - - pingHistoryMs.add(ping); - } - break; - } - } - - private boolean ifNotSelf() { -// if (!discordJar.getGateway().equals(this)) { -// logger.warning("[Gateway] Not the current gateway instance. Shutting down."); -// shouldResume = true; // Stop reconnecting -// try { -// socket.getSession().close(CloseStatus.SERVER_ERROR); -// } catch (IOException e) { -// throw new RuntimeException(e); -// } -// return true; -// } - return false; - } - - private void handleDispatched(JSONObject payload) { - // Handle dispatched events - // actually dispatch the event - Class eventClass = DispatchedEvents.getEventByName(payload.getString("t")).getEvent().apply(payload, null, discordJar); - if (eventClass == null) { - if (debug) logger.info("[discord.jar] Unhandled event: " + payload.getString("t") + "\nThis is usually ok, if a new feature has recently been added to Discord as discord.jar may not support it yet.\nIf that is not the case, please report this to the discord.jar developers."); - return; - } - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Event class: " + eventClass.getName()); - } - if (eventClass.equals(CommandInteractionEvent.class)) return; - - new Thread(() -> { - Event event; - try { - event = eventClass.getConstructor(DiscordJar.class, long.class, JSONObject.class) - .newInstance(discordJar, sequence, payload); - } catch (InstantiationException | IllegalAccessException | NoSuchMethodException e) { - logger.warning("[DISCORD.JAR - EVENTS] Failed to dispatch " + eventClass.getName() + " event. This is usually a bug, please report it on discord.jar's GitHub with this log message."); - e.printStackTrace(); - return; - } catch (InvocationTargetException e) { - logger.warning("[DISCORD.JAR - EVENTS] Failed to dispatch " + eventClass.getName() + " event. This is usually a bug, please report it on discord.jar's GitHub with this log message."); - // If it's a runtime exception, we want to catch it and print the stack trace. - e.getCause().printStackTrace(); - return; - } - - discordJar.getEventDispatcher().dispatchEvent(event, eventClass, discordJar); - - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Event dispatched: " + eventClass.getName()); - } - }, "djar--event-dispatch-gw").start(); - - if (Objects.requireNonNull(DispatchedEvents.getEventByName(payload.getString("t"))) == DispatchedEvents.READY) { - this.sessionId = payload.getJSONObject("d").getString("session_id"); - this.resumeUrl = payload.getJSONObject("d").getString("resume_gateway_url"); - readyForMessages = true; - - if (discordJar.getStatus() != null) { - JSONObject json = new JSONObject(); - json.put("d", discordJar.getStatus().compile()); - json.put("op", 3); - queueMessage(json); - } - } - } - - public void killConnectionNicely() throws IOException { - // disable heartbeat - if (heartbeatManager != null) heartbeatManager.stop(); - heartbeatManager = null; - readyForMessages = false; - // close connection - getSocket().getWs().close(1000, "Going away"); - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Connection closed nicely."); - } - } - - public void killConnection() throws IOException { - // disable heartbeat - if (this.heartbeatManager != null) heartbeatManager.stop(); - heartbeatManager = null; - readyForMessages = false; - - // close connection - if (getSocket() != null && getSocket().getWs() != null) getSocket().getWs().close(1011, "Going away"); - shouldResume = true; - - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Connection closed."); - } - } - - public void reconnect() throws IOException { - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Attempting resume..."); - } - if (getSocket().isOpen()) - getSocket().getWs().close(1011, "Reconnecting"); - -// socket.connect(); -// onConnect(null); -// -// JSONObject resumePayload = new JSONObject(); -// resumePayload.put("op", 6); -// JSONObject resumeData = new JSONObject(); -// resumeData.put("token", discordJar.getToken()); -// resumeData.put("session_id", sessionId); -// resumeData.put("seq", sequence); -// resumePayload.put("d", resumeData); - //queueMessage(resumePayload); - } - - private void handleHello(JSONObject payload) { - heartbeatManager = new HeartLogic(socket, payload.getJSONObject("d").getInt("heartbeat_interval")); - heartbeatManager.start(); - } - - private void sendIdentify() { - AtomicInteger intents = new AtomicInteger(); - if (discordJar.getIntents().contains(Intent.ALL)) { - intents.set(3243773); - discordJar.getIntents().forEach(intent -> { - if (intent.isPrivileged()) { - intents.getAndAdd(intent.getLeftShiftId()); - } - }); - } else { - discordJar.getIntents().forEach(intent -> intents.getAndAdd(intent.getLeftShiftId())); - } - - JSONObject payload = new JSONObject(); - payload.put("op", 2); - JSONObject data = new JSONObject(); - data.put("token", discordJar.getToken()); - if (numShards != -1 && shardId != -1) { - data.put("shard", new JSONArray().put(shardId).put(numShards)); - } - String os = System.getProperty("os.name").toLowerCase(); - data.put("properties", new JSONObject().put("os", os).put("browser", "discord.jar").put("device", "discord.jar")); - data.put("intents", intents.get()); - payload.put("d", data); - sendPayload(payload); - } - - public void sendPayload(JSONObject payload) { - try { - getSocket().send(payload.toString()); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public WebSocket getSocket() { - return socket; - } - - public void requestGuildMembers(RequestGuildMembersAction action, CompletableFuture> future) { - if (action.getQuery() == null & action.getUserIds() == null) { - throw new IllegalArgumentException("You must provide either a query or a list of user ids"); - } - - if (debug) { - logger.info("[DISCORD.JAR - DEBUG] Requesting guild members..."); - } - - JSONObject payload = new JSONObject(); - JSONObject dPayload = new JSONObject(); - dPayload.put("guild_id", action.getGuildId()); - if (action.getQuery() != null) { - dPayload.put("query", action.getQuery()); - } - - if (action.getUserIds() != null && !action.getUserIds().isEmpty()) { - dPayload.put("user_ids", action.getUserIds()); - } - - dPayload.put("limit", action.getLimit()); - if (action.isPresences()) dPayload.put("presences", true); - dPayload.put("nonce", action.getNonce()); - payload.put("op", 8); - payload.put("d", dPayload); - - queueMessage(payload); - - memberRequestChunks.put(action.getNonce(), new GatewayFactory.MemberChunkStorageWrapper(new ArrayList<>(), future)); - } - - public void sendVoicePayload(String guildId, String channelId, boolean selfMute, boolean selfDeaf) { - JSONObject payload = new JSONObject(); - JSONObject dPayload = new JSONObject(); - dPayload.put("guild_id", guildId); - dPayload.put("channel_id", channelId); - dPayload.put("self_mute", selfMute); - dPayload.put("self_deaf", selfDeaf); - payload.put("op", 4); - payload.put("d", dPayload); - sendPayload(payload); - } - - public void onVoiceStateUpdate(Consumer consumer) { - onVoiceStateUpdateListeners.add(consumer); - } - - public void onVoiceServerUpdate(Consumer consumer) { - onVoiceServerUpdateListeners.add(consumer); - } - - public List> getOnVoiceStateUpdateListeners() { - return onVoiceStateUpdateListeners; - } - - public List> getOnVoiceServerUpdateListeners() { - return onVoiceServerUpdateListeners; - } - - public record MemberChunkStorageWrapper(List members, CompletableFuture> future) { - public void addMember(Member member) { - members.add(member); - } - - public void complete() { - future.complete(members); - } - } - - - public boolean isDebug() { - return debug; - } -} diff --git a/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java b/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java index dbbc7004..bd7f882f 100644 --- a/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java +++ b/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java @@ -32,7 +32,6 @@ import com.seailz.discordjar.events.model.message.MessageCreateEvent; import com.seailz.discordjar.events.model.message.TypingStartEvent; import com.seailz.discordjar.gateway.Gateway; -import com.seailz.discordjar.gateway.GatewayFactory; import com.seailz.discordjar.command.CommandType; import com.seailz.discordjar.model.channel.Channel; import com.seailz.discordjar.model.component.ComponentType; @@ -202,7 +201,7 @@ public enum DispatchedEvents { return null; } - GatewayFactory.MemberChunkStorageWrapper wrapper = g.memberRequestChunks.get(nonce); + Gateway.MemberChunkStorageWrapper wrapper = g.memberRequestChunks.get(nonce); if (wrapper == null) { Logger.getLogger("DispatchedEvents").warning("[discord.jar] Received member chunk with unknown nonce: " + nonce); return null; @@ -254,10 +253,10 @@ public enum DispatchedEvents { switch (CommandType.fromCode(p.getJSONObject("d").getJSONObject("data").getInt("type"))) { case SLASH_COMMAND -> - event = new SlashCommandInteractionEvent(d, GatewayFactory.sequence, p); - case USER -> event = new UserContextCommandInteractionEvent(d, GatewayFactory.sequence, p); + event = new SlashCommandInteractionEvent(d, Gateway.lastSequenceNumber, p); + case USER -> event = new UserContextCommandInteractionEvent(d, Gateway.lastSequenceNumber, p); case MESSAGE -> - event = new MessageContextCommandInteractionEvent(d, GatewayFactory.sequence, p); + event = new MessageContextCommandInteractionEvent(d, Gateway.lastSequenceNumber, p); } d.getCommandDispatcher().dispatch(p.getJSONObject("d").getJSONObject("data").getString("name"), diff --git a/src/main/java/com/seailz/discordjar/gateway/events/GatewayEvents.java b/src/main/java/com/seailz/discordjar/gateway/events/GatewayEvents.java index dcc104e3..405bddd3 100644 --- a/src/main/java/com/seailz/discordjar/gateway/events/GatewayEvents.java +++ b/src/main/java/com/seailz/discordjar/gateway/events/GatewayEvents.java @@ -5,7 +5,7 @@ * This is an internal class and should not be used by the end user. * * @author Seailz - * @see com.seailz.discordjar.gateway.GatewayFactory + * @see com.seailz.discordjar.gateway.Gateway * @since 1.0 */ public enum GatewayEvents { diff --git a/src/main/java/com/seailz/discordjar/gateway/heartbeat/Heart.java b/src/main/java/com/seailz/discordjar/gateway/heartbeat/Heart.java deleted file mode 100644 index 64ae5c97..00000000 --- a/src/main/java/com/seailz/discordjar/gateway/heartbeat/Heart.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.seailz.discordjar.gateway.heartbeat; - -import com.seailz.discordjar.gateway.Gateway; -import com.seailz.discordjar.gateway.GatewayFactory; -import org.json.JSONObject; - -import java.util.Date; -import java.util.Random; -import java.util.logging.Logger; - -public class Heart { - - private int interval; - private GatewayFactory factory; - private boolean active = true; - - public Heart(int interval, GatewayFactory factory) { - this.interval = interval; - this.factory = factory; - begin(); - } - - private void begin() { - sendFirst(); - startCycle(); - } - - private void startCycle() { - if (factory.isDebug()) { - Logger.getLogger("HeartbeatManager").info( - """ - [HEARTBEAT] Starting heartbeat cycle with interval of %dms. - """ - .formatted(interval) - ); - } - new Thread(() -> { - while (true) { - try { - Thread.sleep(interval); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (!factory.getSocket().isOpen()) { - deactivate(); - break; - } - if (active) send(); - } - }, "djar--heart-cycle").start(); - } - - private void sendFirst() { - double jitter = new Random().nextDouble(1); - double interval = this.interval * jitter; - new Thread(() -> { - try { - Thread.sleep((long) interval); - if (active) send(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }, "djar--first-heart").start(); - } - - public void forceHeartbeat() { - send(); - } - - private void send() { - factory.queueMessage( - new JSONObject() - .put("op", 1) - .put("d", GatewayFactory.sequence) - ); - - Gateway.lastHeartbeatSent = new Date(); - } - - public void deactivate() { - active = false; - } - - public void activate() { - active = true; - } - -} diff --git a/src/main/java/com/seailz/discordjar/http/HttpOnlyManager.java b/src/main/java/com/seailz/discordjar/http/HttpOnlyManager.java index 9e913634..a5d0f40b 100644 --- a/src/main/java/com/seailz/discordjar/http/HttpOnlyManager.java +++ b/src/main/java/com/seailz/discordjar/http/HttpOnlyManager.java @@ -12,7 +12,7 @@ import com.seailz.discordjar.events.model.interaction.select.entity.ChannelSelectMenuInteractionEvent; import com.seailz.discordjar.events.model.interaction.select.entity.RoleSelectMenuInteractionEvent; import com.seailz.discordjar.events.model.interaction.select.entity.UserSelectMenuInteractionEvent; -import com.seailz.discordjar.gateway.GatewayFactory; +import com.seailz.discordjar.gateway.Gateway; import com.seailz.discordjar.model.component.ComponentType; import com.seailz.discordjar.model.interaction.Interaction; import com.seailz.discordjar.utils.rest.DiscordRequest; @@ -79,12 +79,12 @@ public ResponseEntity get(HttpServletRequest request) throws IOException switch (CommandType.fromCode(new JSONObject(interaction.raw()).getJSONObject("data").getInt("type"))) { case SLASH_COMMAND -> { - event = new SlashCommandInteractionEvent(discordJar, GatewayFactory.sequence, new JSONObject().put("d", new JSONObject(body))); + event = new SlashCommandInteractionEvent(discordJar, Gateway.lastSequenceNumber, new JSONObject().put("d", new JSONObject(body))); } case USER -> - event = new UserContextCommandInteractionEvent(discordJar, GatewayFactory.sequence, new JSONObject().put("d", new JSONObject(body))); + event = new UserContextCommandInteractionEvent(discordJar, Gateway.lastSequenceNumber, new JSONObject().put("d", new JSONObject(body))); case MESSAGE -> - event = new MessageContextCommandInteractionEvent(discordJar, GatewayFactory.sequence, new JSONObject().put("d", new JSONObject(body))); + event = new MessageContextCommandInteractionEvent(discordJar, Gateway.lastSequenceNumber, new JSONObject().put("d", new JSONObject(body))); } discordJar.getCommandDispatcher().dispatch(new JSONObject(interaction.raw()).getJSONObject("data").getString("name"), event); diff --git a/src/main/java/com/seailz/discordjar/model/channel/internal/AudioChannelImpl.java b/src/main/java/com/seailz/discordjar/model/channel/internal/AudioChannelImpl.java index c7a86388..b4b226f2 100644 --- a/src/main/java/com/seailz/discordjar/model/channel/internal/AudioChannelImpl.java +++ b/src/main/java/com/seailz/discordjar/model/channel/internal/AudioChannelImpl.java @@ -1,14 +1,13 @@ package com.seailz.discordjar.model.channel.internal; import com.seailz.discordjar.DiscordJar; -import com.seailz.discordjar.gateway.GatewayFactory; +import com.seailz.discordjar.gateway.Gateway; import com.seailz.discordjar.model.channel.AudioChannel; import com.seailz.discordjar.model.channel.Category; import com.seailz.discordjar.model.channel.audio.VoiceRegion; import com.seailz.discordjar.model.channel.utils.ChannelType; import com.seailz.discordjar.model.guild.Guild; import com.seailz.discordjar.model.permission.PermissionOverwrite; -import com.seailz.discordjar.utils.rest.DiscordRequest; import com.seailz.discordjar.voice.model.provider.VoiceProvider; import com.seailz.discordjar.voice.ws.VoiceGatewayFactory; import lombok.SneakyThrows; @@ -42,7 +41,7 @@ public void connect(VoiceProvider vp) { @SneakyThrows @Override public void connect(VoiceProvider vp, boolean mute, boolean deafen) { - GatewayFactory gateway = discordJv().getGateway(); + Gateway gateway = discordJv().getGateway(); gateway.sendVoicePayload(guild().id(), id(), mute, deafen); AtomicBoolean receivedVoiceServerUpdate = new AtomicBoolean(false); From a4add7a3071e345f62684da711c964a4540d5640 Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Tue, 26 Sep 2023 09:32:08 +0100 Subject: [PATCH 03/11] fix(heart): fixed an issue with HeartLogic.java where it would attempt to send heartbeats to closed connections due to the logic of the Gateway.java class Gateway.java creates a new WebSocket instance for each connection, but HeartLogic.java wasn't being told to stop sending heartbeats after a connection had closed, causing a broken pipe SocketException. This has been fixed by stopping the heartbeat manager on a disconnect. --- src/main/java/com/seailz/discordjar/gateway/Gateway.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index e3315158..ba318997 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -92,6 +92,7 @@ public void disconnect(@NotNull CloseStatus closeStatus) { } public void disconnectFlow(@NotNull CloseStatus closeStatus) { + heartbeatManager.stop(); // Stop attempting heartbeats to avoid broken pipe errors CloseCode closeCode = CloseCode.fromCode(closeStatus.getCode()); readyForMessages = false; boolean attemptReconnect = closeCode.shouldReconnect(); From 329f54db1787e7a6b8bdd109ebdec4b013d1d574 Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Tue, 26 Sep 2023 10:46:40 +0100 Subject: [PATCH 04/11] feat(gateway): added some extra getters to Gateway.java --- .../seailz/discordjar/gateway/Gateway.java | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index ba318997..fc0cf733 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -20,6 +20,7 @@ import com.seailz.discordjar.ws.WebSocket; import org.jetbrains.annotations.Contract; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -445,6 +446,30 @@ private String getGatewayUrl() throws InterruptedException { return gatewayUrl; } + /** + * Returns resume info for the next resume attempt, or null if READY was not received yet. + */ + @Nullable + public ReconnectInfo getResumeInfo() { + return resumeInfo; + } + + /** + * Returns when the last heartbeat was sent, or null if none were sent yet. + */ + @Nullable + public static Date getLastHeartbeatSent() { + return lastHeartbeatSent; + } + + /** + * Returns an array of estimated ping times in milliseconds based on heartbeat ACKs. + */ + @NotNull + public static List getPingHistoryMs() { + return pingHistoryMs; + } + private enum OpCodes { DISPATCH(0), HEARTBEAT(1), @@ -533,7 +558,7 @@ public static CloseCode fromCode(int code) { return UNKNOWN; } } - private record ReconnectInfo(String sessionId, String token, String url) {} + public record ReconnectInfo(String sessionId, String token, String url) {} public static Builder builder(DiscordJar bot) { return new GatewayBuilder(bot); From 6e919fa92ca0f1dd5ad65b60a9a7067a096a1d97 Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Tue, 26 Sep 2023 10:47:58 +0100 Subject: [PATCH 05/11] refactor(gateway): reorganized some elements --- .../seailz/discordjar/gateway/Gateway.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index fc0cf733..7f010900 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -406,16 +406,6 @@ public void setStatus(Status status) { this.status = status; } - public record MemberChunkStorageWrapper(List members, CompletableFuture> future) { - public void addMember(Member member) { - members.add(member); - } - - public void complete() { - future.complete(members); - } - } - /** * Retrieves the Gateway URL from Discord. * @return The Gateway URL. @@ -559,8 +549,18 @@ public static CloseCode fromCode(int code) { } } public record ReconnectInfo(String sessionId, String token, String url) {} + public record MemberChunkStorageWrapper(List members, CompletableFuture> future) { + public void addMember(Member member) { + members.add(member); + } + + public void complete() { + future.complete(members); + } + } - public static Builder builder(DiscordJar bot) { + @Contract(value = "_ -> new", pure = true) + public static @NotNull Builder builder(DiscordJar bot) { return new GatewayBuilder(bot); } @@ -604,4 +604,5 @@ public Gateway.Builder setTransportCompressionType(GatewayTransportCompressionTy } } + } From 0f75f39f14825d6553223ac6473ed01b8b69cc34 Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Tue, 26 Sep 2023 10:56:10 +0100 Subject: [PATCH 06/11] javadocs(gateway): added proper javadocs for the gateway class & methods --- .../seailz/discordjar/gateway/Gateway.java | 74 +++++++++++++++++-- 1 file changed, 68 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index 7f010900..62ef9553 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -35,6 +35,22 @@ import java.util.function.Consumer; import java.util.logging.Logger; +/** + * The Gateway is the real-time two-way WebSocket between Discord and the bot. + *
This class is responsible for handling the connection to the Gateway, as well as handling & sending messages. + * + *

This is the v3 implementation of the Gateway, designed for stability, simplicity, and performance. + * + *

For more information on the Gateway, see the Discord API docs. + * @see Discord API docs + * @see HeartLogic + * @see DispatchedEvents + * @see GatewayEvents + * @see GatewayTransportCompressionType + * @see DiscordJar + * @author Seailz + * @since b-1.1 + */ public class Gateway { private final DiscordJar bot; @@ -65,7 +81,7 @@ protected Gateway(DiscordJar bot, int shardCount, int shardId, GatewayTransportC } /** - * Runs the necessary steps to initialize the Gateway connection. + * Flow for connecting to the Gateway. */ private void connectionFlow() { String gatewayUrl; @@ -88,10 +104,18 @@ private void connectionFlow() { lastSequenceNumber = -1; } + /** + * Disconnects from the Gateway. + * @param closeStatus The close status of the disconnect. + */ public void disconnect(@NotNull CloseStatus closeStatus) { socket.disconnect(closeStatus.getCode(), closeStatus.getReason()); } + /** + * Flow after a disconnect from the Gateway. + * @param closeStatus The close status of the disconnect. + */ public void disconnectFlow(@NotNull CloseStatus closeStatus) { heartbeatManager.stop(); // Stop attempting heartbeats to avoid broken pipe errors CloseCode closeCode = CloseCode.fromCode(closeStatus.getCode()); @@ -106,6 +130,9 @@ public void disconnectFlow(@NotNull CloseStatus closeStatus) { else connectionFlow(); } + /** + * Flow for resuming a connection to the Gateway. + */ private void resumeFlow() { if (resumeInfo == null) { logger.warning("[Gateway - Resume Flow] Resume info is null, cannot resume. Attempting normal connection."); @@ -122,7 +149,11 @@ private void resumeFlow() { resumedConnection = true; } - protected void handleTextMessage(String message) throws Exception { + /** + * Handles a message received from the Gateway. + * @param message The message received from the Gateway. + */ + protected void handleTextMessage(String message) { JSONObject payload = new JSONObject(message); if (bot.isDebug()) { @@ -186,7 +217,11 @@ protected void handleTextMessage(String message) throws Exception { } } - private void handleDispatch(JSONObject payload) { + /** + * Handles a DISPATCHED event. + * @param payload The payload of the event. + */ + private void handleDispatch(@NotNull JSONObject payload) { // Handle dispatched events // actually dispatch the event Class eventClass = DispatchedEvents.getEventByName(payload.getString("t")).getEvent().apply(payload, this, bot); @@ -240,11 +275,18 @@ private void handleDispatch(JSONObject payload) { } } - private void handleHello(JSONObject payload) { + /** + * Starts the heartbeat cycle. + * @param payload The HELLO payload. + */ + private void handleHello(@NotNull JSONObject payload) { heartbeatManager = new HeartLogic(socket, payload.getJSONObject("d").getInt("heartbeat_interval")); heartbeatManager.start(); } + /** + * Sends an IDENTIFY payload to the gateway. + */ private void sendIdentify() { AtomicInteger intents = new AtomicInteger(); if (bot.getIntents().contains(Intent.ALL)) { @@ -272,6 +314,9 @@ private void sendIdentify() { socket.send(payload.toString()); } + /** + * Given a {@link WebSocket} instance, connects to the gateway using it and returns the same {@link WebSocket} instance. + */ @Contract("_, _ -> param1") private @NotNull WebSocket connectToSocket(@NotNull WebSocket socket, boolean resuming) { socket.connect() @@ -296,6 +341,10 @@ private void sendIdentify() { }); return socket; } + + /** + * Sets up a {@link WebSocket} instance to be used for the gateway connection. + */ @Contract("_ -> param1") private @NotNull WebSocket setupDisconnectedSocket(@NotNull WebSocket socket) { ExponentialBackoffLogic backoffReconnectLogic = new ExponentialBackoffLogic(); @@ -321,13 +370,21 @@ private void sendIdentify() { return socket; } + /** + * Appends the relevant query parameters for the given URL. + */ private String appendGatewayQueryParams(String url) { url = url + "?v=" + bot.getApiVersion().getCode() + "&encoding=json"; if (compressionType != GatewayTransportCompressionType.NONE) url = url + "&compress=" + compressionType.getValue(); return url; } - public void queueMessage(JSONObject payload) { + /** + * Queues a message to be sent to the gateway. + *
Messages will be sent after the HELLO event is received. + * @param payload {@link JSONObject} containing the payload to send + */ + public void queueMessage(@NotNull JSONObject payload) { if (bot.isDebug()) logger.info("[Gateway] Queued message: " + payload); while (readyForMessages) { socket.send(payload.toString()); @@ -338,7 +395,12 @@ public void queueMessage(JSONObject payload) { } } - public void requestGuildMembers(RequestGuildMembersAction action, CompletableFuture> future) { + /** + * Sends a request to the gateway to request guild members. + * @param action {@link RequestGuildMembersAction} containing extra information about the request + * @param future {@link CompletableFuture} that will be completed when the request is completed + */ + public void requestGuildMembers(@NotNull RequestGuildMembersAction action, @NotNull CompletableFuture> future) { if (action.getQuery() == null & action.getUserIds() == null) { throw new IllegalArgumentException("You must provide either a query or a list of user ids"); } From bcf6bb623f50a1f72d16bd8fa817654ac6ed4ac6 Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Tue, 26 Sep 2023 14:46:47 +0100 Subject: [PATCH 07/11] fix(heartbeat): fixed heartbeat cycling --- .../seailz/discordjar/gateway/Gateway.java | 5 ++++ .../gateway/heartbeat/HeartLogic.java | 30 +++++++++++++------ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index 62ef9553..f114d6aa 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -280,6 +280,11 @@ private void handleDispatch(@NotNull JSONObject payload) { * @param payload The HELLO payload. */ private void handleHello(@NotNull JSONObject payload) { + if (heartbeatManager != null) { + heartbeatManager.setSocket(socket); + heartbeatManager.startCycle(); + return; + } heartbeatManager = new HeartLogic(socket, payload.getJSONObject("d").getInt("heartbeat_interval")); heartbeatManager.start(); } diff --git a/src/main/java/com/seailz/discordjar/gateway/heartbeat/HeartLogic.java b/src/main/java/com/seailz/discordjar/gateway/heartbeat/HeartLogic.java index ec7944f1..9760dc22 100644 --- a/src/main/java/com/seailz/discordjar/gateway/heartbeat/HeartLogic.java +++ b/src/main/java/com/seailz/discordjar/gateway/heartbeat/HeartLogic.java @@ -18,10 +18,11 @@ */ public class HeartLogic { - private final WebSocket socket; + private WebSocket socket; private long interval; private long lastSequence = -1; private final Map isInstanceStillRunning = new HashMap<>(); + boolean running = true; public HeartLogic(WebSocket socket, long interval) { this.interval = interval; @@ -41,12 +42,20 @@ public void setLastSequence(long sequence) { } public void restart() { - isInstanceStillRunning.forEach((uuid, aBoolean) -> isInstanceStillRunning.put(uuid, false)); - start(); + running = false; + startCycle(); } public void stop() { - isInstanceStillRunning.forEach((uuid, aBoolean) -> isInstanceStillRunning.put(uuid, false)); + running = false; + } + + public void startCycle() { + running = true; + } + + public void setSocket(WebSocket socket) { + this.socket = socket; } public void forceHeartbeat() { @@ -57,12 +66,15 @@ public void forceHeartbeat() { public void start() { Thread thread = new Thread(() -> { - UUID uuid = UUID.randomUUID(); - isInstanceStillRunning.put(uuid, true); while (true) { - if (!isInstanceStillRunning.get(uuid)) { - isInstanceStillRunning.remove(uuid); - return; + if (!running) { + System.out.println("Heartbeat thread stopped."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + continue; } try { socket.send( From f217d4143dd405482aae963e07d6ec282d00cf33 Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Tue, 26 Sep 2023 15:56:26 +0100 Subject: [PATCH 08/11] fix(heartbeat): fixed heartbeat cycling --- src/main/java/com/seailz/discordjar/gateway/Gateway.java | 7 +++++-- src/main/java/com/seailz/discordjar/ws/WebSocket.java | 6 +++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index f114d6aa..41aefaea 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -60,6 +60,7 @@ public class Gateway { private GatewayTransportCompressionType compressionType; private WebSocket socket; private boolean resumedConnection = false; + private boolean reconnecting = false; private ReconnectInfo resumeInfo; public static long lastSequenceNumber = -1; private boolean readyForMessages = false; @@ -127,7 +128,9 @@ public void disconnectFlow(@NotNull CloseStatus closeStatus) { } if (closeCode.shouldResume()) resumeFlow(); - else connectionFlow(); + else reconnecting = true; + + if (bot.isDebug()) logger.info("[Gateway] Finished disconnect flow."); } /** @@ -356,7 +359,7 @@ private void sendIdentify() { socket.setReEstablishConnection(backoffReconnectLogic.getFunction()); backoffReconnectLogic.setAttemptReconnect((c) -> { new Thread(bot::clearMemberCaches, "djar--clearing-member-caches").start(); - return false; + return reconnecting; }); socket.addOnDisconnectConsumer((cs) -> { diff --git a/src/main/java/com/seailz/discordjar/ws/WebSocket.java b/src/main/java/com/seailz/discordjar/ws/WebSocket.java index efa106f8..b6c1ceb9 100644 --- a/src/main/java/com/seailz/discordjar/ws/WebSocket.java +++ b/src/main/java/com/seailz/discordjar/ws/WebSocket.java @@ -152,6 +152,7 @@ public void onClosed(@NotNull okhttp3.WebSocket webSocket, int code, @NotNull St onDisconnectConsumers.forEach(consumer -> consumer.accept(new CloseStatus(code, reason))); if (reEstablishConnection.apply(new CloseStatus(code, reason))) { + if (debug) Logger.getLogger("WS").info("[WS] Attempting to re-establish connection"); try { connect(url); } catch (Exception e) { @@ -166,11 +167,10 @@ public void onFailure(@NotNull okhttp3.WebSocket webSocket, @NotNull Throwable t open = false; buffer = null; - new Thread(() -> { - onDisconnectConsumers.forEach(consumer -> consumer.accept(new CloseStatus(1006, t.getMessage()))); - }, "djar--ws-disconnect-consumers").start(); + onDisconnectConsumers.forEach(consumer -> consumer.accept(new CloseStatus(1006, t.getMessage()))); if (reEstablishConnection.apply(new CloseStatus(1006, t.getMessage()))) { + if (debug) Logger.getLogger("WS").info("[WS] Attempting to re-establish connection"); try { connect(url); } catch (Exception e) { From 1ac4c53e6018ec3246f0fbea0243848c95a58260 Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Tue, 26 Sep 2023 16:19:56 +0100 Subject: [PATCH 09/11] fix(gateway / status rotor): fixed issues with the new gateway implementation where having an active status rotor would cause the gateway to crash after a disconnect and attempted reconnect This was due to the status rotor attempting to send a message when it wasn't authorized to do so, since the ready event hadn't yet been received. --- .../com/seailz/discordjar/DiscordJar.java | 2 +- .../seailz/discordjar/gateway/Gateway.java | 22 +++++++++++++++++++ .../gateway/events/DispatchedEvents.java | 1 + 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/seailz/discordjar/DiscordJar.java b/src/main/java/com/seailz/discordjar/DiscordJar.java index 6ad7fb2b..9086f229 100644 --- a/src/main/java/com/seailz/discordjar/DiscordJar.java +++ b/src/main/java/com/seailz/discordjar/DiscordJar.java @@ -438,7 +438,7 @@ public void setStatus(@NotNull Status status) { JSONObject json = new JSONObject(); json.put("d", status.compile()); json.put("op", 3); - gatewayFactory.queueMessage(json); + gatewayFactory.queueMessageUntilReady(json); gatewayFactory.setStatus(status); this.status = status; } diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index 41aefaea..e9e3c8d4 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -64,6 +64,7 @@ public class Gateway { private ReconnectInfo resumeInfo; public static long lastSequenceNumber = -1; private boolean readyForMessages = false; + private boolean receivedReady = false; private HeartLogic heartbeatManager; public static Date lastHeartbeatSent = new Date(); public static List pingHistoryMs = new ArrayList<>(); @@ -118,6 +119,7 @@ public void disconnect(@NotNull CloseStatus closeStatus) { * @param closeStatus The close status of the disconnect. */ public void disconnectFlow(@NotNull CloseStatus closeStatus) { + setReceivedReady(false); heartbeatManager.stop(); // Stop attempting heartbeats to avoid broken pipe errors CloseCode closeCode = CloseCode.fromCode(closeStatus.getCode()); readyForMessages = false; @@ -403,6 +405,22 @@ public void queueMessage(@NotNull JSONObject payload) { } } + /** + * Queues a message to be sent to the gateway. + *
Messages will be sent after the READY event is received. + * @param payload {@link JSONObject} containing the payload to send + */ + public void queueMessageUntilReady(@NotNull JSONObject payload) { + if (bot.isDebug()) logger.info("[Gateway] Queued message: " + payload); + while (receivedReady) { + socket.send(payload.toString()); + if (bot.isDebug()) { + logger.info("[Gateway] Sent message: " + payload); + } + break; + } + } + /** * Sends a request to the gateway to request guild members. * @param action {@link RequestGuildMembersAction} containing extra information about the request @@ -506,6 +524,10 @@ private String getGatewayUrl() throws InterruptedException { return gatewayUrl; } + public void setReceivedReady(boolean receivedReady) { + this.receivedReady = receivedReady; + } + /** * Returns resume info for the next resume attempt, or null if READY was not received yet. */ diff --git a/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java b/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java index bd7f882f..d5b7c55a 100644 --- a/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java +++ b/src/main/java/com/seailz/discordjar/gateway/events/DispatchedEvents.java @@ -70,6 +70,7 @@ public enum DispatchedEvents { READY((p, g, d) -> { Logger.getLogger("Gateway") .info("[Gateway] Ready to receive events"); + g.setReceivedReady(true); return ReadyEvent.class; }), RESUMED((p, d, g) -> GatewayResumedEvent.class), From 8c931967d43c1123714fe0a35ac90087056f65ef Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Tue, 26 Sep 2023 16:36:39 +0100 Subject: [PATCH 10/11] fix(statuses): more fixes with statuses --- src/main/java/com/seailz/discordjar/gateway/Gateway.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index e9e3c8d4..d04e60b5 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -525,7 +525,14 @@ private String getGatewayUrl() throws InterruptedException { } public void setReceivedReady(boolean receivedReady) { - this.receivedReady = receivedReady; + new Thread(() -> { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + this.receivedReady = receivedReady; + }).start(); } /** From 7568477910c6d70b3c2befd1a22f9a2d59eaa7d3 Mon Sep 17 00:00:00 2001 From: IoyoCode Date: Tue, 26 Sep 2023 17:01:40 +0100 Subject: [PATCH 11/11] fix(statuses): more fixes with statuses --- .../com/seailz/discordjar/DiscordJar.java | 14 ++++++++------ .../seailz/discordjar/gateway/Gateway.java | 19 ++++++------------- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/seailz/discordjar/DiscordJar.java b/src/main/java/com/seailz/discordjar/DiscordJar.java index 9086f229..d48dbb7c 100644 --- a/src/main/java/com/seailz/discordjar/DiscordJar.java +++ b/src/main/java/com/seailz/discordjar/DiscordJar.java @@ -435,12 +435,14 @@ public void disableMemberCachingForGuild(String guildId) { public void setStatus(@NotNull Status status) { if (gatewayFactory == null) throw new IllegalStateException("Cannot set status on an HTTP-only bot. See the constructor for more information."); - JSONObject json = new JSONObject(); - json.put("d", status.compile()); - json.put("op", 3); - gatewayFactory.queueMessageUntilReady(json); - gatewayFactory.setStatus(status); - this.status = status; + new Thread(() -> { + JSONObject json = new JSONObject(); + json.put("d", status.compile()); + json.put("op", 3); + gatewayFactory.queueMessageUntilReady(json); + gatewayFactory.setStatus(status); + this.status = status; + }).start(); } public Status getStatus() { diff --git a/src/main/java/com/seailz/discordjar/gateway/Gateway.java b/src/main/java/com/seailz/discordjar/gateway/Gateway.java index d04e60b5..c0aec555 100644 --- a/src/main/java/com/seailz/discordjar/gateway/Gateway.java +++ b/src/main/java/com/seailz/discordjar/gateway/Gateway.java @@ -274,7 +274,7 @@ private void handleDispatch(@NotNull JSONObject payload) { if (bot.getStatus() != null) { JSONObject json = new JSONObject(); json.put("d", bot.getStatus().compile()); - json.put("op", OpCodes.PRESENCE_UPDATE); + json.put("op", OpCodes.PRESENCE_UPDATE.opCode); queueMessage(json); } } @@ -311,7 +311,7 @@ private void sendIdentify() { } JSONObject payload = new JSONObject(); - payload.put("op", 2); + payload.put("op", OpCodes.IDENTIFY.opCode); JSONObject data = new JSONObject(); data.put("token", bot.getToken()); if (shardCount != -1 && shardId != -1) { @@ -339,7 +339,7 @@ private void sendIdentify() { if (resuming) { JSONObject resumeObject = new JSONObject(); - resumeObject.put("op", OpCodes.RESUME); + resumeObject.put("op", OpCodes.RESUME.opCode); resumeObject.put("d", new JSONObject() .put("token", bot.getToken()) .put("session_id", resumeInfo.sessionId()) @@ -449,7 +449,7 @@ public void requestGuildMembers(@NotNull RequestGuildMembersAction action, @NotN dPayload.put("limit", action.getLimit()); if (action.isPresences()) dPayload.put("presences", true); dPayload.put("nonce", action.getNonce()); - payload.put("op", 8); + payload.put("op", OpCodes.REQUEST_GUILD_MEMBERS.opCode); payload.put("d", dPayload); queueMessage(payload); @@ -464,7 +464,7 @@ public void sendVoicePayload(String guildId, String channelId, boolean selfMute, dPayload.put("channel_id", channelId); dPayload.put("self_mute", selfMute); dPayload.put("self_deaf", selfDeaf); - payload.put("op", 4); + payload.put("op", OpCodes.VOICE_STATE_UPDATE.opCode); payload.put("d", dPayload); queueMessage(payload); } @@ -525,14 +525,7 @@ private String getGatewayUrl() throws InterruptedException { } public void setReceivedReady(boolean receivedReady) { - new Thread(() -> { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - this.receivedReady = receivedReady; - }).start(); + this.receivedReady = receivedReady; } /**