From 15b49b32d381cd6279f6ad3f10b8ee8e3f62ac8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 20 May 2024 17:02:22 +0200 Subject: [PATCH] flaky test --- .../apigateway/gateways/LRUTopicProducerCache.java | 5 +++++ .../apigateway/gateways/TopicProducerCache.java | 5 ++++- .../gateways/TopicProducerCacheFactory.java | 13 ++++++++++++- .../apigateway/websocket/WebSocketConfig.java | 1 + 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/LRUTopicProducerCache.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/LRUTopicProducerCache.java index f88b31f33..1fb4e0b61 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/LRUTopicProducerCache.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/LRUTopicProducerCache.java @@ -135,4 +135,9 @@ public TopicProducer getOrCreate( throw new RuntimeException(ex); } } + + @Override + public void close() { + cache.invalidateAll(); + } } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCache.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCache.java index 1b9f3ede6..0447e0eec 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCache.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCache.java @@ -18,7 +18,7 @@ import ai.langstream.api.runner.topics.TopicProducer; import java.util.function.Supplier; -public interface TopicProducerCache { +public interface TopicProducerCache extends AutoCloseable { record Key( String tenant, String application, @@ -27,4 +27,7 @@ record Key( String configString) {} TopicProducer getOrCreate(Key key, Supplier topicProducerSupplier); + + @Override + void close(); } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java index 9ef1430da..0ae28aa67 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java @@ -15,6 +15,7 @@ */ package ai.langstream.apigateway.gateways; +import ai.langstream.api.runner.topics.TopicProducer; import ai.langstream.apigateway.MetricsNames; import ai.langstream.apigateway.config.TopicProperties; import io.micrometer.core.instrument.Metrics; @@ -22,6 +23,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.function.Supplier; + @Configuration public class TopicProducerCacheFactory { @@ -34,7 +37,15 @@ public TopicProducerCache topicProducerCache(TopicProperties topicProperties) { Metrics.globalRegistry, cache.getCache(), MetricsNames.TOPIC_PRODUCER_CACHE); return cache; } else { - return (key, topicProducerSupplier) -> topicProducerSupplier.get(); + return new TopicProducerCache() { + @Override + public TopicProducer getOrCreate(Key key, Supplier topicProducerSupplier) { + return topicProducerSupplier.get(); + } + @Override + public void close() { + } + }; } } } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java index 4366b1efc..2fdbd12a3 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java @@ -100,5 +100,6 @@ public void onDestroy() { log.info("Shutting down WebSocket"); consumeThreadPool.shutdownNow(); clusterRuntimeRegistry.close(); + topicProducerCache.close(); } }