Skip to content

Commit

Permalink
refactor: Reconnect to Redis when connection lost (#230)
Browse files Browse the repository at this point in the history
* Add redis reconnection

* Add separated method to handle thread unlock

* Add reconnection time constant
  • Loading branch information
Rubenicos authored Jan 22, 2024
1 parent 6641e11 commit 31920d0
Showing 1 changed file with 57 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@
public class RedisManager extends JedisPubSub {

protected static final String KEY_NAMESPACE = "husksync:";
private static final int RECONNECTION_TIME = 8000;

private final HuskSync plugin;
private final String clusterId;
private Pool<Jedis> jedisPool;
private final Map<UUID, CompletableFuture<Optional<DataSnapshot.Packed>>> pendingRequests;

private boolean enabled;
private boolean reconnected;

public RedisManager(@NotNull HuskSync plugin) {
this.plugin = plugin;
this.clusterId = plugin.getSettings().getClusterId();
Expand Down Expand Up @@ -88,18 +92,53 @@ public void initialize() throws IllegalStateException {
}

// Subscribe using a thread (rather than a task)
enabled = true;
new Thread(this::subscribe, "husksync:redis_subscriber").start();
}

@Blocking
private void subscribe() {
try (Jedis jedis = jedisPool.getResource()) {
jedis.subscribe(
this,
Arrays.stream(RedisMessage.Type.values())
.map(type -> type.getMessageChannel(clusterId))
.toArray(String[]::new)
);
while (enabled && !Thread.interrupted() && jedisPool != null && !jedisPool.isClosed()) {
try (Jedis jedis = jedisPool.getResource()) {
if (reconnected) {
plugin.log(Level.INFO, "Redis connection is alive again");
}
// Subscribe channels and lock the thread
jedis.subscribe(
this,
Arrays.stream(RedisMessage.Type.values())
.map(type -> type.getMessageChannel(clusterId))
.toArray(String[]::new)
);
} catch (Throwable t) {
// Thread was unlocked due error
onThreadUnlock(t);
}
}
}

private void onThreadUnlock(Throwable t) {
if (enabled) {
if (reconnected) {
plugin.log(Level.WARNING,
"Connection to the Redis server was lost. Attempting reconnection in 8 seconds...", t);
}
try {
this.unsubscribe();
} catch (Throwable ignored) {
// empty catch
}

// Make an instant subscribe if occurs any error on initialization
if (!reconnected) {
reconnected = true;
} else {
try {
Thread.sleep(RECONNECTION_TIME);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

Expand Down Expand Up @@ -136,6 +175,16 @@ public void onMessage(@NotNull String channel, @NotNull String message) {
}
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
plugin.log(Level.INFO, "Redis subscribed to channel '" + channel + "'");
}

@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
plugin.log(Level.INFO, "Redis unsubscribed from channel '" + channel + "'");
}

@Blocking
protected void sendMessage(@NotNull String channel, @NotNull String message) {
try (Jedis jedis = jedisPool.getResource()) {
Expand Down Expand Up @@ -329,6 +378,7 @@ public boolean getUserServerSwitch(@NotNull User user) {

@Blocking
public void terminate() {
enabled = false;
if (jedisPool != null) {
if (!jedisPool.isClosed()) {
jedisPool.close();
Expand Down

0 comments on commit 31920d0

Please sign in to comment.