From f1c97c4f4071a20e2a36ab3d718559a5976b826a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 13 May 2024 19:08:46 +0200 Subject: [PATCH 1/5] Fix resource leak in gateway when using Pulsar --- .../apigateway/gateways/ConsumeGateway.java | 11 ++++- .../apigateway/gateways/ProduceGateway.java | 43 ++++++++++++++++++- .../apigateway/http/GatewayResourceTest.java | 33 ++++++++------ .../src/test/resources/application.properties | 2 +- 4 files changed, 72 insertions(+), 17 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java index b8f1948c3..be5b6b689 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java @@ -54,6 +54,8 @@ public class ConsumeGateway implements AutoCloseable { private final TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry; private final ClusterRuntimeRegistry clusterRuntimeRegistry; + private volatile TopicConnectionsRuntime topicConnectionsRuntime; + private volatile TopicReader reader; private volatile boolean interrupted; private volatile String logRef; @@ -84,7 +86,7 @@ public void setup( final StreamingCluster streamingCluster = requestContext.application().getInstance().streamingCluster(); - final TopicConnectionsRuntime topicConnectionsRuntime = + topicConnectionsRuntime = topicConnectionsRuntimeRegistry .getTopicConnectionsRuntime(streamingCluster) .asTopicConnectionsRuntime(); @@ -203,6 +205,13 @@ private void closeReader() { log.warn("error closing reader", e); } } + if (topicConnectionsRuntime != null) { + try { + topicConnectionsRuntime.close(); + } catch (Exception e) { + log.warn("error closing runtime", e); + } + } } @Override diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java index 4fba7d0ad..342bcd7b1 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java @@ -19,6 +19,7 @@ import ai.langstream.api.model.StreamingCluster; import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runner.code.Header; +import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.code.SimpleRecord; import ai.langstream.api.runner.topics.TopicConnectionsRuntime; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; @@ -37,7 +38,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; + +import lombok.AllArgsConstructor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; @@ -143,6 +147,43 @@ public void start( key, () -> setupProducer(resolvedTopicName, streamingCluster)); } + @AllArgsConstructor + static class TopicProducerAndRuntime implements TopicProducer { + private TopicProducer producer; + private TopicConnectionsRuntime runtime; + + @Override + public void start() { + producer.start(); + } + + @Override + public void close() { + producer.close(); + runtime.close(); + } + + @Override + public CompletableFuture write(Record record) { + return producer.write(record); + } + + @Override + public Object getNativeProducer() { + return producer.getNativeProducer(); + } + + @Override + public Object getInfo() { + return producer.getInfo(); + } + + @Override + public long getTotalIn() { + return producer.getTotalIn(); + } + } + protected TopicProducer setupProducer(String topic, StreamingCluster streamingCluster) { final TopicConnectionsRuntime topicConnectionsRuntime = @@ -157,7 +198,7 @@ protected TopicProducer setupProducer(String topic, StreamingCluster streamingCl null, streamingCluster, Map.of("topic", topic)); topicProducer.start(); log.debug("[{}] Started producer on topic {}", logRef, topic); - return topicProducer; + return new TopicProducerAndRuntime(topicProducer, topicConnectionsRuntime); } public void produceMessage(String payload) throws ProduceException { diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index 8e7db231b..444dfd3d9 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -304,6 +304,12 @@ void testSimpleProduce() throws Exception { final String url = "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); + while (true) { + produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}"); + if (url.contains("caiii")) { + break; + } + } produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}"); produceJsonAndExpectOk(url, "{\"key\": \"my-key\"}"); @@ -555,20 +561,19 @@ void testService() throws Exception { final String url = "http://localhost:%d/api/gateways/service/tenant1/application1/svc".formatted(port); - assertMessageContent( - new MsgRecord("my-key", "my-value", Map.of()), - produceJsonAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); - assertMessageContent( - new MsgRecord("my-key2", "my-value", Map.of()), - produceJsonAndGetBody(url, "{\"key\": \"my-key2\", \"value\": \"my-value\"}")); - - assertMessageContent( - new MsgRecord(null, "my-text", Map.of()), produceTextAndGetBody(url, "my-text")); - assertMessageContent( - new MsgRecord("my-key2", "my-value", Map.of("header1", "value1")), - produceJsonAndGetBody( - url, - "{\"key\": \"my-key2\", \"value\": \"my-value\", \"headers\": {\"header1\":\"value1\"}}")); + for (int i = 0; i < 100; i++) { + CompletableFuture.runAsync(() -> { + while (true) { + + assertMessageContent( + new MsgRecord("my-key", "my-value", Map.of()), + produceJsonAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); + } + }); + } + Thread.sleep(Long.MAX_VALUE); + + } private void startTopicExchange(String logicalFromTopic, String logicalToTopic) diff --git a/langstream-api-gateway/src/test/resources/application.properties b/langstream-api-gateway/src/test/resources/application.properties index 495a189b7..575cb1d35 100644 --- a/langstream-api-gateway/src/test/resources/application.properties +++ b/langstream-api-gateway/src/test/resources/application.properties @@ -2,5 +2,5 @@ application.gateways.code.path=target/agents management.endpoints.web.base-path=/management management.endpoints.web.exposure.include=configprops,env,health,info,logfile,loggers,threaddump,prometheus management.endpoint.health.probes.enabled=true -application.topics.producers-cache-enabled=true +application.topics.producers-cache-enabled=false application.topics.producers-cache-size=2 \ No newline at end of file From 2617d6026b4f186af34193948938f9dd2c257db9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 13 May 2024 19:13:18 +0200 Subject: [PATCH 2/5] add tests --- .../apigateway/http/GatewayResourceTest.java | 28 +++++++------------ .../src/test/resources/application.properties | 2 +- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index 444dfd3d9..b7c7da712 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -304,13 +304,6 @@ void testSimpleProduce() throws Exception { final String url = "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); - while (true) { - produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}"); - if (url.contains("caiii")) { - break; - } - } - produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}"); produceJsonAndExpectOk(url, "{\"key\": \"my-key\"}"); produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); @@ -561,19 +554,18 @@ void testService() throws Exception { final String url = "http://localhost:%d/api/gateways/service/tenant1/application1/svc".formatted(port); - for (int i = 0; i < 100; i++) { - CompletableFuture.runAsync(() -> { - while (true) { - - assertMessageContent( - new MsgRecord("my-key", "my-value", Map.of()), - produceJsonAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); - } + List> futures1 = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + for (int j = 0; j < 10; j++) { + assertMessageContent( + new MsgRecord("my-key", "my-value", Map.of()), + produceJsonAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); + } }); + futures1.add(future); } - Thread.sleep(Long.MAX_VALUE); - - + CompletableFuture.allOf(futures1.toArray(new CompletableFuture[]{})).get(2, TimeUnit.MINUTES); } private void startTopicExchange(String logicalFromTopic, String logicalToTopic) diff --git a/langstream-api-gateway/src/test/resources/application.properties b/langstream-api-gateway/src/test/resources/application.properties index 575cb1d35..495a189b7 100644 --- a/langstream-api-gateway/src/test/resources/application.properties +++ b/langstream-api-gateway/src/test/resources/application.properties @@ -2,5 +2,5 @@ application.gateways.code.path=target/agents management.endpoints.web.base-path=/management management.endpoints.web.exposure.include=configprops,env,health,info,logfile,loggers,threaddump,prometheus management.endpoint.health.probes.enabled=true -application.topics.producers-cache-enabled=false +application.topics.producers-cache-enabled=true application.topics.producers-cache-size=2 \ No newline at end of file From c7ec68362535a09041d970e0fb21d6b636472cb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 13 May 2024 19:14:39 +0200 Subject: [PATCH 3/5] add tests --- .../apigateway/http/GatewayResourceTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index b7c7da712..7031030f6 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -554,6 +554,21 @@ void testService() throws Exception { final String url = "http://localhost:%d/api/gateways/service/tenant1/application1/svc".formatted(port); + assertMessageContent( + new MsgRecord("my-key", "my-value", Map.of()), + produceJsonAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); + assertMessageContent( + new MsgRecord("my-key2", "my-value", Map.of()), + produceJsonAndGetBody(url, "{\"key\": \"my-key2\", \"value\": \"my-value\"}")); + + assertMessageContent( + new MsgRecord(null, "my-text", Map.of()), produceTextAndGetBody(url, "my-text")); + assertMessageContent( + new MsgRecord("my-key2", "my-value", Map.of("header1", "value1")), + produceJsonAndGetBody( + url, + "{\"key\": \"my-key2\", \"value\": \"my-value\", \"headers\": {\"header1\":\"value1\"}}")); + List> futures1 = new ArrayList<>(); for (int i = 0; i < 30; i++) { CompletableFuture future = CompletableFuture.runAsync(() -> { From 5c1c8e259eb3cdcffe09e7da2183113d55a032a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 20 May 2024 11:34:54 +0200 Subject: [PATCH 4/5] spotless --- .../apigateway/gateways/ProduceGateway.java | 1 - .../apigateway/http/GatewayResourceTest.java | 21 ++++++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java index 342bcd7b1..70ded0363 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java @@ -40,7 +40,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; - import lombok.AllArgsConstructor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index 7031030f6..5bb3f8e81 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -571,16 +571,21 @@ void testService() throws Exception { List> futures1 = new ArrayList<>(); for (int i = 0; i < 30; i++) { - CompletableFuture future = CompletableFuture.runAsync(() -> { - for (int j = 0; j < 10; j++) { - assertMessageContent( - new MsgRecord("my-key", "my-value", Map.of()), - produceJsonAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); - } - }); + CompletableFuture future = + CompletableFuture.runAsync( + () -> { + for (int j = 0; j < 10; j++) { + assertMessageContent( + new MsgRecord("my-key", "my-value", Map.of()), + produceJsonAndGetBody( + url, + "{\"key\": \"my-key\", \"value\": \"my-value\"}")); + } + }); futures1.add(future); } - CompletableFuture.allOf(futures1.toArray(new CompletableFuture[]{})).get(2, TimeUnit.MINUTES); + CompletableFuture.allOf(futures1.toArray(new CompletableFuture[] {})) + .get(2, TimeUnit.MINUTES); } private void startTopicExchange(String logicalFromTopic, String logicalToTopic) From b289ddd47f8c8370dcd2be43c81777228e652585 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 20 May 2024 14:04:39 +0200 Subject: [PATCH 5/5] fix --- .../ai/langstream/apigateway/http/GatewayResourceTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index 5bb3f8e81..7d1e83344 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -569,8 +569,11 @@ void testService() throws Exception { url, "{\"key\": \"my-key2\", \"value\": \"my-value\", \"headers\": {\"header1\":\"value1\"}}")); + // sorry but kafka can't keep up + final int numParallel = getStreamingCluster().type().equals("kafka") ? 5 : 30; + List> futures1 = new ArrayList<>(); - for (int i = 0; i < 30; i++) { + for (int i = 0; i < numParallel; i++) { CompletableFuture future = CompletableFuture.runAsync( () -> {