Skip to content

Commit

Permalink
lock service
Browse files Browse the repository at this point in the history
  • Loading branch information
yfei-z committed Nov 20, 2024
1 parent 4ddc5db commit 8b1a20e
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 55 deletions.
62 changes: 37 additions & 25 deletions src/org/jgroups/raft/blocks/LockService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -122,9 +124,9 @@ protected static class LockEntry {

protected UUID unlock() {
// make sure it's a consistent result for all nodes
var i = waiters.iterator();
Iterator<UUID> i = waiters.iterator();
if (!i.hasNext()) return holder = null;
var v = i.next(); i.remove(); return holder = v;
UUID v = i.next(); i.remove(); return holder = v;
}
}

Expand All @@ -144,21 +146,18 @@ public void readContentFrom(DataInput in) {
locks.put(id, lock);
bind(lock.holder, lock);
if (address.equals(lock.holder)) tmp.put(lock.id, HOLDING);
for (var waiter : lock.waiters) {
for (UUID waiter : lock.waiters) {
bind(waiter, lock);
if (address.equals(waiter)) tmp.put(lock.id, WAITING);
}
}

// notify base on local status
lockStatus.forEach((k, v) -> onStateChange(k, v, tmp.remove(k), true));
tmp.forEach((k, v) -> onStateChange(k, NONE, v, true));
tmp.forEach((k, v) -> onCommit(k, null, v, false));
}

