Skip to content

Commit

Permalink
Extending recording/publishing infra
Browse files Browse the repository at this point in the history
* Add publish interface in EventRecorder
* Allow DefaultEventBus to publish event through EventRecorder
* Wrap source Event in JfrRecordableEvent
  • Loading branch information
rkee authored and Rkee committed May 20, 2024
1 parent cc9d240 commit f1232fc
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 22 deletions.
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import reactor.core.publisher.Mono;
import io.lettuce.core.event.RecordableEvent;
import io.lettuce.core.event.command.CommandListener;
import io.lettuce.core.event.connection.ConnectEvent;
import io.lettuce.core.event.connection.ConnectionCreatedEvent;
Expand Down Expand Up @@ -400,7 +401,7 @@ protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initia
String uriString = connectionBuilder.getRedisURI().toString();

EventRecorder.getInstance().record(new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId()));
EventRecorder.RecordableEvent event = EventRecorder.getInstance()
RecordableEvent event = EventRecorder.getInstance()
.start(new ConnectEvent(uriString, connectionBuilder.endpoint().getId()));

channelReadyFuture.whenComplete((channel, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.lettuce.core.cluster.topology.TopologyComparators;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.event.RecordableEvent;
import io.lettuce.core.event.jfr.EventRecorder;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.Futures;
Expand Down Expand Up @@ -886,7 +887,7 @@ public CompletionStage<Void> refreshPartitionsAsync() {
sources.add(redisURI);
}

EventRecorder.RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources));
RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources));

if (partitions == null) {
return initializePartitions().thenAccept(Partitions::updateCache)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/event/DefaultEventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public Flux<Event> get() {
@Override
public void publish(Event event) {

recorder.record(event);
recorder.publish(event);

Sinks.EmitResult emitResult;

Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/lettuce/core/event/RecordableEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.lettuce.core.event;

/**
* Interface defining a recordable event that is recorded on calling {@link #record()}.
*/
public interface RecordableEvent extends Event {

/**
* Complete the event recording.
*/
void record();

/**
* Get the source event.
*/
Event getSource();

}
13 changes: 2 additions & 11 deletions src/main/java/io/lettuce/core/event/jfr/EventRecorder.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.lettuce.core.event.jfr;

import io.lettuce.core.event.Event;
import io.lettuce.core.event.RecordableEvent;

/**
* Event recorder that can delegate events from the {@link io.lettuce.core.event.EventBus} into a recording facility such as
Expand Down Expand Up @@ -38,16 +39,6 @@ static EventRecorder getInstance() {
*/
RecordableEvent start(Event event);

/**
* Interface defining a recordable event that is recorded on calling {@link #record()}.
*/
interface RecordableEvent {

/**
* Complete the event recording.
*/
void record();

}
void publish(Event event);

}
44 changes: 38 additions & 6 deletions src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;

import io.lettuce.core.event.Event;
import io.lettuce.core.event.RecordableEvent;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceClassUtils;

Expand All @@ -16,7 +17,7 @@
* <p>
* JFR event forwarding tries to detect a JFR event class that is co-located with the actual event type in the same package
* whose simple name is prefixed with {@code Jfr} (e.g. {@code ConnectedEvent} translates to {@code JfrConnectedEvent}). JFR
* event implementations are expected to accept the originak event type as constructor argument. Implementations can be
* event implementations are expected to accept the original event type as constructor argument. Implementations can be
* package-private.
*
* @author Mark Paluch
Expand All @@ -42,16 +43,35 @@ public RecordableEvent start(Event event) {

LettuceAssert.notNull(event, "Event must not be null");

jdk.jfr.Event jfrEvent = createEvent(event);
JfrRecordableEvent jfrRecordableEvent = new JfrRecordableEvent(event);
jdk.jfr.Event jfrEvent = jfrRecordableEvent.getJfrEvent();

if (jfrEvent != null) {
jfrEvent.begin();
return new JfrRecordableEvent(jfrEvent);
return jfrRecordableEvent;
}

return NoOpEventRecorder.INSTANCE;
}

/**
* When the given event is an instance of RecordableEvent, this method works the same as {@link #record}.
* Otherwise, do nothing.
*
* @param event the event to be published
*/
@Override
public void publish(Event event) {

LettuceAssert.notNull(event, "Event must not be null");

if (event instanceof RecordableEvent) {
((RecordableEvent) event).record();
} else {
record(event);
}
}

private Constructor<?> getEventConstructor(Event event) throws NoSuchMethodException {

Constructor<?> constructor;
Expand Down Expand Up @@ -97,12 +117,15 @@ private jdk.jfr.Event createEvent(Event event) {
}
}

static class JfrRecordableEvent implements RecordableEvent {
class JfrRecordableEvent implements RecordableEvent {

private final Event originalEvent;

private final jdk.jfr.Event jfrEvent;

public JfrRecordableEvent(jdk.jfr.Event jfrEvent) {
this.jfrEvent = jfrEvent;
public JfrRecordableEvent(Event event) {
this.originalEvent = event;
this.jfrEvent = createEvent(event);
}

@Override
Expand All @@ -111,6 +134,15 @@ public void record() {
jfrEvent.commit();
}

@Override
public Event getSource() {
return originalEvent;
}

public jdk.jfr.Event getJfrEvent() {
return jfrEvent;
}

}

}
23 changes: 21 additions & 2 deletions src/main/java/io/lettuce/core/event/jfr/NoOpEventRecorder.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
package io.lettuce.core.event.jfr;

import io.lettuce.core.event.Event;
import io.lettuce.core.event.RecordableEvent;

/**
* No-op implementation.
*
* @author Mark Paluch
* @since 6.1
*/
enum NoOpEventRecorder implements EventRecorder, EventRecorder.RecordableEvent {
public final class NoOpEventRecorder implements EventRecorder, RecordableEvent {

INSTANCE;
public final static NoOpEventRecorder INSTANCE = new NoOpEventRecorder();

private Event originalEvent = null;

private NoOpEventRecorder() {
}

public NoOpEventRecorder(Event event) {
this.originalEvent = event;
}

@Override
public void record(Event event) {
Expand All @@ -22,9 +32,18 @@ public RecordableEvent start(Event event) {
return this;
}

@Override
public void publish(Event event) {

}

@Override
public void record() {
}

@Override
public Event getSource() {
return originalEvent;
}

}

0 comments on commit f1232fc

Please sign in to comment.