diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index 8d82dada3..ce8ee65c9 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -270,13 +270,7 @@ private CompletableFuture handleServiceWithTopics( topicConnectionsRuntimeRegistryProvider .getTopicConnectionsRuntimeRegistry(), clusterRuntimeRegistry); - completableFuture.thenRunAsync( - () -> { - if (consumeGateway != null) { - consumeGateway.close(); - } - }, - consumeThreadPool); + completableFuture.thenRunAsync(consumeGateway::close, consumeThreadPool); final Gateway.ServiceOptions serviceOptions = authContext.gateway().getServiceOptions(); try { @@ -297,7 +291,7 @@ record -> { final AtomicBoolean stop = new AtomicBoolean(false); consumeGateway.startReadingAsync( consumeThreadPool, - () -> stop.get(), + stop::get, record -> { stop.set(true); completableFuture.complete(ResponseEntity.ok(record)); diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java index 29afd6aa7..4366b1efc 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java @@ -97,7 +97,8 @@ public ServletServerContainerFactoryBean createWebSocketContainer() { @PreDestroy public void onDestroy() { - consumeThreadPool.shutdown(); + log.info("Shutting down WebSocket"); + consumeThreadPool.shutdownNow(); clusterRuntimeRegistry.close(); } }