From a20462768d60a5726ddfa63fc446d3ae561cd8ba Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Fri, 25 Oct 2024 12:11:24 +0800 Subject: [PATCH 01/16] lock service --- src/org/jgroups/raft/blocks/LockService.java | 778 ++++++++++++++++++ .../jgroups/tests/blocks/LockServiceTest.java | 682 +++++++++++++++ 2 files changed, 1460 insertions(+) create mode 100644 src/org/jgroups/raft/blocks/LockService.java create mode 100644 tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java new file mode 100644 index 00000000..9817a94e --- /dev/null +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -0,0 +1,778 @@ +package org.jgroups.raft.blocks; + +import static org.jgroups.raft.blocks.LockService.LockStatus.HOLDING; +import static org.jgroups.raft.blocks.LockService.LockStatus.NONE; +import static org.jgroups.raft.blocks.LockService.LockStatus.WAITING; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import org.jgroups.Address; +import org.jgroups.ChannelListener; +import org.jgroups.Event; +import org.jgroups.JChannel; +import org.jgroups.MergeView; +import org.jgroups.Message; +import org.jgroups.UpHandler; +import org.jgroups.View; +import org.jgroups.logging.Log; +import org.jgroups.logging.LogFactory; +import org.jgroups.protocols.raft.RAFT; +import org.jgroups.protocols.raft.Role; +import org.jgroups.raft.RaftHandle; +import org.jgroups.raft.StateMachine; +import org.jgroups.util.ByteArrayDataInputStream; +import org.jgroups.util.ByteArrayDataOutputStream; +import org.jgroups.util.ExtendedUUID; +import org.jgroups.util.UUID; + +/** + * @author Zhang Yifei + */ +public class LockService { + protected static final Log log = LogFactory.getLog(LockService.class); + + protected static final byte LOCK = 1, TRY_LOCK = 2, UNLOCK = 3, UNLOCK_ALL = 4, RESET = 5; + + protected final RaftHandle raft; + protected final Map locks = new HashMap<>(); + protected final Map> memberLocks = new LinkedHashMap<>(); + + protected volatile View view; + protected ExtendedUUID address; + + protected final ConcurrentMap lockStatus = new ConcurrentHashMap<>(); + protected final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); + protected final ConcurrentMap mutexes = new ConcurrentHashMap<>(); // could be weak value reference + + public LockService(JChannel channel) { + if (channel.isConnecting() || channel.isConnected()) { + throw new IllegalStateException("Illegal channel state " + channel.getState()); + } + Hook hook = createHook(); + raft = createRaft(channel, hook); + channel.setUpHandler(hook).addChannelListener(hook); + raft.addRoleListener(hook); + } + + protected Hook createHook() { + return new Hook(); + } + + protected RaftHandle createRaft(JChannel ch, StateMachine sm) { + return new RaftHandle(ch, sm); + } + + protected static class LockEntry { + public final long id; + public final LinkedHashSet waiters = new LinkedHashSet<>(); + public UUID holder; + + protected LockEntry(long id) {this.id = id;} + + protected UUID unlock() { + // make sure it's a consistent result for all nodes + var i = waiters.iterator(); + if (!i.hasNext()) return holder = null; + var v = i.next(); i.remove(); return holder = v; + } + } + + protected class Hook implements StateMachine, RAFT.RoleChange, UpHandler, ChannelListener { + + @Override + public void readContentFrom(DataInput in) { + Map tmp = new HashMap<>(); + locks.clear(); memberLocks.clear(); + for (int i = 0, l = readInt(in); i < l; i++) { + long id = readLong(in); + LockEntry lock = new LockEntry(id); + lock.holder = readUuid(in); + for (int t = 0, m = readInt(in); t < m; t++) { + lock.waiters.add(readUuid(in)); + } + locks.put(id, lock); + bind(lock.holder, lock); + if (address.equals(lock.holder)) tmp.put(lock.id, HOLDING); + for (var waiter : lock.waiters) { + bind(waiter, lock); + if (address.equals(waiter)) tmp.put(lock.id, WAITING); + } + } + + // notify base on local status + lockStatus.forEach((k, v) -> notifyListeners(k, v, tmp.remove(k), false)); + tmp.forEach((k, v) -> notifyListeners(k, NONE, v, false)); + } + + @Override + public void writeContentTo(DataOutput out) { + writeInt((int) locks.values().stream().filter(t -> t.holder != null).count(), out); + for (var lock : locks.values()) { + if (lock.holder == null) continue; + writeLong(lock.id, out); + writeUuid(lock.holder, out); + writeInt(lock.waiters.size(), out); + for (UUID t : lock.waiters) { + writeUuid(t, out); + } + } + } + + @Override + public byte[] apply(byte[] data, int offset, int length, boolean serialize_response) throws Exception { + var in = new ByteArrayDataInputStream(data, offset, length); + LockStatus status = null; + switch (in.readByte()) { + case LOCK: + status = doLock(readLong(in), readUuid(in), false); + break; + case TRY_LOCK: + status = doLock(readLong(in), readUuid(in), true); + break; + case UNLOCK: + LockEntry lock = locks.computeIfAbsent(readLong(in), LockEntry::new); + doUnlock(readUuid(in), lock, null); + break; + case UNLOCK_ALL: + doUnlock(readUuid(in), null); + break; + case RESET: + int len = readInt(in); + List members = new ArrayList<>(len); + for (int i = 0; i < len; i++) { + members.add(readUuid(in)); + } + doReset(members); + break; + } + return serialize_response && status != null ? new byte[] {(byte) status.ordinal()} : null; + } + + @Override + public void roleChanged(Role role) { + if (role == Role.Leader) { + try { + // Reset after the leader is elected + View v = view; + boolean clear = false; + if (v instanceof MergeView) { + int majority = raft.raft().majority(); + clear = ((MergeView) v).getSubgroups().stream().allMatch(t -> t.size() < majority); + } + reset(clear ? null : v); + } catch (Throwable e) { + log.error("Fail to send reset command", e); + } + } + } + + @Override + public UpHandler setLocalAddress(Address a) { + address = (ExtendedUUID) a; return this; + } + + @Override + public Object up(Event evt) { + if (evt.getType() == Event.VIEW_CHANGE) { + handleView(evt.arg()); + } + return null; + } + + @Override + public Object up(Message msg) {return null;} + + @Override + public void channelDisconnected(JChannel channel) { + cleanup(); + } + } + + protected LockStatus doLock(long lockId, UUID member, boolean trying) { + LockEntry lock = locks.computeIfAbsent(lockId, LockEntry::new); + LockStatus prev = NONE, next = HOLDING; + if (lock.holder == null) { + lock.holder = member; + } else if (lock.holder.equals(member)) { + prev = HOLDING; + } else if (trying) { + prev = next = lock.waiters.contains(member) ? WAITING : NONE; + } else { + if (!lock.waiters.add(member)) prev = WAITING; + next = WAITING; + } + if (prev != next) bind(member, lock); + if (address.equals(member)) { + notifyListeners(lockId, prev, next, false); + } + if (log.isTraceEnabled()) { + log.trace("[%s] %s lock %s, prev: %s, next: %s", address, member, lockId, prev, next); + } + return next; + } + + protected void doUnlock(UUID member, Set unlocking) { + Set set = memberLocks.get(member); if (set == null) return; + for (LockEntry lock : set.toArray(LockEntry[]::new)) { + doUnlock(member, lock, unlocking); + } + } + + protected void doUnlock(UUID member, LockEntry lock, Set unlocking) { + LockStatus prev = HOLDING; + UUID holder = null; + List waiters = null; + if (member.equals(lock.holder)) { + do { + if (holder != null) { + if (waiters == null) waiters = new ArrayList<>(unlocking.size()); + waiters.add(holder); + } + holder = lock.unlock(); + } while (holder != null && unlocking != null && unlocking.contains(holder)); + } else { + prev = lock.waiters.remove(member) ? WAITING : NONE; + } + if (prev != NONE) unbind(member, lock); + if (address.equals(member)) { + notifyListeners(lock.id, prev, NONE, false); + } else if (address.equals(holder)) { + notifyListeners(lock.id, WAITING, HOLDING, false); + } + if (log.isTraceEnabled()) { + log.trace("[%s] %s unlock %s, prev: %s", address, member, lock.id, prev); + if (holder != null) + log.trace("[%s] %s lock %s, prev: %s, next: %s", address, holder, lock.id, WAITING, HOLDING); + } + if (waiters != null) for (UUID waiter : waiters) { + unbind(waiter, lock); + if (address.equals(waiter)) { + notifyListeners(lock.id, WAITING, NONE, false); + } + if (log.isTraceEnabled()) { + log.trace("[%s] %s unlock %s, prev: %s", address, waiter, lock.id, WAITING); + } + } + } + + protected void doReset(List members) { + Set prev = new LinkedHashSet<>(memberLocks.keySet()); + if (log.isTraceEnabled()) { + log.trace("[%s] reset %s to %s", address, prev, members); + } + for (var id : members) prev.remove(id); + for (var id : prev) doUnlock(id, prev); + } + + protected void bind(UUID member, LockEntry lock) { + memberLocks.computeIfAbsent(member, k -> new LinkedHashSet<>()).add(lock); + } + + protected void unbind(UUID member, LockEntry lock) { + memberLocks.computeIfPresent(member, (k, v) -> { + v.remove(lock); return v.isEmpty() ? null : v; + }); + } + + protected void notifyListeners(long lockId, LockStatus prev, LockStatus curr, boolean force) { + if (!force && raft.leader() == null) return; + if (prev == null) prev = NONE; + if (curr == null) curr = NONE; + LockStatus local = curr == NONE ? lockStatus.remove(lockId) : lockStatus.put(lockId, curr); + if (prev == curr) { + prev = local == null ? NONE : local; + if (prev == curr) return; + } + Mutex mutex = mutexes.get(lockId); + if (mutex != null) mutex.onStatusChange(lockId, prev, curr); + for (Listener listener : listeners) { + try { + listener.onStatusChange(lockId, prev, curr); + } catch (Throwable e) { + log.error("Fail to notify listener, lock: %s, prev: %s, curr: %s", lockId, prev, curr, e); + } + } + } + + protected void handleView(View next) { + View prev = this.view; this.view = next; + if (log.isTraceEnabled()) { + log.trace("[%s] View accepted: %s, prev: %s, leader: %s", address, next, prev, raft.leader()); + } + + if (prev != null) { + int majority = raft.raft().majority(); + if (prev.size() >= majority && next.size() < majority) { // lost majority + // In partition case if majority is still working, it will be forced to unlock by reset command. + cleanup(); + } else if (!next.containsMembers(prev.getMembersRaw()) && raft.isLeader()) { // member left + try { + reset(next); + } catch (Throwable e) { + log.error("Fail to send reset command", e); + } + } + } + } + + protected void cleanup() { + lockStatus.forEach((k, v) -> notifyListeners(k, v, NONE, true)); + } + + protected void reset(View view) { + Address[] members = view != null ? view.getMembersRaw() : new Address[0]; + int len = members.length; + var out = new ByteArrayDataOutputStream(6 + len * 16); + out.writeByte(RESET); + writeInt(len, out); + for (Address member : members) { + writeUuid((UUID) member, out); + } + assert out.position() <= 6 + len * 16; + invoke(out).thenApply(t -> null).exceptionally(e -> { + log.error("Fail to reset to " + view, e); return null; + }); + } + + /** + * Add listener + * @param listener listener for the status change. + * @return true if added, otherwise false. + */ + public boolean addListener(Listener listener) { return listeners.addIfAbsent(listener); } + + /** + * Remove listener + * @param listener listener for removing + * @return true if removed, otherwise false. + */ + public boolean removeListener(Listener listener) { return listeners.remove(listener); } + + /** + * Get this member's lock status from local state. + * @param lockId the lock's id + * @return lock status + */ + public LockStatus lockStatus(long lockId) { + var v = lockStatus.get(lockId); return v == null ? NONE : v; + } + + /** + * Acquire the lock, will join the waiting queue if the lock is held by another member currently. + * @param lockId the lock's id + * @return HOLDING if hold the lock, WAITING if in the waiting queue. + */ + public CompletableFuture lock(long lockId) { + var out = new ByteArrayDataOutputStream(26); + out.writeByte(LOCK); + writeLong(lockId, out); + writeUuid(address(), out); + assert out.position() <= 26; + return invoke(out).thenApply(t -> LockStatus.values()[t[0]]); + } + + /** + * Try to acquire the lock, won't join the waiting queue. + * @param lockId the lock's id + * @return HOLDING if hold the lock, NONE if the lock is held by another member. + */ + public CompletableFuture tryLock(long lockId) { + var out = new ByteArrayDataOutputStream(26); + out.writeByte(TRY_LOCK); + writeLong(lockId, out); + writeUuid(address(), out); + assert out.position() <= 26; + return invoke(out).thenApply(t -> LockStatus.values()[t[0]]); + } + + /** + * Release the lock if it's the holder, and take next waiting member from the queue to be the new holder if there + * is one. Remove from waiting queue if it's waiting. Do nothing if neither of them. + * @param lockId the lock's id + * @return async completion + */ + public CompletableFuture unlock(long lockId) { + var out = new ByteArrayDataOutputStream(26); + out.writeByte(UNLOCK); + writeLong(lockId, out); + writeUuid(address(), out); + assert out.position() <= 26; + return invoke(out).thenApply(t -> null); + } + + /** + * Release all related locks for this member. + * @return async completion + */ + public CompletableFuture unlock() { + var out = new ByteArrayDataOutputStream(17); + out.writeByte(UNLOCK_ALL); + writeUuid(address(), out); + assert out.position() <= 17; + return invoke(out).thenApply(t -> null); + } + + protected UUID address() { + return Objects.requireNonNull(address); + } + + protected CompletableFuture invoke(ByteArrayDataOutputStream out) { + try { + return raft.setAsync(out.buffer(), 0, out.position()); + } catch (Throwable e) { + throw new RaftException("Fail to execute command", e); + } + } + + /** + * Get the mutex for the specified id. + * @param lockId the id related to the mutex + * @return mutex instance + */ + public Mutex mutex(long lockId) { + return mutexes.computeIfAbsent(lockId, Mutex::new); + } + + /** + * The member's lock status + */ + public enum LockStatus { + HOLDING, WAITING, NONE + } + + /** + * Listen on the lock status changes + */ + public interface Listener { + void onStatusChange(long key, LockStatus prev, LockStatus curr); + } + + /** + * Exception for the raft cluster errors + */ + public static class RaftException extends RuntimeException { + public RaftException(String message) { super(message); } + public RaftException(Throwable cause) { super(cause); } + public RaftException(String message, Throwable cause) { super(message, cause); } + } + + /** + * A distributed lock that backed on the lock service. + */ + public class Mutex implements Lock { + private final long key; + private volatile LockStatus status = NONE; + private volatile Thread holder; + private final AtomicInteger acquirers = new AtomicInteger(); + private final ReentrantLock delegate = new ReentrantLock(); + private final Condition notWaiting = delegate.newCondition(); + private Consumer lockHandler, unlockHandler; + private long timeout = 8000; + + Mutex(long key) {this.key = key;} + + /** + * Set the timeout for the command executing in the lock service. + * @param timeout in milliseconds + */ + public void setTimeout(long timeout) {this.timeout = timeout;} + + /** + * The lock status in the lock service + * @return lock status of the key + */ + public LockStatus getStatus() {return status;} + + /** + * The current holder of this mutex + * @return the thread which holding this mutex + */ + public Thread getHolder() {return holder;} + + /** + * Register a handler for the unexpected unlocking in the lock service. + * @param handler callback with this mutex + */ + public void setUnexpectedUnlockHandler(Consumer handler) {unlockHandler = handler;} + + /** + * Register a handler for the unexpected locking in the lock service. + * @param handler callback with this mutex + */ + public void setUnexpectedLockHandler(Consumer handler) {lockHandler = handler;} + + /** + * Get the lock service + * @return the underlying lock service + */ + public LockService service() {return LockService.this;} + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public void lock() { + delegate.lock(); + acquirers.incrementAndGet(); + while (status != HOLDING) { + try { + if (status == WAITING) notWaiting.awaitUninterruptibly(); + else status = join(LockService.this.lock(key)); + } catch (Throwable e) { + rethrow(unlock(e)); + } + } + holder = Thread.currentThread(); + } + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public void lockInterruptibly() throws InterruptedException { + delegate.lockInterruptibly(); + acquirers.incrementAndGet(); + while (status != HOLDING) { + try { + if (status == WAITING) notWaiting.await(); + else status = join(LockService.this.lock(key)); + } catch (InterruptedException e) { + throw unlock(e); + } catch (Throwable e) { + rethrow(unlock(e)); + } + } + holder = Thread.currentThread(); + } + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public boolean tryLock() { + if (!delegate.tryLock()) return false; + acquirers.incrementAndGet(); + if (status == NONE) { + try { + status = join(LockService.this.tryLock(key)); + } catch (Throwable ignored) { + } + } + if (status == HOLDING) { + holder = Thread.currentThread(); return true; + } + unlock(); return false; + } + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { + long deadline = System.nanoTime() + unit.toNanos(timeout), ns; + if (!delegate.tryLock(timeout, unit)) return false; + acquirers.incrementAndGet(); + while (status != HOLDING && (ns = deadline - System.nanoTime()) > 0) { + try { + if (status == WAITING) notWaiting.awaitNanos(ns); + else status = join(LockService.this.lock(key)); + } catch (InterruptedException e) { + throw unlock(e); + } catch (Throwable e) { + rethrow(unlock(e)); + } + } + if (status == HOLDING) { + holder = Thread.currentThread(); return true; + } + unlock(); return false; + } + + /** + * @throws RaftException if exception happens during sending or executing commands in the lock service. + */ + @Override + public void unlock() { + if (!delegate.isHeldByCurrentThread()) return; + assert holder == null || holder == Thread.currentThread(); + if (delegate.getHoldCount() == 1) holder = null; + try { + if (acquirers.decrementAndGet() == 0 && status != NONE) { + join(LockService.this.unlock(key)); + status = NONE; + } + } catch (Throwable e) { + rethrow(e); + } finally { + delegate.unlock(); + } + } + + /** + * Unsupported + */ + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + + private T unlock(T error) { + try { + unlock(); + } catch (Throwable e) { + error.addSuppressed(e); + } + return error; + } + + private T join(CompletableFuture future) throws ExecutionException, TimeoutException { + long nanos = TimeUnit.MILLISECONDS.toNanos(timeout), deadline = System.nanoTime() + nanos; + boolean interrupted = Thread.interrupted(); + try { + do { + try { + return future.get(nanos, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + interrupted = true; + } + } while ((nanos = deadline - System.nanoTime()) > 0); + throw new TimeoutException(); + } finally { + if (interrupted) Thread.currentThread().interrupt(); + } + } + + void onStatusChange(long key, LockStatus prev, LockStatus curr) { + if (key != this.key) return; + if (curr != HOLDING && holder != null) { + status = curr; + var handler = unlockHandler; + if (handler != null) try { + handler.accept(this); + } catch (Throwable e) { + log.error("Error occurred on unlock handler", e); + } + } else if (curr != NONE && acquirers.get() == 0) { + status = curr; + var handler = lockHandler; + if (handler != null) try { + handler.accept(this); + } catch (Throwable e) { + log.error("Error occurred on lock handler", e); + } + } else if (prev == WAITING) { + delegate.lock(); + try { + if (status == WAITING) { + status = curr; + notWaiting.signalAll(); + } + } finally { + delegate.unlock(); + } + } + } + } + + private static T rethrow(Throwable e) { + if (e instanceof RaftException) throw (RaftException) e; + if (e instanceof CompletionException) { + Throwable cause = e.getCause(); + throw cause != null ? new RaftException(e) : (CompletionException) e; + } + if (e instanceof ExecutionException) throw new RaftException(e.getCause()); + if (e instanceof TimeoutException) throw new RaftException("Execute command timeout", e); + throw new RaftException("Unknown exception", e); + } + + private static void writeInt(int value, DataOutput out) { + try { + for (; (value & ~0x7F) != 0; value >>>= 7) { + out.writeByte(0x80 | (value & 0x7F)); + } + out.writeByte(value); + } catch (IOException e) { + throw new RaftException("Fail to write", e); + } + } + + private static int readInt(DataInput in) { + try { + int v = in.readByte(); if (v >= 0) return v; + if ((v ^= in.readByte() << 7) < 0) return v ^ 0xFFFFFF80; + if ((v ^= in.readByte() << 14) >= 0) return v ^ 0x00003F80; + if ((v ^= in.readByte() << 21) < 0) return v ^ 0xFFE03F80; + return v ^ in.readByte() << 28 ^ 0x0FE03F80; + } catch (IOException e) { + throw new RaftException("Fail to read", e); + } + } + + private static void writeLong(long value, DataOutput out) { + try { + for (int i = 0; i < 8 && (value & ~0x7FL) != 0; i++) { + out.writeByte(0x80 | ((int) value & 0x7F)); + value >>>= 7; + } + out.writeByte((int) value); + } catch (IOException e) { + throw new RaftException("Fail to write", e); + } + } + + private static long readLong(DataInput in) { + try { + long v = in.readByte(); if (v >= 0) return v; + if ((v ^= (long) in.readByte() << 7) < 0L) return v ^ 0xFFFFFFFFFFFFFF80L; + if ((v ^= (long) in.readByte() << 14) >= 0L) return v ^ 0x0000000000003F80L; + if ((v ^= (long) in.readByte() << 21) < 0L) return v ^ 0xFFFFFFFFFFE03F80L; + if ((v ^= (long) in.readByte() << 28) >= 0L) return v ^ 0x000000000FE03F80L; + if ((v ^= (long) in.readByte() << 35) < 0L) return v ^ 0xFFFFFFF80FE03F80L; + if ((v ^= (long) in.readByte() << 42) >= 0L) return v ^ 0x000003F80FE03F80L; + if ((v ^= (long) in.readByte() << 49) < 0L) return v ^ 0xFFFE03F80FE03F80L; + return v ^ (long) in.readByte() << 56 ^ 0x00FE03F80FE03F80L; + } catch (IOException e) { + throw new RaftException("Fail to read", e); + } + } + + private static void writeUuid(UUID id, DataOutput out) { + try { + out.writeLong(id.getMostSignificantBits()); + out.writeLong(id.getLeastSignificantBits()); + } catch (IOException e) { + throw new RaftException("Fail to write", e); + } + } + + private static UUID readUuid(DataInput in) { + try { + return new UUID(in.readLong(), in.readLong()); + } catch (IOException e) { + throw new RaftException("Fail to read", e); + } + } +} diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java new file mode 100644 index 00000000..b3d00af6 --- /dev/null +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -0,0 +1,682 @@ +package org.jgroups.tests.blocks; + +import static java.util.Arrays.stream; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static java.util.stream.Stream.concat; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.data.Offset.offset; +import static org.jgroups.raft.blocks.LockService.LockStatus.HOLDING; +import static org.jgroups.raft.blocks.LockService.LockStatus.NONE; +import static org.jgroups.raft.blocks.LockService.LockStatus.WAITING; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.assertj.core.data.Offset; +import org.jgroups.Address; +import org.jgroups.Global; +import org.jgroups.JChannel; +import org.jgroups.View; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.protocols.raft.ELECTION2; +import org.jgroups.protocols.raft.RAFT; +import org.jgroups.protocols.raft.election.BaseElection; +import org.jgroups.raft.Options; +import org.jgroups.raft.RaftHandle; +import org.jgroups.raft.StateMachine; +import org.jgroups.raft.blocks.LockService; +import org.jgroups.raft.blocks.LockService.LockStatus; +import org.jgroups.raft.blocks.LockService.Mutex; +import org.jgroups.tests.harness.BaseRaftChannelTest; +import org.jgroups.tests.harness.BaseRaftElectionTest; +import org.jgroups.util.UUID; +import org.jgroups.util.Util; +import org.testng.annotations.Test; + +/** + * @author Zhang Yifei + */ +@Test(groups = Global.FUNCTIONAL, singleThreaded = true) +public class LockServiceTest extends BaseRaftChannelTest { + + protected Service service_a, service_b, service_c, service_d, service_e; + protected Events events_a, events_b, events_c, events_d, events_e; + + { + clusterSize = 5; + recreatePerMethod = true; + } + + protected static class Event { + final long key; final LockStatus prev, curr; + Event(long key, LockStatus prev, LockStatus curr) { this.key = key; this.prev = prev; this.curr = curr; } + + protected void assertEq(long key, LockStatus prev, LockStatus curr) { + assertThat(this).usingRecursiveComparison().isEqualTo(new Event(key, prev, curr)); + } + } + + protected static class Events implements LockService.Listener { + final BlockingQueue queue = new LinkedBlockingQueue<>(); + + @Override + public void onStatusChange(long key, LockStatus prev, LockStatus curr) { + queue.offer(new Event(key, prev, curr)); + } + + protected Event next(int secs) throws InterruptedException { return queue.poll(secs, SECONDS); } + protected Event next() throws InterruptedException { return next(3); } + } + + protected static class Service extends LockService { + TestRaft raft; + + public Service(JChannel channel) { super(channel); } + + @Override + protected RaftHandle createRaft(JChannel ch, StateMachine sm) { return raft = new TestRaft(ch, sm); } + + Map> dumpState() { + assert locks.values().stream().filter(t -> t.holder != null) + .flatMap(t -> concat(Stream.of(t.holder), t.waiters.stream()).map(m -> Map.entry(m, t))) + .collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet()))).equals(memberLocks); + return locks.values().stream().filter(t -> t.holder != null) + .collect(Collectors.toMap(t -> t.id, t -> { + List list = new ArrayList<>(t.waiters.size() + 1); + list.add(t.holder); list.addAll(t.waiters); return list; + })); + } + } + + protected static class TestRaft extends RaftHandle { + Callable> interceptor; + + public TestRaft(JChannel ch, StateMachine sm) { super(ch, sm); } + + @Override + public CompletableFuture setAsync(byte[] buf, int offset, int length, Options options) throws Exception { + if (interceptor != null) return interceptor.call(); + return super.setAsync(buf, offset, length, options); + } + + void throwingInterceptor(Exception e) { interceptor = () -> { throw e; }; } + void errorInterceptor(Throwable e) { interceptor = () -> CompletableFuture.failedFuture(e); } + void voidInterceptor() { interceptor = CompletableFuture::new; } + void removeInterceptor() { interceptor = null; } + } + + @Override + protected void beforeChannelConnection(JChannel ch) { + switch (ch.name()) { + case "A": service_a = new Service(ch); break; + case "B": service_b = new Service(ch); break; + case "C": service_c = new Service(ch); break; + case "D": service_d = new Service(ch); break; + case "E": service_e = new Service(ch); break; + } + } + + @Override + protected void afterClusterCreation() { + RAFT[] rafts = stream(channels()).map(this::raft).toArray(RAFT[]::new); + BaseRaftElectionTest.waitUntilAllHaveLeaderElected(rafts, 15_000); + } + + protected void enableEvents() { + service_a.addListener(events_a = new Events()); + service_b.addListener(events_b = new Events()); + service_c.addListener(events_c = new Events()); + service_d.addListener(events_d = new Events()); + service_e.addListener(events_e = new Events()); + } + + public void lock() throws Exception { + enableEvents(); + + // lock 101 + assertEquals(service_a.lock(101L).get(3, SECONDS), HOLDING); + events_a.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_a.lockStatus(101L), HOLDING); + + assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); + events_b.next().assertEq(101L, NONE, WAITING); + assertEquals(service_b.lockStatus(101L), WAITING); + + assertEquals(service_c.lock(101L).get(3, SECONDS), WAITING); + events_c.next().assertEq(101L, NONE, WAITING); + assertEquals(service_c.lockStatus(101L), WAITING); + + // lock 102 + assertEquals(service_b.lock(102L).get(3, SECONDS), HOLDING); + events_b.next().assertEq(102L, NONE, HOLDING); + assertEquals(service_b.lockStatus(102L), HOLDING); + + assertEquals(service_a.lock(102L).get(3, SECONDS), WAITING); + events_a.next().assertEq(102L, NONE, WAITING); + assertEquals(service_a.lockStatus(102L), WAITING); + + assertEquals(service_c.lock(102L).get(3, SECONDS), WAITING); + events_c.next().assertEq(102L, NONE, WAITING); + assertEquals(service_c.lockStatus(102L), WAITING); + + // lock 103 + assertEquals(service_c.lock(103L).get(3, SECONDS), HOLDING); + events_c.next().assertEq(103L, NONE, HOLDING); + assertEquals(service_c.lockStatus(103L), HOLDING); + + assertEquals(service_a.lock(103L).get(3, SECONDS), WAITING); + events_a.next().assertEq(103L, NONE, WAITING); + assertEquals(service_a.lockStatus(103L), WAITING); + + assertEquals(service_b.lock(103L).get(3, SECONDS), WAITING); + events_b.next().assertEq(103L, NONE, WAITING); + assertEquals(service_b.lockStatus(103L), WAITING); + + // unlock 101 + service_a.unlock(101L).get(3, SECONDS); + + events_a.next().assertEq(101L, HOLDING, NONE); + assertEquals(service_a.lockStatus(101L), NONE); + + events_b.next().assertEq(101L, WAITING, HOLDING); + assertEquals(service_b.lockStatus(101L), HOLDING); + + // unlock 102 + service_b.unlock(102L).get(3, SECONDS); + + events_b.next().assertEq(102L, HOLDING, NONE); + assertEquals(service_b.lockStatus(102L), NONE); + + events_a.next().assertEq(102L, WAITING, HOLDING); + assertEquals(service_a.lockStatus(102L), HOLDING); + + // unlock 103 + service_c.unlock(103L).get(3, SECONDS); + + events_c.next().assertEq(103L, HOLDING, NONE); + assertEquals(service_c.lockStatus(103L), NONE); + assertEquals(service_c.lockStatus(102L), WAITING); + + events_a.next().assertEq(103L, WAITING, HOLDING); + assertEquals(service_a.lockStatus(103L), HOLDING); + + // unlock a + service_a.unlock().get(3, SECONDS); + + events_a.next().assertEq(102L, HOLDING, NONE); + assertEquals(service_a.lockStatus(102L), NONE); + + events_a.next().assertEq(103L, HOLDING, NONE); + assertEquals(service_a.lockStatus(103L), NONE); + + events_c.next().assertEq(102L, WAITING, HOLDING); + assertEquals(service_c.lockStatus(102L), HOLDING); + + events_b.next().assertEq(103L, WAITING, HOLDING); + assertEquals(service_b.lockStatus(103L), HOLDING); + + // unlock b + service_b.unlock().get(3, SECONDS); + + events_b.next().assertEq(101L, HOLDING, NONE); + assertEquals(service_b.lockStatus(101L), NONE); + + events_b.next().assertEq(103L, HOLDING, NONE); + assertEquals(service_b.lockStatus(103L), NONE); + + events_c.next().assertEq(101L, WAITING, HOLDING); + assertEquals(service_c.lockStatus(101L), HOLDING); + + // unlock c + service_c.unlock().get(3, SECONDS); + + events_c.next().assertEq(101L, HOLDING, NONE); + assertEquals(service_c.lockStatus(101L), NONE); + + events_c.next().assertEq(102L, HOLDING, NONE); + assertEquals(service_c.lockStatus(102L), NONE); + } + + public void tryLock() throws Exception { + enableEvents(); + + assertEquals(service_a.tryLock(101L).get(3, SECONDS), HOLDING); + events_a.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_a.lockStatus(101L), HOLDING); + assertEquals(service_a.tryLock(101L).get(3, SECONDS), HOLDING); + + assertEquals(service_b.tryLock(101L).get(3, SECONDS), NONE); + assertNull(events_b.next(1)); + assertEquals(service_b.lockStatus(101L), NONE); + + assertEquals(service_c.lock(101L).get(3, SECONDS), WAITING); + events_c.next().assertEq(101L, NONE, WAITING); + assertEquals(service_c.lockStatus(101L), WAITING); + assertEquals(service_c.tryLock(101L).get(3, SECONDS), WAITING); + + service_a.unlock(101L); + events_a.next().assertEq(101L, HOLDING, NONE); + events_c.next().assertEq(101L, WAITING, HOLDING); + service_c.unlock(); + events_c.next().assertEq(101L, HOLDING, NONE); + assertNull(events_b.next(1)); + } + + public void reset_by_disconnect() throws Exception { + enableEvents(); + + assertEquals(service_a.lock(101L).get(3, SECONDS), HOLDING); + events_a.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); + events_b.next().assertEq(101L, NONE, WAITING); + assertEquals(service_c.lock(101L).get(3, SECONDS), WAITING); + events_c.next().assertEq(101L, NONE, WAITING); + assertEquals(service_d.lock(101L).get(3, SECONDS), WAITING); + events_d.next().assertEq(101L, NONE, WAITING); + assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); + events_e.next().assertEq(101L, NONE, WAITING); + + // disconnect the coordinator/leader/holder + channel(0).disconnect(); + // reset to [B,C,D,E] + events_a.next().assertEq(101L, HOLDING, NONE); + events_b.next().assertEq(101L, WAITING, HOLDING); + + // disconnect a participant + channel(2).disconnect(); + // reset to [B,D,E] + events_c.next().assertEq(101L, WAITING, NONE); + + service_b.unlock(101L); + events_b.next().assertEq(101L, HOLDING, NONE); + events_d.next().assertEq(101L, WAITING, HOLDING); + + // disconnect the holder and lost majority + channel(3).disconnect(); + // no reset, notify NONE to all. + events_d.next().assertEq(101L, HOLDING, NONE); + events_e.next().assertEq(101L, WAITING, NONE); + + // reconnect the previous holder and reach majority + service_d = reconnect(channel(3), events_d); + // reset to [B,E,D], and D with new address. + assertNull(events_d.next()); + events_e.next().assertEq(101L, WAITING, HOLDING); + } + + public void reset_by_partition() throws Exception { + enableEvents(); + + assertEquals(service_d.lock(101L).get(3, SECONDS), HOLDING); + events_d.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_e.lock(102L).get(3, SECONDS), HOLDING); + events_e.next().assertEq(102L, NONE, HOLDING); + + assertEquals(service_a.lock(101L).get(3, SECONDS), WAITING); + assertEquals(service_a.lock(102L).get(3, SECONDS), WAITING); + events_a.next().assertEq(101L, NONE, WAITING); + events_a.next().assertEq(102L, NONE, WAITING); + + // partition into a majority subgroup and minority subgroup + partition(new int[]{0, 1, 2}, new int[]{3, 4}); + + events_d.next().assertEq(101L, HOLDING, NONE); + events_e.next().assertEq(102L, HOLDING, NONE); + + events_a.next().assertEq(101L, WAITING, HOLDING); + events_a.next().assertEq(102L, WAITING, HOLDING); + + merge(0, 3); + + events_d.next().assertEq(101L, HOLDING, NONE); + events_e.next().assertEq(102L, HOLDING, NONE); + + assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); + assertEquals(service_c.lock(102L).get(3, SECONDS), WAITING); + events_b.next().assertEq(101L, NONE, WAITING); + events_c.next().assertEq(102L, NONE, WAITING); + + assertEquals(service_a.lockStatus(101L), HOLDING); + assertEquals(service_a.lockStatus(102L), HOLDING); + + // partition into subgroups without majority + partition(new int[]{0, 1}, new int[]{2}, new int[]{3, 4}); + + events_a.next().assertEq(101L, HOLDING, NONE); + events_a.next().assertEq(102L, HOLDING, NONE); + events_b.next().assertEq(101L, WAITING, NONE); + events_c.next().assertEq(102L, WAITING, NONE); + + merge(0, 2, 3); + waitUntilLeaderElected(0, 1, 2, 3, 4); + + events_a.next().assertEq(101L, HOLDING, NONE); + events_a.next().assertEq(102L, HOLDING, NONE); + events_b.next().assertEq(101L, WAITING, NONE); + events_c.next().assertEq(102L, WAITING, NONE); + + assertEquals(service_a.lock(101L).get(3, SECONDS), HOLDING); + events_a.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); + events_e.next().assertEq(101L, NONE, WAITING); + + channel(0).disconnect(); + + events_a.next().assertEq(101L, HOLDING, NONE); + events_e.next().assertEq(101L, WAITING, HOLDING); + } + + public void snapshot() throws Exception { + enableEvents(); + + assertEquals(service_a.lock(101L).get(3, SECONDS), HOLDING); + events_a.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); + events_b.next().assertEq(101L, NONE, WAITING); + + partition(new int[]{0, 2, 3, 4}, new int[]{1}); + + events_b.next().assertEq(101, WAITING, NONE); + + service_a.unlock(101).get(3, SECONDS); + assertEquals(service_c.lock(101).get(3, SECONDS), HOLDING); + assertEquals(service_d.lock(101).get(3, SECONDS), WAITING); + service_c.unlock(101).get(3, SECONDS); + service_d.unlock(101).get(3, SECONDS); + + List services = List.of(service_a, service_c, service_d, service_e); + for (int i = 0; i < 100; i++) { + long key = -(i + 1); + for (int t = 0, len = services.size(); t < len; t++) { + services.get((i + t) % len).lock(key).get(3, SECONDS); + } + services.get(i % services.size()).unlock(key).get(3, SECONDS); + } + + service_c.unlock(); + + leader().snapshotAsync().get(3, SECONDS); + + merge(0, 1); + + waitUntilNodesApplyAllLogs(); + assertTrue(events_b.queue.isEmpty()); + + Map> state = service_a.dumpState(); + assertEquals(service_b.dumpState(), state); + assertEquals(service_c.dumpState(), state); + assertEquals(service_d.dumpState(), state); + assertEquals(service_e.dumpState(), state); + } + + public void mutex_atomicity() throws Exception { + Lock a = service_a.mutex(101); + Lock b = service_b.mutex(101); + Lock c = service_c.mutex(101); + Lock d = service_d.mutex(101); + Lock e = service_e.mutex(101); + + class MutableInt { + int value; + } + MutableInt count = new MutableInt(); + List threads = Stream.of(a, b, c, d, e).flatMap(t -> Stream.of(t, t)).map(t -> new Thread(() -> { + for (int i = 0; i < 100; i++) { + t.lock(); + try { + int v = count.value; + LockSupport.parkNanos(10); + count.value = v + 1; + } finally { + t.unlock(); + } + } + })).collect(toList()); + + threads.forEach(Thread::start); + for (Thread t : threads) t.join(); + + assertEquals(count.value, 1000); + } + + public void mutex_interruption() throws InterruptedException { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + a.lock(); + List> list; + try { + CompletableFuture.runAsync(() -> { + interruptAfter(1); + assertThrows(InterruptedException.class, a::lockInterruptibly); + + interruptAfter(1); + assertThrows(InterruptedException.class, b::lockInterruptibly); + + interruptAfter(1); + assertThrows(InterruptedException.class, () -> a.tryLock(30, SECONDS)); + + interruptAfter(1); + assertThrows(InterruptedException.class, () -> b.tryLock(30, SECONDS)); + }).join(); + + BlockingQueue> interrupted = new LinkedBlockingQueue<>(); + list = Stream.of(a, b).map(t -> CompletableFuture.runAsync(() -> { + interrupted.add(interruptAfter(1)); + t.lock(); + try { + assertTrue(Thread.currentThread().isInterrupted()); + } finally { + t.unlock(); + } + assertTrue(Thread.currentThread().isInterrupted()); + })).collect(toList()); + for (int i = 0, l = list.size(); i < l; i++) interrupted.take().join(); + } finally { + a.unlock(); + } + list.forEach(CompletableFuture::join); + } + + public void mutex_timeout() { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + a.lock(); + try { + CompletableFuture.runAsync(() -> { + try { + long timeout = SECONDS.toNanos(1); + Offset error = offset(MILLISECONDS.toNanos(100)); + + long begin = System.nanoTime(); + assertFalse(a.tryLock(timeout, NANOSECONDS)); + assertThat(System.nanoTime() - begin).isCloseTo(timeout, error); + + begin = System.nanoTime(); + assertFalse(b.tryLock(timeout, NANOSECONDS)); + assertThat(System.nanoTime() - begin).isCloseTo(timeout, error); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).join(); + } finally { + a.unlock(); + } + } + + public void mutex_race() { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + Mutex c = service_c.mutex(101); + + List.of(Stream.of(a, b), Stream.of(b, c)).forEach(stream -> { + stream.map(t -> CompletableFuture.runAsync(() -> { + for (int i = 0; i < 3000; i++) { + t.lock(); t.unlock(); + } + })).collect(toList()).forEach(t -> { + try { + t.get(10, SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + }); + } + + public void mutex_inconsistency() throws Exception { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + + for (Mutex t : List.of(a, b)) { + CompletableFuture unlocked = new CompletableFuture<>(); + CompletableFuture locked = new CompletableFuture<>(); + t.setUnexpectedUnlockHandler(unlocked::complete); + t.setUnexpectedLockHandler(locked::complete); + + t.lock(); + try { + t.service().unlock(101).join(); + assertSame(unlocked.get(3, SECONDS), t); + assertSame(t.getHolder(), Thread.currentThread()); + assertEquals(t.getStatus(), NONE); + } finally { + t.unlock(); + } + + t.service().lock(101).join(); + assertSame(locked.get(3, SECONDS), t); + assertEquals(t.getStatus(), HOLDING); + + if (t.tryLock()) t.unlock(); + assertEquals(t.getStatus(), NONE); + } + + a.lock(); CompletableFuture f; + try { + f = CompletableFuture.runAsync(() -> { + b.lock(); + try { + assertEquals(service_b.lockStatus(101), HOLDING); + } finally { + b.unlock(); + } + }); + Util.waitUntil(5000, 1000, () -> service_b.lockStatus(101) == WAITING); + service_b.unlock(101).get(3, SECONDS); + waitUntilNodesApplyAllLogs(); + Util.waitUntil(5000, 1000, () -> service_b.lockStatus(101) == WAITING); + } finally { + a.unlock(); + } + f.get(5, SECONDS); + } + + public void mutex_exception() { + Mutex a = service_a.mutex(101); + + service_a.raft.throwingInterceptor(new Exception("thrown error")); + assertThatThrownBy(a::lock).isInstanceOf(LockService.RaftException.class) + .cause().isInstanceOf(Exception.class).hasMessage("thrown error"); + + service_a.raft.errorInterceptor(new Exception("returned error")); + assertThatThrownBy(a::lock).isInstanceOf(LockService.RaftException.class) + .cause().isInstanceOf(Exception.class).hasMessage("returned error"); + + service_a.raft.voidInterceptor(); + a.setTimeout(1000); + assertThatThrownBy(a::lock).isInstanceOf(LockService.RaftException.class) + .cause().isInstanceOf(TimeoutException.class); + + service_a.raft.removeInterceptor(); + a.lock(); + try { + service_a.raft.throwingInterceptor(new Exception("thrown error")); + } finally { + assertThatThrownBy(a::unlock).isInstanceOf(LockService.RaftException.class) + .cause().isInstanceOf(Exception.class).hasMessage("thrown error"); + } + + a.lock(); + service_a.raft.removeInterceptor(); + a.unlock(); + } + + private CompletableFuture interruptAfter(int delay) { + CompletableFuture done = new CompletableFuture<>(); + Thread thread = Thread.currentThread(); + CompletableFuture.delayedExecutor(delay, SECONDS).execute(() -> { + thread.interrupt(); done.complete(null); + }); + return done; + } + + private Service reconnect(JChannel ch, LockService.Listener listener) throws Exception { + Service service = new Service(ch); + if (listener != null) service.addListener(listener); + ch.connect(clusterName()); return service; + } + + private void partition(int[]... partitions) throws TimeoutException { + List> parts = stream(partitions).map(t -> stream(t).mapToObj(this::channel).collect(toList())) + .collect(toList()); + for (List p : parts) { + var s = parts.stream().filter(t -> t != p).flatMap(t -> t.stream().map(JChannel::address)).collect(toList()); + p.forEach(t -> t.stack().getBottomProtocol().up(new org.jgroups.Event(org.jgroups.Event.SUSPECT, s))); + Util.waitUntilAllChannelsHaveSameView(30_000, 1000, p.toArray(JChannel[]::new)); + } + } + + private void merge(int... coordinators) throws TimeoutException { + List coords = stream(coordinators).mapToObj(this::channel).collect(toList()); + Map views = coords.stream().collect(toMap(JChannel::address, JChannel::view)); + coords.forEach(t -> t.stack().getBottomProtocol().up(new org.jgroups.Event(org.jgroups.Event.MERGE, views))); + for (JChannel ch : coords) { + GMS gms = ch.stack().findProtocol(GMS.class); + Util.waitUntil(30_000, 1000, () -> !gms.isMergeTaskRunning()); + } + } + + private void waitUntilNodesApplyAllLogs(int... indexes) throws TimeoutException { + RAFT[] rafts = indexes.length > 0 ? IntStream.of(indexes).mapToObj(this::raft).toArray(RAFT[]::new) : + stream(channels()).map(this::raft).toArray(RAFT[]::new); + Util.waitUntil(30_000, 1000, () -> { + long last = -1; + for (RAFT raft : rafts) { + if (last == -1) last = raft.lastAppended(); + else if (raft.lastAppended() != last) return false; + if (raft.commitIndex() != last) return false; + } + return true; + }); + } + + private void waitUntilLeaderElected(int... indexes) { + RAFT[] rafts = IntStream.of(indexes).mapToObj(this::raft).toArray(RAFT[]::new); + BaseRaftElectionTest.waitUntilLeaderElected(rafts, 10_000); + } +} From 996b7f6114d996eb7bd73dffc129ca8d83fefc7f Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Tue, 5 Nov 2024 17:53:25 +0800 Subject: [PATCH 02/16] lock service --- src/org/jgroups/raft/blocks/LockService.java | 47 +++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index 9817a94e..996f75df 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -49,6 +49,31 @@ import org.jgroups.util.UUID; /** + * A state machine that maintains the holder and waiters for a specified lockId. For a lockId, it could have only one + * holder in same time, other acquirers will be queued as waiters, when the holder unlock from the lockId, the next + * holder will be polled from the waiting queue if there is a waiter. + *

