diff --git a/common/src/main/java/net/william278/husksync/redis/RedisManager.java b/common/src/main/java/net/william278/husksync/redis/RedisManager.java index a3a4ca95..577045e7 100644 --- a/common/src/main/java/net/william278/husksync/redis/RedisManager.java +++ b/common/src/main/java/net/william278/husksync/redis/RedisManager.java @@ -41,12 +41,16 @@ public class RedisManager extends JedisPubSub { protected static final String KEY_NAMESPACE = "husksync:"; + private static final int RECONNECTION_TIME = 8000; private final HuskSync plugin; private final String clusterId; private Pool jedisPool; private final Map>> pendingRequests; + private boolean enabled; + private boolean reconnected; + public RedisManager(@NotNull HuskSync plugin) { this.plugin = plugin; this.clusterId = plugin.getSettings().getClusterId(); @@ -88,18 +92,53 @@ public void initialize() throws IllegalStateException { } // Subscribe using a thread (rather than a task) + enabled = true; new Thread(this::subscribe, "husksync:redis_subscriber").start(); } @Blocking private void subscribe() { - try (Jedis jedis = jedisPool.getResource()) { - jedis.subscribe( - this, - Arrays.stream(RedisMessage.Type.values()) - .map(type -> type.getMessageChannel(clusterId)) - .toArray(String[]::new) - ); + while (enabled && !Thread.interrupted() && jedisPool != null && !jedisPool.isClosed()) { + try (Jedis jedis = jedisPool.getResource()) { + if (reconnected) { + plugin.log(Level.INFO, "Redis connection is alive again"); + } + // Subscribe channels and lock the thread + jedis.subscribe( + this, + Arrays.stream(RedisMessage.Type.values()) + .map(type -> type.getMessageChannel(clusterId)) + .toArray(String[]::new) + ); + } catch (Throwable t) { + // Thread was unlocked due error + onThreadUnlock(t); + } + } + } + + private void onThreadUnlock(Throwable t) { + if (enabled) { + if (reconnected) { + plugin.log(Level.WARNING, + "Connection to the Redis server was lost. Attempting reconnection in 8 seconds...", t); + } + try { + this.unsubscribe(); + } catch (Throwable ignored) { + // empty catch + } + + // Make an instant subscribe if occurs any error on initialization + if (!reconnected) { + reconnected = true; + } else { + try { + Thread.sleep(RECONNECTION_TIME); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } } @@ -136,6 +175,16 @@ public void onMessage(@NotNull String channel, @NotNull String message) { } } + @Override + public void onSubscribe(String channel, int subscribedChannels) { + plugin.log(Level.INFO, "Redis subscribed to channel '" + channel + "'"); + } + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + plugin.log(Level.INFO, "Redis unsubscribed from channel '" + channel + "'"); + } + @Blocking protected void sendMessage(@NotNull String channel, @NotNull String message) { try (Jedis jedis = jedisPool.getResource()) { @@ -329,6 +378,7 @@ public boolean getUserServerSwitch(@NotNull User user) { @Blocking public void terminate() { + enabled = false; if (jedisPool != null) { if (!jedisPool.isClosed()) { jedisPool.close();