From ea87260d00006168218be1ccc4c939ac09cb1d29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 22 Mar 2024 13:43:09 +0100 Subject: [PATCH] gateway: fix Pulsar topics with non default tenant (#761) --- .../apigateway/gateways/ConsumeGateway.java | 51 +++---- .../apigateway/gateways/ProduceGateway.java | 21 ++- .../apigateway/http/GatewayResource.java | 7 +- .../runner/ClusterRuntimeRegistryBean.java | 30 +++++ .../apigateway/websocket/WebSocketConfig.java | 6 + .../websocket/handlers/AbstractHandler.java | 26 +++- .../websocket/handlers/ChatHandler.java | 8 +- .../websocket/handlers/ConsumeHandler.java | 8 +- .../websocket/handlers/ProduceHandler.java | 8 +- .../apigateway/http/GatewayResourceTest.java | 16 ++- .../http/PulsarTenantGatewayResourceTest.java | 124 ++++++++++++++++++ .../handlers/PulsarContainerExtension.java | 23 +++- ...PulsarTenantProduceConsumeHandlerTest.java | 118 +++++++++++++++++ .../ai/langstream/api/model/Application.java | 11 ++ .../api/runtime/StreamingClusterRuntime.java | 4 +- .../impl/common/BasicClusterRuntime.java | 16 ++- .../runtime/KafkaStreamingClusterRuntime.java | 3 +- .../PravegaStreamingClusterRuntime.java | 6 +- .../pulsar/PulsarContainerExtension.java | 2 +- .../pulsar/PulsarStreamingClusterRuntime.java | 6 +- .../pravega/PulsarContainerExtension.java | 2 +- .../pravega/PulsarRunnerDockerTest.java | 110 ++++++++++++++-- .../application/ApplicationService.java | 2 +- 23 files changed, 525 insertions(+), 83 deletions(-) create mode 100644 langstream-api-gateway/src/main/java/ai/langstream/apigateway/runner/ClusterRuntimeRegistryBean.java create mode 100644 langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/PulsarTenantGatewayResourceTest.java create mode 100644 langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarTenantProduceConsumeHandlerTest.java diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java index 82ca0fcea..be5b6b689 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java @@ -17,6 +17,7 @@ import ai.langstream.api.model.Gateway; import ai.langstream.api.model.StreamingCluster; +import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runner.code.Header; import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.topics.TopicConnectionsRuntime; @@ -24,8 +25,10 @@ import ai.langstream.api.runner.topics.TopicOffsetPosition; import ai.langstream.api.runner.topics.TopicReadResult; import ai.langstream.api.runner.topics.TopicReader; +import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import ai.langstream.api.runtime.StreamingClusterRuntime; +import ai.langstream.api.runtime.Topic; import ai.langstream.apigateway.api.ConsumePushMessage; -import ai.langstream.apigateway.api.ProduceResponse; import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; @@ -42,44 +45,14 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j public class ConsumeGateway implements AutoCloseable { protected static final ObjectMapper mapper = new ObjectMapper(); - - @Getter - public static class ProduceException extends Exception { - - private final ProduceResponse.Status status; - - public ProduceException(String message, ProduceResponse.Status status) { - super(message); - this.status = status; - } - } - - public static class ProduceGatewayRequestValidator - implements GatewayRequestHandler.GatewayRequestValidator { - @Override - public List getAllRequiredParameters(Gateway gateway) { - return gateway.getParameters(); - } - - @Override - public void validateOptions(Map options) { - for (Map.Entry option : options.entrySet()) { - switch (option.getKey()) { - default -> throw new IllegalArgumentException( - "Unknown option " + option.getKey()); - } - } - } - } - private final TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry; + private final ClusterRuntimeRegistry clusterRuntimeRegistry; private volatile TopicConnectionsRuntime topicConnectionsRuntime; @@ -90,8 +63,11 @@ public void validateOptions(Map options) { private AuthenticatedGatewayRequestContext requestContext; private List> filters; - public ConsumeGateway(TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry) { + public ConsumeGateway( + TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry, + ClusterRuntimeRegistry clusterRuntimeRegistry) { this.topicConnectionsRuntimeRegistry = topicConnectionsRuntimeRegistry; + this.clusterRuntimeRegistry = clusterRuntimeRegistry; } public void setup( @@ -126,9 +102,16 @@ public void setup( default -> TopicOffsetPosition.absolute( Base64.getDecoder().decode(positionParameter)); }; + TopicDefinition topicDefinition = requestContext.application().resolveTopic(topic); + StreamingClusterRuntime streamingClusterRuntime = + clusterRuntimeRegistry.getStreamingClusterRuntime(streamingCluster); + Topic topicImplementation = + streamingClusterRuntime.createTopicImplementation( + topicDefinition, streamingCluster); + final String resolvedTopicName = topicImplementation.topicName(); reader = topicConnectionsRuntime.createReader( - streamingCluster, Map.of("topic", topic), position); + streamingCluster, Map.of("topic", resolvedTopicName), position); reader.start(); } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java index 175218f34..70ded0363 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java @@ -17,12 +17,16 @@ import ai.langstream.api.model.Gateway; import ai.langstream.api.model.StreamingCluster; +import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runner.code.Header; import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.code.SimpleRecord; import ai.langstream.api.runner.topics.TopicConnectionsRuntime; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.runner.topics.TopicProducer; +import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import ai.langstream.api.runtime.StreamingClusterRuntime; +import ai.langstream.api.runtime.Topic; import ai.langstream.apigateway.api.ProduceRequest; import ai.langstream.apigateway.api.ProduceResponse; import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext; @@ -82,6 +86,7 @@ public void validateOptions(Map options) { } private final TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry; + private final ClusterRuntimeRegistry clusterRuntimeRegistry; private final TopicProducerCache topicProducerCache; private TopicProducer producer; private List
commonHeaders; @@ -89,8 +94,10 @@ public void validateOptions(Map options) { public ProduceGateway( TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry, + ClusterRuntimeRegistry clusterRuntimeRegistry, TopicProducerCache topicProducerCache) { this.topicConnectionsRuntimeRegistry = topicConnectionsRuntimeRegistry; + this.clusterRuntimeRegistry = clusterRuntimeRegistry; this.topicProducerCache = topicProducerCache; } @@ -116,6 +123,15 @@ public void start( } catch (JsonProcessingException e) { throw new RuntimeException(e); } + + TopicDefinition topicDefinition = requestContext.application().resolveTopic(topic); + StreamingClusterRuntime streamingClusterRuntime = + clusterRuntimeRegistry.getStreamingClusterRuntime(streamingCluster); + Topic topicImplementation = + streamingClusterRuntime.createTopicImplementation( + topicDefinition, streamingCluster); + final String resolvedTopicName = topicImplementation.topicName(); + // we need to cache the producer per topic and per config, since an application update could // change the configuration final TopicProducerCache.Key key = @@ -123,10 +139,11 @@ public void start( requestContext.tenant(), requestContext.applicationId(), requestContext.gateway().getId(), - topic, + resolvedTopicName, configString); producer = - topicProducerCache.getOrCreate(key, () -> setupProducer(topic, streamingCluster)); + topicProducerCache.getOrCreate( + key, () -> setupProducer(resolvedTopicName, streamingCluster)); } @AllArgsConstructor diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index b824f5178..8d82dada3 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -19,6 +19,7 @@ import ai.langstream.api.model.Gateway; import ai.langstream.api.runner.code.Header; import ai.langstream.api.runner.code.Record; +import ai.langstream.api.runtime.ClusterRuntimeRegistry; import ai.langstream.api.storage.ApplicationStore; import ai.langstream.apigateway.api.ProduceRequest; import ai.langstream.apigateway.api.ProduceResponse; @@ -78,6 +79,7 @@ public class GatewayResource { protected static final ObjectMapper MAPPER = new ObjectMapper(); protected static final String SERVICE_REQUEST_ID_HEADER = "langstream-service-request-id"; private final TopicConnectionsRuntimeProviderBean topicConnectionsRuntimeRegistryProvider; + private final ClusterRuntimeRegistry clusterRuntimeRegistry; private final TopicProducerCache topicProducerCache; private final ApplicationStore applicationStore; private final GatewayRequestHandler gatewayRequestHandler; @@ -121,6 +123,7 @@ ProduceResponse produce( new ProduceGateway( topicConnectionsRuntimeRegistryProvider .getTopicConnectionsRuntimeRegistry(), + clusterRuntimeRegistry, topicProducerCache)) { final List
commonHeaders = ProduceGateway.getProducerCommonHeaders( @@ -259,12 +262,14 @@ private CompletableFuture handleServiceWithTopics( new ProduceGateway( topicConnectionsRuntimeRegistryProvider .getTopicConnectionsRuntimeRegistry(), + clusterRuntimeRegistry, topicProducerCache); ) { final ConsumeGateway consumeGateway = new ConsumeGateway( topicConnectionsRuntimeRegistryProvider - .getTopicConnectionsRuntimeRegistry()); + .getTopicConnectionsRuntimeRegistry(), + clusterRuntimeRegistry); completableFuture.thenRunAsync( () -> { if (consumeGateway != null) { diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/runner/ClusterRuntimeRegistryBean.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/runner/ClusterRuntimeRegistryBean.java new file mode 100644 index 000000000..8cb40e8d2 --- /dev/null +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/runner/ClusterRuntimeRegistryBean.java @@ -0,0 +1,30 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.apigateway.runner; + +import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Slf4j +public class ClusterRuntimeRegistryBean { + @Bean + public ClusterRuntimeRegistry registry() { + return new ClusterRuntimeRegistry(); + } +} diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java index f74390d06..29afd6aa7 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java @@ -16,6 +16,7 @@ package ai.langstream.apigateway.websocket; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; +import ai.langstream.api.runtime.ClusterRuntimeRegistry; import ai.langstream.api.storage.ApplicationStore; import ai.langstream.apigateway.gateways.GatewayRequestHandler; import ai.langstream.apigateway.gateways.TopicProducerCache; @@ -49,6 +50,7 @@ public class WebSocketConfig implements WebSocketConfigurer { private final ApplicationStore applicationStore; private final TopicConnectionsRuntimeProviderBean topicConnectionsRuntimeRegistryProvider; + private final ClusterRuntimeRegistry clusterRuntimeRegistry; private final GatewayRequestHandler gatewayRequestHandler; private final TopicProducerCache topicProducerCache; private final ExecutorService consumeThreadPool = @@ -64,12 +66,14 @@ public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { applicationStore, consumeThreadPool, topicConnectionsRuntimeRegistry, + clusterRuntimeRegistry, topicProducerCache), CONSUME_PATH) .addHandler( new ProduceHandler( applicationStore, topicConnectionsRuntimeRegistry, + clusterRuntimeRegistry, topicProducerCache), PRODUCE_PATH) .addHandler( @@ -77,6 +81,7 @@ public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { applicationStore, consumeThreadPool, topicConnectionsRuntimeRegistry, + clusterRuntimeRegistry, topicProducerCache), CHAT_PATH) .setAllowedOrigins("*") @@ -93,5 +98,6 @@ public ServletServerContainerFactoryBean createWebSocketContainer() { @PreDestroy public void onDestroy() { consumeThreadPool.shutdown(); + clusterRuntimeRegistry.close(); } } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java index 64cd2355a..639460aa3 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java @@ -20,12 +20,16 @@ import ai.langstream.api.events.GatewayEventData; import ai.langstream.api.model.Gateway; import ai.langstream.api.model.StreamingCluster; +import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runner.code.Header; import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.code.SimpleRecord; import ai.langstream.api.runner.topics.TopicConnectionsRuntime; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.runner.topics.TopicProducer; +import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import ai.langstream.api.runtime.StreamingClusterRuntime; +import ai.langstream.api.runtime.Topic; import ai.langstream.api.storage.ApplicationStore; import ai.langstream.apigateway.api.ProduceResponse; import ai.langstream.apigateway.gateways.ConsumeGateway; @@ -52,14 +56,17 @@ public abstract class AbstractHandler extends TextWebSocketHandler { protected static final String ATTRIBUTE_PRODUCE_GATEWAY = "__produce_gateway"; protected static final String ATTRIBUTE_CONSUME_GATEWAY = "__consume_gateway"; protected final TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry; + protected final ClusterRuntimeRegistry clusterRuntimeRegistry; protected final ApplicationStore applicationStore; private final TopicProducerCache topicProducerCache; public AbstractHandler( ApplicationStore applicationStore, TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry, + ClusterRuntimeRegistry clusterRuntimeRegistry, TopicProducerCache topicProducerCache) { this.topicConnectionsRuntimeRegistry = topicConnectionsRuntimeRegistry; + this.clusterRuntimeRegistry = clusterRuntimeRegistry; this.applicationStore = applicationStore; this.topicProducerCache = topicProducerCache; } @@ -187,11 +194,20 @@ protected void sendEvent(EventRecord.Types type, AuthenticatedGatewayRequestCont topicConnectionsRuntime.init(streamingCluster); + TopicDefinition topicDefinition = + context.application().resolveTopic(gateway.getEventsTopic()); + StreamingClusterRuntime streamingClusterRuntime = + new ClusterRuntimeRegistry().getStreamingClusterRuntime(streamingCluster); + Topic topicImplementation = + streamingClusterRuntime.createTopicImplementation( + topicDefinition, streamingCluster); + final String resolvedTopicName = topicImplementation.topicName(); + try (final TopicProducer producer = topicConnectionsRuntime.createProducer( "langstream-events", streamingCluster, - Map.of("topic", gateway.getEventsTopic()))) { + Map.of("topic", resolvedTopicName))) { producer.start(); final EventSources.GatewaySource source = @@ -246,7 +262,8 @@ protected void setupReader( List> filters, AuthenticatedGatewayRequestContext context) throws Exception { - final ConsumeGateway consumeGateway = new ConsumeGateway(topicConnectionsRuntimeRegistry); + final ConsumeGateway consumeGateway = + new ConsumeGateway(topicConnectionsRuntimeRegistry, clusterRuntimeRegistry); try { consumeGateway.setup(topic, filters, context); } catch (Exception ex) { @@ -261,7 +278,10 @@ protected void setupProducer( String topic, List
commonHeaders, AuthenticatedGatewayRequestContext context) throws Exception { final ProduceGateway produceGateway = - new ProduceGateway(topicConnectionsRuntimeRegistry, topicProducerCache); + new ProduceGateway( + topicConnectionsRuntimeRegistry, + clusterRuntimeRegistry, + topicProducerCache); try { produceGateway.start(topic, commonHeaders, context); diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ChatHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ChatHandler.java index 57e13cc82..b03cae36e 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ChatHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ChatHandler.java @@ -21,6 +21,7 @@ import ai.langstream.api.runner.code.Header; import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; +import ai.langstream.api.runtime.ClusterRuntimeRegistry; import ai.langstream.api.storage.ApplicationStore; import ai.langstream.apigateway.gateways.ConsumeGateway; import ai.langstream.apigateway.gateways.GatewayRequestHandler; @@ -47,8 +48,13 @@ public ChatHandler( ApplicationStore applicationStore, ExecutorService executor, TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry, + ClusterRuntimeRegistry clusterRuntimeRegistry, TopicProducerCache topicProducerCache) { - super(applicationStore, topicConnectionsRuntimeRegistry, topicProducerCache); + super( + applicationStore, + topicConnectionsRuntimeRegistry, + clusterRuntimeRegistry, + topicProducerCache); this.executor = executor; } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ConsumeHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ConsumeHandler.java index 9b70d4fb0..d764cc548 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ConsumeHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ConsumeHandler.java @@ -20,6 +20,7 @@ import ai.langstream.api.model.Gateway; import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; +import ai.langstream.api.runtime.ClusterRuntimeRegistry; import ai.langstream.api.storage.ApplicationStore; import ai.langstream.apigateway.gateways.ConsumeGateway; import ai.langstream.apigateway.gateways.GatewayRequestHandler; @@ -44,8 +45,13 @@ public ConsumeHandler( ApplicationStore applicationStore, ExecutorService executor, TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry, + ClusterRuntimeRegistry clusterRuntimeRegistry, TopicProducerCache topicProducerCache) { - super(applicationStore, topicConnectionsRuntimeRegistry, topicProducerCache); + super( + applicationStore, + topicConnectionsRuntimeRegistry, + clusterRuntimeRegistry, + topicProducerCache); this.executor = executor; } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ProduceHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ProduceHandler.java index 198574b8b..77d40081f 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ProduceHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ProduceHandler.java @@ -20,6 +20,7 @@ import ai.langstream.api.model.Gateway; import ai.langstream.api.runner.code.Header; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; +import ai.langstream.api.runtime.ClusterRuntimeRegistry; import ai.langstream.api.storage.ApplicationStore; import ai.langstream.apigateway.gateways.GatewayRequestHandler; import ai.langstream.apigateway.gateways.ProduceGateway; @@ -38,8 +39,13 @@ public class ProduceHandler extends AbstractHandler { public ProduceHandler( ApplicationStore applicationStore, TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry, + ClusterRuntimeRegistry clusterRuntimeRegistry, TopicProducerCache topicProducerCache) { - super(applicationStore, topicConnectionsRuntimeRegistry, topicProducerCache); + super( + applicationStore, + topicConnectionsRuntimeRegistry, + clusterRuntimeRegistry, + topicProducerCache); } @Override diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index e6247794d..5bb3f8e81 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -23,12 +23,7 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -import ai.langstream.api.model.Application; -import ai.langstream.api.model.ApplicationSpecs; -import ai.langstream.api.model.Gateway; -import ai.langstream.api.model.Gateways; -import ai.langstream.api.model.StoredApplication; -import ai.langstream.api.model.StreamingCluster; +import ai.langstream.api.model.*; import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.topics.TopicConnectionsRuntime; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; @@ -593,7 +588,8 @@ void testService() throws Exception { .get(2, TimeUnit.MINUTES); } - private void startTopicExchange(String fromTopic, String toTopic) throws Exception { + private void startTopicExchange(String logicalFromTopic, String logicalToTopic) + throws Exception { final CompletableFuture future = CompletableFuture.runAsync( () -> { @@ -606,6 +602,8 @@ private void startTopicExchange(String fromTopic, String toTopic) throws Excepti .getTopicConnectionsRuntime(streamingCluster) .asTopicConnectionsRuntime(); runtime.init(streamingCluster); + final String fromTopic = resolveTopicName(logicalFromTopic); + final String toTopic = resolveTopicName(logicalToTopic); try (final TopicConsumer consumer = runtime.createConsumer( null, @@ -686,4 +684,8 @@ private void prepareTopicsForTest(String... topic) throws Exception { deployer.createImplementation( "app", store.get("t", "app", false).getInstance())); } + + protected String resolveTopicName(String topic) { + return topic; + } } diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/PulsarTenantGatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/PulsarTenantGatewayResourceTest.java new file mode 100644 index 000000000..5d4de2f5e --- /dev/null +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/PulsarTenantGatewayResourceTest.java @@ -0,0 +1,124 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.apigateway.http; + +import ai.langstream.api.model.StreamingCluster; +import ai.langstream.api.storage.ApplicationStore; +import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties; +import ai.langstream.apigateway.websocket.handlers.PulsarContainerExtension; +import java.util.HashSet; +import java.util.Map; +import java.util.function.Consumer; +import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +class PulsarTenantGatewayResourceTest extends GatewayResourceTest { + + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv( + Map.of( + "PULSAR_PREFIX_forceDeleteTenantAllowed", + "true", + "PULSAR_PREFIX_forceDeleteNamespaceAllowed", + "true")) + .withOnContainerReady( + new Consumer() { + @Override + @SneakyThrows + public void accept( + PulsarContainerExtension pulsarContainerExtension) { + try (PulsarAdmin admin = + PulsarAdmin.builder() + .serviceHttpUrl( + pulsarContainerExtension + .getHttpServiceUrl()) + .build(); ) { + + TenantInfo info = + TenantInfo.builder() + .allowedClusters( + new HashSet<>( + admin.clusters() + .getClusters())) + .build(); + admin.tenants().createTenant("mytenant", info); + admin.namespaces().createNamespace("mytenant/mynamespace"); + admin.tenants().deleteTenant("public", true); + } + } + }); + + @Override + protected StreamingCluster getStreamingCluster() { + return new StreamingCluster( + "pulsar", + Map.of( + "admin", + Map.of("serviceUrl", pulsarContainer.getHttpServiceUrl()), + "service", + Map.of("serviceUrl", pulsarContainer.getBrokerUrl()), + "default-tenant", + "mytenant", + "default-namespace", + "mynamespace")); + } + + @TestConfiguration + public static class WebSocketTestConfig { + + @Bean + @Primary + public ApplicationStore store() { + String instanceYaml = + """ + instance: + streamingCluster: + type: "pulsar" + configuration: + admin: + serviceUrl: "%s" + service: + serviceUrl: "%s" + default-tenant: "mytenant" + default-namespace: "mynamespace" + computeCluster: + type: "none" + """ + .formatted( + pulsarContainer.getHttpServiceUrl(), + pulsarContainer.getBrokerUrl()); + return getMockedStore(instanceYaml); + } + + @Bean + @Primary + public GatewayTestAuthenticationProperties gatewayTestAuthenticationProperties() { + return getGatewayTestAuthenticationProperties(); + } + } + + @Override + protected String resolveTopicName(String topic) { + return "mytenant/mynamespace/" + topic; + } +} diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarContainerExtension.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarContainerExtension.java index b91d94633..8fa01a685 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarContainerExtension.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarContainerExtension.java @@ -15,6 +15,8 @@ */ package ai.langstream.apigateway.websocket.handlers; +import java.util.Map; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; @@ -26,6 +28,8 @@ @Slf4j public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCallback { private PulsarContainer pulsarContainer; + private Consumer onContainerReady; + private Map env = Map.of(); private Network network; @@ -43,16 +47,33 @@ public void afterAll(ExtensionContext extensionContext) { public void beforeAll(ExtensionContext extensionContext) { network = Network.newNetwork(); pulsarContainer = - new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.1.0")) + new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.2.1")) .withNetwork(network) + .withEnv(env) .withLogConsumer( outputFrame -> log.debug( "pulsar> {}", outputFrame.getUtf8String().trim())); // start Pulsar and wait for it to be ready to accept requests pulsarContainer.start(); + if (onContainerReady != null) { + onContainerReady.accept(this); + } + } + + public PulsarContainerExtension withOnContainerReady( + Consumer onContainerReady) { + this.onContainerReady = onContainerReady; + return this; } + public PulsarContainerExtension withEnv(Map env) { + this.env = env; + return this; + } + + protected void onContainerReady() {} + public String getBrokerUrl() { return pulsarContainer.getPulsarBrokerUrl(); } diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarTenantProduceConsumeHandlerTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarTenantProduceConsumeHandlerTest.java new file mode 100644 index 000000000..19c43166e --- /dev/null +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarTenantProduceConsumeHandlerTest.java @@ -0,0 +1,118 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.apigateway.websocket.handlers; + +import ai.langstream.api.model.StreamingCluster; +import ai.langstream.api.storage.ApplicationStore; +import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties; +import java.util.HashSet; +import java.util.Map; +import java.util.function.Consumer; +import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +public class PulsarTenantProduceConsumeHandlerTest extends ProduceConsumeHandlerTest { + + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv( + Map.of( + "PULSAR_PREFIX_forceDeleteTenantAllowed", + "true", + "PULSAR_PREFIX_forceDeleteNamespaceAllowed", + "true")) + .withOnContainerReady( + new Consumer() { + @Override + @SneakyThrows + public void accept( + PulsarContainerExtension pulsarContainerExtension) { + try (PulsarAdmin admin = + PulsarAdmin.builder() + .serviceHttpUrl( + pulsarContainerExtension + .getHttpServiceUrl()) + .build(); ) { + + TenantInfo info = + TenantInfo.builder() + .allowedClusters( + new HashSet<>( + admin.clusters() + .getClusters())) + .build(); + admin.tenants().createTenant("mytenant", info); + admin.namespaces().createNamespace("mytenant/mynamespace"); + admin.tenants().deleteTenant("public", true); + } + } + }); + + @Override + protected StreamingCluster getStreamingCluster() { + return new StreamingCluster( + "pulsar", + Map.of( + "admin", + Map.of("serviceUrl", pulsarContainer.getHttpServiceUrl()), + "service", + Map.of("serviceUrl", pulsarContainer.getBrokerUrl()), + "default-tenant", + "mytenant", + "default-namespace", + "mynamespace")); + } + + @TestConfiguration + public static class WebSocketTestConfig { + + @Bean + @Primary + public ApplicationStore store() { + String instanceYaml = + """ + instance: + streamingCluster: + type: "pulsar" + configuration: + admin: + serviceUrl: "%s" + service: + serviceUrl: "%s" + default-tenant: "mytenant" + default-namespace: "mynamespace" + computeCluster: + type: "none" + """ + .formatted( + pulsarContainer.getHttpServiceUrl(), + pulsarContainer.getBrokerUrl()); + return getMockedStore(instanceYaml); + } + + @Bean + @Primary + public GatewayTestAuthenticationProperties gatewayTestAuthenticationProperties() { + return getGatewayTestAuthenticationProperties(); + } + } +} diff --git a/langstream-api/src/main/java/ai/langstream/api/model/Application.java b/langstream-api/src/main/java/ai/langstream/api/model/Application.java index be86787d0..ff7700968 100644 --- a/langstream-api/src/main/java/ai/langstream/api/model/Application.java +++ b/langstream-api/src/main/java/ai/langstream/api/model/Application.java @@ -37,4 +37,15 @@ public class Application { public Module getModule(String module) { return modules.computeIfAbsent(module, Module::new); } + + @JsonIgnore + public TopicDefinition resolveTopic(String input) { + for (Module module : modules.values()) { + if (module.getTopics().containsKey(input)) { + return module.resolveTopic(input); + } + } + throw new IllegalArgumentException( + "Topic " + input + " is not defined in any module of the application"); + } } diff --git a/langstream-api/src/main/java/ai/langstream/api/runtime/StreamingClusterRuntime.java b/langstream-api/src/main/java/ai/langstream/api/runtime/StreamingClusterRuntime.java index 1b4f3e769..2a1c160df 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runtime/StreamingClusterRuntime.java +++ b/langstream-api/src/main/java/ai/langstream/api/runtime/StreamingClusterRuntime.java @@ -15,6 +15,7 @@ */ package ai.langstream.api.runtime; +import ai.langstream.api.model.StreamingCluster; import ai.langstream.api.model.TopicDefinition; import java.util.Map; @@ -28,10 +29,11 @@ public interface StreamingClusterRuntime extends AutoCloseable { * Map a Logical TopicDefinition to a Physical TopicImplementation * * @param topicDefinition the logical topic definition + * @param streamingCluster streaming cluster configuration * @return the physical topic implementation */ Topic createTopicImplementation( - TopicDefinition topicDefinition, ExecutionPlan applicationInstance); + TopicDefinition topicDefinition, StreamingCluster streamingCluster); /** * Create the configuration to consume from a topic. The contents of the map are specific to the diff --git a/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java b/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java index 0e91598f5..c526cd143 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java +++ b/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java @@ -86,7 +86,8 @@ protected void detectTopics( for (Module module : applicationInstance.getModules().values()) { for (TopicDefinition topic : module.getTopics().values()) { Topic topicImplementation = - streamingClusterRuntime.createTopicImplementation(topic, result); + streamingClusterRuntime.createTopicImplementation( + topic, result.getApplication().getInstance().streamingCluster()); result.registerTopic(topic, topicImplementation); } } @@ -340,7 +341,7 @@ private Topic buildImplicitTopicForDeadletterQueue( Topic connection, TopicDefinition inputTopicDefinition, StreamingClusterRuntime streamingClusterRuntime, - ExecutionPlan physicalApplicationInstance) { + ExecutionPlan executionPlan) { // connecting two agents requires an intermediate topic String name = inputTopicDefinition.getName() + "-deadletter"; log.info( @@ -364,8 +365,9 @@ private Topic buildImplicitTopicForDeadletterQueue( Map.of()); Topic topicImplementation = streamingClusterRuntime.createTopicImplementation( - topicDefinition, physicalApplicationInstance); - physicalApplicationInstance.registerTopic(topicDefinition, topicImplementation); + topicDefinition, + executionPlan.getApplication().getInstance().streamingCluster()); + executionPlan.registerTopic(topicDefinition, topicImplementation); return topicImplementation; } @@ -396,7 +398,11 @@ protected Topic buildImplicitTopicForAgent( Map.of()); Topic topicImplementation = streamingClusterRuntime.createTopicImplementation( - topicDefinition, physicalApplicationInstance); + topicDefinition, + physicalApplicationInstance + .getApplication() + .getInstance() + .streamingCluster()); physicalApplicationInstance.registerTopic(topicDefinition, topicImplementation); return topicImplementation; diff --git a/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java b/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java index b593864e4..34af7f271 100644 --- a/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java +++ b/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java @@ -19,7 +19,6 @@ import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runtime.AgentNode; import ai.langstream.api.runtime.ConnectionImplementation; -import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.StreamingClusterRuntime; import ai.langstream.api.runtime.Topic; import com.fasterxml.jackson.databind.ObjectMapper; @@ -40,7 +39,7 @@ public static KafkaClusterRuntimeConfiguration getKafkaClusterRuntimeConfigurati @Override public Topic createTopicImplementation( - TopicDefinition topicDefinition, ExecutionPlan applicationInstance) { + TopicDefinition topicDefinition, StreamingCluster streamingCluster) { String name = topicDefinition.getName(); String creationMode = topicDefinition.getCreationMode(); Map options = topicDefinition.getOptions(); diff --git a/langstream-pravega/src/main/java/ai/langstream/pravega/PravegaStreamingClusterRuntime.java b/langstream-pravega/src/main/java/ai/langstream/pravega/PravegaStreamingClusterRuntime.java index 42f2e864d..6ec736efc 100644 --- a/langstream-pravega/src/main/java/ai/langstream/pravega/PravegaStreamingClusterRuntime.java +++ b/langstream-pravega/src/main/java/ai/langstream/pravega/PravegaStreamingClusterRuntime.java @@ -19,7 +19,6 @@ import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runtime.AgentNode; import ai.langstream.api.runtime.ConnectionImplementation; -import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.StreamingClusterRuntime; import ai.langstream.api.runtime.Topic; import com.fasterxml.jackson.databind.ObjectMapper; @@ -32,10 +31,9 @@ public class PravegaStreamingClusterRuntime implements StreamingClusterRuntime { @Override public Topic createTopicImplementation( - TopicDefinition topicDefinition, ExecutionPlan applicationInstance) { + TopicDefinition topicDefinition, StreamingCluster streamingCluster) { final PravegaClusterRuntimeConfiguration config = - getPravegaClusterRuntimeConfiguration( - applicationInstance.getApplication().getInstance().streamingCluster()); + getPravegaClusterRuntimeConfiguration(streamingCluster); String name = topicDefinition.getName(); String creationMode = topicDefinition.getCreationMode(); diff --git a/langstream-pulsar-runtime/src/test/java/ai/langstream/pulsar/PulsarContainerExtension.java b/langstream-pulsar-runtime/src/test/java/ai/langstream/pulsar/PulsarContainerExtension.java index 7b35419b8..24e8dff8d 100644 --- a/langstream-pulsar-runtime/src/test/java/ai/langstream/pulsar/PulsarContainerExtension.java +++ b/langstream-pulsar-runtime/src/test/java/ai/langstream/pulsar/PulsarContainerExtension.java @@ -57,7 +57,7 @@ public void beforeAll(ExtensionContext extensionContext) throws PulsarClientException, PulsarAdminException { network = Network.newNetwork(); pulsarContainer = - new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.1.0")) + new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.2.1")) .withNetwork(network) .withLogConsumer( outputFrame -> diff --git a/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java b/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java index 80ea80055..58e143d70 100644 --- a/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java +++ b/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java @@ -20,7 +20,6 @@ import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runtime.AgentNode; import ai.langstream.api.runtime.ConnectionImplementation; -import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.StreamingClusterRuntime; import ai.langstream.api.runtime.Topic; import com.fasterxml.jackson.databind.ObjectMapper; @@ -33,10 +32,9 @@ public class PulsarStreamingClusterRuntime implements StreamingClusterRuntime { @Override public Topic createTopicImplementation( - TopicDefinition topicDefinition, ExecutionPlan applicationInstance) { + TopicDefinition topicDefinition, StreamingCluster streamingCluster) { final PulsarClusterRuntimeConfiguration config = - getPulsarClusterRuntimeConfiguration( - applicationInstance.getApplication().getInstance().streamingCluster()); + getPulsarClusterRuntimeConfiguration(streamingCluster); SchemaDefinition keySchema = topicDefinition.getKeySchema(); SchemaDefinition valueSchema = topicDefinition.getValueSchema(); diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pravega/PulsarContainerExtension.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pravega/PulsarContainerExtension.java index 0141a031f..ba0882e86 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pravega/PulsarContainerExtension.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pravega/PulsarContainerExtension.java @@ -57,7 +57,7 @@ public void beforeAll(ExtensionContext extensionContext) throws PulsarClientException, PulsarAdminException { network = Network.newNetwork(); pulsarContainer = - new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.1.0")) + new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.2.1")) .withNetwork(network) .withLogConsumer( outputFrame -> diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pravega/PulsarRunnerDockerTest.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pravega/PulsarRunnerDockerTest.java index 48dc32f5a..2110cbcf5 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pravega/PulsarRunnerDockerTest.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pravega/PulsarRunnerDockerTest.java @@ -22,19 +22,18 @@ import ai.langstream.AbstractApplicationRunner; import ai.langstream.kafka.AbstractKafkaApplicationRunner; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.awaitility.Awaitility; @@ -48,7 +47,7 @@ class PulsarRunnerDockerTest extends AbstractApplicationRunner { static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test - public void testRunAITools() throws Exception { + public void simpleTest() throws Exception { String tenant = "tenant"; String[] expectedAgents = {"app-step1"}; String inputTopic = "input-topic-" + UUID.randomUUID(); @@ -79,7 +78,11 @@ public void testRunAITools() throws Exception { try (ApplicationRuntime applicationRuntime = deployApplication( - tenant, "app", application, buildInstanceYaml(), expectedAgents)) { + tenant, + "app", + application, + buildInstanceYaml("public", "default"), + expectedAgents)) { try (Producer producer = createProducer(inputTopic); Consumer consumer = createConsumer(outputTopic)) { @@ -98,6 +101,71 @@ public void testRunAITools() throws Exception { } } + @Test + public void simpleTestDifferentTenant() throws Exception { + String tenant = "tenant"; + String[] expectedAgents = {"app-step1"}; + String inputTopic = "input-topic-" + UUID.randomUUID(); + String outputTopic = "output-topic-" + UUID.randomUUID(); + + PulsarAdmin admin = pulsarContainer.getAdmin(); + + TenantInfo info = + TenantInfo.builder() + .allowedClusters(new HashSet<>(admin.clusters().getClusters())) + .build(); + admin.tenants().createTenant("mytenant", info); + admin.namespaces().createNamespace("mytenant/mynamespace"); + + Map application = + Map.of( + "module.yaml", + """ + module: "module-1" + id: "pipeline-1" + topics: + - name: "%s" + creation-mode: create-if-not-exists + - name: "%s" + creation-mode: create-if-not-exists + pipeline: + - name: "drop-description" + id: "step1" + type: "drop-fields" + input: "%s" + output: "%s" + configuration: + fields: + - "description" + """ + .formatted(inputTopic, outputTopic, inputTopic, outputTopic)); + + try (ApplicationRuntime applicationRuntime = + deployApplication( + tenant, + "app", + application, + buildInstanceYaml("mytenant", "mynamespace"), + expectedAgents)) { + try (Producer producer = createProducer("mytenant/mynamespace/" + inputTopic); + Consumer consumer = + createConsumer("mytenant/mynamespace/" + outputTopic)) { + + producer.newMessage() + .value("{\"name\": \"some name\", \"description\": \"some description\"}") + .property("header-key", "header-value") + .send(); + producer.flush(); + + executeAgentRunners(applicationRuntime); + + Message record = consumer.receive(30, TimeUnit.SECONDS); + assertEquals("{\"name\":\"some name\"}", record.getValue().getNativeObject()); + assertEquals("header-value", record.getProperties().get("header-key")); + } + } + } + @Test public void testTopicSchema() throws Exception { String tenant = "topic-schema"; @@ -132,7 +200,11 @@ public void testTopicSchema() throws Exception { try (ApplicationRuntime applicationRuntime = deployApplication( - tenant, "app", application, buildInstanceYaml(), expectedAgents)) { + tenant, + "app", + application, + buildInstanceYaml("public", "default"), + expectedAgents)) { try (Producer producer = createProducer(inputTopic); Consumer consumer = createConsumer(outputTopic)) { @@ -187,7 +259,11 @@ public void testKeyValueSchema() throws Exception { try (ApplicationRuntime applicationRuntime = deployApplication( - tenant, "app", application, buildInstanceYaml(), expectedAgents)) { + tenant, + "app", + application, + buildInstanceYaml("public", "default"), + expectedAgents)) { try (Producer> producer = createProducer( inputTopic, @@ -244,7 +320,11 @@ public void testDeadLetter() throws Exception { setMaxNumLoops(25); try (AbstractKafkaApplicationRunner.ApplicationRuntime applicationRuntime = deployApplication( - tenant, "app", application, buildInstanceYaml(), expectedAgents)) { + tenant, + "app", + application, + buildInstanceYaml("public", "default"), + expectedAgents)) { try (Producer producer = createProducer(inputTopic); Consumer consumer = createConsumer(outputTopic); Consumer consumerDeadletter = @@ -268,7 +348,7 @@ public void testDeadLetter() throws Exception { } } - private String buildInstanceYaml() { + private String buildInstanceYaml(String tenant, String namespace) { return """ instance: streamingCluster: @@ -278,12 +358,16 @@ private String buildInstanceYaml() { serviceUrl: "%s" service: serviceUrl: "%s" - default-tenant: "public" - default-namespace: "default" + default-tenant: "%s" + default-namespace: "%s" computeCluster: type: "kubernetes" """ - .formatted(pulsarContainer.getHttpServiceUrl(), pulsarContainer.getBrokerUrl()); + .formatted( + pulsarContainer.getHttpServiceUrl(), + pulsarContainer.getBrokerUrl(), + tenant, + namespace); } protected Producer createProducer(String topic) throws PulsarClientException { diff --git a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java index 62befce59..765b88fc9 100644 --- a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java +++ b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java @@ -54,7 +54,7 @@ public class ApplicationService { private final ApplicationDeployer deployer = ApplicationDeployer.builder() - .registry(new ClusterRuntimeRegistry()) // TODO: add config + .registry(new ClusterRuntimeRegistry()) .pluginsRegistry(new PluginsRegistry()) .topicConnectionsRuntimeRegistry(new TopicConnectionsRuntimeRegistry()) .build();