From 2d71e4a474d12079f1f31148644ec584d27a307b Mon Sep 17 00:00:00 2001 From: William Date: Thu, 31 Oct 2024 17:04:21 +0000 Subject: [PATCH] refactor: rework networking system --- .../huskhomes/api/BaseHuskHomesAPI.java | 8 +- .../huskhomes/command/RtpCommand.java | 7 +- .../huskhomes/command/TpAllCommand.java | 6 +- .../huskhomes/listener/EventListener.java | 45 ++-- .../huskhomes/manager/HomesManager.java | 7 +- .../william278/huskhomes/manager/Manager.java | 5 +- .../huskhomes/manager/RequestsManager.java | 21 +- .../huskhomes/manager/WarpsManager.java | 7 +- .../william278/huskhomes/network/Broker.java | 218 ++++-------------- .../william278/huskhomes/network/Message.java | 160 +++++-------- .../huskhomes/network/MessageHandler.java | 166 +++++++++++++ .../william278/huskhomes/network/Payload.java | 155 ++----------- .../network/PluginMessageBroker.java | 19 +- .../huskhomes/network/RedisBroker.java | 20 +- .../huskhomes/teleport/Teleport.java | 19 +- .../network/MessageSerializationTests.java | 18 +- 16 files changed, 367 insertions(+), 514 deletions(-) create mode 100644 common/src/main/java/net/william278/huskhomes/network/MessageHandler.java diff --git a/common/src/main/java/net/william278/huskhomes/api/BaseHuskHomesAPI.java b/common/src/main/java/net/william278/huskhomes/api/BaseHuskHomesAPI.java index 8334a2c4..a35e3e1f 100644 --- a/common/src/main/java/net/william278/huskhomes/api/BaseHuskHomesAPI.java +++ b/common/src/main/java/net/william278/huskhomes/api/BaseHuskHomesAPI.java @@ -795,11 +795,9 @@ public final void randomlyTeleportPlayer(@NotNull OnlineUser user, boolean timed } plugin.getBroker().ifPresent(b -> Message.builder() - .scope(Message.Scope.SERVER) - .target(randomServer) - .type(Message.Type.REQUEST_RTP_LOCATION) - .payload(Payload.withStringList( - List.of(user.getPosition().getWorld().getName(), user.getName()))) + .target(randomServer, Message.TargetType.SERVER) + .type(Message.MessageType.REQUEST_RTP_LOCATION) + .payload(Payload.world(user.getPosition().getWorld())) .build().send(b, user)); return; } diff --git a/common/src/main/java/net/william278/huskhomes/command/RtpCommand.java b/common/src/main/java/net/william278/huskhomes/command/RtpCommand.java index 3f36a186..04513067 100644 --- a/common/src/main/java/net/william278/huskhomes/command/RtpCommand.java +++ b/common/src/main/java/net/william278/huskhomes/command/RtpCommand.java @@ -162,10 +162,9 @@ private void executeRtp(@NotNull OnlineUser teleporter, @NotNull CommandUser exe } plugin.getBroker().ifPresent(b -> Message.builder() - .type(Message.Type.REQUEST_RTP_LOCATION) - .scope(Message.Scope.SERVER) - .target(randomServer) - .payload(Payload.withRTPRequest(Payload.RTPRequest.of(teleporter.getName(), world.getName()))) + .type(Message.MessageType.REQUEST_RTP_LOCATION) + .target(randomServer, Message.TargetType.SERVER) + .payload(Payload.string(world.getName())) .build().send(b, teleporter)); return; } diff --git a/common/src/main/java/net/william278/huskhomes/command/TpAllCommand.java b/common/src/main/java/net/william278/huskhomes/command/TpAllCommand.java index f0febef9..86db158a 100644 --- a/common/src/main/java/net/william278/huskhomes/command/TpAllCommand.java +++ b/common/src/main/java/net/william278/huskhomes/command/TpAllCommand.java @@ -63,9 +63,9 @@ public void execute(@NotNull OnlineUser executor, @NotNull String[] args) { } plugin.getBroker().ifPresent(b -> Message.builder() - .target(Message.TARGET_ALL) - .type(Message.Type.TELEPORT_TO_POSITION) - .payload(Payload.withPosition(targetPosition)) + .target(Message.TARGET_ALL, Message.TargetType.PLAYER) + .type(Message.MessageType.TELEPORT_TO_POSITION) + .payload(Payload.position(targetPosition)) .build().send(b, executor)); plugin.getLocales().getLocale("teleporting_all_players") diff --git a/common/src/main/java/net/william278/huskhomes/listener/EventListener.java b/common/src/main/java/net/william278/huskhomes/listener/EventListener.java index dce513fe..2bb822e2 100644 --- a/common/src/main/java/net/william278/huskhomes/listener/EventListener.java +++ b/common/src/main/java/net/william278/huskhomes/listener/EventListener.java @@ -67,9 +67,11 @@ protected final void handlePlayerJoin(@NotNull OnlineUser onlineUser) { this.handleInboundTeleport(onlineUser); // Synchronize the global player list - plugin.runSyncDelayed(() -> this.synchronizeGlobalPlayerList( - onlineUser, plugin.getOnlineUsers().stream().map(User::getName).toList()), - onlineUser, 40L + plugin.runSyncDelayed(() -> this.updateUserList( + onlineUser, + plugin.getOnlineUsers().stream().map(u -> (User) u).toList()), + onlineUser, + 40L ); // Request updated player lists from other servers @@ -99,34 +101,33 @@ protected final void handlePlayerJoin(@NotNull OnlineUser onlineUser) { /** * Handle when a {@link OnlineUser} leaves the server. * - * @param onlineUser the leaving {@link OnlineUser} + * @param online the leaving {@link OnlineUser} */ - protected final void handlePlayerLeave(@NotNull OnlineUser onlineUser) { - onlineUser.removeInvulnerabilityIfPermitted(); + protected final void handlePlayerLeave(@NotNull OnlineUser online) { + online.removeInvulnerabilityIfPermitted(); plugin.runAsync(() -> { // Set offline position - plugin.getDatabase().setOfflinePosition(onlineUser, onlineUser.getPosition()); + plugin.getDatabase().setOfflinePosition(online, online.getPosition()); // Remove this user's home cache - plugin.getManager().homes().removeUserHomes(onlineUser); + plugin.getManager().homes().removeUserHomes(online); // Update global lists if (plugin.getSettings().getCrossServer().isEnabled()) { - final List localPlayerList = plugin.getOnlineUsers().stream().map(User::getName) - .filter(player -> !player.equals(onlineUser.getName())).toList(); + final List users = plugin.getOnlineUsers().stream().map(u -> (User) u).toList(); if (plugin.getSettings().getCrossServer().getBrokerType() == Broker.Type.REDIS) { - this.synchronizeGlobalPlayerList(onlineUser, localPlayerList); + this.updateUserList(online, users); return; } plugin.getOnlineUsers().stream() - .filter(user -> !user.equals(onlineUser)) + .filter(user -> !user.equals(online)) .findAny() - .ifPresent(player -> this.synchronizeGlobalPlayerList(player, localPlayerList)); + .ifPresent(player -> this.updateUserList(player, users)); } // Remove from user map - plugin.getOnlineUserMap().remove(onlineUser.getUuid()); + plugin.getOnlineUserMap().remove(online.getUuid()); }); } @@ -192,24 +193,22 @@ private void handleInboundRespawn(@NotNull OnlineUser teleporter) { plugin.getDatabase().setRespawnPosition(teleporter, bedPosition.orElse(null)); } - // Synchronize the global player list TODO CHECK - private void synchronizeGlobalPlayerList(@NotNull OnlineUser user, @NotNull List localPlayerList) { + // Synchronize the global player list + private void updateUserList(@NotNull OnlineUser user, @NotNull List localPlayerList) { plugin.getBroker().ifPresent(broker -> { // Send this server's player list to all servers Message.builder() - .type(Message.Type.PLAYER_LIST) - .scope(Message.Scope.SERVER) - .target(Message.TARGET_ALL) - .payload(Payload.withStringList(localPlayerList)) + .type(Message.MessageType.UPDATE_USER_LIST) + .target(Message.TARGET_ALL, Message.TargetType.SERVER) + .payload(Payload.userList(localPlayerList)) .build().send(broker, user); // Clear cached global player lists and request updated lists from all servers if (plugin.getOnlineUsers().size() == 1) { plugin.getGlobalUserList().clear(); Message.builder() - .type(Message.Type.REQUEST_PLAYER_LIST) - .scope(Message.Scope.SERVER) - .target(Message.TARGET_ALL) + .type(Message.MessageType.REQUEST_USER_LIST) + .target(Message.TARGET_ALL, Message.TargetType.SERVER) .build().send(broker, user); } }); diff --git a/common/src/main/java/net/william278/huskhomes/manager/HomesManager.java b/common/src/main/java/net/william278/huskhomes/manager/HomesManager.java index 2392d3e5..2ded7e3b 100644 --- a/common/src/main/java/net/william278/huskhomes/manager/HomesManager.java +++ b/common/src/main/java/net/william278/huskhomes/manager/HomesManager.java @@ -184,10 +184,9 @@ public void unCacheHome(@NotNull UUID homeId, boolean propagate) { private void propagateCacheUpdate(@NotNull UUID homeId) { plugin.getBroker().ifPresent(b -> plugin.getOnlineUsers().stream().findAny() .ifPresent(user -> Message.builder() - .type(Message.Type.UPDATE_HOME) - .scope(Message.Scope.SERVER) - .target(Message.TARGET_ALL) - .payload(Payload.withString(homeId.toString())) + .type(Message.MessageType.UPDATE_HOME) + .target(Message.TARGET_ALL, Message.TargetType.SERVER) + .payload(Payload.string(homeId.toString())) .build().send(b, user))); } diff --git a/common/src/main/java/net/william278/huskhomes/manager/Manager.java b/common/src/main/java/net/william278/huskhomes/manager/Manager.java index 31d9c3d1..d625e3d0 100644 --- a/common/src/main/java/net/william278/huskhomes/manager/Manager.java +++ b/common/src/main/java/net/william278/huskhomes/manager/Manager.java @@ -56,9 +56,8 @@ public RequestsManager requests() { protected void propagateCacheUpdate() { plugin.getBroker().ifPresent(b -> plugin.getOnlineUsers().stream() .findAny().ifPresent(user -> Message.builder() - .type(Message.Type.UPDATE_CACHES) - .scope(Message.Scope.SERVER) - .target(Message.TARGET_ALL) + .type(Message.MessageType.UPDATE_CACHES) + .target(Message.TARGET_ALL, Message.TargetType.SERVER) .build().send(b, user))); } } diff --git a/common/src/main/java/net/william278/huskhomes/manager/RequestsManager.java b/common/src/main/java/net/william278/huskhomes/manager/RequestsManager.java index 2426c90a..fc715f84 100644 --- a/common/src/main/java/net/william278/huskhomes/manager/RequestsManager.java +++ b/common/src/main/java/net/william278/huskhomes/manager/RequestsManager.java @@ -128,9 +128,9 @@ public void sendTeleportAllRequest(@NotNull OnlineUser requester) { } plugin.getBroker().ifPresent(b -> Message.builder() - .type(Message.Type.TELEPORT_REQUEST) - .payload(Payload.withTeleportRequest(request)) - .target(Message.TARGET_ALL) + .type(Message.MessageType.TELEPORT_REQUEST) + .payload(Payload.teleportRequest(request)) + .target(Message.TARGET_ALL, Message.TargetType.PLAYER) .build().send(b, requester)); } @@ -168,9 +168,9 @@ public void sendTeleportRequest(@NotNull OnlineUser requester, @NotNull String t plugin.fireEvent( plugin.getSendTeleportRequestEvent(requester, request), (event -> plugin.getBroker().ifPresent(b -> Message.builder() - .type(Message.Type.TELEPORT_REQUEST) - .payload(Payload.withTeleportRequest(request)) - .target(targetUser) + .type(Message.MessageType.TELEPORT_REQUEST) + .payload(Payload.teleportRequest(request)) + .target(targetUser, Message.TargetType.PLAYER) .build().send(b, requester))) ); return; @@ -303,11 +303,10 @@ private void handleRequestResponse(@NotNull TeleportRequest request, @NotNull On handleLocalRequestResponse(localRequester.get(), request); } else if (plugin.getSettings().getCrossServer().isEnabled()) { plugin.getBroker().ifPresent(b -> Message.builder() - .type(Message.Type.TELEPORT_REQUEST_RESPONSE) - .payload(Payload.withTeleportRequest(request)) - .target(request.getRequesterName()) - .build() - .send(b, recipient)); + .type(Message.MessageType.TELEPORT_REQUEST_RESPONSE) + .payload(Payload.teleportRequest(request)) + .target(request.getRequesterName(), Message.TargetType.PLAYER) + .build().send(b, recipient)); } else { plugin.getLocales().getLocale("error_teleport_request_sender_not_online") .ifPresent(recipient::sendMessage); diff --git a/common/src/main/java/net/william278/huskhomes/manager/WarpsManager.java b/common/src/main/java/net/william278/huskhomes/manager/WarpsManager.java index de9fb772..16e137f0 100644 --- a/common/src/main/java/net/william278/huskhomes/manager/WarpsManager.java +++ b/common/src/main/java/net/william278/huskhomes/manager/WarpsManager.java @@ -80,10 +80,9 @@ public void unCacheWarp(@NotNull UUID warpId, boolean propagate) { private void propagateCacheUpdate(@NotNull UUID warpId) { plugin.getOnlineUsers().stream().findAny().ifPresent(user -> plugin.getBroker() .ifPresent(b -> Message.builder() - .type(Message.Type.UPDATE_WARP) - .scope(Message.Scope.SERVER) - .target(Message.TARGET_ALL) - .payload(Payload.withString(warpId.toString())) + .type(Message.MessageType.UPDATE_WARP) + .target(Message.TARGET_ALL, Message.TargetType.SERVER) + .payload(Payload.string(warpId.toString())) .build().send(b, user))); } diff --git a/common/src/main/java/net/william278/huskhomes/network/Broker.java b/common/src/main/java/net/william278/huskhomes/network/Broker.java index 707d8587..b4bc3503 100644 --- a/common/src/main/java/net/william278/huskhomes/network/Broker.java +++ b/common/src/main/java/net/william278/huskhomes/network/Broker.java @@ -19,36 +19,22 @@ package net.william278.huskhomes.network; -import net.kyori.adventure.key.InvalidKeyException; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; import net.william278.huskhomes.HuskHomes; -import net.william278.huskhomes.position.Home; -import net.william278.huskhomes.position.Warp; -import net.william278.huskhomes.position.World; -import net.william278.huskhomes.teleport.Teleport; -import net.william278.huskhomes.teleport.TeleportBuilder; import net.william278.huskhomes.user.OnlineUser; -import net.william278.huskhomes.user.User; -import net.william278.huskhomes.util.TransactionResolver; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; -import java.util.Locale; -import java.util.Optional; -import java.util.UUID; import java.util.logging.Level; -public abstract class Broker { +@Getter +@AllArgsConstructor(access = AccessLevel.PROTECTED) +public abstract class Broker implements MessageHandler { protected final HuskHomes plugin; - /** - * Create a new broker. - * - * @param plugin the HuskHomes plugin instance - */ - protected Broker(@NotNull HuskHomes plugin) { - this.plugin = plugin; - } - /** * Handle an inbound {@link Message}. * @@ -60,195 +46,89 @@ protected void handle(@NotNull OnlineUser receiver, @NotNull Message message) { return; } switch (message.getType()) { - case TELEPORT_TO_POSITION -> message.getPayload() - .getPosition().ifPresent(position -> Teleport.builder(plugin) - .teleporter(receiver) - .target(position) - .toTeleport() - .complete()); - case TELEPORT_TO_NETWORKED_POSITION -> Message.builder() - .type(Message.Type.TELEPORT_TO_POSITION) - .target(message.getSender()) - .payload(Payload.withPosition(receiver.getPosition())) - .build().send(this, receiver); - case TELEPORT_TO_NETWORKED_USER -> message.getPayload() - .getString().ifPresent(target -> Message.builder() - .type(Message.Type.TELEPORT_TO_NETWORKED_POSITION) - .target(target) - .build().send(this, receiver)); - case TELEPORT_REQUEST -> message.getPayload() - .getTeleportRequest() - .ifPresent(teleportRequest -> plugin.getManager().requests() - .sendLocalTeleportRequest(teleportRequest, receiver)); - case TELEPORT_REQUEST_RESPONSE -> message.getPayload() - .getTeleportRequest() - .ifPresent(teleportRequest -> plugin.getManager().requests() - .handleLocalRequestResponse(receiver, teleportRequest)); - case REQUEST_PLAYER_LIST -> Message.builder() - .type(Message.Type.PLAYER_LIST) - .scope(Message.Scope.SERVER) - .target(message.getSourceServer()) - .payload(Payload.withUserList(plugin.getOnlineUsers().stream().map(u -> (User) u).toList())) - .build().send(this, receiver); - case PLAYER_LIST -> message.getPayload() - .getUserList() - .ifPresent(players -> plugin.setUserList(message.getSourceServer(), players)); - case UPDATE_HOME -> message.getPayload().getString() - .map(UUID::fromString) - .ifPresent(homeId -> { - final Optional optionalHome = plugin.getDatabase().getHome(homeId); - if (optionalHome.isPresent()) { - plugin.getManager().homes().cacheHome(optionalHome.get(), false); - } else { - plugin.getManager().homes().unCacheHome(homeId, false); - } - }); - case UPDATE_WARP -> message.getPayload().getString() - .map(UUID::fromString) - .ifPresent(warpId -> { - final Optional optionalWarp = plugin.getDatabase().getWarp(warpId); - if (optionalWarp.isPresent()) { - plugin.getManager().warps().cacheWarp(optionalWarp.get(), false); - } else { - plugin.getManager().warps().unCacheWarp(warpId, false); - } - }); - case UPDATE_CACHES -> { - plugin.getManager().homes().updatePublicHomeCache(); - plugin.getManager().warps().updateWarpCache(); - } - case RTP_LOCATION -> message.getPayload() - .getRTPResponse() - .ifPresentOrElse(response -> { - final TeleportBuilder builder = Teleport.builder(plugin) - .teleporter(receiver) - .actions(TransactionResolver.Action.RANDOM_TELEPORT) - .target(response.getPosition()); - builder.buildAndComplete(true); - }, () -> plugin.getLocales().getLocale("error_rtp_randomization_timeout") - .ifPresent(receiver::sendMessage)); - default -> throw new IllegalStateException("Unexpected value: " + message.getType()); + case REQUEST_USER_LIST -> handleRequestUserList(message, receiver); + case UPDATE_USER_LIST -> handleUpdateUserList(message); + case TELEPORT_TO_POSITION -> handleTeleportToPosition(message, receiver); + case TELEPORT_TO_NETWORKED_POSITION -> handleTeleportToNetworkedPosition(message, receiver); + case TELEPORT_TO_NETWORKED_USER -> handleTeleportToNetworkedUser(message, receiver); + case TELEPORT_REQUEST -> handleTeleportRequest(message, receiver); + case TELEPORT_REQUEST_RESPONSE -> handleTeleportRequestResponse(message, receiver); + case UPDATE_HOME -> handleUpdateHome(message, receiver); + case UPDATE_WARP -> handleUpdateWarp(message, receiver); + case UPDATE_CACHES -> handleUpdateCaches(); + case RTP_LOCATION -> handleRtpLocation(message, receiver); + default -> plugin.log(Level.SEVERE, "Received unknown message type: " + message.getType()); } } /** - * Separate handler for RTP Request because it doesn't need a receiver to handle it. - * - * @param message the message to handle - */ - protected void handleRTPRequest(@NotNull Message message) { - if (message.getSourceServer().equals(getServer())) { - return; - } - - message.getPayload() - .getRTPRequest() - .ifPresent((request) -> { - Optional world = plugin.getWorlds().stream() - .filter(w -> w.getName().equals(request.getWorldName())).findFirst(); - if (world.isEmpty()) { - plugin.log(Level.SEVERE, "%s requested a position in a world we don't have! World: %s" - .formatted(message.getSourceServer(), request.getWorldName())); - plugin.getBroker().ifPresent(b -> Message.builder() - .type(Message.Type.RTP_LOCATION) - .target(request.getUsername()) - .payload(Payload.empty()) - .build().send(b, request.getUsername())); - return; - } - plugin.getRandomTeleportEngine().getRandomPosition(world.get(), null) - .thenAccept((position) -> { - final Message.Builder builder = Message.builder() - .type(Message.Type.RTP_LOCATION) - .target(request.getUsername()); - if (position.isEmpty()) { - builder.payload(Payload.empty()); - } else { - builder.payload(Payload.withRTPResponse( - Payload.RTPResponse.of(request.getUsername(), position.get()))); - } - plugin.getBroker().ifPresent(b -> builder.build().send(b, request.getUsername())); - }); - }); - } - - /** - * Initialize the message broker. + * Initialize the message broker * * @throws RuntimeException if the broker fails to initialize */ - public abstract void initialize() throws IllegalStateException; + public abstract void initialize() throws RuntimeException; /** - * Send a message to the broker. + * Send a message to the broker * * @param message the message to send * @param sender the sender of the message */ - protected abstract void send(@NotNull Message message, @NotNull OnlineUser sender); + protected abstract void send(@NotNull Message message, @Nullable OnlineUser sender); /** - * Send a message to the broker. (For Redis Only) - * - * @param message the message to send + * Terminate the broker */ - protected abstract void send(@NotNull Message message); + public abstract void close(); /** - * Move an {@link OnlineUser} to a new server on the proxy network. + * Get the sub-channel ID for broker communications * - * @param user the user to move - * @param server the server to move the user to + * @return the sub-channel ID + * @since 1.0 */ - public abstract void changeServer(@NotNull OnlineUser user, @NotNull String server); + @NotNull + protected String getSubChannelId() { + return plugin.getKey(plugin.getSettings().getCrossServer().getClusterId(), getFormattedVersion()).asString(); + } /** - * Terminate the broker. + * Return the server name + * + * @return the server name + * @since 1.0 */ - public abstract void close(); + protected String getServer() { + return plugin.getServerName(); + } - // Get the formatted channel ID for the broker + // Returns the formatted version of the plugin (format: x.x) @NotNull - protected String getSubChannelId() { - final String version = String.format( - "%s.%s", - plugin.getPluginVersion().getMajor(), - plugin.getPluginVersion().getMinor() - ); - try { - return plugin.getKey( - plugin.getSettings().getCrossServer().getClusterId().toLowerCase(Locale.ENGLISH), - version - ).asString(); - } catch (InvalidKeyException e) { - plugin.log(Level.SEVERE, "Cluster ID specified in config contains invalid characters"); - } - return plugin.getKey("main", version).asString(); + private String getFormattedVersion() { + return String.format("%s.%s", plugin.getPluginVersion().getMajor(), plugin.getPluginVersion().getMinor()); } - // Get the name of this server + // Return this broker instance @NotNull - protected String getServer() { - return plugin.getServerName(); + @Override + public Broker getBroker() { + return this; } /** - * Identifies types of message brokers. + * Identifies types of message brokers */ + @Getter public enum Type { PLUGIN_MESSAGE("Plugin Messages"), REDIS("Redis"); + @NotNull private final String displayName; Type(@NotNull String displayName) { this.displayName = displayName; } - - @NotNull - public String getDisplayName() { - return displayName; - } } } \ No newline at end of file diff --git a/common/src/main/java/net/william278/huskhomes/network/Message.java b/common/src/main/java/net/william278/huskhomes/network/Message.java index 96e3b3c0..5ac5cb4c 100644 --- a/common/src/main/java/net/william278/huskhomes/network/Message.java +++ b/common/src/main/java/net/william278/huskhomes/network/Message.java @@ -21,126 +21,79 @@ import com.google.gson.annotations.Expose; import com.google.gson.annotations.SerializedName; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; import net.william278.huskhomes.user.OnlineUser; import org.jetbrains.annotations.NotNull; - -import java.util.UUID; +import org.jetbrains.annotations.Nullable; /** * Represents a message sent by a {@link Broker} cross-server. See {@link #builder()} for * a builder to create a message. */ +@Getter +@NoArgsConstructor public class Message { - /** - * Message target indicating all players. - */ public static final String TARGET_ALL = "ALL"; + @NotNull @Expose - private UUID id; - @Expose - private Type type; + private MessageType type; + @NotNull @Expose - private Scope scope; + @SerializedName("target_type") + private TargetType targetType; + @NotNull @Expose private String target; + @NotNull @Expose private Payload payload; + @NotNull @Expose private String sender; + @NotNull @Expose @SerializedName("source_server") private String sourceServer; - private Message(@NotNull Type type, @NotNull Scope scope, @NotNull String target, @NotNull Payload payload) { + private Message(@NotNull MessageType type, @NotNull String target, @NotNull TargetType targetType, + @NotNull Payload payload) { this.type = type; - this.scope = scope; this.target = target; + this.targetType = targetType; this.payload = payload; - this.id = UUID.randomUUID(); - } - - @SuppressWarnings("unused") - private Message() { } + @NotNull public static Builder builder() { return new Builder(); } - public void send(@NotNull Broker broker, @NotNull OnlineUser sender) { - this.sender = sender.getName(); + public void send(@NotNull Broker broker, @Nullable OnlineUser sender) { + this.sender = sender != null ? sender.getName() : broker.getServer(); this.sourceServer = broker.getServer(); broker.send(this, sender); } - public void send(@NotNull Broker broker, @NotNull String sender) { - this.sender = sender; - this.sourceServer = broker.getServer(); - broker.send(this); - } - - - @NotNull - public Type getType() { - return type; - } - - @NotNull - public Scope getScope() { - return scope; - } - - @NotNull - public String getTarget() { - return target; - } - - @NotNull - public Payload getPayload() { - return payload; - } - - @NotNull - public String getSender() { - return sender; - } - - @NotNull - public String getSourceServer() { - return sourceServer; - } - - @NotNull - public UUID getUuid() { - return id; - } - /** - * Builder for {@link Message}s. + * Builder for {@link Message}s */ + @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class Builder { - private Type type; - private Scope scope = Scope.PLAYER; + private MessageType type; private Payload payload = Payload.empty(); + private TargetType targetType = TargetType.PLAYER; private String target; - private Builder() { - } - @NotNull - public Builder type(@NotNull Type type) { + public Builder type(@NotNull MessageType type) { this.type = type; return this; } - @NotNull - public Builder scope(@NotNull Scope scope) { - this.scope = scope; - return this; - } - @NotNull public Builder payload(@NotNull Payload payload) { this.payload = payload; @@ -148,55 +101,34 @@ public Builder payload(@NotNull Payload payload) { } @NotNull - public Builder target(@NotNull String target) { + public Builder target(@NotNull String target, @NotNull TargetType targetType) { this.target = target; + this.targetType = targetType; return this; } @NotNull public Message build() { - if (type == null) { - throw new IllegalStateException("Message type must be set"); - } - if (target == null) { - throw new IllegalStateException("Message target must be set"); + if (target == null || type == null) { + throw new IllegalStateException("Message not fully built. Type: " + type + ", Target: " + target); } - return new Message(type, scope, target, payload); + return new Message(type, target, targetType, payload); } } /** - * Different types of cross-server messages. + * Type of targets messages can be sent to + * + * @since 4.8 */ - public enum Type { - TELEPORT_TO_POSITION, - TELEPORT_TO_NETWORKED_POSITION, - TELEPORT_REQUEST, - TELEPORT_TO_NETWORKED_USER, - TELEPORT_REQUEST_RESPONSE, - REQUEST_PLAYER_LIST, - PLAYER_LIST, - UPDATE_HOME, - UPDATE_WARP, - UPDATE_CACHES, - REQUEST_RTP_LOCATION, - RTP_LOCATION, - } - - public enum Scope { - /** - * The target is a server name, or "all" to indicate all servers. - */ + public enum TargetType { SERVER("Forward"), - /** - * The target is a player name, or "all" to indicate all players. - */ PLAYER("ForwardToPlayer"); private final String pluginMessageChannel; - Scope(@NotNull String pluginMessageChannel) { + TargetType(@NotNull String pluginMessageChannel) { this.pluginMessageChannel = pluginMessageChannel; } @@ -206,4 +138,24 @@ public String getPluginMessageChannel() { } } + /** + * Different types of cross-server messages + * + * @since 4.8 + */ + public enum MessageType { + TELEPORT_TO_POSITION, + TELEPORT_TO_NETWORKED_POSITION, + TELEPORT_REQUEST, + TELEPORT_TO_NETWORKED_USER, + TELEPORT_REQUEST_RESPONSE, + REQUEST_USER_LIST, + UPDATE_USER_LIST, + UPDATE_HOME, + UPDATE_WARP, + UPDATE_CACHES, + REQUEST_RTP_LOCATION, + RTP_LOCATION, + } + } \ No newline at end of file diff --git a/common/src/main/java/net/william278/huskhomes/network/MessageHandler.java b/common/src/main/java/net/william278/huskhomes/network/MessageHandler.java new file mode 100644 index 00000000..2b90353a --- /dev/null +++ b/common/src/main/java/net/william278/huskhomes/network/MessageHandler.java @@ -0,0 +1,166 @@ +/* + * This file is part of HuskHomes, licensed under the Apache License 2.0. + * + * Copyright (c) William278 + * Copyright (c) contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.william278.huskhomes.network; + +import net.william278.huskhomes.HuskHomes; +import net.william278.huskhomes.position.Home; +import net.william278.huskhomes.position.Position; +import net.william278.huskhomes.position.Warp; +import net.william278.huskhomes.position.World; +import net.william278.huskhomes.teleport.Teleport; +import net.william278.huskhomes.user.OnlineUser; +import net.william278.huskhomes.user.User; +import net.william278.huskhomes.util.TransactionResolver; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +public interface MessageHandler { + + // Handle inbound user list requests + default void handleRequestUserList(@NotNull Message message, @Nullable OnlineUser receiver) { + if (receiver == null) { + return; + } + + Message.builder() + .type(Message.MessageType.UPDATE_USER_LIST) + .payload(Payload.userList(getPlugin().getOnlineUsers().stream().map(online -> (User) online).toList())) + .target(message.getSourceServer(), Message.TargetType.SERVER).build() + .send(getBroker(), receiver); + } + + // Handle inbound user list updates (returned from requests) + default void handleUpdateUserList(@NotNull Message message) { + message.getPayload().getUserList().ifPresent( + (players) -> getPlugin().setUserList(message.getSourceServer(), players) + ); + } + + default void handleTeleportToPosition(@NotNull Message message, @NotNull OnlineUser receiver) { + message.getPayload().getPosition().ifPresent( + (position) -> Teleport.builder(getPlugin()) + .teleporter(receiver) + .target(position) + .toTeleport() + .complete() + ); + } + + default void handleTeleportToNetworkedPosition(@NotNull Message message, @NotNull OnlineUser receiver) { + Message.builder() + .type(Message.MessageType.TELEPORT_TO_POSITION) + .target(message.getSender(), Message.TargetType.PLAYER) + .payload(Payload.position(receiver.getPosition())) + .build().send(getBroker(), receiver); + } + + default void handleTeleportToNetworkedUser(@NotNull Message message, @NotNull OnlineUser receiver) { + message.getPayload().getString().ifPresent( + (target) -> Message.builder() + .type(Message.MessageType.TELEPORT_TO_NETWORKED_POSITION) + .target(target, Message.TargetType.PLAYER) + .build().send(getBroker(), receiver) + ); + } + + default void handleTeleportRequest(@NotNull Message message, @NotNull OnlineUser receiver) { + message.getPayload().getTeleportRequest().ifPresent( + (request) -> getPlugin().getManager().requests() + .sendLocalTeleportRequest(request, receiver) + ); + } + + default void handleTeleportRequestResponse(@NotNull Message message, @NotNull OnlineUser receiver) { + message.getPayload().getTeleportRequest().ifPresent( + (request) -> getPlugin().getManager().requests().handleLocalRequestResponse(receiver, request) + ); + } + + default void handleUpdateHome(@NotNull Message message, @NotNull OnlineUser receiver) { + message.getPayload().getString() + .map(UUID::fromString) + .ifPresent(id -> { + final Optional optionalHome = getPlugin().getDatabase().getHome(id); + if (optionalHome.isPresent()) { + getPlugin().getManager().homes().cacheHome(optionalHome.get(), false); + } else { + getPlugin().getPlugin().getManager().homes().unCacheHome(id, false); + } + }); + } + + default void handleUpdateWarp(@NotNull Message message, @NotNull OnlineUser receiver) { + message.getPayload().getString() + .map(UUID::fromString) + .ifPresent(warpId -> { + final Optional optionalWarp = getPlugin().getDatabase().getWarp(warpId); + if (optionalWarp.isPresent()) { + getPlugin().getManager().warps().cacheWarp(optionalWarp.get(), false); + } else { + getPlugin().getManager().warps().unCacheWarp(warpId, false); + } + }); + } + + default void handleRtpRequestLocation(@NotNull Message message) { + final Optional requested = message.getPayload().getString().flatMap( + name -> getPlugin().getWorlds().stream().filter(w -> w.getName().equalsIgnoreCase(name)).findFirst()); + requested.map(world -> getPlugin().getRandomTeleportEngine().getRandomPosition(world, new String[0])) + .orElse(CompletableFuture.completedFuture(Optional.empty())) + .thenAccept( + (teleport) -> Message.builder() + .target(message.getSender(), Message.TargetType.PLAYER) + .payload(Payload.position(teleport.orElse(null))) + .build().send(getBroker(), null) + ); + + } + + default void handleRtpLocation(@NotNull Message message, @NotNull OnlineUser receiver) { + final Optional position = message.getPayload().getPosition(); + if (position.isEmpty()) { + getPlugin().getLocales().getLocale("error_rtp_randomization_timeout") + .ifPresent(receiver::sendMessage); + return; + } + + Teleport.builder(getPlugin()) + .teleporter(receiver) + .actions(TransactionResolver.Action.RANDOM_TELEPORT) + .target(position.get()) + .buildAndComplete(true); + } + + default void handleUpdateCaches() { + getPlugin().getManager().homes().updatePublicHomeCache(); + getPlugin().getManager().warps().updateWarpCache(); + } + + @NotNull + Broker getBroker(); + + @NotNull + HuskHomes getPlugin(); + +} diff --git a/common/src/main/java/net/william278/huskhomes/network/Payload.java b/common/src/main/java/net/william278/huskhomes/network/Payload.java index abd8f173..5d9b22c8 100644 --- a/common/src/main/java/net/william278/huskhomes/network/Payload.java +++ b/common/src/main/java/net/william278/huskhomes/network/Payload.java @@ -22,7 +22,6 @@ import com.google.gson.annotations.Expose; import com.google.gson.annotations.SerializedName; import lombok.NoArgsConstructor; -import lombok.Value; import net.william278.huskhomes.position.Position; import net.william278.huskhomes.position.World; import net.william278.huskhomes.teleport.TeleportRequest; @@ -39,207 +38,81 @@ @NoArgsConstructor public class Payload { + @Nullable + @Expose + private String string; @Nullable @Expose private Position position; - @Nullable @Expose private World world; - @Nullable @Expose - @SerializedName("teleport_request") private TeleportRequest teleportRequest; - - @Nullable - @Expose - @SerializedName("rtp_response") - private RTPResponse rtpResponse; - - @Nullable - @Expose - @SerializedName("rtp_request") - private RTPRequest rtpRequest; - - @Nullable - @Expose - private String string; - - @Nullable - @Expose - @SerializedName("string_list") - private List stringList; - @Nullable @Expose @SerializedName("user_list") private List userList; - /** - * Returns an empty cross-server message payload. - * - * @return an empty payload - */ @NotNull public static Payload empty() { return new Payload(); } - /** - * Returns a payload containing a {@link Position}. - * - * @param position the position to send - * @return a payload containing the position - */ - @NotNull - public static Payload withPosition(@NotNull Position position) { - final Payload payload = new Payload(); - payload.position = position; - return payload; - } - - /** - * Returns a payload containing a {@link World}. - * - * @param world the world to send - * @return a payload containing the world - */ @NotNull - public static Payload withWorld(@NotNull World world) { + public static Payload string(@Nullable String target) { final Payload payload = new Payload(); - payload.world = world; + payload.string = target; return payload; } - /** - * Returns a payload containing a {@link TeleportRequest}. - * - * @param teleportRequest the teleport to send - * @return a payload containing the teleport request - */ @NotNull - public static Payload withTeleportRequest(@NotNull TeleportRequest teleportRequest) { + public static Payload position(@Nullable Position position) { final Payload payload = new Payload(); - payload.teleportRequest = teleportRequest; + payload.position = position; return payload; } - /** - * A string field. - */ @NotNull - public static Payload withString(@NotNull String target) { + public static Payload world(@Nullable World world) { final Payload payload = new Payload(); - payload.string = target; + payload.world = world; return payload; } - /** - * A string list field. - */ @NotNull - public static Payload withStringList(@NotNull List target) { + public static Payload teleportRequest(@Nullable TeleportRequest teleportRequest) { final Payload payload = new Payload(); - payload.stringList = target; + payload.teleportRequest = teleportRequest; return payload; } - /** - * A usr list field. - */ @NotNull - public static Payload withUserList(@NotNull List target) { + public static Payload userList(@Nullable List target) { final Payload payload = new Payload(); payload.userList = target; return payload; } - /** - * An RTP Response field. - */ - @NotNull - public static Payload withRTPResponse(@NotNull RTPResponse rtpResponse) { - final Payload payload = new Payload(); - payload.rtpResponse = rtpResponse; - return payload; - } - - /** - * An RTP Request field. - */ - @NotNull - public static Payload withRTPRequest(@NotNull RTPRequest rtpRequest) { - final Payload payload = new Payload(); - payload.rtpRequest = rtpRequest; - return payload; + public Optional getString() { + return Optional.ofNullable(string); } - /** - * A position field. - */ public Optional getPosition() { return Optional.ofNullable(position); } - /** - * A world field. - */ public Optional getWorld() { return Optional.ofNullable(world); } - /** - * A teleport request field. - */ public Optional getTeleportRequest() { return Optional.ofNullable(teleportRequest); } - /** - * A string field. - */ - public Optional getString() { - return Optional.ofNullable(string); - } - - /** - * A string list field. - */ - public Optional> getStringList() { - return Optional.ofNullable(stringList); - } - - /** - * A string list field. - */ public Optional> getUserList() { return Optional.ofNullable(userList); } - /** - * An RTP response. - */ - public Optional getRTPResponse() { - return Optional.ofNullable(rtpResponse); - } - - /** - * An RTP request. - */ - public Optional getRTPRequest() { - return Optional.ofNullable(rtpRequest); - } - - @Value(staticConstructor = "of") - public static class RTPResponse { - @Expose String username; - @Expose Position position; - } - - @Value(staticConstructor = "of") - public static class RTPRequest { - @Expose String username; - @Expose String worldName; - } } diff --git a/common/src/main/java/net/william278/huskhomes/network/PluginMessageBroker.java b/common/src/main/java/net/william278/huskhomes/network/PluginMessageBroker.java index 1069fb1d..75bee9dc 100644 --- a/common/src/main/java/net/william278/huskhomes/network/PluginMessageBroker.java +++ b/common/src/main/java/net/william278/huskhomes/network/PluginMessageBroker.java @@ -19,12 +19,14 @@ package net.william278.huskhomes.network; +import com.google.common.base.Preconditions; import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; import net.william278.huskhomes.HuskHomes; import net.william278.huskhomes.user.OnlineUser; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.io.*; import java.util.logging.Level; @@ -75,9 +77,11 @@ public final void onReceive(@NotNull String channel, @NotNull OnlineUser user, b } @Override - protected void send(@NotNull Message message, @NotNull OnlineUser sender) { + protected void send(@NotNull Message message, @Nullable OnlineUser sender) { + Preconditions.checkNotNull(sender, "Sender cannot be null with a Plugin Message broker"); + final ByteArrayDataOutput messageWriter = ByteStreams.newDataOutput(); - messageWriter.writeUTF(message.getScope().getPluginMessageChannel()); + messageWriter.writeUTF(message.getTargetType().getPluginMessageChannel()); messageWriter.writeUTF(message.getTarget()); messageWriter.writeUTF(getSubChannelId()); @@ -96,17 +100,6 @@ protected void send(@NotNull Message message, @NotNull OnlineUser sender) { sender.sendPluginMessage(messageWriter.toByteArray()); } - /** - * Send a message to the broker. (For Redis Only) - * - * @param message the message to send - */ - @Override - protected void send(@NotNull Message message) { - throw new IllegalStateException("Tried to send a plugin message without a sender!"); - } - - @Override public void changeServer(@NotNull OnlineUser user, @NotNull String server) { user.dismount().thenRun(() -> { final ByteArrayDataOutput outputStream = ByteStreams.newDataOutput(); diff --git a/common/src/main/java/net/william278/huskhomes/network/RedisBroker.java b/common/src/main/java/net/william278/huskhomes/network/RedisBroker.java index 925a1e81..dc652ae7 100644 --- a/common/src/main/java/net/william278/huskhomes/network/RedisBroker.java +++ b/common/src/main/java/net/william278/huskhomes/network/RedisBroker.java @@ -24,6 +24,7 @@ import net.william278.huskhomes.user.OnlineUser; import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import redis.clients.jedis.*; import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.util.Pool; @@ -58,7 +59,7 @@ public void initialize() throws IllegalStateException { jedisPool.getResource().ping(); } catch (JedisException e) { throw new IllegalStateException("Failed to establish connection with Redis. " - + "Please check the supplied credentials in the config file", e); + + "Please check the supplied credentials in the config file", e); } // Subscribe using a thread (rather than a task) @@ -98,12 +99,7 @@ private static Pool getJedisPool(@NotNull RedisSettings settings) { } @Override - protected void send(@NotNull Message message, @NotNull OnlineUser sender) { - plugin.runAsync(() -> subscriber.send(message)); - } - - @Override - protected void send(@NotNull Message message) { + protected void send(@NotNull Message message, @Nullable OnlineUser sender) { plugin.runAsync(() -> subscriber.send(message)); } @@ -206,21 +202,21 @@ public void onMessage(@NotNull String channel, @NotNull String encoded) { return; } - if (message.getType() == Message.Type.REQUEST_RTP_LOCATION) { - broker.handleRTPRequest(message); + if (message.getType() == Message.MessageType.REQUEST_RTP_LOCATION) { + broker.handleRtpRequestLocation(message); return; } - if (message.getScope() == Message.Scope.PLAYER) { + if (message.getTargetType() == Message.TargetType.PLAYER) { broker.plugin.getOnlineUsers().stream() .filter(online -> message.getTarget().equals(Message.TARGET_ALL) - || online.getName().equals(message.getTarget())) + || online.getName().equals(message.getTarget())) .forEach(receiver -> broker.handle(receiver, message)); return; } if (message.getTarget().equals(broker.plugin.getServerName()) - || message.getTarget().equals(Message.TARGET_ALL)) { + || message.getTarget().equals(Message.TARGET_ALL)) { broker.plugin.getOnlineUsers().stream() .findAny() .ifPresent(receiver -> broker.handle(receiver, message)); diff --git a/common/src/main/java/net/william278/huskhomes/teleport/Teleport.java b/common/src/main/java/net/william278/huskhomes/teleport/Teleport.java index caffde61..a257debb 100644 --- a/common/src/main/java/net/william278/huskhomes/teleport/Teleport.java +++ b/common/src/main/java/net/william278/huskhomes/teleport/Teleport.java @@ -28,6 +28,7 @@ import net.william278.huskhomes.event.ITeleportEvent; import net.william278.huskhomes.network.Message; import net.william278.huskhomes.network.Payload; +import net.william278.huskhomes.network.PluginMessageBroker; import net.william278.huskhomes.position.Position; import net.william278.huskhomes.user.OnlineUser; import net.william278.huskhomes.util.TransactionResolver; @@ -106,8 +107,8 @@ private void executeLocal(@NotNull OnlineUser teleporter) throws TeleportationEx fireEvent((event) -> { performTransactions(); plugin.getBroker().ifPresent(b -> Message.builder() - .type(Message.Type.TELEPORT_TO_NETWORKED_POSITION) - .target(username.name()) + .type(Message.MessageType.TELEPORT_TO_NETWORKED_POSITION) + .target(username.name(), Message.TargetType.PLAYER) .build().send(b, executor)); }); return; @@ -132,7 +133,7 @@ private void executeLocal(@NotNull OnlineUser teleporter) throws TeleportationEx } plugin.getDatabase().setCurrentTeleport(teleporter, this); - plugin.getBroker().ifPresent(b -> b.changeServer(teleporter, target.getServer())); + plugin.getBroker().ifPresent(b -> ((PluginMessageBroker) b).changeServer(teleporter, target.getServer())); }); } @@ -146,17 +147,17 @@ private void executeRemote() throws TeleportationException { performTransactions(); if (target instanceof Username username) { Message.builder() - .type(Message.Type.TELEPORT_TO_NETWORKED_USER) - .target(teleporter.name()) - .payload(Payload.withString(username.name())) + .type(Message.MessageType.TELEPORT_TO_NETWORKED_USER) + .target(teleporter.name(), Message.TargetType.PLAYER) + .payload(Payload.string(username.name())) .build().send(b, executor); return; } Message.builder() - .type(Message.Type.TELEPORT_TO_POSITION) - .target(teleporter.name()) - .payload(Payload.withPosition((Position) target)) + .type(Message.MessageType.TELEPORT_TO_POSITION) + .target(teleporter.name(), Message.TargetType.PLAYER) + .payload(Payload.position((Position) target)) .build().send(b, executor); })); } diff --git a/common/src/test/java/net/william278/huskhomes/network/MessageSerializationTests.java b/common/src/test/java/net/william278/huskhomes/network/MessageSerializationTests.java index 71d22a59..7dd7c0ed 100644 --- a/common/src/test/java/net/william278/huskhomes/network/MessageSerializationTests.java +++ b/common/src/test/java/net/william278/huskhomes/network/MessageSerializationTests.java @@ -39,31 +39,31 @@ public class MessageSerializationTests { private static final List TEST_MESSAGES = List.of( Message.builder() - .type(Message.Type.REQUEST_PLAYER_LIST) + .type(Message.MessageType.REQUEST_USER_LIST) .target("TestTarget") .payload(Payload.empty()) .build(), Message.builder() - .type(Message.Type.TELEPORT_REQUEST) + .type(Message.MessageType.TELEPORT_REQUEST) .target("TestTarget") - .payload(Payload.withPosition( + .payload(Payload.position( Position.at(63.25, 127.43, -32, 180f, -94.3f, World.from("TestWorld", UUID.randomUUID()), "TestServer"))) .build(), Message.builder() - .type(Message.Type.TELEPORT_TO_NETWORKED_USER) + .type(Message.MessageType.TELEPORT_TO_NETWORKED_USER) .target("TestTarget") - .payload(Payload.withString("TestString")) + .payload(Payload.string("TestString")) .build(), Message.builder() - .type(Message.Type.PLAYER_LIST) + .type(Message.MessageType.UPDATE_USER_LIST) .target("TestTarget") .payload(Payload.withStringList(List.of("TestString1", "TestString2", "TestString3"))) .build(), Message.builder() - .type(Message.Type.TELEPORT_TO_POSITION) + .type(Message.MessageType.TELEPORT_TO_POSITION) .target("TestTarget") - .payload(Payload.withPosition( + .payload(Payload.position( Position.at(63.25, 127.43, -32, 180f, -94.3f, World.from("TestWorld", UUID.randomUUID()), "TestServer"))) .build() @@ -80,7 +80,7 @@ public void testMessageSerialization(@NotNull Message message, @SuppressWarnings final Message deserialized = gson.fromJson(serializedMessage, Message.class); Assertions.assertNotNull(deserialized); Assertions.assertEquals(message.getType(), deserialized.getType()); - Assertions.assertEquals(message.getScope(), deserialized.getScope()); + Assertions.assertEquals(message.getTargetType(), deserialized.getTargetType()); Assertions.assertEquals(message.getTarget(), deserialized.getTarget()); Assertions.assertEquals( message.getPayload().getPosition().isPresent(),