From 30e25e8b7eb37bd4d42101471b1ee272d65fd8f9 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 22:36:40 -0800 Subject: [PATCH] [improve][test] Add solution to PulsarMockBookKeeper for intercepting reads (#23875) (cherry picked from commit 87fb442c223d47d8a426b44575981345d7a23481) (cherry picked from commit 38f15bce242541ce33a97ff132d8cfacfee5c16e) --- .../client/PulsarMockBookKeeper.java | 8 +++- .../client/PulsarMockLedgerHandle.java | 2 +- .../client/PulsarMockReadHandle.java | 31 +++++++++----- .../PulsarMockReadHandleInterceptor.java | 40 +++++++++++++++++++ 4 files changed, 68 insertions(+), 13 deletions(-) create mode 100644 testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 620b1c6fb6a2a..f249a0f7231cf 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -40,6 +40,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -96,6 +98,9 @@ public static Collection getMockEnsemble() { final Queue addEntryResponseDelaysMillis = new ConcurrentLinkedQueue<>(); final List> failures = new ArrayList<>(); final List> addEntryFailures = new ArrayList<>(); + @Setter + @Getter + private volatile PulsarMockReadHandleInterceptor readHandleInterceptor; public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception { this.orderedExecutor = orderedExecutor; @@ -250,7 +255,8 @@ public CompletableFuture execute() { return FutureUtils.exception(new BKException.BKUnauthorizedAccessException()); } else { return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId, - lh.getLedgerMetadata(), lh.entries)); + lh.getLedgerMetadata(), lh.entries, + PulsarMockBookKeeper.this::getReadHandleInterceptor)); } }); } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index aa61e541d0d6b..d30684e604670 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -73,7 +73,7 @@ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, this.digest = digest; this.passwd = Arrays.copyOf(passwd, passwd.length); - readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries); + readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, bk::getReadHandleInterceptor); } @Override diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index a4361f62254e4..9f3f4969199ce 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -40,28 +41,36 @@ class PulsarMockReadHandle implements ReadHandle { private final long ledgerId; private final LedgerMetadata metadata; private final List entries; + private final Supplier readHandleInterceptorSupplier; PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, - List entries) { + List entries, + Supplier readHandleInterceptorSupplier) { this.bk = bk; this.ledgerId = ledgerId; this.metadata = metadata; this.entries = entries; + this.readHandleInterceptorSupplier = readHandleInterceptorSupplier; } @Override public CompletableFuture readAsync(long firstEntry, long lastEntry) { return bk.getProgrammedFailure().thenComposeAsync((res) -> { - log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); - List seq = new ArrayList<>(); - long entryId = firstEntry; - while (entryId <= lastEntry && entryId < entries.size()) { - seq.add(entries.get((int) entryId++).duplicate()); - } - log.debug("Entries read: {}", seq); - - return FutureUtils.value(LedgerEntriesImpl.create(seq)); - }); + log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); + List seq = new ArrayList<>(); + long entryId = firstEntry; + while (entryId <= lastEntry && entryId < entries.size()) { + seq.add(entries.get((int) entryId++).duplicate()); + } + log.debug("Entries read: {}", seq); + LedgerEntriesImpl ledgerEntries = LedgerEntriesImpl.create(seq); + PulsarMockReadHandleInterceptor pulsarMockReadHandleInterceptor = readHandleInterceptorSupplier.get(); + if (pulsarMockReadHandleInterceptor != null) { + return pulsarMockReadHandleInterceptor.interceptReadAsync(ledgerId, firstEntry, lastEntry, + ledgerEntries); + } + return FutureUtils.value(ledgerEntries); + }); } @Override diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java new file mode 100644 index 0000000000000..acee87b0f77f4 --- /dev/null +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.api.LedgerEntries; + +/** + * Interceptor interface for intercepting read handle readAsync operations. + * This is useful for testing purposes, for example for introducing delays. + */ +public interface PulsarMockReadHandleInterceptor { + /** + * Intercepts the readAsync operation on a read handle. + * + * @param ledgerId ledger id + * @param firstEntry first entry to read + * @param lastEntry last entry to read + * @param entries entries that would be returned by the read operation + * @return CompletableFuture that will complete with the entries to return + */ + CompletableFuture interceptReadAsync(long ledgerId, long firstEntry, long lastEntry, + LedgerEntries entries); +}