From f1232fcd4b87a10d92f49f92795b16e8747e68bf Mon Sep 17 00:00:00 2001 From: rkee Date: Sun, 7 Apr 2024 19:54:39 -0700 Subject: [PATCH 1/2] Extending recording/publishing infra * Add publish interface in EventRecorder * Allow DefaultEventBus to publish event through EventRecorder * Wrap source Event in JfrRecordableEvent --- .../io/lettuce/core/AbstractRedisClient.java | 3 +- .../core/cluster/RedisClusterClient.java | 3 +- .../lettuce/core/event/DefaultEventBus.java | 2 +- .../lettuce/core/event/RecordableEvent.java | 18 ++++++++ .../lettuce/core/event/jfr/EventRecorder.java | 13 +----- .../core/event/jfr/JfrEventRecorder.java | 44 ++++++++++++++++--- .../core/event/jfr/NoOpEventRecorder.java | 23 +++++++++- 7 files changed, 84 insertions(+), 22 deletions(-) create mode 100644 src/main/java/io/lettuce/core/event/RecordableEvent.java diff --git a/src/main/java/io/lettuce/core/AbstractRedisClient.java b/src/main/java/io/lettuce/core/AbstractRedisClient.java index 9adff50624..034bf5b306 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisClient.java +++ b/src/main/java/io/lettuce/core/AbstractRedisClient.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import reactor.core.publisher.Mono; +import io.lettuce.core.event.RecordableEvent; import io.lettuce.core.event.command.CommandListener; import io.lettuce.core.event.connection.ConnectEvent; import io.lettuce.core.event.connection.ConnectionCreatedEvent; @@ -400,7 +401,7 @@ protected > ConnectionFuture initia String uriString = connectionBuilder.getRedisURI().toString(); EventRecorder.getInstance().record(new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId())); - EventRecorder.RecordableEvent event = EventRecorder.getInstance() + RecordableEvent event = EventRecorder.getInstance() .start(new ConnectEvent(uriString, connectionBuilder.endpoint().getId())); channelReadyFuture.whenComplete((channel, throwable) -> { diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index a60dfd0d82..0123770bc5 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -54,6 +54,7 @@ import io.lettuce.core.cluster.topology.TopologyComparators; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.event.RecordableEvent; import io.lettuce.core.event.jfr.EventRecorder; import io.lettuce.core.internal.Exceptions; import io.lettuce.core.internal.Futures; @@ -886,7 +887,7 @@ public CompletionStage refreshPartitionsAsync() { sources.add(redisURI); } - EventRecorder.RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources)); + RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources)); if (partitions == null) { return initializePartitions().thenAccept(Partitions::updateCache) diff --git a/src/main/java/io/lettuce/core/event/DefaultEventBus.java b/src/main/java/io/lettuce/core/event/DefaultEventBus.java index e94aecdcaa..c6b57cc370 100644 --- a/src/main/java/io/lettuce/core/event/DefaultEventBus.java +++ b/src/main/java/io/lettuce/core/event/DefaultEventBus.java @@ -33,7 +33,7 @@ public Flux get() { @Override public void publish(Event event) { - recorder.record(event); + recorder.publish(event); Sinks.EmitResult emitResult; diff --git a/src/main/java/io/lettuce/core/event/RecordableEvent.java b/src/main/java/io/lettuce/core/event/RecordableEvent.java new file mode 100644 index 0000000000..885daf00b2 --- /dev/null +++ b/src/main/java/io/lettuce/core/event/RecordableEvent.java @@ -0,0 +1,18 @@ +package io.lettuce.core.event; + +/** + * Interface defining a recordable event that is recorded on calling {@link #record()}. + */ +public interface RecordableEvent extends Event { + + /** + * Complete the event recording. + */ + void record(); + + /** + * Get the source event. + */ + Event getSource(); + +} diff --git a/src/main/java/io/lettuce/core/event/jfr/EventRecorder.java b/src/main/java/io/lettuce/core/event/jfr/EventRecorder.java index 718c132018..fbb1b5c4c8 100644 --- a/src/main/java/io/lettuce/core/event/jfr/EventRecorder.java +++ b/src/main/java/io/lettuce/core/event/jfr/EventRecorder.java @@ -1,6 +1,7 @@ package io.lettuce.core.event.jfr; import io.lettuce.core.event.Event; +import io.lettuce.core.event.RecordableEvent; /** * Event recorder that can delegate events from the {@link io.lettuce.core.event.EventBus} into a recording facility such as @@ -38,16 +39,6 @@ static EventRecorder getInstance() { */ RecordableEvent start(Event event); - /** - * Interface defining a recordable event that is recorded on calling {@link #record()}. - */ - interface RecordableEvent { - - /** - * Complete the event recording. - */ - void record(); - - } + void publish(Event event); } diff --git a/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java b/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java index 620e546d54..6d9162b35c 100644 --- a/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java +++ b/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java @@ -5,6 +5,7 @@ import java.util.Map; import io.lettuce.core.event.Event; +import io.lettuce.core.event.RecordableEvent; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceClassUtils; @@ -16,7 +17,7 @@ *

* JFR event forwarding tries to detect a JFR event class that is co-located with the actual event type in the same package * whose simple name is prefixed with {@code Jfr} (e.g. {@code ConnectedEvent} translates to {@code JfrConnectedEvent}). JFR - * event implementations are expected to accept the originak event type as constructor argument. Implementations can be + * event implementations are expected to accept the original event type as constructor argument. Implementations can be * package-private. * * @author Mark Paluch @@ -42,16 +43,35 @@ public RecordableEvent start(Event event) { LettuceAssert.notNull(event, "Event must not be null"); - jdk.jfr.Event jfrEvent = createEvent(event); + JfrRecordableEvent jfrRecordableEvent = new JfrRecordableEvent(event); + jdk.jfr.Event jfrEvent = jfrRecordableEvent.getJfrEvent(); if (jfrEvent != null) { jfrEvent.begin(); - return new JfrRecordableEvent(jfrEvent); + return jfrRecordableEvent; } return NoOpEventRecorder.INSTANCE; } + /** + * When the given event is an instance of RecordableEvent, this method works the same as {@link #record}. + * Otherwise, do nothing. + * + * @param event the event to be published + */ + @Override + public void publish(Event event) { + + LettuceAssert.notNull(event, "Event must not be null"); + + if (event instanceof RecordableEvent) { + ((RecordableEvent) event).record(); + } else { + record(event); + } + } + private Constructor getEventConstructor(Event event) throws NoSuchMethodException { Constructor constructor; @@ -97,12 +117,15 @@ private jdk.jfr.Event createEvent(Event event) { } } - static class JfrRecordableEvent implements RecordableEvent { + class JfrRecordableEvent implements RecordableEvent { + + private final Event originalEvent; private final jdk.jfr.Event jfrEvent; - public JfrRecordableEvent(jdk.jfr.Event jfrEvent) { - this.jfrEvent = jfrEvent; + public JfrRecordableEvent(Event event) { + this.originalEvent = event; + this.jfrEvent = createEvent(event); } @Override @@ -111,6 +134,15 @@ public void record() { jfrEvent.commit(); } + @Override + public Event getSource() { + return originalEvent; + } + + public jdk.jfr.Event getJfrEvent() { + return jfrEvent; + } + } } diff --git a/src/main/java/io/lettuce/core/event/jfr/NoOpEventRecorder.java b/src/main/java/io/lettuce/core/event/jfr/NoOpEventRecorder.java index 8b5f7a9e50..b24c22090e 100644 --- a/src/main/java/io/lettuce/core/event/jfr/NoOpEventRecorder.java +++ b/src/main/java/io/lettuce/core/event/jfr/NoOpEventRecorder.java @@ -1,6 +1,7 @@ package io.lettuce.core.event.jfr; import io.lettuce.core.event.Event; +import io.lettuce.core.event.RecordableEvent; /** * No-op implementation. @@ -8,9 +9,18 @@ * @author Mark Paluch * @since 6.1 */ -enum NoOpEventRecorder implements EventRecorder, EventRecorder.RecordableEvent { +public final class NoOpEventRecorder implements EventRecorder, RecordableEvent { - INSTANCE; + public final static NoOpEventRecorder INSTANCE = new NoOpEventRecorder(); + + private Event originalEvent = null; + + private NoOpEventRecorder() { + } + + public NoOpEventRecorder(Event event) { + this.originalEvent = event; + } @Override public void record(Event event) { @@ -22,9 +32,18 @@ public RecordableEvent start(Event event) { return this; } + @Override + public void publish(Event event) { + + } + @Override public void record() { + } + @Override + public Event getSource() { + return originalEvent; } } From a8826478b2fed98487bdca0fcf9c0a7b432c0df4 Mon Sep 17 00:00:00 2001 From: Rkee Date: Sat, 1 Jun 2024 22:38:31 -0700 Subject: [PATCH 2/2] address commments from https://github.com/redis/lettuce/pull/2859#discussion_r1607784212 --- .../io/lettuce/core/AbstractRedisClient.java | 3 +- .../core/cluster/RedisClusterClient.java | 3 +- .../lettuce/core/event/DefaultEventBus.java | 8 +++-- .../lettuce/core/event/RecordableEvent.java | 18 ---------- .../lettuce/core/event/jfr/EventRecorder.java | 18 ++++++++-- .../core/event/jfr/JfrEventRecorder.java | 35 ++++++------------- .../core/event/jfr/NoOpEventRecorder.java | 8 +---- 7 files changed, 35 insertions(+), 58 deletions(-) delete mode 100644 src/main/java/io/lettuce/core/event/RecordableEvent.java diff --git a/src/main/java/io/lettuce/core/AbstractRedisClient.java b/src/main/java/io/lettuce/core/AbstractRedisClient.java index 034bf5b306..9adff50624 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisClient.java +++ b/src/main/java/io/lettuce/core/AbstractRedisClient.java @@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger; import reactor.core.publisher.Mono; -import io.lettuce.core.event.RecordableEvent; import io.lettuce.core.event.command.CommandListener; import io.lettuce.core.event.connection.ConnectEvent; import io.lettuce.core.event.connection.ConnectionCreatedEvent; @@ -401,7 +400,7 @@ protected > ConnectionFuture initia String uriString = connectionBuilder.getRedisURI().toString(); EventRecorder.getInstance().record(new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId())); - RecordableEvent event = EventRecorder.getInstance() + EventRecorder.RecordableEvent event = EventRecorder.getInstance() .start(new ConnectEvent(uriString, connectionBuilder.endpoint().getId())); channelReadyFuture.whenComplete((channel, throwable) -> { diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index 0123770bc5..a60dfd0d82 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -54,7 +54,6 @@ import io.lettuce.core.cluster.topology.TopologyComparators; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.codec.StringCodec; -import io.lettuce.core.event.RecordableEvent; import io.lettuce.core.event.jfr.EventRecorder; import io.lettuce.core.internal.Exceptions; import io.lettuce.core.internal.Futures; @@ -887,7 +886,7 @@ public CompletionStage refreshPartitionsAsync() { sources.add(redisURI); } - RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources)); + EventRecorder.RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources)); if (partitions == null) { return initializePartitions().thenAccept(Partitions::updateCache) diff --git a/src/main/java/io/lettuce/core/event/DefaultEventBus.java b/src/main/java/io/lettuce/core/event/DefaultEventBus.java index c6b57cc370..257eda5404 100644 --- a/src/main/java/io/lettuce/core/event/DefaultEventBus.java +++ b/src/main/java/io/lettuce/core/event/DefaultEventBus.java @@ -33,11 +33,15 @@ public Flux get() { @Override public void publish(Event event) { - recorder.publish(event); + final Event sourceEvent = (event instanceof EventRecorder.RecordableEvent) + ? ((EventRecorder.RecordableEvent) event).getSource() + : event; + + recorder.record(sourceEvent); Sinks.EmitResult emitResult; - while ((emitResult = bus.tryEmitNext(event)) == Sinks.EmitResult.FAIL_NON_SERIALIZED) { + while ((emitResult = bus.tryEmitNext(sourceEvent)) == Sinks.EmitResult.FAIL_NON_SERIALIZED) { // busy-loop } diff --git a/src/main/java/io/lettuce/core/event/RecordableEvent.java b/src/main/java/io/lettuce/core/event/RecordableEvent.java deleted file mode 100644 index 885daf00b2..0000000000 --- a/src/main/java/io/lettuce/core/event/RecordableEvent.java +++ /dev/null @@ -1,18 +0,0 @@ -package io.lettuce.core.event; - -/** - * Interface defining a recordable event that is recorded on calling {@link #record()}. - */ -public interface RecordableEvent extends Event { - - /** - * Complete the event recording. - */ - void record(); - - /** - * Get the source event. - */ - Event getSource(); - -} diff --git a/src/main/java/io/lettuce/core/event/jfr/EventRecorder.java b/src/main/java/io/lettuce/core/event/jfr/EventRecorder.java index fbb1b5c4c8..fd2c4726a6 100644 --- a/src/main/java/io/lettuce/core/event/jfr/EventRecorder.java +++ b/src/main/java/io/lettuce/core/event/jfr/EventRecorder.java @@ -1,7 +1,6 @@ package io.lettuce.core.event.jfr; import io.lettuce.core.event.Event; -import io.lettuce.core.event.RecordableEvent; /** * Event recorder that can delegate events from the {@link io.lettuce.core.event.EventBus} into a recording facility such as @@ -39,6 +38,21 @@ static EventRecorder getInstance() { */ RecordableEvent start(Event event); - void publish(Event event); + /** + * Interface defining a recordable event that is recorded on calling {@link #record()}. + */ + interface RecordableEvent extends Event { + + /** + * Complete the event recording. + */ + void record(); + + /** + * Get the source event. + */ + Event getSource(); + + } } diff --git a/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java b/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java index 6d9162b35c..5cc96231eb 100644 --- a/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java +++ b/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java @@ -5,7 +5,6 @@ import java.util.Map; import io.lettuce.core.event.Event; -import io.lettuce.core.event.RecordableEvent; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceClassUtils; @@ -31,10 +30,14 @@ public void record(Event event) { LettuceAssert.notNull(event, "Event must not be null"); - jdk.jfr.Event jfrEvent = createEvent(event); + if (event instanceof RecordableEvent) { + ((RecordableEvent) event).record(); + } else { + jdk.jfr.Event jfrEvent = createEvent(event); - if (jfrEvent != null) { - jfrEvent.commit(); + if (jfrEvent != null) { + jfrEvent.commit(); + } } } @@ -54,24 +57,6 @@ public RecordableEvent start(Event event) { return NoOpEventRecorder.INSTANCE; } - /** - * When the given event is an instance of RecordableEvent, this method works the same as {@link #record}. - * Otherwise, do nothing. - * - * @param event the event to be published - */ - @Override - public void publish(Event event) { - - LettuceAssert.notNull(event, "Event must not be null"); - - if (event instanceof RecordableEvent) { - ((RecordableEvent) event).record(); - } else { - record(event); - } - } - private Constructor getEventConstructor(Event event) throws NoSuchMethodException { Constructor constructor; @@ -119,12 +104,12 @@ private jdk.jfr.Event createEvent(Event event) { class JfrRecordableEvent implements RecordableEvent { - private final Event originalEvent; + private final Event sourceEvent; private final jdk.jfr.Event jfrEvent; public JfrRecordableEvent(Event event) { - this.originalEvent = event; + this.sourceEvent = event; this.jfrEvent = createEvent(event); } @@ -136,7 +121,7 @@ public void record() { @Override public Event getSource() { - return originalEvent; + return sourceEvent; } public jdk.jfr.Event getJfrEvent() { diff --git a/src/main/java/io/lettuce/core/event/jfr/NoOpEventRecorder.java b/src/main/java/io/lettuce/core/event/jfr/NoOpEventRecorder.java index b24c22090e..f1f75b04a6 100644 --- a/src/main/java/io/lettuce/core/event/jfr/NoOpEventRecorder.java +++ b/src/main/java/io/lettuce/core/event/jfr/NoOpEventRecorder.java @@ -1,7 +1,6 @@ package io.lettuce.core.event.jfr; import io.lettuce.core.event.Event; -import io.lettuce.core.event.RecordableEvent; /** * No-op implementation. @@ -9,7 +8,7 @@ * @author Mark Paluch * @since 6.1 */ -public final class NoOpEventRecorder implements EventRecorder, RecordableEvent { +public final class NoOpEventRecorder implements EventRecorder, EventRecorder.RecordableEvent { public final static NoOpEventRecorder INSTANCE = new NoOpEventRecorder(); @@ -32,11 +31,6 @@ public RecordableEvent start(Event event) { return this; } - @Override - public void publish(Event event) { - - } - @Override public void record() { }