+ * The {@link Address} of the member will be used to identify the acquirers(holders and waiters), that means a new + * connected member will have a new identity, so a disconnected member will unlock from all related lockId + * automatically. + * For the cluster if there are members who left then the leader will unlock for those members in the state machine. + * A new started cluster will unlock for all previous members in the state machine. Another scenario is the cluster + * resume from multiple minority partitions, since the {@link Address} remain unchanged, the new leader will force + * unlock for all existing members in the state machine. + *

+ * The {@link LockStatus} represent the member's locking status. + *

    + *
  • {@link LockStatus#HOLDING HOLDING} - current member has held the lock, it could be returned by + * {@link LockService#lock(long)} or {@link LockService#tryLock(long)}, or being notified that changed from + * {@link LockStatus#WAITING WAITING} via the {@link Listener}. + *
  • {@link LockStatus#WAITING WAITING} - current member is in the waiting queue since the lock is currently + * held by another member, it could be returned by calling {@link LockService#lock(long)}. + *
  • {@link LockStatus#NONE NONE} - current member is not holding nor waiting the lock. + *
+ *

+ * The {@link Mutex} is a distributed implementation of {@link Lock}. It based on the lock service, a thread is holding + * the mutex also means the member is holding the lock in the lock service. + * * @author Zhang Yifei */ public class LockService { @@ -467,7 +492,7 @@ public enum LockStatus { * Listen on the lock status changes */ public interface Listener { - void onStatusChange(long key, LockStatus prev, LockStatus curr); + void onStatusChange(long lockId, LockStatus prev, LockStatus curr); } /** @@ -483,7 +508,7 @@ public static class RaftException extends RuntimeException { * A distributed lock that backed on the lock service. */ public class Mutex implements Lock { - private final long key; + private final long lockId; private volatile LockStatus status = NONE; private volatile Thread holder; private final AtomicInteger acquirers = new AtomicInteger(); @@ -492,7 +517,7 @@ public class Mutex implements Lock { private Consumer lockHandler, unlockHandler; private long timeout = 8000; - Mutex(long key) {this.key = key;} + Mutex(long lockId) {this.lockId = lockId;} /** * Set the timeout for the command executing in the lock service. @@ -502,7 +527,7 @@ public class Mutex implements Lock { /** * The lock status in the lock service - * @return lock status of the key + * @return lock status of the lockId */ public LockStatus getStatus() {return status;} @@ -540,7 +565,7 @@ public void lock() { while (status != HOLDING) { try { if (status == WAITING) notWaiting.awaitUninterruptibly(); - else status = join(LockService.this.lock(key)); + else status = join(LockService.this.lock(lockId)); } catch (Throwable e) { rethrow(unlock(e)); } @@ -558,7 +583,7 @@ public void lockInterruptibly() throws InterruptedException { while (status != HOLDING) { try { if (status == WAITING) notWaiting.await(); - else status = join(LockService.this.lock(key)); + else status = join(LockService.this.lock(lockId)); } catch (InterruptedException e) { throw unlock(e); } catch (Throwable e) { @@ -577,7 +602,7 @@ public boolean tryLock() { acquirers.incrementAndGet(); if (status == NONE) { try { - status = join(LockService.this.tryLock(key)); + status = join(LockService.this.tryLock(lockId)); } catch (Throwable ignored) { } } @@ -598,7 +623,7 @@ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException while (status != HOLDING && (ns = deadline - System.nanoTime()) > 0) { try { if (status == WAITING) notWaiting.awaitNanos(ns); - else status = join(LockService.this.lock(key)); + else status = join(LockService.this.lock(lockId)); } catch (InterruptedException e) { throw unlock(e); } catch (Throwable e) { @@ -621,7 +646,7 @@ public void unlock() { if (delegate.getHoldCount() == 1) holder = null; try { if (acquirers.decrementAndGet() == 0 && status != NONE) { - join(LockService.this.unlock(key)); + join(LockService.this.unlock(lockId)); status = NONE; } } catch (Throwable e) { @@ -665,8 +690,8 @@ private T join(CompletableFuture future) throws ExecutionException, Timeo } } - void onStatusChange(long key, LockStatus prev, LockStatus curr) { - if (key != this.key) return; + void onStatusChange(long lockId, LockStatus prev, LockStatus curr) { + if (lockId != this.lockId) return; if (curr != HOLDING && holder != null) { status = curr; var handler = unlockHandler; From f2e9021ba21eeb8fe6cc0597b6d331770dcb26b9 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Wed, 6 Nov 2024 18:33:04 +0800 Subject: [PATCH 03/16] lock service --- doc/design/LockService.adoc | 40 ++++++++++++++++++++ src/org/jgroups/raft/blocks/LockService.java | 7 +++- 2 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 doc/design/LockService.adoc diff --git a/doc/design/LockService.adoc b/doc/design/LockService.adoc new file mode 100644 index 00000000..305fbf4f --- /dev/null +++ b/doc/design/LockService.adoc @@ -0,0 +1,40 @@ += Lock Service Design +Zhang Yifei; + +Lock service maintains the holder and waiters of a specified lockId, lockId could be seen as the identity of a lock, +and a lock could have only one holder and multiple waiters at same time. +The waiters will be queued, the first waiter could be changed to holder by the unlocking operation of last holder. + +== Holder Identity +The identity of a holder or waiter has to be a member of the RAFT cluster, because of there are server initiated +messages. currently the clients are stateless to the server after reply, so there is no way to send the server initiated +message to the client. +I have considered to create a new protocol to maintain sessions for clients, but it will be a lot of work to do, for +example, the session's creation and destruction has to be recorded in the RAFT log, and the liveness check of sessions +etc. +Holders and waiters in server are represented by the address(UUID) of the channel, the advantage of doing so is +the server can clear those disconnected holders and waiters base on the view of the cluster. + +== Holding Status +The holding status is only for connected members. Disconnected members can assume that they have released all locks, +because the leader of the cluster will clear those leaving members from the locking status when the view change event +arrived. +For the partition, members are in a minority subgroup will also being cleared by the leader if majority subgroup still +present, if all subgroups are minority, the new elected leader will force clear all previous locking status after cluster +resumed. +A new started cluster will clear all previous locking status as well, because of all members have a new address. +Since the locking status has the same lifecycle as the cluster, the log storage could be in memory implementation. + +== Mutex +With the lock service and the ReentrantLock could implement an exclusive lock cross JVMs. + +=== Command executing +The mutex's methods involve executing commands in the lock service, RaftException will be thrown when the command fails +to execute. +The command executing process is uninterruptible to avoid the inconsistent state, but a timeout could be set to control +the waiting time. + +=== Unexpected status +Many factors can cause unexpected unlocking or locking status, for example, disconnect the channel, network partition, +even calling the lock service with the same lockId, so handlers could be set to handle the unexpected status, let users +know the risks and decide how to deal with them, the RaftException also comes from the same idea. diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index 996f75df..221dff4f 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -71,8 +71,13 @@ *

  • {@link LockStatus#NONE NONE} - current member is not holding nor waiting the lock. * *

    + * Listeners could be registered to get the notification of lock statue change, The order of notifications is the same + * as the order of commands executed. Don't do any heavy job or block the calling thread in the listener. + *

    * The {@link Mutex} is a distributed implementation of {@link Lock}. It based on the lock service, a thread is holding - * the mutex also means the member is holding the lock in the lock service. + * the mutex also means the member is holding the lock in the lock service. There is only one {@link Mutex} instance + * for each lockId in a given lock service, {@link LockService#mutex(long)} method will create the instance if absent, + * otherwise return the existing one. * * @author Zhang Yifei */ From 70a2692e3aa0b9c06a4a9fa61a379dcaaea77e58 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Thu, 7 Nov 2024 14:15:00 +0800 Subject: [PATCH 04/16] lock service --- doc/design/LockService.adoc | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/doc/design/LockService.adoc b/doc/design/LockService.adoc index 305fbf4f..dc305f76 100644 --- a/doc/design/LockService.adoc +++ b/doc/design/LockService.adoc @@ -6,8 +6,8 @@ and a lock could have only one holder and multiple waiters at same time. The waiters will be queued, the first waiter could be changed to holder by the unlocking operation of last holder. == Holder Identity -The identity of a holder or waiter has to be a member of the RAFT cluster, because of there are server initiated -messages. currently the clients are stateless to the server after reply, so there is no way to send the server initiated +The identity of a holder or waiter has to be a member of the RAFT cluster, because of there are server-initiated +messages. currently the clients are stateless to the server after reply, so there is no way to send the server-initiated message to the client. I have considered to create a new protocol to maintain sessions for clients, but it will be a lot of work to do, for example, the session's creation and destruction has to be recorded in the RAFT log, and the liveness check of sessions @@ -23,7 +23,16 @@ For the partition, members are in a minority subgroup will also being cleared by present, if all subgroups are minority, the new elected leader will force clear all previous locking status after cluster resumed. A new started cluster will clear all previous locking status as well, because of all members have a new address. -Since the locking status has the same lifecycle as the cluster, the log storage could be in memory implementation. +Since the locking status has the same lifecycle as the cluster, the log storage could be in-memory implementation. + +== Waiting Status +Waiting status is treated the same as holding status in the case of disconnection and partitioning. +The tricky part is how to let the waiter know that it has become the holder, this is the server-initiated message +mentioned earlier. As members of the cluster, leader can send messages to any lock service, but in what way? +Those messages must be in order and can't be lost or duplicated, assume a dedicated message to do this, leader will +send them after logs are applied, and the sending process could be async, what if the leader left, the new leader can't +ensure those messages are not lost or duplicated. +Base on the log applying process of each member is a reliable choice, although it's not perfect. == Mutex With the lock service and the ReentrantLock could implement an exclusive lock cross JVMs. From 3d7130c1487761b68d94ccbb3313c3ebb3c9ca85 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Thu, 7 Nov 2024 15:02:59 +0800 Subject: [PATCH 05/16] lock service --- doc/design/LockService.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/design/LockService.adoc b/doc/design/LockService.adoc index dc305f76..36a52d08 100644 --- a/doc/design/LockService.adoc +++ b/doc/design/LockService.adoc @@ -10,8 +10,8 @@ The identity of a holder or waiter has to be a member of the RAFT cluster, becau messages. currently the clients are stateless to the server after reply, so there is no way to send the server-initiated message to the client. I have considered to create a new protocol to maintain sessions for clients, but it will be a lot of work to do, for -example, the session's creation and destruction has to be recorded in the RAFT log, and the liveness check of sessions -etc. +example, the session's creation and destruction needs to be recorded in the RAFT log, sessions needs to be available +in the new leader if leadership changed, and the client actually keeps connections to all members. Holders and waiters in server are represented by the address(UUID) of the channel, the advantage of doing so is the server can clear those disconnected holders and waiters base on the view of the cluster. From 826ea6b363bf12c9cccc53ef9df53f6e328e760d Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Wed, 13 Nov 2024 15:58:30 +0800 Subject: [PATCH 06/16] lock service --- doc/design/LockService.adoc | 68 ++++++++++++++++--- src/org/jgroups/raft/blocks/LockService.java | 44 ++++++------ .../jgroups/tests/blocks/LockServiceTest.java | 49 ++++++++----- 3 files changed, 112 insertions(+), 49 deletions(-) diff --git a/doc/design/LockService.adoc b/doc/design/LockService.adoc index 36a52d08..fc05483f 100644 --- a/doc/design/LockService.adoc +++ b/doc/design/LockService.adoc @@ -1,27 +1,27 @@ = Lock Service Design -Zhang Yifei; +Zhang Yifei Lock service maintains the holder and waiters of a specified lockId, lockId could be seen as the identity of a lock, -and a lock could have only one holder and multiple waiters at same time. +and a lock could have only one holder and multiple waiters at same time. + The waiters will be queued, the first waiter could be changed to holder by the unlocking operation of last holder. == Holder Identity The identity of a holder or waiter has to be a member of the RAFT cluster, because of there are server-initiated messages. currently the clients are stateless to the server after reply, so there is no way to send the server-initiated -message to the client. +message to the client. + I have considered to create a new protocol to maintain sessions for clients, but it will be a lot of work to do, for example, the session's creation and destruction needs to be recorded in the RAFT log, sessions needs to be available -in the new leader if leadership changed, and the client actually keeps connections to all members. +in the new leader if leadership changed, and the client actually keeps connections to all members. + Holders and waiters in server are represented by the address(UUID) of the channel, the advantage of doing so is the server can clear those disconnected holders and waiters base on the view of the cluster. == Holding Status The holding status is only for connected members. Disconnected members can assume that they have released all locks, because the leader of the cluster will clear those leaving members from the locking status when the view change event -arrived. +arrived. + For the partition, members are in a minority subgroup will also being cleared by the leader if majority subgroup still present, if all subgroups are minority, the new elected leader will force clear all previous locking status after cluster -resumed. +resumed. + A new started cluster will clear all previous locking status as well, because of all members have a new address. Since the locking status has the same lifecycle as the cluster, the log storage could be in-memory implementation. @@ -31,15 +31,67 @@ The tricky part is how to let the waiter know that it has become the holder, thi mentioned earlier. As members of the cluster, leader can send messages to any lock service, but in what way? Those messages must be in order and can't be lost or duplicated, assume a dedicated message to do this, leader will send them after logs are applied, and the sending process could be async, what if the leader left, the new leader can't -ensure those messages are not lost or duplicated. +ensure those messages are not lost or duplicated. + Base on the log applying process of each member is a reliable choice, although it's not perfect. +== Commands +LOCK:: +With the UUID of the member and the lockId. Hold the lock if possible, otherwise join the waiting queue. +TRY_LOCK:: +With the UUID of the member and the lockId. Hold the lock if possible. +UNLOCK:: +With the UUID of the member and the lockId. If the member is the holder then remove it from the holder status, +and make the first waiter to be the next holder, if the member is a waiter then remove it from the waiting queue. +UNLOCK_ALL:: +With the UUID of the member. Remove the member from all holding and waiting status. +RESET:: +With the UUIDs of members that currently connected. Check all holds and waiters if it's in the list, +if not then remove it from all holding and waiting status, notice the waiter that being promoted to the holder during +unlocking should be in the list as well. It's an internal command, it's not exposed to users. + +=== Reset +Members will resign from holder and waiter status when it's disconnected or in a minority subgroup of partition, it +notifies listeners that it has unlocked from all locks, but in the state machine, unlocking hasn't really happened yet. +Unlocking will happen immediately by reset command if the leader still present, or happened eventually after a new +leader present. + +There are two types reset, one is to reset with the list of current members, and another one is to reset with an empty +list which means all state will be cleared. + +The first one is used when the leader found members leaving or a new leader is elected because of previous leader +leaving. + +The second one is used when a new leader is elected and not because of the previous leader leaving. + + +Scenarios for electing a new leader:: +. Majority is just reached. +** New member connected +** Disconnected member reconnected +** Merging views (no subgroup has majority members) +. Leader leave, and majority still there. +. There is a leader, but view merging cause the coordinator changed and the new coordinator started a new term voting +before knowing the existence of leader, and the new coordinator become next leader. + +Above scenario 1 use second way to reset, because potentially all members have resigned. + +Above scenario 2 will use first way to reset, because the cluster has majority members all the time, these members +won't resign. + +The scenario 3 won't happen I think, because the existing leader will always have longer log because of the reset +command. + +== Listener +Listeners could be registered to listen on the status change of the lock. + +The listener is notified by the RAFT working thread after state is updated while RAFT log applying, that means the +listener shouldn't have heavy work, put those work in a separate thread may be. + +I don't make any assumptions about how users will use this listener, because there are so many possibilities, +maybe just simply set a flag, maybe start/stop a service and its working thread, maybe start/stop a job in a thread +pool, the job may be serial or parallel, so I choose not to assume that just simply notify it in a timely and orderly +manner, completely consistent with the local state machine. + +The notification to the listener may be earlier or later than the command execution result, it depends on the member is +the leader or a follower. + == Mutex With the lock service and the ReentrantLock could implement an exclusive lock cross JVMs. === Command executing The mutex's methods involve executing commands in the lock service, RaftException will be thrown when the command fails -to execute. +to execute. + The command executing process is uninterruptible to avoid the inconsistent state, but a timeout could be set to control the waiting time. diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index 221dff4f..1b9ace00 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -33,7 +33,6 @@ import org.jgroups.ChannelListener; import org.jgroups.Event; import org.jgroups.JChannel; -import org.jgroups.MergeView; import org.jgroups.Message; import org.jgroups.UpHandler; import org.jgroups.View; @@ -49,9 +48,9 @@ import org.jgroups.util.UUID; /** - * A state machine that maintains the holder and waiters for a specified lockId. For a lockId, it could have only one - * holder in same time, other acquirers will be queued as waiters, when the holder unlock from the lockId, the next - * holder will be polled from the waiting queue if there is a waiter. + * A state machine that maintains the holder and waiters for a specified lockId. For a lockId, it has only one holder + * in same time, other acquirers will be queued as waiters, when the holder unlock from the lockId, the next holder will + * be polled from the waiting queue if there is a waiter. *

    * The {@link Address} of the member will be used to identify the acquirers(holders and waiters), that means a new * connected member will have a new identity, so a disconnected member will unlock from all related lockId @@ -71,7 +70,7 @@ *

  • {@link LockStatus#NONE NONE} - current member is not holding nor waiting the lock. * *

    - * Listeners could be registered to get the notification of lock statue change, The order of notifications is the same + * Listeners could be registered to get the notification of lock status change. The order of notifications is the same * as the order of commands executed. Don't do any heavy job or block the calling thread in the listener. *

    * The {@link Mutex} is a distributed implementation of {@link Lock}. It based on the lock service, a thread is holding @@ -91,7 +90,8 @@ public class LockService { protected final Map> memberLocks = new LinkedHashMap<>(); protected volatile View view; - protected ExtendedUUID address; + protected volatile boolean inTransition; + protected volatile ExtendedUUID address; protected final ConcurrentMap lockStatus = new ConcurrentHashMap<>(); protected final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); @@ -205,14 +205,7 @@ public byte[] apply(byte[] data, int offset, int length, boolean serialize_respo public void roleChanged(Role role) { if (role == Role.Leader) { try { - // Reset after the leader is elected - View v = view; - boolean clear = false; - if (v instanceof MergeView) { - int majority = raft.raft().majority(); - clear = ((MergeView) v).getSubgroups().stream().allMatch(t -> t.size() < majority); - } - reset(clear ? null : v); + reset(inTransition ? view : null); } catch (Throwable e) { log.error("Fail to send reset command", e); } @@ -237,7 +230,7 @@ public Object up(Event evt) { @Override public void channelDisconnected(JChannel channel) { - cleanup(); + resign(); view = null; inTransition = false; } } @@ -337,7 +330,7 @@ protected void notifyListeners(long lockId, LockStatus prev, LockStatus curr, bo if (prev == curr) return; } Mutex mutex = mutexes.get(lockId); - if (mutex != null) mutex.onStatusChange(lockId, prev, curr); + if (mutex != null) mutex.onStatusChange(prev, curr); for (Listener listener : listeners) { try { listener.onStatusChange(lockId, prev, curr); @@ -349,15 +342,19 @@ protected void notifyListeners(long lockId, LockStatus prev, LockStatus curr, bo protected void handleView(View next) { View prev = this.view; this.view = next; + Address leader = raft.leader(); if (log.isTraceEnabled()) { - log.trace("[%s] View accepted: %s, prev: %s, leader: %s", address, next, prev, raft.leader()); + log.trace("[%s] View accepted: %s, prev: %s, leader: %s", address, next, prev, leader); } if (prev != null) { int majority = raft.raft().majority(); + inTransition = prev.size() >= majority && next.size() >= majority + && leader != null && !next.containsMember(leader); + if (prev.size() >= majority && next.size() < majority) { // lost majority - // In partition case if majority is still working, it will be forced to unlock by reset command. - cleanup(); + // In partition case if majority is still working, it will be unlocked by reset command. + resign(); } else if (!next.containsMembers(prev.getMembersRaw()) && raft.isLeader()) { // member left try { reset(next); @@ -368,7 +365,7 @@ protected void handleView(View next) { } } - protected void cleanup() { + protected void resign() { lockStatus.forEach((k, v) -> notifyListeners(k, v, NONE, true)); } @@ -382,7 +379,7 @@ protected void reset(View view) { writeUuid((UUID) member, out); } assert out.position() <= 6 + len * 16; - invoke(out).thenApply(t -> null).exceptionally(e -> { + invoke(out).exceptionally(e -> { log.error("Fail to reset to " + view, e); return null; }); } @@ -457,7 +454,7 @@ public CompletableFuture unlock(long lockId) { * Release all related locks for this member. * @return async completion */ - public CompletableFuture unlock() { + public CompletableFuture unlockAll() { var out = new ByteArrayDataOutputStream(17); out.writeByte(UNLOCK_ALL); writeUuid(address(), out); @@ -695,8 +692,7 @@ private T join(CompletableFuture future) throws ExecutionException, Timeo } } - void onStatusChange(long lockId, LockStatus prev, LockStatus curr) { - if (lockId != this.lockId) return; + void onStatusChange(LockStatus prev, LockStatus curr) { if (curr != HOLDING && holder != null) { status = curr; var handler = unlockHandler; diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index b3d00af6..99eddd70 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -43,9 +43,7 @@ import org.jgroups.JChannel; import org.jgroups.View; import org.jgroups.protocols.pbcast.GMS; -import org.jgroups.protocols.raft.ELECTION2; import org.jgroups.protocols.raft.RAFT; -import org.jgroups.protocols.raft.election.BaseElection; import org.jgroups.raft.Options; import org.jgroups.raft.RaftHandle; import org.jgroups.raft.StateMachine; @@ -226,7 +224,7 @@ public void lock() throws Exception { assertEquals(service_a.lockStatus(103L), HOLDING); // unlock a - service_a.unlock().get(3, SECONDS); + service_a.unlockAll().get(3, SECONDS); events_a.next().assertEq(102L, HOLDING, NONE); assertEquals(service_a.lockStatus(102L), NONE); @@ -241,7 +239,7 @@ public void lock() throws Exception { assertEquals(service_b.lockStatus(103L), HOLDING); // unlock b - service_b.unlock().get(3, SECONDS); + service_b.unlockAll().get(3, SECONDS); events_b.next().assertEq(101L, HOLDING, NONE); assertEquals(service_b.lockStatus(101L), NONE); @@ -253,7 +251,7 @@ public void lock() throws Exception { assertEquals(service_c.lockStatus(101L), HOLDING); // unlock c - service_c.unlock().get(3, SECONDS); + service_c.unlockAll().get(3, SECONDS); events_c.next().assertEq(101L, HOLDING, NONE); assertEquals(service_c.lockStatus(101L), NONE); @@ -282,7 +280,7 @@ public void tryLock() throws Exception { service_a.unlock(101L); events_a.next().assertEq(101L, HOLDING, NONE); events_c.next().assertEq(101L, WAITING, HOLDING); - service_c.unlock(); + service_c.unlockAll(); events_c.next().assertEq(101L, HOLDING, NONE); assertNull(events_b.next(1)); } @@ -302,14 +300,14 @@ public void reset_by_disconnect() throws Exception { events_e.next().assertEq(101L, NONE, WAITING); // disconnect the coordinator/leader/holder - channel(0).disconnect(); - // reset to [B,C,D,E] + channel(0).disconnect(); // [B,C,D,E] + // reset to [B,C,D,E], notified by reset command. events_a.next().assertEq(101L, HOLDING, NONE); events_b.next().assertEq(101L, WAITING, HOLDING); // disconnect a participant - channel(2).disconnect(); - // reset to [B,D,E] + channel(2).disconnect(); // [B,D,E] + // reset to [B,D,E], notified by reset command. events_c.next().assertEq(101L, WAITING, NONE); service_b.unlock(101L); @@ -317,16 +315,33 @@ public void reset_by_disconnect() throws Exception { events_d.next().assertEq(101L, WAITING, HOLDING); // disconnect the holder and lost majority - channel(3).disconnect(); - // no reset, notify NONE to all. + channel(3).disconnect(); // [B,E] + // no reset, notify base on local status. events_d.next().assertEq(101L, HOLDING, NONE); events_e.next().assertEq(101L, WAITING, NONE); // reconnect the previous holder and reach majority - service_d = reconnect(channel(3), events_d); - // reset to [B,E,D], and D with new address. - assertNull(events_d.next()); - events_e.next().assertEq(101L, WAITING, HOLDING); + service_d = reconnect(channel(3), events_d); // [B,E,D] + assertNull(events_d.next()); // D has a new address + // reset to clear all previous status, notified by reset command. + events_e.next().assertEq(101L, WAITING, NONE); // duplicated + + assertEquals(service_d.lock(101L).get(3, SECONDS), HOLDING); + assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); + events_d.next().assertEq(101L, NONE, HOLDING); + events_e.next().assertEq(101L, NONE, WAITING); + + // disconnect to lost majority + channel(1).disconnect(); // [E,D] + // no reset, notify base on local status. + events_d.next().assertEq(101L, HOLDING, NONE); + events_e.next().assertEq(101L, WAITING, NONE); + + // reconnect to reach majority + service_b = reconnect(channel(1), events_b); // [E,D,B] + // reset to clear all previous status, notified by reset command. + events_d.next().assertEq(101L, HOLDING, NONE); // duplicated + events_e.next().assertEq(101L, WAITING, NONE); // duplicated } public void reset_by_partition() throws Exception { @@ -418,7 +433,7 @@ public void snapshot() throws Exception { services.get(i % services.size()).unlock(key).get(3, SECONDS); } - service_c.unlock(); + service_c.unlockAll(); leader().snapshotAsync().get(3, SECONDS); From cd0e196d6a65192733ad74f07a7b6baadd4e8a75 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Wed, 13 Nov 2024 17:16:05 +0800 Subject: [PATCH 07/16] lock service --- doc/design/LockService.adoc | 2 +- .../jgroups/tests/blocks/LockServiceTest.java | 48 ++++++++++++------- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/doc/design/LockService.adoc b/doc/design/LockService.adoc index fc05483f..79585490 100644 --- a/doc/design/LockService.adoc +++ b/doc/design/LockService.adoc @@ -76,7 +76,7 @@ The scenario 3 won't happen I think, because the existing leader will always hav command. == Listener -Listeners could be registered to listen on the status change of the lock. + +Listeners could be registered to listen on the status change of locks. + The listener is notified by the RAFT working thread after state is updated while RAFT log applying, that means the listener shouldn't have heavy work, put those work in a separate thread may be. + I don't make any assumptions about how users will use this listener, because there are so many possibilities, diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index 99eddd70..aa88dbbe 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -299,49 +299,53 @@ public void reset_by_disconnect() throws Exception { assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); events_e.next().assertEq(101L, NONE, WAITING); - // disconnect the coordinator/leader/holder + // Disconnect the coordinator/leader/holder channel(0).disconnect(); // [B,C,D,E] - // reset to [B,C,D,E], notified by reset command. + // Resigned because of disconnection events_a.next().assertEq(101L, HOLDING, NONE); + // Reset to [B,C,D,E], notified by reset command. events_b.next().assertEq(101L, WAITING, HOLDING); - // disconnect a participant + // Disconnect a participant channel(2).disconnect(); // [B,D,E] - // reset to [B,D,E], notified by reset command. + // Resigned because of disconnection events_c.next().assertEq(101L, WAITING, NONE); + // Reset to [B,D,E] service_b.unlock(101L); events_b.next().assertEq(101L, HOLDING, NONE); events_d.next().assertEq(101L, WAITING, HOLDING); - // disconnect the holder and lost majority + // Disconnect the holder and lost majority channel(3).disconnect(); // [B,E] - // no reset, notify base on local status. + // Resigned because of disconnection events_d.next().assertEq(101L, HOLDING, NONE); + // Resigned because of lost majority events_e.next().assertEq(101L, WAITING, NONE); - // reconnect the previous holder and reach majority + // Reconnect the previous holder and reach majority. service_d = reconnect(channel(3), events_d); // [B,E,D] - assertNull(events_d.next()); // D has a new address - // reset to clear all previous status, notified by reset command. - events_e.next().assertEq(101L, WAITING, NONE); // duplicated + // Nothing to do with D because it has a new address + assertNull(events_d.next()); + // Reset to clear all previous status, notified by reset command. + events_e.next().assertEq(101L, WAITING, NONE); // duplicate notification assertEquals(service_d.lock(101L).get(3, SECONDS), HOLDING); assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); events_d.next().assertEq(101L, NONE, HOLDING); events_e.next().assertEq(101L, NONE, WAITING); - // disconnect to lost majority + // Disconnect to lost majority channel(1).disconnect(); // [E,D] - // no reset, notify base on local status. + // Resigned because of lost majority events_d.next().assertEq(101L, HOLDING, NONE); events_e.next().assertEq(101L, WAITING, NONE); - // reconnect to reach majority + // Reconnect to reach majority service_b = reconnect(channel(1), events_b); // [E,D,B] - // reset to clear all previous status, notified by reset command. - events_d.next().assertEq(101L, HOLDING, NONE); // duplicated - events_e.next().assertEq(101L, WAITING, NONE); // duplicated + // Reset to clear all previous status, notified by reset command. + events_d.next().assertEq(101L, HOLDING, NONE); // duplicate notification + events_e.next().assertEq(101L, WAITING, NONE); // duplicate notification } public void reset_by_partition() throws Exception { @@ -357,17 +361,20 @@ public void reset_by_partition() throws Exception { events_a.next().assertEq(101L, NONE, WAITING); events_a.next().assertEq(102L, NONE, WAITING); - // partition into a majority subgroup and minority subgroup + // Partition into a majority subgroup and a minority subgroup partition(new int[]{0, 1, 2}, new int[]{3, 4}); + // Resigned because of lost majority events_d.next().assertEq(101L, HOLDING, NONE); events_e.next().assertEq(102L, HOLDING, NONE); + // Reset to [A,B,C], notified by reset command. events_a.next().assertEq(101L, WAITING, HOLDING); events_a.next().assertEq(102L, WAITING, HOLDING); merge(0, 3); + // Reset to [A,B,C], notified by reset command. events_d.next().assertEq(101L, HOLDING, NONE); events_e.next().assertEq(102L, HOLDING, NONE); @@ -379,9 +386,10 @@ public void reset_by_partition() throws Exception { assertEquals(service_a.lockStatus(101L), HOLDING); assertEquals(service_a.lockStatus(102L), HOLDING); - // partition into subgroups without majority + // Partition into subgroups without majority partition(new int[]{0, 1}, new int[]{2}, new int[]{3, 4}); + // Resigned because of lost majority events_a.next().assertEq(101L, HOLDING, NONE); events_a.next().assertEq(102L, HOLDING, NONE); events_b.next().assertEq(101L, WAITING, NONE); @@ -390,6 +398,7 @@ public void reset_by_partition() throws Exception { merge(0, 2, 3); waitUntilLeaderElected(0, 1, 2, 3, 4); + // Reset to clear all previous status, notified by reset command. events_a.next().assertEq(101L, HOLDING, NONE); events_a.next().assertEq(102L, HOLDING, NONE); events_b.next().assertEq(101L, WAITING, NONE); @@ -402,7 +411,10 @@ public void reset_by_partition() throws Exception { channel(0).disconnect(); + // Resigned because of disconnection events_a.next().assertEq(101L, HOLDING, NONE); + + // Reset to [B,C,D,E], notified by reset command. events_e.next().assertEq(101L, WAITING, HOLDING); } From 64d4c25189808e7916ba05bf3c660bffdbc39a70 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Wed, 13 Nov 2024 17:38:41 +0800 Subject: [PATCH 08/16] lock service --- src/org/jgroups/raft/blocks/LockService.java | 31 +++++++++---------- .../jgroups/tests/blocks/LockServiceTest.java | 2 ++ 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index 1b9ace00..67edbd8a 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -177,26 +177,20 @@ public byte[] apply(byte[] data, int offset, int length, boolean serialize_respo LockStatus status = null; switch (in.readByte()) { case LOCK: - status = doLock(readLong(in), readUuid(in), false); - break; + status = doLock(readLong(in), readUuid(in), false); break; case TRY_LOCK: - status = doLock(readLong(in), readUuid(in), true); - break; + status = doLock(readLong(in), readUuid(in), true); break; case UNLOCK: - LockEntry lock = locks.computeIfAbsent(readLong(in), LockEntry::new); - doUnlock(readUuid(in), lock, null); - break; + doUnlock(readLong(in), readUuid(in)); break; case UNLOCK_ALL: - doUnlock(readUuid(in), null); - break; + doUnlock(readUuid(in), null); break; case RESET: int len = readInt(in); List members = new ArrayList<>(len); for (int i = 0; i < len; i++) { members.add(readUuid(in)); } - doReset(members); - break; + doReset(members); break; } return serialize_response && status != null ? new byte[] {(byte) status.ordinal()} : null; } @@ -257,14 +251,19 @@ protected LockStatus doLock(long lockId, UUID member, boolean trying) { return next; } + protected void doUnlock(long lockId, UUID member) { + LockEntry lock = locks.get(lockId); if (lock == null) return; + LockStatus prev = doUnlock(member, lock, null); + if (prev != NONE) unbind(member, lock); + } + protected void doUnlock(UUID member, Set unlocking) { Set set = memberLocks.get(member); if (set == null) return; - for (LockEntry lock : set.toArray(LockEntry[]::new)) { - doUnlock(member, lock, unlocking); - } + set.removeIf(t -> doUnlock(member, t, unlocking) != NONE); + if (set.isEmpty()) memberLocks.remove(member); } - protected void doUnlock(UUID member, LockEntry lock, Set unlocking) { + protected LockStatus doUnlock(UUID member, LockEntry lock, Set unlocking) { LockStatus prev = HOLDING; UUID holder = null; List waiters = null; @@ -279,7 +278,6 @@ protected void doUnlock(UUID member, LockEntry lock, Set unlocking) { } else { prev = lock.waiters.remove(member) ? WAITING : NONE; } - if (prev != NONE) unbind(member, lock); if (address.equals(member)) { notifyListeners(lock.id, prev, NONE, false); } else if (address.equals(holder)) { @@ -299,6 +297,7 @@ protected void doUnlock(UUID member, LockEntry lock, Set unlocking) { log.trace("[%s] %s unlock %s, prev: %s", address, waiter, lock.id, WAITING); } } + return prev; } protected void doReset(List members) { diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index aa88dbbe..580150b2 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -373,6 +373,8 @@ public void reset_by_partition() throws Exception { events_a.next().assertEq(102L, WAITING, HOLDING); merge(0, 3); + waitUntilLeaderElected(0, 1, 2, 3, 4); + assertTrue(raft(0).isLeader()); // Reset to [A,B,C], notified by reset command. events_d.next().assertEq(101L, HOLDING, NONE); From 1de6cdcad16036d0c7a89c2652ce5d21ba257c1a Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Thu, 14 Nov 2024 14:25:46 +0800 Subject: [PATCH 09/16] lock service --- src/org/jgroups/raft/blocks/LockService.java | 14 +++++++------- .../org/jgroups/tests/blocks/LockServiceTest.java | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index 67edbd8a..8a90d58f 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -90,7 +90,7 @@ public class LockService { protected final Map> memberLocks = new LinkedHashMap<>(); protected volatile View view; - protected volatile boolean inTransition; + protected volatile Address lastLeader; protected volatile ExtendedUUID address; protected final ConcurrentMap lockStatus = new ConcurrentHashMap<>(); @@ -199,7 +199,7 @@ public byte[] apply(byte[] data, int offset, int length, boolean serialize_respo public void roleChanged(Role role) { if (role == Role.Leader) { try { - reset(inTransition ? view : null); + reset(lastLeader != null ? view : null); } catch (Throwable e) { log.error("Fail to send reset command", e); } @@ -224,7 +224,7 @@ public Object up(Event evt) { @Override public void channelDisconnected(JChannel channel) { - resign(); view = null; inTransition = false; + resign(); view = null; lastLeader = null; address = null; } } @@ -341,16 +341,13 @@ protected void notifyListeners(long lockId, LockStatus prev, LockStatus curr, bo protected void handleView(View next) { View prev = this.view; this.view = next; - Address leader = raft.leader(); + Address leader = raft.leader(); lastLeader = leader; if (log.isTraceEnabled()) { log.trace("[%s] View accepted: %s, prev: %s, leader: %s", address, next, prev, leader); } if (prev != null) { int majority = raft.raft().majority(); - inTransition = prev.size() >= majority && next.size() >= majority - && leader != null && !next.containsMember(leader); - if (prev.size() >= majority && next.size() < majority) { // lost majority // In partition case if majority is still working, it will be unlocked by reset command. resign(); @@ -369,6 +366,9 @@ protected void resign() { } protected void reset(View view) { + if (log.isTraceEnabled()) { + log.trace("[%s] Send reset command: %s", address, view); + } Address[] members = view != null ? view.getMembersRaw() : new Address[0]; int len = members.length; var out = new ByteArrayDataOutputStream(6 + len * 16); diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index 580150b2..4ef0c449 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -362,7 +362,8 @@ public void reset_by_partition() throws Exception { events_a.next().assertEq(102L, NONE, WAITING); // Partition into a majority subgroup and a minority subgroup - partition(new int[]{0, 1, 2}, new int[]{3, 4}); + // Put the minority subgroup first, otherwise it will receive the AppendEntriesRequest of reset command. + partition(new int[]{3, 4}, new int[]{0, 1, 2}); // Resigned because of lost majority events_d.next().assertEq(101L, HOLDING, NONE); @@ -428,7 +429,7 @@ public void snapshot() throws Exception { assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); events_b.next().assertEq(101L, NONE, WAITING); - partition(new int[]{0, 2, 3, 4}, new int[]{1}); + partition(new int[]{1}, new int[]{0, 2, 3, 4}); events_b.next().assertEq(101, WAITING, NONE); From f22d7045ad6d9c27a1d28bda7b4f58ccf8b0337d Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Thu, 14 Nov 2024 14:57:15 +0800 Subject: [PATCH 10/16] lock service --- src/org/jgroups/raft/blocks/LockService.java | 2 +- .../jgroups/tests/blocks/LockServiceTest.java | 28 +++++++++++++------ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index 8a90d58f..68219950 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -87,7 +87,7 @@ public class LockService { protected final RaftHandle raft; protected final Map locks = new HashMap<>(); - protected final Map> memberLocks = new LinkedHashMap<>(); + protected final Map> memberLocks = new HashMap<>(); protected volatile View view; protected volatile Address lastLeader; diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index 4ef0c449..f4ef3c42 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -74,11 +74,21 @@ protected static class Event { final long key; final LockStatus prev, curr; Event(long key, LockStatus prev, LockStatus curr) { this.key = key; this.prev = prev; this.curr = curr; } - protected void assertEq(long key, LockStatus prev, LockStatus curr) { + void assertEq(long key, LockStatus prev, LockStatus curr) { assertThat(this).usingRecursiveComparison().isEqualTo(new Event(key, prev, curr)); } } + protected static class Batch { + final List events; + Batch(List events) {this.events = events;} + + Batch assertContains(long key, LockStatus prev, LockStatus curr) { + assertThat(events).usingRecursiveFieldByFieldElementComparator().contains(new Event(key, prev, curr)); + return this; + } + } + protected static class Events implements LockService.Listener { final BlockingQueue queue = new LinkedBlockingQueue<>(); @@ -89,6 +99,11 @@ public void onStatusChange(long key, LockStatus prev, LockStatus curr) { protected Event next(int secs) throws InterruptedException { return queue.poll(secs, SECONDS); } protected Event next() throws InterruptedException { return next(3); } + protected Batch batch(int count) throws InterruptedException { + List list = new ArrayList<>(count); + for (int i = 0; i < count; i++) list.add(next()); + return new Batch(list); + } } protected static class Service extends LockService { @@ -370,8 +385,7 @@ public void reset_by_partition() throws Exception { events_e.next().assertEq(102L, HOLDING, NONE); // Reset to [A,B,C], notified by reset command. - events_a.next().assertEq(101L, WAITING, HOLDING); - events_a.next().assertEq(102L, WAITING, HOLDING); + events_a.batch(2).assertContains(101L, WAITING, HOLDING).assertContains(102L, WAITING, HOLDING); merge(0, 3); waitUntilLeaderElected(0, 1, 2, 3, 4); @@ -393,17 +407,13 @@ public void reset_by_partition() throws Exception { partition(new int[]{0, 1}, new int[]{2}, new int[]{3, 4}); // Resigned because of lost majority - events_a.next().assertEq(101L, HOLDING, NONE); - events_a.next().assertEq(102L, HOLDING, NONE); - events_b.next().assertEq(101L, WAITING, NONE); - events_c.next().assertEq(102L, WAITING, NONE); + events_a.batch(2).assertContains(101L, HOLDING, NONE).assertContains(102L, HOLDING, NONE); merge(0, 2, 3); waitUntilLeaderElected(0, 1, 2, 3, 4); // Reset to clear all previous status, notified by reset command. - events_a.next().assertEq(101L, HOLDING, NONE); - events_a.next().assertEq(102L, HOLDING, NONE); + events_a.batch(2).assertContains(101L, HOLDING, NONE).assertContains(102L, HOLDING, NONE); events_b.next().assertEq(101L, WAITING, NONE); events_c.next().assertEq(102L, WAITING, NONE); From ca650f75561de22e07d1fc2c6b67eaa5dabb85c5 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Thu, 14 Nov 2024 17:00:23 +0800 Subject: [PATCH 11/16] lock service --- doc/design/LockService.adoc | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/doc/design/LockService.adoc b/doc/design/LockService.adoc index 79585490..b3cb5c78 100644 --- a/doc/design/LockService.adoc +++ b/doc/design/LockService.adoc @@ -62,18 +62,21 @@ The second one is used when a new leader is elected and not because of the previ Scenarios for electing a new leader:: . Majority is just reached. -** New member connected -** Disconnected member reconnected -** Merging views (no subgroup has majority members) +.. New member connected +.. Disconnected member reconnected +.. Merging views (no subgroup has majority members) . Leader leave, and majority still there. -. There is a leader, but view merging cause the coordinator changed and the new coordinator started a new term voting -before knowing the existence of leader, and the new coordinator become next leader. +. There is a leader in majority subgroup, but view merging cause the coordinator changed and the new coordinator started +a new term voting before knowing the existence of leader. +.. The new coordinator is elected to be the new leader. +.. The existing leader is re-elected to be the leader of next term. -Above scenario 1 use second way to reset, because potentially all members have resigned. + -Above scenario 2 will use first way to reset, because the cluster has majority members all the time, these members +Above scenario 1 will reset to empty, because potentially all members have resigned. + +Above scenario 2 will reset to current members, because the cluster has majority members all the time, these members won't resign. + -The scenario 3 won't happen I think, because the existing leader will always have longer log because of the reset +Scenario 3.1 won't happen I think, because the existing leader will always have longer log because of the reset command. +Scenario 3.2 will reset to current members. == Listener Listeners could be registered to listen on the status change of locks. + From 60f2bac65631f514d3a1f203372c3dc46c78d719 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Fri, 15 Nov 2024 18:11:51 +0800 Subject: [PATCH 12/16] lock service --- src/org/jgroups/raft/blocks/LockService.java | 9 +-- .../jgroups/tests/blocks/LockServiceTest.java | 81 ++++++++++++++----- 2 files changed, 66 insertions(+), 24 deletions(-) diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index 68219950..ff632426 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -253,17 +253,16 @@ protected LockStatus doLock(long lockId, UUID member, boolean trying) { protected void doUnlock(long lockId, UUID member) { LockEntry lock = locks.get(lockId); if (lock == null) return; - LockStatus prev = doUnlock(member, lock, null); - if (prev != NONE) unbind(member, lock); + if (doUnlock(member, lock, null)) unbind(member, lock); } protected void doUnlock(UUID member, Set unlocking) { Set set = memberLocks.get(member); if (set == null) return; - set.removeIf(t -> doUnlock(member, t, unlocking) != NONE); + set.removeIf(t -> doUnlock(member, t, unlocking)); if (set.isEmpty()) memberLocks.remove(member); } - protected LockStatus doUnlock(UUID member, LockEntry lock, Set unlocking) { + protected boolean doUnlock(UUID member, LockEntry lock, Set unlocking) { LockStatus prev = HOLDING; UUID holder = null; List waiters = null; @@ -297,7 +296,7 @@ protected LockStatus doUnlock(UUID member, LockEntry lock, Set unlocking) log.trace("[%s] %s unlock %s, prev: %s", address, waiter, lock.id, WAITING); } } - return prev; + return prev != NONE; } protected void doReset(List members) { diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index f4ef3c42..4ac810bb 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -31,6 +31,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; @@ -389,17 +391,17 @@ public void reset_by_partition() throws Exception { merge(0, 3); waitUntilLeaderElected(0, 1, 2, 3, 4); - assertTrue(raft(0).isLeader()); + assertTrue(raft(0).isLeader()); // A has longer log // Reset to [A,B,C], notified by reset command. events_d.next().assertEq(101L, HOLDING, NONE); events_e.next().assertEq(102L, HOLDING, NONE); + // Holder is A all the time assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); assertEquals(service_c.lock(102L).get(3, SECONDS), WAITING); events_b.next().assertEq(101L, NONE, WAITING); events_c.next().assertEq(102L, NONE, WAITING); - assertEquals(service_a.lockStatus(101L), HOLDING); assertEquals(service_a.lockStatus(102L), HOLDING); @@ -408,6 +410,8 @@ public void reset_by_partition() throws Exception { // Resigned because of lost majority events_a.batch(2).assertContains(101L, HOLDING, NONE).assertContains(102L, HOLDING, NONE); + events_b.next().assertEq(101L, WAITING, NONE); + events_c.next().assertEq(102L, WAITING, NONE); merge(0, 2, 3); waitUntilLeaderElected(0, 1, 2, 3, 4); @@ -417,9 +421,10 @@ public void reset_by_partition() throws Exception { events_b.next().assertEq(101L, WAITING, NONE); events_c.next().assertEq(102L, WAITING, NONE); + // There is no holder after reset assertEquals(service_a.lock(101L).get(3, SECONDS), HOLDING); - events_a.next().assertEq(101L, NONE, HOLDING); assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); + events_a.next().assertEq(101L, NONE, HOLDING); events_e.next().assertEq(101L, NONE, WAITING); channel(0).disconnect(); @@ -592,32 +597,40 @@ public void mutex_inconsistency() throws Exception { Mutex a = service_a.mutex(101); Mutex b = service_b.mutex(101); - for (Mutex t : List.of(a, b)) { + for (Mutex mutex : List.of(a, b)) { CompletableFuture unlocked = new CompletableFuture<>(); CompletableFuture locked = new CompletableFuture<>(); - t.setUnexpectedUnlockHandler(unlocked::complete); - t.setUnexpectedLockHandler(locked::complete); + mutex.setUnexpectedUnlockHandler(unlocked::complete); + mutex.setUnexpectedLockHandler(locked::complete); - t.lock(); + mutex.lock(); try { - t.service().unlock(101).join(); - assertSame(unlocked.get(3, SECONDS), t); - assertSame(t.getHolder(), Thread.currentThread()); - assertEquals(t.getStatus(), NONE); + // Unexpected unlock + mutex.service().unlock(101).join(); + // Callback + assertSame(unlocked.get(3, SECONDS), mutex); + // Inconsistent state + assertSame(mutex.getHolder(), Thread.currentThread()); + assertEquals(mutex.getStatus(), NONE); } finally { - t.unlock(); + mutex.unlock(); } - t.service().lock(101).join(); - assertSame(locked.get(3, SECONDS), t); - assertEquals(t.getStatus(), HOLDING); - - if (t.tryLock()) t.unlock(); - assertEquals(t.getStatus(), NONE); + // Unexpected lock + mutex.service().lock(101).join(); + // Callback + assertSame(locked.get(3, SECONDS), mutex); + // Inconsistent state + assertEquals(mutex.getStatus(), HOLDING); + // Fix it with mutex instance + if (mutex.tryLock()) mutex.unlock(); + assertEquals(mutex.getStatus(), NONE); } + // Unexpected unlock for waiting status, it will retry to lock instead of calling the handler. a.lock(); CompletableFuture f; try { + // lock async f = CompletableFuture.runAsync(() -> { b.lock(); try { @@ -626,14 +639,44 @@ public void mutex_inconsistency() throws Exception { b.unlock(); } }); + // Make sure the thread is blocked in WAITING status Util.waitUntil(5000, 1000, () -> service_b.lockStatus(101) == WAITING); + // Successfully unlock with lock service service_b.unlock(101).get(3, SECONDS); + // Make sure all logs is applied which means mutex has been notified all status changes waitUntilNodesApplyAllLogs(); + // The thread is awakened by unlocking, and lock again. Util.waitUntil(5000, 1000, () -> service_b.lockStatus(101) == WAITING); } finally { a.unlock(); } - f.get(5, SECONDS); + f.get(5, SECONDS); // check error + } + + public void fast_response_and_slow_log() throws TimeoutException { + Mutex b = service_b.mutex(101); // must be a follower + + b.service().addListener((lockId, prev, next) -> { + LockSupport.parkNanos(10_000_000); // slow down the log applying (notification) + }); + AtomicInteger locked = new AtomicInteger(), unlocked = new AtomicInteger(); + b.setUnexpectedLockHandler(t -> locked.incrementAndGet()); + b.setUnexpectedUnlockHandler(t -> unlocked.incrementAndGet()); + + for (int i = 0; i < 10; i++) { + b.lock(); b.unlock(); + } + waitUntilNodesApplyAllLogs(); + + for (int i = 0; i < 10; i++) { + b.lock(); b.unlock(); + } + b.lock(); + waitUntilNodesApplyAllLogs(); + b.unlock(); + + assertEquals(unlocked.get(), 0); + assertEquals(locked.get(), 0); } public void mutex_exception() { From 559af989608a72cdd46c81f44bc2cff48873be99 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Fri, 15 Nov 2024 18:15:23 +0800 Subject: [PATCH 13/16] lock service --- src/org/jgroups/raft/blocks/LockService.java | 1 - .../org/jgroups/tests/blocks/LockServiceTest.java | 1 - 2 files changed, 2 deletions(-) diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index ff632426..14f8213f 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -9,7 +9,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index 4ac810bb..58cfe4b6 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; From 7804d666dbe3fa0cfc488dddf965d0ad40e2c18d Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Tue, 19 Nov 2024 12:41:16 +0800 Subject: [PATCH 14/16] lock service --- doc/design/LockService.adoc | 16 +-- src/org/jgroups/raft/blocks/LockService.java | 104 ++++++++++++------ .../jgroups/tests/blocks/LockServiceTest.java | 69 ++++++------ 3 files changed, 108 insertions(+), 81 deletions(-) diff --git a/doc/design/LockService.adoc b/doc/design/LockService.adoc index b3cb5c78..54f6e221 100644 --- a/doc/design/LockService.adoc +++ b/doc/design/LockService.adoc @@ -21,8 +21,7 @@ because the leader of the cluster will clear those leaving members from the lock arrived. + For the partition, members are in a minority subgroup will also being cleared by the leader if majority subgroup still present, if all subgroups are minority, the new elected leader will force clear all previous locking status after cluster -resumed. + -A new started cluster will clear all previous locking status as well, because of all members have a new address. +resumed. A new started cluster will clear all previous locking status as well. + Since the locking status has the same lifecycle as the cluster, the log storage could be in-memory implementation. == Waiting Status @@ -48,6 +47,8 @@ RESET:: With the UUIDs of members that currently connected. Check all holds and waiters if it's in the list, if not then remove it from all holding and waiting status, notice the waiter that being promoted to the holder during unlocking should be in the list as well. It's an internal command, it's not exposed to users. +QUERY:: +With the UUID of the member and the lockId. It's a read-only command that returns the current lock status. === Reset Members will resign from holder and waiter status when it's disconnected or in a minority subgroup of partition, it @@ -79,15 +80,8 @@ command. Scenario 3.2 will reset to current members. == Listener -Listeners could be registered to listen on the status change of locks. + -The listener is notified by the RAFT working thread after state is updated while RAFT log applying, that means the -listener shouldn't have heavy work, put those work in a separate thread may be. + -I don't make any assumptions about how users will use this listener, because there are so many possibilities, -maybe just simply set a flag, maybe start/stop a service and its working thread, maybe start/stop a job in a thread -pool, the job may be serial or parallel, so I choose not to assume that just simply notify it in a timely and orderly -manner, completely consistent with the local state machine. + -The notification to the listener may be earlier or later than the command execution result, it depends on the member is -the leader or a follower. +Listeners could be registered to listen on the status change of locks. In the leader node, listeners are notified by +the RAFT working thread, and in followers, it will be notified by the thread that delivered the response message. == Mutex With the lock service and the ReentrantLock could implement an exclusive lock cross JVMs. diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index 14f8213f..b50c5617 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -54,10 +54,9 @@ * The {@link Address} of the member will be used to identify the acquirers(holders and waiters), that means a new * connected member will have a new identity, so a disconnected member will unlock from all related lockId * automatically. - * For the cluster if there are members who left then the leader will unlock for those members in the state machine. - * A new started cluster will unlock for all previous members in the state machine. Another scenario is the cluster - * resume from multiple minority partitions, since the {@link Address} remain unchanged, the new leader will force - * unlock for all existing members in the state machine. + * For the cluster if there are leaving members, the leader will unlock for those members in the state machine. + * A newly started cluster or cluster resuming from multiple minority partitions will unlock for all previous members + * in the state machine. *

    * The {@link LockStatus} represent the member's locking status. *

      @@ -82,7 +81,7 @@ public class LockService { protected static final Log log = LogFactory.getLog(LockService.class); - protected static final byte LOCK = 1, TRY_LOCK = 2, UNLOCK = 3, UNLOCK_ALL = 4, RESET = 5; + protected static final byte LOCK = 1, TRY_LOCK = 2, UNLOCK = 3, UNLOCK_ALL = 4, RESET = 5, QUERY = 6; protected final RaftHandle raft; protected final Map locks = new HashMap<>(); @@ -152,8 +151,8 @@ public void readContentFrom(DataInput in) { } // notify base on local status - lockStatus.forEach((k, v) -> notifyListeners(k, v, tmp.remove(k), false)); - tmp.forEach((k, v) -> notifyListeners(k, NONE, v, false)); + lockStatus.forEach((k, v) -> onStateChange(k, v, tmp.remove(k), true)); + tmp.forEach((k, v) -> onStateChange(k, NONE, v, true)); } @Override @@ -171,7 +170,7 @@ public void writeContentTo(DataOutput out) { } @Override - public byte[] apply(byte[] data, int offset, int length, boolean serialize_response) throws Exception { + public byte[] apply(byte[] data, int offset, int length, boolean response) throws Exception { var in = new ByteArrayDataInputStream(data, offset, length); LockStatus status = null; switch (in.readByte()) { @@ -190,8 +189,10 @@ public byte[] apply(byte[] data, int offset, int length, boolean serialize_respo members.add(readUuid(in)); } doReset(members); break; + case QUERY: + if (response) status = doQuery(readLong(in), readUuid(in)); break; } - return serialize_response && status != null ? new byte[] {(byte) status.ordinal()} : null; + return response && status != null ? new byte[] {(byte) status.ordinal()} : null; } @Override @@ -242,7 +243,7 @@ protected LockStatus doLock(long lockId, UUID member, boolean trying) { } if (prev != next) bind(member, lock); if (address.equals(member)) { - notifyListeners(lockId, prev, next, false); + onStateChange(lockId, prev, next, false); } if (log.isTraceEnabled()) { log.trace("[%s] %s lock %s, prev: %s, next: %s", address, member, lockId, prev, next); @@ -277,9 +278,9 @@ protected boolean doUnlock(UUID member, LockEntry lock, Set unlocking) { prev = lock.waiters.remove(member) ? WAITING : NONE; } if (address.equals(member)) { - notifyListeners(lock.id, prev, NONE, false); + onStateChange(lock.id, prev, NONE, false); } else if (address.equals(holder)) { - notifyListeners(lock.id, WAITING, HOLDING, false); + onStateChange(lock.id, WAITING, HOLDING, false); } if (log.isTraceEnabled()) { log.trace("[%s] %s unlock %s, prev: %s", address, member, lock.id, prev); @@ -289,7 +290,7 @@ protected boolean doUnlock(UUID member, LockEntry lock, Set unlocking) { if (waiters != null) for (UUID waiter : waiters) { unbind(waiter, lock); if (address.equals(waiter)) { - notifyListeners(lock.id, WAITING, NONE, false); + onStateChange(lock.id, WAITING, NONE, false); } if (log.isTraceEnabled()) { log.trace("[%s] %s unlock %s, prev: %s", address, waiter, lock.id, WAITING); @@ -307,6 +308,18 @@ protected void doReset(List members) { for (var id : prev) doUnlock(id, prev); } + protected LockStatus doQuery(Long lockId, UUID member) { + LockEntry lock = locks.get(lockId); + LockStatus status; + if (lock == null || lock.holder == null) status = NONE; + else if (lock.holder.equals(member)) status = HOLDING; + else status = lock.waiters.contains(member) ? WAITING : NONE; + if (log.isTraceEnabled()) { + log.trace("[%s] query %s: %s", address, member, status); + } + return status; + } + protected void bind(UUID member, LockEntry lock) { memberLocks.computeIfAbsent(member, k -> new LinkedHashSet<>()).add(lock); } @@ -317,23 +330,13 @@ protected void unbind(UUID member, LockEntry lock) { }); } - protected void notifyListeners(long lockId, LockStatus prev, LockStatus curr, boolean force) { - if (!force && raft.leader() == null) return; - if (prev == null) prev = NONE; - if (curr == null) curr = NONE; - LockStatus local = curr == NONE ? lockStatus.remove(lockId) : lockStatus.put(lockId, curr); - if (prev == curr) { - prev = local == null ? NONE : local; - if (prev == curr) return; - } - Mutex mutex = mutexes.get(lockId); - if (mutex != null) mutex.onStatusChange(prev, curr); - for (Listener listener : listeners) { - try { - listener.onStatusChange(lockId, prev, curr); - } catch (Throwable e) { - log.error("Fail to notify listener, lock: %s, prev: %s, curr: %s", lockId, prev, curr, e); - } + protected void onStateChange(long lockId, LockStatus prev, LockStatus curr, boolean snapshot) { + if (raft.leader() == null) return; // initStateMachineFromLog + if (curr == null) curr = NONE; if (prev == curr) return; + // In followers, logs and responses of multiple commands can be included in one batch message, since RAFT is + // below REDIRECT in protocol stack, the commands applying may occur before the responses completing. + if (curr == HOLDING && (prev == WAITING || snapshot)) { + query(lockId); } } @@ -360,7 +363,7 @@ protected void handleView(View next) { } protected void resign() { - lockStatus.forEach((k, v) -> notifyListeners(k, v, NONE, true)); + lockStatus.forEach((k, v) -> notifyListeners(k, NONE)); } protected void reset(View view) { @@ -381,6 +384,15 @@ protected void reset(View view) { }); } + protected void query(long lockId) { + var out = new ByteArrayDataOutputStream(26); + out.writeByte(QUERY); + writeLong(lockId, out); + writeUuid(address(), out); + assert out.position() <= 26; + invoke(out).thenApply(t -> notifyListeners(lockId, LockStatus.values()[t[0]])); + } + /** * Add listener * @param listener listener for the status change. @@ -408,6 +420,7 @@ public LockStatus lockStatus(long lockId) { * Acquire the lock, will join the waiting queue if the lock is held by another member currently. * @param lockId the lock's id * @return HOLDING if hold the lock, WAITING if in the waiting queue. + * @throws RaftException if exception happens during sending or executing commands in the lock service. */ public CompletableFuture lock(long lockId) { var out = new ByteArrayDataOutputStream(26); @@ -415,13 +428,14 @@ public CompletableFuture lock(long lockId) { writeLong(lockId, out); writeUuid(address(), out); assert out.position() <= 26; - return invoke(out).thenApply(t -> LockStatus.values()[t[0]]); + return invoke(out).thenApply(t -> notifyListeners(lockId, LockStatus.values()[t[0]])); } /** * Try to acquire the lock, won't join the waiting queue. * @param lockId the lock's id * @return HOLDING if hold the lock, NONE if the lock is held by another member. + * @throws RaftException if exception happens during sending or executing commands in the lock service. */ public CompletableFuture tryLock(long lockId) { var out = new ByteArrayDataOutputStream(26); @@ -429,7 +443,7 @@ public CompletableFuture tryLock(long lockId) { writeLong(lockId, out); writeUuid(address(), out); assert out.position() <= 26; - return invoke(out).thenApply(t -> LockStatus.values()[t[0]]); + return invoke(out).thenApply(t -> notifyListeners(lockId, LockStatus.values()[t[0]])); } /** @@ -437,6 +451,7 @@ public CompletableFuture tryLock(long lockId) { * is one. Remove from waiting queue if it's waiting. Do nothing if neither of them. * @param lockId the lock's id * @return async completion + * @throws RaftException if exception happens during sending or executing commands in the lock service. */ public CompletableFuture unlock(long lockId) { var out = new ByteArrayDataOutputStream(26); @@ -444,19 +459,20 @@ public CompletableFuture unlock(long lockId) { writeLong(lockId, out); writeUuid(address(), out); assert out.position() <= 26; - return invoke(out).thenApply(t -> null); + return invoke(out).thenApply(t -> { notifyListeners(lockId, NONE); return null; }); } /** * Release all related locks for this member. * @return async completion + * @throws RaftException if exception happens during sending or executing commands in the lock service. */ public CompletableFuture unlockAll() { var out = new ByteArrayDataOutputStream(17); out.writeByte(UNLOCK_ALL); writeUuid(address(), out); assert out.position() <= 17; - return invoke(out).thenApply(t -> null); + return invoke(out).thenApply(t -> { resign(); return null; }); } protected UUID address() { @@ -471,6 +487,22 @@ protected CompletableFuture invoke(ByteArrayDataOutputStream out) { } } + protected LockStatus notifyListeners(long lockId, LockStatus curr) { + LockStatus prev = curr == NONE ? lockStatus.remove(lockId) : lockStatus.put(lockId, curr); + if (prev == null) prev = NONE; + if (prev == curr) return curr; + Mutex mutex = mutexes.get(lockId); + if (mutex != null) mutex.onStatusChange(prev, curr); + for (Listener listener : listeners) { + try { + listener.onStatusChange(lockId, prev, curr); + } catch (Throwable e) { + log.error("Fail to notify listener, lock: %s, prev: %s, curr: %s", lockId, prev, curr, e); + } + } + return curr; + } + /** * Get the mutex for the specified id. * @param lockId the id related to the mutex @@ -706,7 +738,7 @@ void onStatusChange(LockStatus prev, LockStatus curr) { } catch (Throwable e) { log.error("Error occurred on lock handler", e); } - } else if (prev == WAITING) { + } else if (prev == WAITING && acquirers.get() > 0) { delegate.lock(); try { if (status == WAITING) { diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index 58cfe4b6..8d4afe57 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -98,6 +98,7 @@ public void onStatusChange(long key, LockStatus prev, LockStatus curr) { queue.offer(new Event(key, prev, curr)); } + protected void assertEmpty() { assertThat(queue).isEmpty(); } protected Event next(int secs) throws InterruptedException { return queue.poll(secs, SECONDS); } protected Event next() throws InterruptedException { return next(3); } protected Batch batch(int count) throws InterruptedException { @@ -326,8 +327,7 @@ public void reset_by_disconnect() throws Exception { channel(2).disconnect(); // [B,D,E] // Resigned because of disconnection events_c.next().assertEq(101L, WAITING, NONE); - - // Reset to [B,D,E] + // Reset to [B,D,E], D is next waiter. service_b.unlock(101L); events_b.next().assertEq(101L, HOLDING, NONE); events_d.next().assertEq(101L, WAITING, HOLDING); @@ -341,27 +341,31 @@ public void reset_by_disconnect() throws Exception { // Reconnect the previous holder and reach majority. service_d = reconnect(channel(3), events_d); // [B,E,D] - // Nothing to do with D because it has a new address - assertNull(events_d.next()); - // Reset to clear all previous status, notified by reset command. - events_e.next().assertEq(101L, WAITING, NONE); // duplicate notification - - assertEquals(service_d.lock(101L).get(3, SECONDS), HOLDING); - assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); - events_d.next().assertEq(101L, NONE, HOLDING); - events_e.next().assertEq(101L, NONE, WAITING); + waitUntilLeaderElected(1, 3, 4); + // Reset to empty, there is no holder or waiter. + assertEquals(service_b.lock(101L).get(3, SECONDS), HOLDING); + events_b.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_d.lock(101L).get(3, SECONDS), WAITING); + events_d.next().assertEq(101L, NONE, WAITING); + assertEquals(service_e.lockStatus(101L), NONE); + events_e.assertEmpty(); // Disconnect to lost majority - channel(1).disconnect(); // [E,D] + channel(4).disconnect(); // [B,D] // Resigned because of lost majority - events_d.next().assertEq(101L, HOLDING, NONE); - events_e.next().assertEq(101L, WAITING, NONE); + events_b.next().assertEq(101L, HOLDING, NONE); + events_d.next().assertEq(101L, WAITING, NONE); // Reconnect to reach majority - service_b = reconnect(channel(1), events_b); // [E,D,B] - // Reset to clear all previous status, notified by reset command. - events_d.next().assertEq(101L, HOLDING, NONE); // duplicate notification - events_e.next().assertEq(101L, WAITING, NONE); // duplicate notification + service_e = reconnect(channel(4), events_e); // [B,D,E] + waitUntilLeaderElected(1, 3, 4); + // Reset to empty, there is no holder or waiter. + assertEquals(service_e.lock(101L).get(3, SECONDS), HOLDING); + events_e.next().assertEq(101L, NONE, HOLDING); + assertEquals(service_b.lockStatus(101L), NONE); + events_b.assertEmpty(); + assertEquals(service_d.lockStatus(101L), NONE); + events_d.assertEmpty(); } public void reset_by_partition() throws Exception { @@ -392,17 +396,15 @@ public void reset_by_partition() throws Exception { waitUntilLeaderElected(0, 1, 2, 3, 4); assertTrue(raft(0).isLeader()); // A has longer log - // Reset to [A,B,C], notified by reset command. - events_d.next().assertEq(101L, HOLDING, NONE); - events_e.next().assertEq(102L, HOLDING, NONE); - - // Holder is A all the time + // Reset to [A,B,C], holder is A all the time assertEquals(service_b.lock(101L).get(3, SECONDS), WAITING); - assertEquals(service_c.lock(102L).get(3, SECONDS), WAITING); events_b.next().assertEq(101L, NONE, WAITING); + assertEquals(service_c.lock(102L).get(3, SECONDS), WAITING); events_c.next().assertEq(102L, NONE, WAITING); assertEquals(service_a.lockStatus(101L), HOLDING); assertEquals(service_a.lockStatus(102L), HOLDING); + assertEquals(service_d.lockStatus(101L), NONE); events_d.assertEmpty(); + assertEquals(service_e.lockStatus(102L), NONE); events_e.assertEmpty(); // Partition into subgroups without majority partition(new int[]{0, 1}, new int[]{2}, new int[]{3, 4}); @@ -415,21 +417,20 @@ public void reset_by_partition() throws Exception { merge(0, 2, 3); waitUntilLeaderElected(0, 1, 2, 3, 4); - // Reset to clear all previous status, notified by reset command. - events_a.batch(2).assertContains(101L, HOLDING, NONE).assertContains(102L, HOLDING, NONE); - events_b.next().assertEq(101L, WAITING, NONE); - events_c.next().assertEq(102L, WAITING, NONE); - - // There is no holder after reset - assertEquals(service_a.lock(101L).get(3, SECONDS), HOLDING); + // Reset to empty, there is no holder after reset + assertEquals(service_d.lock(101L).get(3, SECONDS), HOLDING); + events_d.next().assertEq(101L, NONE, HOLDING); assertEquals(service_e.lock(101L).get(3, SECONDS), WAITING); - events_a.next().assertEq(101L, NONE, HOLDING); events_e.next().assertEq(101L, NONE, WAITING); + assertEquals(service_a.lockStatus(101L), NONE); + assertEquals(service_a.lockStatus(102L), NONE); events_a.assertEmpty(); + assertEquals(service_b.lockStatus(101L), NONE); events_b.assertEmpty(); + assertEquals(service_c.lockStatus(102L), NONE); events_c.assertEmpty(); - channel(0).disconnect(); + channel(3).disconnect(); // Resigned because of disconnection - events_a.next().assertEq(101L, HOLDING, NONE); + events_d.next().assertEq(101L, HOLDING, NONE); // Reset to [B,C,D,E], notified by reset command. events_e.next().assertEq(101L, WAITING, HOLDING); From 4ddc5db73aaf8ad5f35b38852b8d83bde4ccbdc5 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Tue, 19 Nov 2024 18:25:52 +0800 Subject: [PATCH 15/16] lock service --- .../jgroups/tests/blocks/LockServiceTest.java | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index 8d4afe57..43697ab9 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -593,6 +593,39 @@ public void mutex_race() { }); } + public void mutex_reentrant() throws InterruptedException { + Mutex a = service_a.mutex(101); + Mutex b = service_b.mutex(101); + + for (int i = 0; i < 10; i++) { + b.lock(); + assertEquals(b.getHolder(), Thread.currentThread()); + } + + for (int i = 0; i < 10; i++) { + assertTrue(b.tryLock()); + assertEquals(b.getHolder(), Thread.currentThread()); + } + + for (int i = 0; i < 10; i++) { + assertTrue(b.tryLock(1, SECONDS)); + assertEquals(b.getHolder(), Thread.currentThread()); + } + + for (int i = 0; i < 30; i++) { + assertEquals(b.getHolder(), Thread.currentThread()); + assertFalse(CompletableFuture.supplyAsync(b::tryLock).join()); + assertFalse(a.tryLock()); + b.unlock(); + } + + assertNull(b.getHolder()); + assertTrue(b.tryLock()); + assertEquals(b.getHolder(), Thread.currentThread()); + b.unlock(); + assertNull(b.getHolder()); + } + public void mutex_inconsistency() throws Exception { Mutex a = service_a.mutex(101); Mutex b = service_b.mutex(101); @@ -643,8 +676,6 @@ public void mutex_inconsistency() throws Exception { Util.waitUntil(5000, 1000, () -> service_b.lockStatus(101) == WAITING); // Successfully unlock with lock service service_b.unlock(101).get(3, SECONDS); - // Make sure all logs is applied which means mutex has been notified all status changes - waitUntilNodesApplyAllLogs(); // The thread is awakened by unlocking, and lock again. Util.waitUntil(5000, 1000, () -> service_b.lockStatus(101) == WAITING); } finally { @@ -657,7 +688,7 @@ public void fast_response_and_slow_log() throws TimeoutException { Mutex b = service_b.mutex(101); // must be a follower b.service().addListener((lockId, prev, next) -> { - LockSupport.parkNanos(10_000_000); // slow down the log applying (notification) + LockSupport.parkNanos(10_000_000); // slow down the log applying }); AtomicInteger locked = new AtomicInteger(), unlocked = new AtomicInteger(); b.setUnexpectedLockHandler(t -> locked.incrementAndGet()); From 8b1a20ea9def65be76e90fb8289905bf635a0241 Mon Sep 17 00:00:00 2001 From: Zhang Yifei Date: Wed, 20 Nov 2024 16:38:52 +0800 Subject: [PATCH 16/16] lock service --- src/org/jgroups/raft/blocks/LockService.java | 62 +++++++++++-------- .../jgroups/tests/blocks/LockServiceTest.java | 49 ++++++--------- 2 files changed, 56 insertions(+), 55 deletions(-) diff --git a/src/org/jgroups/raft/blocks/LockService.java b/src/org/jgroups/raft/blocks/LockService.java index b50c5617..5b91a1f4 100644 --- a/src/org/jgroups/raft/blocks/LockService.java +++ b/src/org/jgroups/raft/blocks/LockService.java @@ -9,6 +9,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -122,9 +124,9 @@ protected static class LockEntry { protected UUID unlock() { // make sure it's a consistent result for all nodes - var i = waiters.iterator(); + Iterator i = waiters.iterator(); if (!i.hasNext()) return holder = null; - var v = i.next(); i.remove(); return holder = v; + UUID v = i.next(); i.remove(); return holder = v; } } @@ -144,21 +146,18 @@ public void readContentFrom(DataInput in) { locks.put(id, lock); bind(lock.holder, lock); if (address.equals(lock.holder)) tmp.put(lock.id, HOLDING); - for (var waiter : lock.waiters) { + for (UUID waiter : lock.waiters) { bind(waiter, lock); if (address.equals(waiter)) tmp.put(lock.id, WAITING); } } - - // notify base on local status - lockStatus.forEach((k, v) -> onStateChange(k, v, tmp.remove(k), true)); - tmp.forEach((k, v) -> onStateChange(k, NONE, v, true)); + tmp.forEach((k, v) -> onCommit(k, null, v, false)); } @Override public void writeContentTo(DataOutput out) { writeInt((int) locks.values().stream().filter(t -> t.holder != null).count(), out); - for (var lock : locks.values()) { + for (LockEntry lock : locks.values()) { if (lock.holder == null) continue; writeLong(lock.id, out); writeUuid(lock.holder, out); @@ -243,7 +242,7 @@ protected LockStatus doLock(long lockId, UUID member, boolean trying) { } if (prev != next) bind(member, lock); if (address.equals(member)) { - onStateChange(lockId, prev, next, false); + onCommit(lockId, prev, next, false); } if (log.isTraceEnabled()) { log.trace("[%s] %s lock %s, prev: %s, next: %s", address, member, lockId, prev, next); @@ -257,15 +256,15 @@ protected void doUnlock(long lockId, UUID member) { } protected void doUnlock(UUID member, Set unlocking) { - Set set = memberLocks.get(member); if (set == null) return; - set.removeIf(t -> doUnlock(member, t, unlocking)); - if (set.isEmpty()) memberLocks.remove(member); + Set set = memberLocks.remove(member); if (set == null) return; + for (LockEntry lock : set) doUnlock(member, lock, unlocking); } protected boolean doUnlock(UUID member, LockEntry lock, Set unlocking) { LockStatus prev = HOLDING; UUID holder = null; List waiters = null; + boolean reset = unlocking != null; if (member.equals(lock.holder)) { do { if (holder != null) { @@ -278,9 +277,9 @@ protected boolean doUnlock(UUID member, LockEntry lock, Set unlocking) { prev = lock.waiters.remove(member) ? WAITING : NONE; } if (address.equals(member)) { - onStateChange(lock.id, prev, NONE, false); + onCommit(lock.id, prev, NONE, reset); } else if (address.equals(holder)) { - onStateChange(lock.id, WAITING, HOLDING, false); + onCommit(lock.id, WAITING, HOLDING, reset); } if (log.isTraceEnabled()) { log.trace("[%s] %s unlock %s, prev: %s", address, member, lock.id, prev); @@ -290,7 +289,7 @@ protected boolean doUnlock(UUID member, LockEntry lock, Set unlocking) { if (waiters != null) for (UUID waiter : waiters) { unbind(waiter, lock); if (address.equals(waiter)) { - onStateChange(lock.id, WAITING, NONE, false); + onCommit(lock.id, WAITING, NONE, true); } if (log.isTraceEnabled()) { log.trace("[%s] %s unlock %s, prev: %s", address, waiter, lock.id, WAITING); @@ -300,12 +299,12 @@ protected boolean doUnlock(UUID member, LockEntry lock, Set unlocking) { } protected void doReset(List members) { - Set prev = new LinkedHashSet<>(memberLocks.keySet()); + Set prev = new HashSet<>(memberLocks.keySet()); if (log.isTraceEnabled()) { log.trace("[%s] reset %s to %s", address, prev, members); } - for (var id : members) prev.remove(id); - for (var id : prev) doUnlock(id, prev); + for (UUID member : members) prev.remove(member); + for (UUID member : prev) doUnlock(member, prev); } protected LockStatus doQuery(Long lockId, UUID member) { @@ -315,7 +314,7 @@ protected LockStatus doQuery(Long lockId, UUID member) { else if (lock.holder.equals(member)) status = HOLDING; else status = lock.waiters.contains(member) ? WAITING : NONE; if (log.isTraceEnabled()) { - log.trace("[%s] query %s: %s", address, member, status); + log.trace("[%s] %s query %s, status: %s", address, member, lockId, status); } return status; } @@ -330,13 +329,20 @@ protected void unbind(UUID member, LockEntry lock) { }); } - protected void onStateChange(long lockId, LockStatus prev, LockStatus curr, boolean snapshot) { - if (raft.leader() == null) return; // initStateMachineFromLog - if (curr == null) curr = NONE; if (prev == curr) return; + protected void onCommit(long lockId, LockStatus prev, LockStatus curr, boolean reset) { + if (prev == curr) return; // In followers, logs and responses of multiple commands can be included in one batch message, since RAFT is // below REDIRECT in protocol stack, the commands applying may occur before the responses completing. - if (curr == HOLDING && (prev == WAITING || snapshot)) { + if (curr == HOLDING && (prev == WAITING || prev == null && lockStatus.get(lockId) != HOLDING)) { query(lockId); + } else if (reset) { + LockStatus status = lockStatus.get(lockId); + if (curr == NONE && status != null || curr == HOLDING && status != HOLDING) { + if (log.isTraceEnabled()) { + log.trace("[%s] Suspicious status %s, lockId: %s, expected: %s", address, status, lockId, curr); + } + query(lockId); + } } } @@ -363,6 +369,9 @@ protected void handleView(View next) { } protected void resign() { + if (log.isTraceEnabled()) { + log.trace("[%s] resign from %s", address, lockStatus); + } lockStatus.forEach((k, v) -> notifyListeners(k, NONE)); } @@ -390,7 +399,10 @@ protected void query(long lockId) { writeLong(lockId, out); writeUuid(address(), out); assert out.position() <= 26; - invoke(out).thenApply(t -> notifyListeners(lockId, LockStatus.values()[t[0]])); + invoke(out).whenComplete((r, e) -> { + if (e != null) log.error("Fail to query on " + lockId, e); + else notifyListeners(lockId, LockStatus.values()[r[0]]); + }); } /** @@ -413,7 +425,7 @@ protected void query(long lockId) { * @return lock status */ public LockStatus lockStatus(long lockId) { - var v = lockStatus.get(lockId); return v == null ? NONE : v; + LockStatus v = lockStatus.get(lockId); return v == null ? NONE : v; } /** diff --git a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java index 43697ab9..e0793750 100644 --- a/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/tests/blocks/LockServiceTest.java @@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; @@ -100,7 +99,7 @@ public void onStatusChange(long key, LockStatus prev, LockStatus curr) { protected void assertEmpty() { assertThat(queue).isEmpty(); } protected Event next(int secs) throws InterruptedException { return queue.poll(secs, SECONDS); } - protected Event next() throws InterruptedException { return next(3); } + protected Event next() throws InterruptedException { return next(5); } protected Batch batch(int count) throws InterruptedException { List list = new ArrayList<>(count); for (int i = 0; i < count; i++) list.add(next()); @@ -111,6 +110,10 @@ protected Batch batch(int count) throws InterruptedException { protected static class Service extends LockService { TestRaft raft; + static { +// log.setLevel("trace"); + } + public Service(JChannel channel) { super(channel); } @Override @@ -684,32 +687,6 @@ public void mutex_inconsistency() throws Exception { f.get(5, SECONDS); // check error } - public void fast_response_and_slow_log() throws TimeoutException { - Mutex b = service_b.mutex(101); // must be a follower - - b.service().addListener((lockId, prev, next) -> { - LockSupport.parkNanos(10_000_000); // slow down the log applying - }); - AtomicInteger locked = new AtomicInteger(), unlocked = new AtomicInteger(); - b.setUnexpectedLockHandler(t -> locked.incrementAndGet()); - b.setUnexpectedUnlockHandler(t -> unlocked.incrementAndGet()); - - for (int i = 0; i < 10; i++) { - b.lock(); b.unlock(); - } - waitUntilNodesApplyAllLogs(); - - for (int i = 0; i < 10; i++) { - b.lock(); b.unlock(); - } - b.lock(); - waitUntilNodesApplyAllLogs(); - b.unlock(); - - assertEquals(unlocked.get(), 0); - assertEquals(locked.get(), 0); - } - public void mutex_exception() { Mutex a = service_a.mutex(101); @@ -789,8 +766,20 @@ private void waitUntilNodesApplyAllLogs(int... indexes) throws TimeoutException }); } - private void waitUntilLeaderElected(int... indexes) { + private void waitUntilLeaderElected(int... indexes) throws TimeoutException { RAFT[] rafts = IntStream.of(indexes).mapToObj(this::raft).toArray(RAFT[]::new); - BaseRaftElectionTest.waitUntilLeaderElected(rafts, 10_000); + Util.waitUntil(30_000, 1000, () -> { + Address leader = null; long term = 0; + for (RAFT raft : rafts) { + Address a = raft.leader(); + if (a == null) return false; + if (leader == null) { + leader = a; term = raft.currentTerm(); + } else if (!leader.equals(a) || term != raft.currentTerm()) { + return false; + } + } + return true; + }); } }