From 6ada9926653d1ddf687980dbd288227b8599cfae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 20 May 2024 16:10:41 +0200 Subject: [PATCH] flaky test --- .../ai/langstream/apigateway/http/GatewayResource.java | 10 ++-------- .../apigateway/websocket/WebSocketConfig.java | 3 ++- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index 8d82dada3..ce8ee65c9 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -270,13 +270,7 @@ private CompletableFuture handleServiceWithTopics( topicConnectionsRuntimeRegistryProvider .getTopicConnectionsRuntimeRegistry(), clusterRuntimeRegistry); - completableFuture.thenRunAsync( - () -> { - if (consumeGateway != null) { - consumeGateway.close(); - } - }, - consumeThreadPool); + completableFuture.thenRunAsync(consumeGateway::close, consumeThreadPool); final Gateway.ServiceOptions serviceOptions = authContext.gateway().getServiceOptions(); try { @@ -297,7 +291,7 @@ record -> { final AtomicBoolean stop = new AtomicBoolean(false); consumeGateway.startReadingAsync( consumeThreadPool, - () -> stop.get(), + stop::get, record -> { stop.set(true); completableFuture.complete(ResponseEntity.ok(record)); diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java index 29afd6aa7..4366b1efc 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java @@ -97,7 +97,8 @@ public ServletServerContainerFactoryBean createWebSocketContainer() { @PreDestroy public void onDestroy() { - consumeThreadPool.shutdown(); + log.info("Shutting down WebSocket"); + consumeThreadPool.shutdownNow(); clusterRuntimeRegistry.close(); } }