Skip to content

Commit

Permalink
Fix instance network, 1.3.2 for real
Browse files Browse the repository at this point in the history
  • Loading branch information
MATRIX-feather committed Nov 5, 2024
1 parent 09f618f commit 60a6a42
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,14 @@ public void onReload()

private boolean stopAll()
{
if (slaveInstance != null) return slaveInstance.stop();
if (masterInstance != null) return masterInstance.stop();
boolean success = true;

return true;
if (slaveInstance != null)
success = slaveInstance.stop();

if (masterInstance != null)
success = masterInstance.stop() && success;

return success;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ public final class InstanceServer extends WebSocketServer

private final IClientHandler clientHandler;

private void logServerInfo(String message)
{
logger.info("[S@%s] %s".formatted(Integer.toHexString(this.hashCode()), message));
}

private void logServerWarn(String message)
{
logger.warn("[S@%s] %s".formatted(Integer.toHexString(this.hashCode()), message));
}

public InstanceServer(XiaMoJavaPlugin plugin, InetSocketAddress address, IClientHandler iClientHandler)
{
super(address);
Expand All @@ -42,7 +52,7 @@ public List<WebSocket> getConnectedSockets()
@Override
public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake)
{
logger.info("[S] New connection opened: " + webSocket.getRemoteSocketAddress());
logServerInfo("New connection opened: " + webSocket.getRemoteSocketAddress());

connectedSockets.add(webSocket);
}
Expand All @@ -52,7 +62,7 @@ public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake)
@Override
public void stop(int timeout, String closeMessage) throws InterruptedException
{
logger.info("[S] Stopping instance server...");
logServerInfo("Stopping instance server...");
super.stop(timeout, closeMessage);

running = false;
Expand All @@ -61,7 +71,7 @@ public void stop(int timeout, String closeMessage) throws InterruptedException
@Override
public void onClose(WebSocket webSocket, int i, String s, boolean b)
{
logger.info("[S] Connection closed: " + webSocket.getRemoteSocketAddress());
logServerInfo("Connection closed: " + webSocket.getRemoteSocketAddress());

connectedSockets.remove(webSocket);
clientHandler.onConnectionClose(webSocket);
Expand All @@ -74,7 +84,7 @@ public record WsRecord(WebSocket socket, String rawMessage)
@Override
public void onMessage(WebSocket webSocket, String msg)
{
//logger.info("%s :: <- :: '%s'".formatted(webSocket.getRemoteSocketAddress(), msg));
//logServer("%s :: <- :: '%s'".formatted(webSocket.getRemoteSocketAddress(), msg));

clientHandler.onMessage(new WsRecord(webSocket, msg), this);
}
Expand All @@ -85,14 +95,14 @@ public void onError(@Nullable WebSocket webSocket, Exception e)
String socketAddress = "<unknown socket @ %s>".formatted(webSocket);
if (webSocket != null) socketAddress = webSocket.getRemoteSocketAddress().toString();

logger.warn("[S] An error occurred with socket '%s': %s".formatted(socketAddress, e.getMessage()));
logServerWarn("An error occurred with socket '%s': %s".formatted(socketAddress, e.getMessage()));
e.printStackTrace();
}

@Override
public void onStart()
{
logger.info("[S] Master websocket server started on " + this.getAddress().toString());
logServerInfo("Master websocket server started on " + this.getAddress().toString());

clientHandler.onServerStart(this);
running = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ public class MasterInstance extends MorphPluginObject implements IInstanceServic
@Resolved
private MorphConfigManager config;

private void logMasterInfo(String message)
{
logger.info("[Master@%s] %s".formatted(Integer.toHexString(this.hashCode()), message));
}

private void logMasterWarn(String message)
{
logger.warn("[Master@%s] %s".formatted(Integer.toHexString(this.hashCode()), message));
}

/**
* @return Success?
*/
Expand All @@ -51,8 +61,8 @@ private boolean stopServer()
{
if (bindingServer != null)
{
bindingServer.stop(1000, "Master instance shutting down");
bindingServer.dispose();
bindingServer.stop(0, "Master instance shutting down");
}

bindingServer = null;
Expand All @@ -64,7 +74,7 @@ private boolean stopServer()
}
catch (Throwable t)
{
logger.error("[S] Error occurred shutting down socket server: " + t.getMessage());
logMasterWarn("Error occurred shutting down socket server: " + t.getMessage());
t.printStackTrace();

return false;
Expand Down Expand Up @@ -99,7 +109,7 @@ private boolean prepareServer()
}
catch (Throwable t)
{
logger.warn("[S] Error occurred while setting up server:" + t.getMessage());
logMasterWarn("Error occurred while setting up server:" + t.getMessage());
t.printStackTrace();

return false;
Expand Down Expand Up @@ -150,13 +160,13 @@ private void onText(InstanceServer.WsRecord record)
var cmd = registries.createC2SCommand(text[0], text.length == 2 ? text[1] : "");
if (cmd == null)
{
logger.warn("[S] Unknown command: " + text[0]);
logMasterWarn("Unknown command: " + text[0]);
return;
}

if (!(cmd instanceof MIC2SCommand<?> mic2s))
{
logger.warn("[S] Command is not a MIC2S instance!");
logMasterWarn("Command is not a MIC2S instance!");
return;
}

Expand All @@ -183,7 +193,7 @@ private void sendCommand(WebSocket socket, MIS2CCommand<?> command)
{
if (!socket.isOpen())
{
logger.warn("[S] Not sending commands to a closed socket! %s".formatted(socket.getRemoteSocketAddress()));
logMasterWarn("Not sending commands to a closed socket! %s".formatted(socket.getRemoteSocketAddress()));
return;
}

Expand Down Expand Up @@ -228,7 +238,7 @@ public void onLoginCommand(MIC2SLoginCommand cProtocolCommand)
return;
}

logger.info("[S] '%s' is requesting a login".formatted(socket.getRemoteSocketAddress()));
logMasterInfo("'%s' is requesting a login".formatted(socket.getRemoteSocketAddress()));

this.switchState(socket, ProtocolState.LOGIN);

Expand All @@ -237,21 +247,21 @@ public void onLoginCommand(MIC2SLoginCommand cProtocolCommand)

if (!this.level.equals(cProtocolCommand.getVersion()))
{
logger.info("[S] Protocol mismatch! Disconnecting...");
logMasterInfo("Protocol mismatch! Disconnecting...");

this.disconnect(socket, "Protocol mismatch!");
return;
}

if (cProtocolCommand.getSecret() == null || !cProtocolCommand.getSecret().equals(this.secret.get()))
{
logger.info("[S] Invalid secret! Disconnecting...");
logMasterInfo("Invalid secret! Disconnecting...");

disconnect(socket, "Invalid secret '%s'".formatted(cProtocolCommand.getSecret()));
return;
}

logger.info("[S] '%s' logged in".formatted(socket.getRemoteSocketAddress()));
logMasterInfo("'%s' logged in".formatted(socket.getRemoteSocketAddress()));

sendCommand(socket, new MIS2CLoginResultCommand(true));
switchState(socket, ProtocolState.SYNC);
Expand Down Expand Up @@ -287,15 +297,15 @@ public void onDisguiseMetaCommand(MIC2SDisguiseMetaCommand cDisguiseMetaCommand)
var meta = cDisguiseMetaCommand.getMeta();
if (meta == null || !meta.isValid())
{
logger.warn("[S] Bad client implementation? Got invalid meta from '%s'".formatted(socket.getRemoteSocketAddress()));
logMasterWarn("Bad client implementation? Got invalid meta from '%s'".formatted(socket.getRemoteSocketAddress()));
return;
}

var state = getConnectionState(socket);

if (!state.loggedIn())
{
logger.warn("[S] Bad client implementation? They sent meta sync before they login! (%s)".formatted(socket.getRemoteSocketAddress()));
logMasterWarn("Bad client implementation? They sent meta sync before they login! (%s)".formatted(socket.getRemoteSocketAddress()));
return;
}

Expand Down Expand Up @@ -350,8 +360,6 @@ else if (operation == Operation.REMOVE)
@Override
public void onMessage(InstanceServer.WsRecord wsRecord, InstanceServer server)
{
if (!allowedSockets.containsKey(wsRecord.socket())) return;

this.addSchedule(() -> this.onText(wsRecord));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import java.net.ConnectException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class InstanceClient extends WebSocketClient
{
Expand All @@ -33,13 +35,32 @@ private void load()
{
}

private void logClientInfo(String message)
{
logger.info("[C@%s] %s".formatted(Integer.toHexString(this.hashCode()), message));
}

private void logClientWarn(String message)
{
logger.warn("[C@%s] %s".formatted(Integer.toHexString(this.hashCode()), message));
}

private final AtomicBoolean disposed = new AtomicBoolean(false);
public void dispose()
{
this.disposed.set(true);
}

//region WebSocket stuffs

private final AtomicBoolean connectionAlive = new AtomicBoolean(false);

@Override
public void onOpen(ServerHandshake serverHandshake)
{
logger.info("[C] Opened connection to the instance server.");
logClientInfo("Opened connection to the instance server.");
masterHandler.onConnectionOpen();
connectionAlive.set(true);
}

@Override
Expand All @@ -52,24 +73,55 @@ public void onMessage(String msg)
@Override
public void onClose(int code, String reason, boolean isFromRemote)
{
logger.info("[C] Connection closed with code '%s' and reason '%s'".formatted(code, reason));
logClientInfo("Connection closed with code '%s' and reason '%s'".formatted(code, reason));
this.connectionAlive.set(false);

boolean shouldRetry = true;
boolean shouldRetry = !reason.equalsIgnoreCase("NORETRY");

var waitingSecond = 20;
if (shouldRetry)
{
logger.info("[C] Retrying connect after %s seconds...".formatted(waitingSecond));
plugin.schedule(this::reconnect, waitingSecond * 20);
logClientInfo("Retrying connect after %s seconds...".formatted(waitingSecond));

var connectionId = this.connectionId.incrementAndGet();
plugin.schedule(() -> tryReconnect(connectionId), waitingSecond * 20);
}
else
{
logClientInfo("Not reconnecting because either the server or other sources declared NORETRY");
}

masterHandler.onConnectionClose(code);
}

private void tryReconnect(int connectId)
{
if (connectionId.get() != connectId)
{
logClientInfo("Not retrying because another connection is ongoing...");
return;
}

this.reconnect();
}

private final AtomicInteger connectionId = new AtomicInteger(0);

@Override
public void connect()
{
logger.info("[C] Connecting to the instance server...");
if (disposed.get()) return;

logClientInfo("Connecting to the instance server...");

if (this.connectionAlive.get())
{
logClientWarn("Already connected to the server!");
return;
}

connectionId.incrementAndGet();

super.connect();
}

Expand All @@ -82,19 +134,18 @@ public void onError(Exception e)
}
catch (Throwable t)
{
logger.warn("[C] Error occurred invoking onClientError(): " + t.getMessage());
logClientWarn("Error occurred invoking onClientError(): " + t.getMessage());
t.printStackTrace();
}

if (e instanceof ConnectException)
{
logger.info("[C] Can't reach the server, retrying after 30 seconds: " + e.getMessage());
plugin.schedule(this::reconnect, 30 * 20);
logClientInfo("Can't reach the server: " + e.getMessage());

return;
}

logger.error("Unknown error occurred with the client: " + e.getMessage());
logger.error("Unknown error occurred with the client %s: %s".formatted(Integer.toHexString(this.hashCode()), e.getMessage()));
e.printStackTrace();
}

Expand Down
Loading

0 comments on commit 60a6a42

Please sign in to comment.