Skip to content

Commit

Permalink
refactor: rework networking system
Browse files Browse the repository at this point in the history
  • Loading branch information
WiIIiam278 committed Oct 31, 2024
1 parent aec6fdf commit 2d71e4a
Show file tree
Hide file tree
Showing 16 changed files with 367 additions and 514 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -795,11 +795,9 @@ public final void randomlyTeleportPlayer(@NotNull OnlineUser user, boolean timed
}

plugin.getBroker().ifPresent(b -> Message.builder()
.scope(Message.Scope.SERVER)
.target(randomServer)
.type(Message.Type.REQUEST_RTP_LOCATION)
.payload(Payload.withStringList(
List.of(user.getPosition().getWorld().getName(), user.getName())))
.target(randomServer, Message.TargetType.SERVER)
.type(Message.MessageType.REQUEST_RTP_LOCATION)
.payload(Payload.world(user.getPosition().getWorld()))
.build().send(b, user));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,9 @@ private void executeRtp(@NotNull OnlineUser teleporter, @NotNull CommandUser exe
}

plugin.getBroker().ifPresent(b -> Message.builder()
.type(Message.Type.REQUEST_RTP_LOCATION)
.scope(Message.Scope.SERVER)
.target(randomServer)
.payload(Payload.withRTPRequest(Payload.RTPRequest.of(teleporter.getName(), world.getName())))
.type(Message.MessageType.REQUEST_RTP_LOCATION)
.target(randomServer, Message.TargetType.SERVER)
.payload(Payload.string(world.getName()))
.build().send(b, teleporter));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public void execute(@NotNull OnlineUser executor, @NotNull String[] args) {
}

plugin.getBroker().ifPresent(b -> Message.builder()
.target(Message.TARGET_ALL)
.type(Message.Type.TELEPORT_TO_POSITION)
.payload(Payload.withPosition(targetPosition))
.target(Message.TARGET_ALL, Message.TargetType.PLAYER)
.type(Message.MessageType.TELEPORT_TO_POSITION)
.payload(Payload.position(targetPosition))
.build().send(b, executor));

