Skip to content

Commit

Permalink
[improve][test] Add solution to PulsarMockBookKeeper for intercepting…
Browse files Browse the repository at this point in the history
… reads (apache#23875)

(cherry picked from commit 87fb442)
(cherry picked from commit 38f15bc)
  • Loading branch information
lhotari authored and nikhil-ctds committed Jan 31, 2025
1 parent 69e638a commit 30e25e8
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
Expand Down Expand Up @@ -96,6 +98,9 @@ public static Collection<BookieId> getMockEnsemble() {
final Queue<Long> addEntryResponseDelaysMillis = new ConcurrentLinkedQueue<>();
final List<CompletableFuture<Void>> failures = new ArrayList<>();
final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();
@Setter
@Getter
private volatile PulsarMockReadHandleInterceptor readHandleInterceptor;

public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception {
this.orderedExecutor = orderedExecutor;
Expand Down Expand Up @@ -250,7 +255,8 @@ public CompletableFuture<ReadHandle> execute() {
return FutureUtils.exception(new BKException.BKUnauthorizedAccessException());
} else {
return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
lh.getLedgerMetadata(), lh.entries));
lh.getLedgerMetadata(), lh.entries,
PulsarMockBookKeeper.this::getReadHandleInterceptor));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
this.digest = digest;
this.passwd = Arrays.copyOf(passwd, passwd.length);

readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries);
readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, bk::getReadHandleInterceptor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
Expand All @@ -40,28 +41,36 @@ class PulsarMockReadHandle implements ReadHandle {
private final long ledgerId;
private final LedgerMetadata metadata;
private final List<LedgerEntryImpl> entries;
private final Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier;

PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata,
List<LedgerEntryImpl> entries) {
List<LedgerEntryImpl> entries,
Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier) {
this.bk = bk;
this.ledgerId = ledgerId;
this.metadata = metadata;
this.entries = entries;
this.readHandleInterceptorSupplier = readHandleInterceptorSupplier;
}

@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
return bk.getProgrammedFailure().thenComposeAsync((res) -> {
log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
List<LedgerEntry> seq = new ArrayList<>();
long entryId = firstEntry;
while (entryId <= lastEntry && entryId < entries.size()) {
seq.add(entries.get((int) entryId++).duplicate());
}
log.debug("Entries read: {}", seq);

return FutureUtils.value(LedgerEntriesImpl.create(seq));
});
log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
List<LedgerEntry> seq = new ArrayList<>();
long entryId = firstEntry;
while (entryId <= lastEntry && entryId < entries.size()) {
seq.add(entries.get((int) entryId++).duplicate());
}
log.debug("Entries read: {}", seq);
LedgerEntriesImpl ledgerEntries = LedgerEntriesImpl.create(seq);
PulsarMockReadHandleInterceptor pulsarMockReadHandleInterceptor = readHandleInterceptorSupplier.get();
if (pulsarMockReadHandleInterceptor != null) {
return pulsarMockReadHandleInterceptor.interceptReadAsync(ledgerId, firstEntry, lastEntry,
ledgerEntries);
}
return FutureUtils.value(ledgerEntries);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.client;

import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.LedgerEntries;

/**
* Interceptor interface for intercepting read handle readAsync operations.
* This is useful for testing purposes, for example for introducing delays.
*/
public interface PulsarMockReadHandleInterceptor {
/**
* Intercepts the readAsync operation on a read handle.
*
* @param ledgerId ledger id
* @param firstEntry first entry to read
* @param lastEntry last entry to read
* @param entries entries that would be returned by the read operation
* @return CompletableFuture that will complete with the entries to return
*/
CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long firstEntry, long lastEntry,
LedgerEntries entries);
}

0 comments on commit 30e25e8

Please sign in to comment.