Skip to content

Commit

Permalink
High-level event-publisher API
Browse files Browse the repository at this point in the history
Closes #23
Relates to #16
  • Loading branch information
miere committed Mar 6, 2022
1 parent 3572e85 commit ee4312e
Show file tree
Hide file tree
Showing 27 changed files with 806 additions and 161 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2019 Skullabs Contributors (https://github.com/skullabs)
# Copyright 2022 Skullabs Contributors (https://github.com/skullabs)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,7 @@
# limitations under the License.
#
kos.apt.events.EventListenerKosProcessor
kos.apt.events.EventPublisherKosProcessor
kos.apt.validation.ValidatorProcessor
kos.apt.rest.RestApiProcessor
kos.apt.rest.RestClientProcessor
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class {{eventListenerClassName}} implements ConfigurationLoadedEventListe
final ImplementationLoader implementationLoader = event.getKosContext().getImplementationLoader();
// Auto-configure a message producer, if found in the classpath
final EventBusSinkManager messageProducerSyncManager = implementationLoader.instanceOfOrFail(EventBusSinkManager.class);
final EventBusSinkManager subscriptionManager = implementationLoader.instanceOfOrFail(EventBusSinkManager.class);
final Validation validation = event.getKosContext().getDefaultValidation();
final Vertx vertx = event.getKosContext().getDefaultVertx();
Expand All @@ -42,11 +42,7 @@ public class {{eventListenerClassName}} implements ConfigurationLoadedEventListe
{{/isAsync}}
};
{{/requiresValidation}}
EventBusSink.SubscriptionRequest subscriptionRequest{{uniqueIdentifier}} = new EventBusSink.SubscriptionRequest(
event.getApplicationConfig(), event.getKosContext(), "{{topicAddressName}}", {{{messageTypeWithTypeErasure}}}.class
);
messageProducerSyncManager.tryInitialise(subscriptionRequest{{uniqueIdentifier}});
vertx.eventBus().consumer("{{topicAddressName}}", EventHandler.async((Message<{{{messageType}}}> message) -> {
subscriptionManager.subscribe("{{topicAddressName}}", {{{messageTypeWithTypeErasure}}}.class, EventHandler.async((Message<{{{messageType}}}> message) -> {
{{{messageType}}} body = message.body();
{{#requiresValidation}}
return validation.validate(body, {{{messageType}}}.class)
Expand Down
43 changes: 43 additions & 0 deletions kos-annotations/resources/template-event-publisher-java.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package {{packageName}};

import injector.*;
import kos.api.*;
import kos.core.events.*;
import io.vertx.core.*;
import io.vertx.core.eventbus.*;

/**
* Auto generated event bus publisher for {@link {{packageName}}.{{eventPublisherInterfaceName}}}.
*/
@Singleton
@ExposedAs(ConfigurationLoadedEventListener.class)
@{{jdkGeneratedAnnotation}}("kos.apt.EventPublisherKosProcessor")
public class {{eventPublisherClassName}} implements ConfigurationLoadedEventListener, {{eventPublisherInterfaceName}} {
{{#methods}}
/**
* Message producer for {{name}}({{{messageType}}}).
*/
MessageProducer<{{{messageType}}}> {{targetMethodName}}Producer{{uniqueIdentifier}};

@Override
public Future<Void> {{targetMethodName}}({{{messageType}}} value) {
return {{targetMethodName}}Producer{{uniqueIdentifier}}.write(value);
}

{{/methods}}
@Override
public void on(ConfigurationLoadedEvent configurationLoadedEvent) {
final ImplementationLoader implementationLoader = configurationLoadedEvent.getKosContext().getImplementationLoader();
final EventBusSinkManager eventPublisherManager = implementationLoader.instanceOfOrFail(EventBusSinkManager.class);
{{#methods}}
{{targetMethodName}}Producer{{uniqueIdentifier}} = eventPublisherManager.createProducer("{{topicAddressName}}", {{{messageType}}}.class);
{{/methods}}
}

@Producer
public {{eventPublisherInterfaceName}} produce{{eventPublisherInterfaceName}}() {
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package kos.apt.events;

import generator.apt.SimplifiedAST;
import generator.apt.SimplifiedAbstractProcessor;
import kos.api.ConfigurationLoadedEventListener;
import kos.apt.ClassGenerator;
import kos.apt.spi.CustomInjectorProcessor;
import kos.apt.spi.SPIGenerator;
import kos.events.Publisher;
import lombok.val;

import javax.annotation.processing.ProcessingEnvironment;
import javax.annotation.processing.SupportedAnnotationTypes;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static kos.core.Lang.convert;

@SupportedAnnotationTypes( { "kos.events.*" } )
public class EventPublisherKosProcessor extends SimplifiedAbstractProcessor {

private final CustomInjectorProcessor injectorProcessor = new CustomInjectorProcessor();
private ClassGenerator classGenerator;
private SPIGenerator configurationLoadedSpiGenerator;

public EventPublisherKosProcessor() {
super(
emptyList(),
singletonList(Publisher.class),
emptyList()
);
}

@Override
public synchronized void init(ProcessingEnvironment processingEnv) {
super.init(processingEnv);
classGenerator = new ClassGenerator("template-event-publisher-java.mustache", processingEnv);
injectorProcessor.init(processingEnv);

val configurationLoadedSpiLocation = "META-INF/services/" + ConfigurationLoadedEventListener.class.getCanonicalName();
configurationLoadedSpiGenerator = new SPIGenerator(processingEnv, resourceLocator, configurationLoadedSpiLocation);
}

@Override
protected void process(Collection<SimplifiedAST.Type> types) {
try {
val eventPublisherTypes = convert(types, EventPublisherType::from);
classGenerator.generateClasses(eventPublisherTypes);
generateSpiDescriptors(eventPublisherTypes);
createSPIFileForInterfaces(eventPublisherTypes);
} catch (Throwable cause) {
throw new RuntimeException(cause);
}
}

/**
* Writes the SPI files for {@link ConfigurationLoadedEventListener}.
*/
private void generateSpiDescriptors(List<EventPublisherType> eventListenerTypes) throws IOException {
configurationLoadedSpiGenerator.flushSPIClasses();
configurationLoadedSpiGenerator.memorizeSPIFor(eventListenerTypes);
configurationLoadedSpiGenerator.generateSPIFiles();
}

/**
* Will generate SPI files for the just created Publisher types.
*/
private void createSPIFileForInterfaces(List<EventPublisherType> eventPublisherTypes) throws IOException {
for (val publisherType : eventPublisherTypes) {
val spiLocation = "META-INF/services/" + publisherType.packageName + "." + publisherType.eventPublisherInterfaceName;
val generator = new SPIGenerator(processingEnv, resourceLocator, spiLocation);
generator.flushSPIClasses();
generator.memorizeSPIFor(publisherType);
generator.generateSPIFiles();
}
}
}
111 changes: 111 additions & 0 deletions kos-annotations/source/kos/apt/events/EventPublisherType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package kos.apt.events;

import generator.apt.SimplifiedAST;
import io.vertx.core.Future;
import kos.apt.TypeUtils;
import kos.apt.spi.SpiClass;
import kos.core.Lang;
import kos.events.Publisher;
import kos.validation.Valid;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.val;

import java.util.List;
import java.util.Objects;

@Getter
@RequiredArgsConstructor
class EventPublisherType implements SpiClass {

final String jdkGeneratedAnnotation;
final String eventPublisherClassName;
final String eventPublisherInterfaceName;
final String packageName;

final List<EventPublisherMethod> methods;

static EventPublisherType from(SimplifiedAST.Type type)
{
if (!type.isInterface()) {
throw new UnsupportedOperationException("Cannot create Publisher for concrete classes.");
}

val methods = Lang.filter(
Lang.convertIndex(type.getMethods(), EventPublisherMethod::from),
Objects::nonNull
);

return new EventPublisherType(
type.getJdkGeneratedAnnotation(),
type.getSimpleName() + "Impl",
type.getSimpleName(),
type.getPackageName(),
methods
);
}

@Override
public String getClassCanonicalName() {
return getPackageName() + "." + this.getEventPublisherClassName();
}
}

@Getter
@RequiredArgsConstructor
class EventPublisherMethod {

private static final String VERTX_FUTURE_OF_VOID = Future.class.getCanonicalName() + "<java.lang.Void>";

final String topicAddressName;
final String messageType;
final boolean requiresValidation;
final boolean isAsync;
final String targetMethodName;
final int uniqueIdentifier;

public static EventPublisherMethod from(int counter, SimplifiedAST.Method targetMethod)
{
if (targetMethod.isConstructor()) return null;

val topicAddressName = extractTopicAddressNameFrom(targetMethod);
if (topicAddressName == null) return null;

if (targetMethod.getParameters().isEmpty()) {
throw new UnsupportedOperationException("Missing event type in publisher method.");
}

if (targetMethod.getParameters().size() != 1) {
throw new UnsupportedOperationException("Publisher methods cannot have more than one parameter.");
}

if (!targetMethod.getType().equals(VERTX_FUTURE_OF_VOID)) {
throw new UnsupportedOperationException("Publisher methods should return " + VERTX_FUTURE_OF_VOID + ".");
}

val parameter = targetMethod.getParameters().get(0);
val messageType = parameter.getType();

val erasedMessageType = TypeUtils.rawType(messageType).orElse(messageType);
if (erasedMessageType != messageType) {
throw new UnsupportedOperationException("Publisher does not support types with generics");
}

return new EventPublisherMethod(
topicAddressName,
messageType,
parameter.getAnnotation(Valid.class) != null,
!targetMethod.isVoidMethod(),
targetMethod.getName(),
counter
);
}

private static String extractTopicAddressNameFrom(SimplifiedAST.Method targetMethod) {
return Lang
.first(targetMethod.getAnnotations(), ann -> ann.getType().equals(Publisher.class.getCanonicalName()))
.map(SimplifiedAST.Annotation::getValue)
.map(TypeUtils::annotationValueAsString)
.orElse(null);
}
}
14 changes: 14 additions & 0 deletions kos-annotations/source/kos/events/Publisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kos.events;

import java.lang.annotation.*;

@Documented
@Retention(RetentionPolicy.SOURCE)
@Target(ElementType.METHOD)
public @interface Publisher {

/**
* The {@link io.vertx.core.eventbus.EventBus} topic address.
*/
String value();
}
43 changes: 43 additions & 0 deletions kos-annotations/tests/kos/apt/EventPublisherProcessorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package kos.apt;

import kos.apt.events.EventListenerKosProcessor;
import kos.apt.events.EventPublisherKosProcessor;
import kos.sample.events.ListenerWithValidationAndBothAsyncAndSync;
import kos.sample.events.ListenerWithValidationAndBothAsyncAndSyncEventListenerConfiguration;
import kos.sample.events.PublisherWithMultipleMethods;
import kos.sample.events.PublisherWithMultipleMethodsImpl;
import lombok.val;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import javax.annotation.processing.Processor;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class EventPublisherProcessorTest {

Processor processor = new EventPublisherKosProcessor();

@Nested class ListenerMethodsCorrectlyWritten {

@Nested class WhenParametersIsValidated {

@Nested class WhenHasMultiplePublishers {

@DisplayName("should generate class as expected")
@Test void process()
{
val source = APT.asSource(APT.testFile(PublisherWithMultipleMethods.class));
APT.run(processor, source);

val generatedClassName = PublisherWithMultipleMethods.class.getCanonicalName() + "Impl";
val generatedClass = APT.readFileAsString(APT.outputGeneratedClass(generatedClassName));

val expectedClass = APT.readFileAsString(APT.testFile(PublisherWithMultipleMethodsImpl.class));
assertEquals(expectedClass, generatedClass);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void on(ConfigurationLoadedEvent event) {
final ImplementationLoader implementationLoader = event.getKosContext().getImplementationLoader();

// Auto-configure a message producer, if found in the classpath
final EventBusSinkManager messageProducerSyncManager = implementationLoader.instanceOfOrFail(EventBusSinkManager.class);
final EventBusSinkManager subscriptionManager = implementationLoader.instanceOfOrFail(EventBusSinkManager.class);

final Validation validation = event.getKosContext().getDefaultValidation();
final Vertx vertx = event.getKosContext().getDefaultVertx();
Expand All @@ -31,11 +31,7 @@ public void on(ConfigurationLoadedEvent event) {
* - handler is async: true
* - requires validation: false
*/
EventBusSink.SubscriptionRequest subscriptionRequest1 = new EventBusSink.SubscriptionRequest(
event.getApplicationConfig(), event.getKosContext(), "gcp::pubsub::users::deleted", java.lang.String.class
);
messageProducerSyncManager.tryInitialise(subscriptionRequest1);
vertx.eventBus().consumer("gcp::pubsub::users::deleted", EventHandler.async((Message<java.lang.String> message) -> {
subscriptionManager.subscribe("gcp::pubsub::users::deleted", java.lang.String.class, EventHandler.async((Message<java.lang.String> message) -> {
java.lang.String body = message.body();
return listener.on(body);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void on(ConfigurationLoadedEvent event) {
final ImplementationLoader implementationLoader = event.getKosContext().getImplementationLoader();

// Auto-configure a message producer, if found in the classpath
final EventBusSinkManager messageProducerSyncManager = implementationLoader.instanceOfOrFail(EventBusSinkManager.class);
final EventBusSinkManager subscriptionManager = implementationLoader.instanceOfOrFail(EventBusSinkManager.class);

final Validation validation = event.getKosContext().getDefaultValidation();
final Vertx vertx = event.getKosContext().getDefaultVertx();
Expand All @@ -31,11 +31,7 @@ public void on(ConfigurationLoadedEvent event) {
* - handler is async: false
* - requires validation: false
*/
EventBusSink.SubscriptionRequest subscriptionRequest1 = new EventBusSink.SubscriptionRequest(
event.getApplicationConfig(), event.getKosContext(), "gcp::pubsub::users::deleted", java.lang.String.class
);
messageProducerSyncManager.tryInitialise(subscriptionRequest1);
vertx.eventBus().consumer("gcp::pubsub::users::deleted", EventHandler.async((Message<java.lang.String> message) -> {
subscriptionManager.subscribe("gcp::pubsub::users::deleted", java.lang.String.class, EventHandler.async((Message<java.lang.String> message) -> {
java.lang.String body = message.body();
listener.on(body);
return Future.succeededFuture();
Expand Down
Loading

0 comments on commit ee4312e

Please sign in to comment.