Skip to content

Commit

Permalink
Simplify ConcurrentPool (mongodb#907)
Browse files Browse the repository at this point in the history
  • Loading branch information
stIncMale authored Apr 11, 2022
1 parent f201248 commit af39665
Showing 1 changed file with 25 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class ConcurrentPool<T> implements Pool<T> {
private final int maxSize;
private final ItemFactory<T> itemFactory;

private final ConcurrentLinkedDeque<T> available = new ConcurrentLinkedDeque<T>();
private final ConcurrentLinkedDeque<T> available = new ConcurrentLinkedDeque<>();
private final StateAndPermits stateAndPermits;
private final String poolClosedMessage;

Expand Down Expand Up @@ -162,8 +162,6 @@ public T get() {
*/
@Override
public T get(final long timeout, final TimeUnit timeUnit) {
stateAndPermits.throwIfClosedOrPaused();

if (!stateAndPermits.acquirePermit(timeout, timeUnit)) {
throw new MongoTimeoutException(String.format("Timeout waiting for a pooled item after %d %s", timeout, timeUnit));
}
Expand All @@ -183,7 +181,6 @@ public T get(final long timeout, final TimeUnit timeUnit) {
*/
@Nullable
T getImmediateUnfair() {
stateAndPermits.throwIfClosedOrPaused();
T element = null;
if (stateAndPermits.acquirePermitImmediateUnfair()) {
element = available.pollLast();
Expand Down Expand Up @@ -221,7 +218,6 @@ public void prune() {
* Otherwise, the action must {@linkplain #release(Object) release} the item.
*/
public void ensureMinSize(final int minSize, final Consumer<T> initAndRelease) {
stateAndPermits.throwIfClosedOrPaused();
while (getCount() < minSize) {
if (!stateAndPermits.acquirePermit(0, TimeUnit.MILLISECONDS)) {
break;
Expand Down Expand Up @@ -284,12 +280,9 @@ public int getCount() {
}

public String toString() {
StringBuilder buf = new StringBuilder();
buf.append("pool: ")
.append(" maxSize: ").append(sizeToString(maxSize))
.append(" availableCount ").append(getAvailableCount())
.append(" inUseCount ").append(getInUseCount());
return buf.toString();
return "pool: maxSize: " + sizeToString(maxSize)
+ " availableCount " + getAvailableCount()
+ " inUseCount " + getInUseCount();
}

/**
Expand Down Expand Up @@ -332,7 +325,7 @@ static boolean isPoolClosedException(final Throwable e) {
@ThreadSafe
private static final class StateAndPermits {
private final Supplier<MongoServerUnavailableException> poolClosedExceptionSupplier;
private final ReentrantReadWriteLock lock;
private final ReentrantLock lock;
private final Condition permitAvailableOrClosedOrPausedCondition;
private volatile boolean paused;
private volatile boolean closed;
Expand Down Expand Up @@ -373,8 +366,8 @@ private static final class StateAndPermits {

StateAndPermits(final int maxPermits, final Supplier<MongoServerUnavailableException> poolClosedExceptionSupplier) {
this.poolClosedExceptionSupplier = poolClosedExceptionSupplier;
lock = new ReentrantReadWriteLock(true);
permitAvailableOrClosedOrPausedCondition = lock.writeLock().newCondition();
lock = new ReentrantLock(true);
permitAvailableOrClosedOrPausedCondition = lock.newCondition();
paused = false;
closed = false;
this.maxPermits = maxPermits;
Expand All @@ -388,7 +381,7 @@ int permits() {
}

boolean acquirePermitImmediateUnfair() {
lockUnfair(lock.writeLock());
lockUnfair(lock);
try {
throwIfClosedOrPaused();
if (permits > 0) {
Expand All @@ -399,7 +392,7 @@ boolean acquirePermitImmediateUnfair() {
return false;
}
} finally {
lock.writeLock().unlock();
lock.unlock();
}
}

Expand All @@ -412,9 +405,9 @@ boolean acquirePermitImmediateUnfair() {
boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInterruptedException {
long remainingNanos = unit.toNanos(timeout);
if (waitersEstimate.get() == 0) {
lockInterruptiblyUnfair(lock.writeLock());
lockInterruptiblyUnfair(lock);
} else {
lockInterruptibly(lock.writeLock());
lockInterruptibly(lock);
}
try {
while (permits == 0
Expand All @@ -440,43 +433,43 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
permits--;
return true;
} finally {
lock.writeLock().unlock();
lock.unlock();
}
}

void releasePermit() {
lockUnfair(lock.writeLock());
lockUnfair(lock);
try {
assertTrue(permits < maxPermits);
//noinspection NonAtomicOperationOnVolatileField
permits++;
permitAvailableOrClosedOrPausedCondition.signal();
} finally {
lock.writeLock().unlock();
lock.unlock();
}
}

void pause(final Supplier<MongoException> causeSupplier) {
lockUnfair(lock.writeLock());
lockUnfair(lock);
try {
if (!paused) {
this.paused = true;
permitAvailableOrClosedOrPausedCondition.signalAll();
}
this.causeSupplier = assertNotNull(causeSupplier);
} finally {
lock.writeLock().unlock();
lock.unlock();
}
}

void ready() {
if (paused) {
lockUnfair(lock.writeLock());
lockUnfair(lock);
try {
this.paused = false;
this.causeSupplier = null;
} finally {
lock.writeLock().unlock();
lock.unlock();
}
}
}
Expand All @@ -486,21 +479,23 @@ void ready() {
*/
boolean close() {
if (!closed) {
lockUnfair(lock.writeLock());
lockUnfair(lock);
try {
if (!closed) {
closed = true;
permitAvailableOrClosedOrPausedCondition.signalAll();
return true;
}
} finally {
lock.writeLock().unlock();
lock.unlock();
}
}
return false;
}

/**
* This method must be called by a {@link Thread} that holds the {@link #lock}.
*
* @return {@code false} which means that the method did not throw.
* The method returns to allow using it conveniently as part of a condition check when waiting on a {@link Condition}.
* Short-circuiting operators {@code &&} and {@code ||} must not be used with this method to ensure that it is called.
Expand All @@ -514,14 +509,7 @@ boolean throwIfClosedOrPaused() {
throw poolClosedExceptionSupplier.get();
}
if (paused) {
lock.readLock().lock();
try {
if (paused) {
throw assertNotNull(assertNotNull(causeSupplier).get());
}
} finally {
lock.readLock().unlock();
}
throw assertNotNull(assertNotNull(causeSupplier).get());
}
return false;
}
Expand All @@ -546,9 +534,9 @@ static void lockInterruptibly(final Lock lock) throws MongoInterruptedException
}
}

private static void lockInterruptiblyUnfair(final ReentrantReadWriteLock.WriteLock lock) throws MongoInterruptedException {
private static void lockInterruptiblyUnfair(final ReentrantLock lock) throws MongoInterruptedException {
throwIfInterrupted();
// `WriteLock.tryLock` is unfair
// `ReentrantLock.tryLock` is unfair
if (!lock.tryLock()) {
try {
lock.lockInterruptibly();
Expand All @@ -566,13 +554,6 @@ static void lockUnfair(final ReentrantLock lock) {
}
}

private static void lockUnfair(final ReentrantReadWriteLock.WriteLock lock) {
// `WriteLock.tryLock` is unfair
if (!lock.tryLock()) {
lock.lock();
}
}

private static void throwIfInterrupted() throws MongoInterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new MongoInterruptedException(null, new InterruptedException());
Expand Down

0 comments on commit af39665

Please sign in to comment.