@Override
public void writeContentTo(DataOutput out) {
writeInt((int) locks.values().stream().filter(t -> t.holder != null).count(), out);
for (var lock : locks.values()) {
for (LockEntry lock : locks.values()) {
if (lock.holder == null) continue;
writeLong(lock.id, out);
writeUuid(lock.holder, out);
Expand Down Expand Up @@ -243,7 +242,7 @@ protected LockStatus doLock(long lockId, UUID member, boolean trying) {
}
if (prev != next) bind(member, lock);
if (address.equals(member)) {
onStateChange(lockId, prev, next, false);
onCommit(lockId, prev, next, false);
}
if (log.isTraceEnabled()) {
log.trace("[%s] %s lock %s, prev: %s, next: %s", address, member, lockId, prev, next);
Expand All @@ -257,15 +256,15 @@ protected void doUnlock(long lockId, UUID member) {
}

protected void doUnlock(UUID member, Set<UUID> unlocking) {
Set<LockEntry> set = memberLocks.get(member); if (set == null) return;
set.removeIf(t -> doUnlock(member, t, unlocking));
if (set.isEmpty()) memberLocks.remove(member);
Set<LockEntry> set = memberLocks.remove(member); if (set == null) return;
for (LockEntry lock : set) doUnlock(member, lock, unlocking);
}

protected boolean doUnlock(UUID member, LockEntry lock, Set<UUID> unlocking) {
LockStatus prev = HOLDING;
UUID holder = null;
List<UUID> waiters = null;
boolean reset = unlocking != null;
if (member.equals(lock.holder)) {
do {
if (holder != null) {
Expand All @@ -278,9 +277,9 @@ protected boolean doUnlock(UUID member, LockEntry lock, Set<UUID> unlocking) {
prev = lock.waiters.remove(member) ? WAITING : NONE;
}
if (address.equals(member)) {
onStateChange(lock.id, prev, NONE, false);
onCommit(lock.id, prev, NONE, reset);
} else if (address.equals(holder)) {
onStateChange(lock.id, WAITING, HOLDING, false);
onCommit(lock.id, WAITING, HOLDING, reset);
}
if (log.isTraceEnabled()) {
log.trace("[%s] %s unlock %s, prev: %s", address, member, lock.id, prev);
Expand All @@ -290,7 +289,7 @@ protected boolean doUnlock(UUID member, LockEntry lock, Set<UUID> unlocking) {
if (waiters != null) for (UUID waiter : waiters) {
unbind(waiter, lock);
if (address.equals(waiter)) {
onStateChange(lock.id, WAITING, NONE, false);
onCommit(lock.id, WAITING, NONE, true);
}
if (log.isTraceEnabled()) {
log.trace("[%s] %s unlock %s, prev: %s", address, waiter, lock.id, WAITING);
Expand All @@ -300,12 +299,12 @@ protected boolean doUnlock(UUID member, LockEntry lock, Set<UUID> unlocking) {
}

protected void doReset(List<UUID> members) {
Set<UUID> prev = new LinkedHashSet<>(memberLocks.keySet());
Set<UUID> prev = new HashSet<>(memberLocks.keySet());
if (log.isTraceEnabled()) {
log.trace("[%s] reset %s to %s", address, prev, members);
}
for (var id : members) prev.remove(id);
for (var id : prev) doUnlock(id, prev);
for (UUID member : members) prev.remove(member);
for (UUID member : prev) doUnlock(member, prev);
}

protected LockStatus doQuery(Long lockId, UUID member) {
Expand All @@ -315,7 +314,7 @@ protected LockStatus doQuery(Long lockId, UUID member) {
else if (lock.holder.equals(member)) status = HOLDING;
else status = lock.waiters.contains(member) ? WAITING : NONE;
if (log.isTraceEnabled()) {
log.trace("[%s] query %s: %s", address, member, status);
log.trace("[%s] %s query %s, status: %s", address, member, lockId, status);
}
return status;
}
Expand All @@ -330,13 +329,20 @@ protected void unbind(UUID member, LockEntry lock) {
});
}

protected void onStateChange(long lockId, LockStatus prev, LockStatus curr, boolean snapshot) {
if (raft.leader() == null) return; // initStateMachineFromLog
if (curr == null) curr = NONE; if (prev == curr) return;
protected void onCommit(long lockId, LockStatus prev, LockStatus curr, boolean reset) {
if (prev == curr) return;
// In followers, logs and responses of multiple commands can be included in one batch message, since RAFT is
// below REDIRECT in protocol stack, the commands applying may occur before the responses completing.
if (curr == HOLDING && (prev == WAITING || snapshot)) {
if (curr == HOLDING && (prev == WAITING || prev == null && lockStatus.get(lockId) != HOLDING)) {
query(lockId);
} else if (reset) {
LockStatus status = lockStatus.get(lockId);
if (curr == NONE && status != null || curr == HOLDING && status != HOLDING) {
if (log.isTraceEnabled()) {
log.trace("[%s] Suspicious status %s, lockId: %s, expected: %s", address, status, lockId, curr);
}
query(lockId);
}
}
}

Expand All @@ -363,6 +369,9 @@ protected void handleView(View next) {
}

protected void resign() {
if (log.isTraceEnabled()) {
log.trace("[%s] resign from %s", address, lockStatus);
}
lockStatus.forEach((k, v) -> notifyListeners(k, NONE));
}

Expand Down Expand Up @@ -390,7 +399,10 @@ protected void query(long lockId) {
writeLong(lockId, out);
writeUuid(address(), out);
assert out.position() <= 26;
invoke(out).thenApply(t -> notifyListeners(lockId, LockStatus.values()[t[0]]));
invoke(out).whenComplete((r, e) -> {
if (e != null) log.error("Fail to query on " + lockId, e);
else notifyListeners(lockId, LockStatus.values()[r[0]]);
});
}

/**
Expand All @@ -413,7 +425,7 @@ protected void query(long lockId) {
* @return lock status
*/
public LockStatus lockStatus(long lockId) {
var v = lockStatus.get(lockId); return v == null ? NONE : v;
LockStatus v = lockStatus.get(lockId); return v == null ? NONE : v;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -100,7 +99,7 @@ public void onStatusChange(long key, LockStatus prev, LockStatus curr) {

protected void assertEmpty() { assertThat(queue).isEmpty(); }
protected Event next(int secs) throws InterruptedException { return queue.poll(secs, SECONDS); }
protected Event next() throws InterruptedException { return next(3); }
protected Event next() throws InterruptedException { return next(5); }
protected Batch batch(int count) throws InterruptedException {
List<Event> list = new ArrayList<>(count);
for (int i = 0; i < count; i++) list.add(next());
Expand All @@ -111,6 +110,10 @@ protected Batch batch(int count) throws InterruptedException {
protected static class Service extends LockService {
TestRaft raft;

static {
// log.setLevel("trace");
}

public Service(JChannel channel) { super(channel); }

@Override
Expand Down Expand Up @@ -684,32 +687,6 @@ public void mutex_inconsistency() throws Exception {
f.get(5, SECONDS); // check error
}

public void fast_response_and_slow_log() throws TimeoutException {
Mutex b = service_b.mutex(101); // must be a follower

b.service().addListener((lockId, prev, next) -> {
LockSupport.parkNanos(10_000_000); // slow down the log applying
});
AtomicInteger locked = new AtomicInteger(), unlocked = new AtomicInteger();
b.setUnexpectedLockHandler(t -> locked.incrementAndGet());
b.setUnexpectedUnlockHandler(t -> unlocked.incrementAndGet());

for (int i = 0; i < 10; i++) {
b.lock(); b.unlock();
}
waitUntilNodesApplyAllLogs();

for (int i = 0; i < 10; i++) {
b.lock(); b.unlock();
}
b.lock();
waitUntilNodesApplyAllLogs();
b.unlock();

assertEquals(unlocked.get(), 0);
assertEquals(locked.get(), 0);
}

public void mutex_exception() {
Mutex a = service_a.mutex(101);

Expand Down Expand Up @@ -789,8 +766,20 @@ private void waitUntilNodesApplyAllLogs(int... indexes) throws TimeoutException
});
}

private void waitUntilLeaderElected(int... indexes) {
private void waitUntilLeaderElected(int... indexes) throws TimeoutException {
RAFT[] rafts = IntStream.of(indexes).mapToObj(this::raft).toArray(RAFT[]::new);
BaseRaftElectionTest.waitUntilLeaderElected(rafts, 10_000);
Util.waitUntil(30_000, 1000, () -> {
Address leader = null; long term = 0;
for (RAFT raft : rafts) {
Address a = raft.leader();
if (a == null) return false;
if (leader == null) {
leader = a; term = raft.currentTerm();
} else if (!leader.equals(a) || term != raft.currentTerm()) {
return false;
}
}
return true;
});
}
}

0 comments on commit 8b1a20e

Please sign in to comment.