plugin.getLocales().getLocale("teleporting_all_players")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ protected final void handlePlayerJoin(@NotNull OnlineUser onlineUser) {
this.handleInboundTeleport(onlineUser);

// Synchronize the global player list
plugin.runSyncDelayed(() -> this.synchronizeGlobalPlayerList(
onlineUser, plugin.getOnlineUsers().stream().map(User::getName).toList()),
onlineUser, 40L
plugin.runSyncDelayed(() -> this.updateUserList(
onlineUser,
plugin.getOnlineUsers().stream().map(u -> (User) u).toList()),
onlineUser,
40L
);

// Request updated player lists from other servers
Expand Down Expand Up @@ -99,34 +101,33 @@ protected final void handlePlayerJoin(@NotNull OnlineUser onlineUser) {
/**
* Handle when a {@link OnlineUser} leaves the server.
*
* @param onlineUser the leaving {@link OnlineUser}
* @param online the leaving {@link OnlineUser}
*/
protected final void handlePlayerLeave(@NotNull OnlineUser onlineUser) {
onlineUser.removeInvulnerabilityIfPermitted();
protected final void handlePlayerLeave(@NotNull OnlineUser online) {
online.removeInvulnerabilityIfPermitted();
plugin.runAsync(() -> {
// Set offline position
plugin.getDatabase().setOfflinePosition(onlineUser, onlineUser.getPosition());
plugin.getDatabase().setOfflinePosition(online, online.getPosition());

// Remove this user's home cache
plugin.getManager().homes().removeUserHomes(onlineUser);
plugin.getManager().homes().removeUserHomes(online);

// Update global lists
if (plugin.getSettings().getCrossServer().isEnabled()) {
final List<String> localPlayerList = plugin.getOnlineUsers().stream().map(User::getName)
.filter(player -> !player.equals(onlineUser.getName())).toList();
final List<User> users = plugin.getOnlineUsers().stream().map(u -> (User) u).toList();
if (plugin.getSettings().getCrossServer().getBrokerType() == Broker.Type.REDIS) {
this.synchronizeGlobalPlayerList(onlineUser, localPlayerList);
this.updateUserList(online, users);
return;
}

plugin.getOnlineUsers().stream()
.filter(user -> !user.equals(onlineUser))
.filter(user -> !user.equals(online))
.findAny()
.ifPresent(player -> this.synchronizeGlobalPlayerList(player, localPlayerList));
.ifPresent(player -> this.updateUserList(player, users));
}

// Remove from user map
plugin.getOnlineUserMap().remove(onlineUser.getUuid());
plugin.getOnlineUserMap().remove(online.getUuid());
});
}

Expand Down Expand Up @@ -192,24 +193,22 @@ private void handleInboundRespawn(@NotNull OnlineUser teleporter) {
plugin.getDatabase().setRespawnPosition(teleporter, bedPosition.orElse(null));
}

// Synchronize the global player list TODO CHECK
private void synchronizeGlobalPlayerList(@NotNull OnlineUser user, @NotNull List<String> localPlayerList) {
// Synchronize the global player list
private void updateUserList(@NotNull OnlineUser user, @NotNull List<User> localPlayerList) {
plugin.getBroker().ifPresent(broker -> {
// Send this server's player list to all servers
Message.builder()
.type(Message.Type.PLAYER_LIST)
.scope(Message.Scope.SERVER)
.target(Message.TARGET_ALL)
.payload(Payload.withStringList(localPlayerList))
.type(Message.MessageType.UPDATE_USER_LIST)
.target(Message.TARGET_ALL, Message.TargetType.SERVER)
.payload(Payload.userList(localPlayerList))
.build().send(broker, user);

// Clear cached global player lists and request updated lists from all servers
if (plugin.getOnlineUsers().size() == 1) {
plugin.getGlobalUserList().clear();
Message.builder()
.type(Message.Type.REQUEST_PLAYER_LIST)
.scope(Message.Scope.SERVER)
.target(Message.TARGET_ALL)
.type(Message.MessageType.REQUEST_USER_LIST)
.target(Message.TARGET_ALL, Message.TargetType.SERVER)
.build().send(broker, user);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,9 @@ public void unCacheHome(@NotNull UUID homeId, boolean propagate) {
private void propagateCacheUpdate(@NotNull UUID homeId) {
plugin.getBroker().ifPresent(b -> plugin.getOnlineUsers().stream().findAny()
.ifPresent(user -> Message.builder()
.type(Message.Type.UPDATE_HOME)
.scope(Message.Scope.SERVER)
.target(Message.TARGET_ALL)
.payload(Payload.withString(homeId.toString()))
.type(Message.MessageType.UPDATE_HOME)
.target(Message.TARGET_ALL, Message.TargetType.SERVER)
.payload(Payload.string(homeId.toString()))
.build().send(b, user)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ public RequestsManager requests() {
protected void propagateCacheUpdate() {
plugin.getBroker().ifPresent(b -> plugin.getOnlineUsers().stream()
.findAny().ifPresent(user -> Message.builder()
.type(Message.Type.UPDATE_CACHES)
.scope(Message.Scope.SERVER)
.target(Message.TARGET_ALL)
.type(Message.MessageType.UPDATE_CACHES)
.target(Message.TARGET_ALL, Message.TargetType.SERVER)
.build().send(b, user)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ public void sendTeleportAllRequest(@NotNull OnlineUser requester) {
}

plugin.getBroker().ifPresent(b -> Message.builder()
.type(Message.Type.TELEPORT_REQUEST)
.payload(Payload.withTeleportRequest(request))
.target(Message.TARGET_ALL)
.type(Message.MessageType.TELEPORT_REQUEST)
.payload(Payload.teleportRequest(request))
.target(Message.TARGET_ALL, Message.TargetType.PLAYER)
.build().send(b, requester));
}

Expand Down Expand Up @@ -168,9 +168,9 @@ public void sendTeleportRequest(@NotNull OnlineUser requester, @NotNull String t
plugin.fireEvent(
plugin.getSendTeleportRequestEvent(requester, request),
(event -> plugin.getBroker().ifPresent(b -> Message.builder()
.type(Message.Type.TELEPORT_REQUEST)
.payload(Payload.withTeleportRequest(request))
.target(targetUser)
.type(Message.MessageType.TELEPORT_REQUEST)
.payload(Payload.teleportRequest(request))
.target(targetUser, Message.TargetType.PLAYER)
.build().send(b, requester)))
);
return;
Expand Down Expand Up @@ -303,11 +303,10 @@ private void handleRequestResponse(@NotNull TeleportRequest request, @NotNull On
handleLocalRequestResponse(localRequester.get(), request);
} else if (plugin.getSettings().getCrossServer().isEnabled()) {
plugin.getBroker().ifPresent(b -> Message.builder()
.type(Message.Type.TELEPORT_REQUEST_RESPONSE)
.payload(Payload.withTeleportRequest(request))
.target(request.getRequesterName())
.build()
.send(b, recipient));
.type(Message.MessageType.TELEPORT_REQUEST_RESPONSE)
.payload(Payload.teleportRequest(request))
.target(request.getRequesterName(), Message.TargetType.PLAYER)
.build().send(b, recipient));
} else {
plugin.getLocales().getLocale("error_teleport_request_sender_not_online")
.ifPresent(recipient::sendMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ public void unCacheWarp(@NotNull UUID warpId, boolean propagate) {
private void propagateCacheUpdate(@NotNull UUID warpId) {
plugin.getOnlineUsers().stream().findAny().ifPresent(user -> plugin.getBroker()
.ifPresent(b -> Message.builder()
.type(Message.Type.UPDATE_WARP)
.scope(Message.Scope.SERVER)
.target(Message.TARGET_ALL)
.payload(Payload.withString(warpId.toString()))
.type(Message.MessageType.UPDATE_WARP)
.target(Message.TARGET_ALL, Message.TargetType.SERVER)
.payload(Payload.string(warpId.toString()))
.build().send(b, user)));
}

Expand Down
Loading

0 comments on commit 2d71e4a

Please sign in to comment.