Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): lock scope optimization and persistence settings encapsulation #1715

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 45 additions & 40 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public class S3Storage implements Storage {
private static final FastReadFailFastException FAST_READ_FAIL_FAST_EXCEPTION = new FastReadFailFastException();

private static final int NUM_STREAM_CALLBACK_LOCKS = 128;
/**
* Stream callback locks. Used to ensure the stream callbacks will not be called concurrently.
*
* @see #handleAppendCallback
*/
private final static Lock[] STREAM_CALLBACK_LOCKS = IntStream.range(0, NUM_STREAM_CALLBACK_LOCKS).mapToObj(i -> new ReentrantLock()).toArray(Lock[]::new);
private final long maxDeltaWALCacheSize;
private final Config config;
private final WriteAheadLog deltaWAL;
Expand All @@ -89,7 +95,7 @@ public class S3Storage implements Storage {
*/
private final LogCache deltaWALCache;
/**
* WAL out of order callback sequencer. {@link #streamCallbackLocks} will ensure the memory safety.
* WAL out of order callback sequencer. {@link #STREAM_CALLBACK_LOCKS} will ensure the memory safety.
*/
private final WALCallbackSequencer callbackSequencer = new WALCallbackSequencer();
private final WALConfirmOffsetCalculator confirmOffsetCalculator = new WALConfirmOffsetCalculator();
Expand All @@ -112,13 +118,8 @@ public class S3Storage implements Storage {
private final ObjectManager objectManager;
private final ObjectStorage objectStorage;
private final S3BlockCache blockCache;

private final StorageFailureHandler storageFailureHandler;
/**
* Stream callback locks. Used to ensure the stream callbacks will not be called concurrently.
*
* @see #handleAppendCallback
*/
private final Lock[] streamCallbackLocks = IntStream.range(0, NUM_STREAM_CALLBACK_LOCKS).mapToObj(i -> new ReentrantLock()).toArray(Lock[]::new);
private final HashedWheelTimer timeoutDetect = new HashedWheelTimer(
ThreadUtils.createThreadFactory("storage-timeout-detect", true), 1, TimeUnit.SECONDS, 100);
private long lastLogTimestamp = 0L;
Expand Down Expand Up @@ -616,36 +617,35 @@ private void handleAppendRequest(WalWriteRequest request) {
}

private void handleAppendCallback(WalWriteRequest request) {
suppress(() -> handleAppendCallback0(request), LOGGER);
suppress(() -> {
final long startTime = System.nanoTime();
handleAppendCallback0(request);
StorageOperationStats.getInstance().appendCallbackStats.record(TimerUtil.durationElapsedAs(startTime, TimeUnit.NANOSECONDS));
}, LOGGER);
}

private void handleAppendCallback0(WalWriteRequest request) {
final long startTime = System.nanoTime();
List<WalWriteRequest> waitingAckRequests;
Lock lock = getStreamCallbackLock(request.record.getStreamId());
lock.lock();
try {
waitingAckRequests = callbackSequencer.after(request);
waitingAckRequests.forEach(r -> r.record.retain());
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
boolean full = deltaWALCache.put(waitingAckRequest.record);
waitingAckRequest.confirmed = true;
if (full) {
// cache block is full, trigger WAL upload.
uploadDeltaWAL();
}

List<WalWriteRequest> waitingAckRequests = callbackSequencer.after(request);
if (waitingAckRequests.isEmpty()) {
return;
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.record.retain();
boolean full = deltaWALCache.put(waitingAckRequest.record);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, LogCache relies on external guarantees to put individual Stream Records in order, moving the lock to WALCallbackSequencer#after will have thread concurrency safety issues.

waitingAckRequest.confirmed = true;
if (full) {
// cache block is full, trigger WAL upload.
uploadDeltaWAL();
}
} finally {
lock.unlock();
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
StorageOperationStats.getInstance().appendCallbackStats.record(TimerUtil.durationElapsedAs(startTime, TimeUnit.NANOSECONDS));
}

private Lock getStreamCallbackLock(long streamId) {
return streamCallbackLocks[(int) ((streamId & Long.MAX_VALUE) % NUM_STREAM_CALLBACK_LOCKS)];
private static Lock getStreamCallbackLock(long streamId) {
return STREAM_CALLBACK_LOCKS[(int) ((streamId & Long.MAX_VALUE) % NUM_STREAM_CALLBACK_LOCKS)];
}

@SuppressWarnings("UnusedReturnValue")
Expand Down Expand Up @@ -974,7 +974,7 @@ public void before(WalWriteRequest request) {
* @return popped sequence persisted request.
*/
public List<WalWriteRequest> after(WalWriteRequest request) {
request.persisted = true;
request.persisted();

// Try to pop sequential persisted requests from the queue.
long streamId = request.record.getStreamId();
Expand All @@ -983,21 +983,26 @@ public List<WalWriteRequest> after(WalWriteRequest request) {
if (peek == null || peek.offset != request.offset) {
return Collections.emptyList();
}

LinkedList<WalWriteRequest> rst = new LinkedList<>();
WalWriteRequest poll = streamRequests.poll();
assert poll == peek;
rst.add(poll);

for (; ; ) {
peek = streamRequests.peek();
if (peek == null || !peek.persisted) {
break;
}
poll = streamRequests.poll();
Lock lock = getStreamCallbackLock(request.record.getStreamId());
lock.lock();
try {
WalWriteRequest poll = streamRequests.poll();
assert poll == peek;
assert poll.record.getBaseOffset() == rst.getLast().record.getLastOffset();
rst.add(poll);

for (; ; ) {
peek = streamRequests.peek();
if (peek == null || !peek.isPersisted()) {
break;
}
poll = streamRequests.poll();
assert poll == peek;
assert poll.record.getBaseOffset() == rst.getLast().record.getLastOffset();
rst.add(poll);
}
} finally {
lock.unlock();
}

return rst;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class WalWriteRequest implements Comparable<WalWriteRequest> {
*
* @see S3Storage.WALCallbackSequencer
*/
boolean persisted;
private volatile boolean persisted;

/**
* Whether the record has been put to the {@link LogCache}
Expand All @@ -54,6 +54,14 @@ public int compareTo(WalWriteRequest o) {
return record.compareTo(o.record);
}

public void persisted() {
this.persisted = true;
}

public boolean isPersisted() {
return this.persisted;
}

@Override
public String toString() {
return "WalWriteRequest{" +
Expand Down
Loading