From 1d2bb6a14da8354876b51e10411b3482e9982a8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 3 Nov 2023 11:03:39 +0100 Subject: [PATCH 1/7] Fix service gateway with topics --- .../apigateway/http/GatewayResource.java | 25 +++++++- .../http/ResourceErrorsHandler.java | 5 ++ .../apigateway/http/GatewayResourceTest.java | 57 +++++++++++++++++-- 3 files changed, 80 insertions(+), 7 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index c7d6d3a0c..4d676a86c 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -74,6 +75,7 @@ public class GatewayResource { protected static final String GATEWAY_SERVICE_PATH = "/service/{tenant}/{application}/{gateway}/**"; protected static final ObjectMapper MAPPER = new ObjectMapper(); + protected static final String SERVICE_REQUEST_ID_HEADER = "langstream-service-request-id"; private final TopicConnectionsRuntimeProviderBean topicConnectionsRuntimeRegistryProvider; private final TopicProducerCache topicProducerCache; private final ApplicationStore applicationStore; @@ -233,6 +235,9 @@ public void validateOptions(Map options) {} private CompletableFuture handleServiceWithTopics( ProduceRequest produceRequest, AuthenticatedGatewayRequestContext authContext) { + + final String langstreamServiceRequestId = UUID.randomUUID().toString(); + final CompletableFuture completableFuture = new CompletableFuture<>(); try (final ConsumeGateway consumeGateway = new ConsumeGateway( @@ -251,7 +256,15 @@ private CompletableFuture handleServiceWithTopics( serviceOptions.getHeaders(), authContext.userParameters(), authContext.principalValues()); - consumeGateway.setup(serviceOptions.getInputTopic(), messageFilters, authContext); + messageFilters.add( + record -> { + final Header header = record.getHeader(SERVICE_REQUEST_ID_HEADER); + if (header == null) { + return false; + } + return langstreamServiceRequestId.equals(header.valueAsString()); + }); + consumeGateway.setup(serviceOptions.getOutputTopic(), messageFilters, authContext); final AtomicBoolean stop = new AtomicBoolean(false); consumeGateway.startReadingAsync( consumeThreadPool, @@ -266,8 +279,14 @@ record -> { } final List
commonHeaders = ProduceGateway.getProducerCommonHeaders(serviceOptions, authContext); - produceGateway.start(serviceOptions.getOutputTopic(), commonHeaders, authContext); - produceGateway.produceMessage(produceRequest); + produceGateway.start(serviceOptions.getInputTopic(), commonHeaders, authContext); + + Map passedHeaders = produceRequest.headers(); + if (passedHeaders == null) { + passedHeaders = new HashMap<>(); + } + passedHeaders.put(SERVICE_REQUEST_ID_HEADER, langstreamServiceRequestId); + produceGateway.produceMessage(new ProduceRequest(produceRequest.key(), produceRequest.value(), passedHeaders)); } catch (Throwable t) { completableFuture.completeExceptionally(t); } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java index 90e5f93f9..9be249e1d 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java @@ -22,6 +22,7 @@ import org.springframework.http.ProblemDetail; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.context.request.async.AsyncRequestTimeoutException; import org.springframework.web.server.ResponseStatusException; @ControllerAdvice @@ -38,6 +39,10 @@ ProblemDetail handleAll(Throwable exception) { log.error("Bad request", exception); return ProblemDetail.forStatusAndDetail(HttpStatus.BAD_REQUEST, exception.getMessage()); } + if (exception instanceof AsyncRequestTimeoutException) { + log.error("Request timed out", exception); + return ProblemDetail.forStatusAndDetail(HttpStatus.REQUEST_TIMEOUT, "Request timed out"); + } log.error("Internal error", exception); return ProblemDetail.forStatusAndDetail( HttpStatus.INTERNAL_SERVER_ERROR, exception.getMessage()); diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index 497dd4014..f6aed1af4 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -29,7 +29,10 @@ import ai.langstream.api.model.Gateways; import ai.langstream.api.model.StoredApplication; import ai.langstream.api.model.StreamingCluster; +import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; +import ai.langstream.api.runner.topics.TopicConsumer; +import ai.langstream.api.runner.topics.TopicProducer; import ai.langstream.api.runtime.ClusterRuntimeRegistry; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.api.storage.ApplicationStore; @@ -49,8 +52,11 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -95,6 +101,7 @@ abstract class GatewayResourceTest { protected static final ObjectMapper MAPPER = new ObjectMapper(); static List topics; + static List> futures = new ArrayList<>(); static Gateways testGateways; protected static ApplicationStore getMockedStore(String instanceYaml) { @@ -207,6 +214,10 @@ public static void afterAll() { @AfterEach public void afterEach() { Metrics.globalRegistry.clear(); + for (CompletableFuture future : futures) { + future.cancel(true); + } + futures.clear(); } @SneakyThrows @@ -497,8 +508,12 @@ void testTestCredentials() throws Exception { @Test void testService() throws Exception { - final String topic = genTopic(); - prepareTopicsForTest(topic); + final String inputTopic = genTopic(); + final String outputTopic = genTopic(); + prepareTopicsForTest(inputTopic, outputTopic); + + startTopicExchange(inputTopic, outputTopic); + testGateways = new Gateways( List.of( @@ -507,7 +522,7 @@ void testService() throws Exception { .type(Gateway.GatewayType.service) .serviceOptions( new Gateway.ServiceOptions( - null, topic, topic, List.of())) + null, inputTopic, outputTopic, List.of())) .build())); final String url = @@ -526,16 +541,50 @@ void testService() throws Exception { "{\"key\": \"my-key2\", \"value\": \"my-value\", \"headers\": {\"header1\":\"value1\"}}")); } + private void startTopicExchange(String fromTopic, String toTopic) throws Exception { + final CompletableFuture future = CompletableFuture.runAsync(() -> { + TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry = + topicConnectionsRuntimeProvider.getTopicConnectionsRuntimeRegistry(); + final StreamingCluster streamingCluster = getStreamingCluster(); + try (final TopicConsumer consumer = topicConnectionsRuntimeRegistry + .getTopicConnectionsRuntime(streamingCluster) + .asTopicConnectionsRuntime() + .createConsumer(null, streamingCluster, Map.of("topic", fromTopic));) { + consumer.start(); + + try (final TopicProducer producer = topicConnectionsRuntimeRegistry + .getTopicConnectionsRuntime(streamingCluster) + .asTopicConnectionsRuntime() + .createProducer(null, streamingCluster, Map.of("topic", toTopic));) { + + producer.start(); + while (true) { + + final List records = consumer.read(); + for (Record record : records) { + producer.write(record); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } + }); + futures.add(future); + } + private record MsgRecord(Object key, Object value, Map headers) {} @SneakyThrows private void assertMessageContent(MsgRecord expected, String actual) { ConsumePushMessage consume = MAPPER.readValue(actual, ConsumePushMessage.class); + final Map headers = consume.record().headers(); + assertNotNull(headers.remove("langstream-service-request-id")); final MsgRecord actualMsgRecord = new MsgRecord( consume.record().key(), consume.record().value(), - consume.record().headers()); + headers); assertEquals(expected, actualMsgRecord); } From 8d5356de4d30bb26cf30268ddca57e6f9cc01fcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 3 Nov 2023 11:05:26 +0100 Subject: [PATCH 2/7] fix --- .../apigateway/http/GatewayResource.java | 4 +- .../http/ResourceErrorsHandler.java | 3 +- .../apigateway/http/GatewayResourceTest.java | 73 ++++++++++--------- 3 files changed, 45 insertions(+), 35 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index 4d676a86c..7be508c26 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -286,7 +286,9 @@ record -> { passedHeaders = new HashMap<>(); } passedHeaders.put(SERVICE_REQUEST_ID_HEADER, langstreamServiceRequestId); - produceGateway.produceMessage(new ProduceRequest(produceRequest.key(), produceRequest.value(), passedHeaders)); + produceGateway.produceMessage( + new ProduceRequest( + produceRequest.key(), produceRequest.value(), passedHeaders)); } catch (Throwable t) { completableFuture.completeExceptionally(t); } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java index 9be249e1d..a83998b39 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java @@ -41,7 +41,8 @@ ProblemDetail handleAll(Throwable exception) { } if (exception instanceof AsyncRequestTimeoutException) { log.error("Request timed out", exception); - return ProblemDetail.forStatusAndDetail(HttpStatus.REQUEST_TIMEOUT, "Request timed out"); + return ProblemDetail.forStatusAndDetail( + HttpStatus.REQUEST_TIMEOUT, "Request timed out"); } log.error("Internal error", exception); return ProblemDetail.forStatusAndDetail( diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index f6aed1af4..c993d43f6 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -56,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -542,34 +541,45 @@ void testService() throws Exception { } private void startTopicExchange(String fromTopic, String toTopic) throws Exception { - final CompletableFuture future = CompletableFuture.runAsync(() -> { - TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry = - topicConnectionsRuntimeProvider.getTopicConnectionsRuntimeRegistry(); - final StreamingCluster streamingCluster = getStreamingCluster(); - try (final TopicConsumer consumer = topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime(streamingCluster) - .asTopicConnectionsRuntime() - .createConsumer(null, streamingCluster, Map.of("topic", fromTopic));) { - consumer.start(); - - try (final TopicProducer producer = topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime(streamingCluster) - .asTopicConnectionsRuntime() - .createProducer(null, streamingCluster, Map.of("topic", toTopic));) { - - producer.start(); - while (true) { - - final List records = consumer.read(); - for (Record record : records) { - producer.write(record); - } - } - } - } catch (Exception e) { - e.printStackTrace(); - } - }); + final CompletableFuture future = + CompletableFuture.runAsync( + () -> { + TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry = + topicConnectionsRuntimeProvider + .getTopicConnectionsRuntimeRegistry(); + final StreamingCluster streamingCluster = getStreamingCluster(); + try (final TopicConsumer consumer = + topicConnectionsRuntimeRegistry + .getTopicConnectionsRuntime(streamingCluster) + .asTopicConnectionsRuntime() + .createConsumer( + null, + streamingCluster, + Map.of("topic", fromTopic)); ) { + consumer.start(); + + try (final TopicProducer producer = + topicConnectionsRuntimeRegistry + .getTopicConnectionsRuntime(streamingCluster) + .asTopicConnectionsRuntime() + .createProducer( + null, + streamingCluster, + Map.of("topic", toTopic)); ) { + + producer.start(); + while (true) { + + final List records = consumer.read(); + for (Record record : records) { + producer.write(record); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } + }); futures.add(future); } @@ -581,10 +591,7 @@ private void assertMessageContent(MsgRecord expected, String actual) { final Map headers = consume.record().headers(); assertNotNull(headers.remove("langstream-service-request-id")); final MsgRecord actualMsgRecord = - new MsgRecord( - consume.record().key(), - consume.record().value(), - headers); + new MsgRecord(consume.record().key(), consume.record().value(), headers); assertEquals(expected, actualMsgRecord); } From d41bf1263591a44f7d2468e3fdd5297420373175 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 3 Nov 2023 11:40:25 +0100 Subject: [PATCH 3/7] support plain text --- .../apigateway/gateways/ProduceGateway.java | 9 +- .../apigateway/http/GatewayResource.java | 26 +++++- .../apigateway/http/GatewayResourceTest.java | 92 +++++++++++++------ 3 files changed, 93 insertions(+), 34 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java index 73f776b3a..156b16683 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java @@ -144,13 +144,18 @@ protected TopicProducer setupProducer(String topic, StreamingCluster streamingCl } public void produceMessage(String payload) throws ProduceException { + final ProduceRequest produceRequest = parseProduceRequest(payload); + produceMessage(produceRequest); + } + + public static ProduceRequest parseProduceRequest(String payload) throws ProduceException { final ProduceRequest produceRequest; try { produceRequest = mapper.readValue(payload, ProduceRequest.class); } catch (JsonProcessingException err) { - throw new ProduceException(err.getMessage(), ProduceResponse.Status.BAD_REQUEST); + throw new ProduceException("Error while parsing JSON payload: " + err.getMessage(), ProduceResponse.Status.BAD_REQUEST); } - produceMessage(produceRequest); + return produceRequest; } public void produceMessage(ProduceRequest produceRequest) throws ProduceException { diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index 7be508c26..5ba23f236 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -36,6 +36,7 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -91,13 +92,13 @@ public class GatewayResource { @PostMapping( value = "/produce/{tenant}/{application}/{gateway}", - consumes = MediaType.APPLICATION_JSON_VALUE) + consumes = "*/*") ProduceResponse produce( WebRequest request, @NotBlank @PathVariable("tenant") String tenant, @NotBlank @PathVariable("application") String application, @NotBlank @PathVariable("gateway") String gateway, - @RequestBody ProduceRequest produceRequest) + @RequestBody String payload) throws ProduceGateway.ProduceException { final Map queryString = computeQueryString(request); @@ -127,11 +128,26 @@ ProduceResponse produce( ProduceGateway.getProducerCommonHeaders( context.gateway().getProduceOptions(), authContext); produceGateway.start(context.gateway().getTopic(), commonHeaders, authContext); + final ProduceRequest produceRequest = parseProduceRequest(request, payload); produceGateway.produceMessage(produceRequest); return ProduceResponse.OK; } } + private ProduceRequest parseProduceRequest(WebRequest request, String payload) + throws ProduceGateway.ProduceException { + final String contentType = request.getHeader("Content-Type"); + if (contentType == null || contentType.equals(MediaType.TEXT_PLAIN_VALUE)) { + return new ProduceRequest(null, payload, null); + } else if (contentType.equals(MediaType.APPLICATION_JSON_VALUE)) { + return ProduceGateway.parseProduceRequest(payload); + } else { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, + String.format("Unsupported content type: %s", contentType)); + } + } + private Map computeHeaders(WebRequest request) { final Map headers = new HashMap<>(); request.getHeaderNames() @@ -189,7 +205,7 @@ private CompletableFuture handleServiceCall( String tenant, String application, String gateway) - throws IOException { + throws IOException, ProduceGateway.ProduceException { final Map queryString = computeQueryString(request); final Map headers = computeHeaders(request); final GatewayRequestContext context = @@ -227,8 +243,8 @@ public void validateOptions(Map options) {} throw new ResponseStatusException( HttpStatus.BAD_REQUEST, "Only POST method is supported"); } - final ProduceRequest produceRequest = - MAPPER.readValue(servletRequest.getInputStream(), ProduceRequest.class); + final String payload = new String(servletRequest.getInputStream().readAllBytes(), StandardCharsets.UTF_8); + final ProduceRequest produceRequest = parseProduceRequest(request, payload); return handleServiceWithTopics(produceRequest, authContext); } } diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index c993d43f6..ef4a4642a 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -220,7 +220,7 @@ public void afterEach() { } @SneakyThrows - void produceAndExpectOk(String url, String content) { + void produceJsonAndExpectOk(String url, String content) { final HttpRequest request = HttpRequest.newBuilder(URI.create(url)) .header("Content-Type", "application/json") @@ -233,8 +233,21 @@ void produceAndExpectOk(String url, String content) { {"status":"OK","reason":null}""", response.body()); } + + @SneakyThrows + String produceTextAndGetBody(String url, String content) { + final HttpRequest request = + HttpRequest.newBuilder(URI.create(url)) + .POST(HttpRequest.BodyPublishers.ofString(content)) + .build(); + final HttpResponse response = + CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, response.statusCode()); + return response.body(); + } + @SneakyThrows - String produceAndGetBody(String url, String content) { + String produceJsonAndGetBody(String url, String content) { final HttpRequest request = HttpRequest.newBuilder(URI.create(url)) .header("Content-Type", "application/json") @@ -247,7 +260,7 @@ String produceAndGetBody(String url, String content) { } @SneakyThrows - void produceAndExpectBadRequest(String url, String content, String errorMessage) { + void produceJsonAndExpectBadRequest(String url, String content, String errorMessage) { final HttpRequest request = HttpRequest.newBuilder(URI.create(url)) .header("Content-Type", "application/json") @@ -263,7 +276,7 @@ void produceAndExpectBadRequest(String url, String content, String errorMessage) } @SneakyThrows - void produceAndExpectUnauthorized(String url, String content) { + void produceJsonAndExpectUnauthorized(String url, String content) { final HttpRequest request = HttpRequest.newBuilder(URI.create(url)) .header("Content-Type", "application/json") @@ -297,9 +310,30 @@ void testSimpleProduce() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); - produceAndExpectOk(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}"); - produceAndExpectOk(url, "{\"key\": \"my-key\"}"); - produceAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); + produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}"); + produceJsonAndExpectOk(url, "{\"key\": \"my-key\"}"); + produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); + HttpRequest request = + HttpRequest.newBuilder(URI.create(url)) + .POST(HttpRequest.BodyPublishers.ofString("my-string")) + .build(); + HttpResponse response = + CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, response.statusCode()); + assertEquals(""" + {"status":"OK","reason":null}""", response.body()); + + request = + HttpRequest.newBuilder(URI.create(url)) + .header("Content-Type", "plain/text") + .POST(HttpRequest.BodyPublishers.ofString("my-string")) + .build(); + response = + CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, response.statusCode()); + assertEquals(""" + {"status":"OK","reason":null}""", response.body()); + } @Test @@ -334,10 +368,10 @@ void testSimpleProduceCacheProducer() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); - produceAndExpectOk(url + "1", "{\"key\": \"my-key\", \"value\": \"my-value\"}"); - produceAndExpectOk(url + "2", "{\"key\": \"my-key\"}"); - produceAndExpectOk(url + "2", "{\"key\": \"my-key\"}"); - produceAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); + produceJsonAndExpectOk(url + "1", "{\"key\": \"my-key\", \"value\": \"my-value\"}"); + produceJsonAndExpectOk(url + "2", "{\"key\": \"my-key\"}"); + produceJsonAndExpectOk(url + "2", "{\"key\": \"my-key\"}"); + produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); final String metrics = mockMvc.perform(get("/management/prometheus")) @@ -389,17 +423,17 @@ void testParametersRequired() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/gw".formatted(port); final String content = "{\"value\": \"my-value\"}"; - produceAndExpectBadRequest(baseUrl, content, "missing required parameter session-id"); - produceAndExpectBadRequest( + produceJsonAndExpectBadRequest(baseUrl, content, "missing required parameter session-id"); + produceJsonAndExpectBadRequest( baseUrl + "?param:otherparam=1", content, "missing required parameter session-id"); - produceAndExpectBadRequest( + produceJsonAndExpectBadRequest( baseUrl + "?param:session-id=", content, "missing required parameter session-id"); - produceAndExpectBadRequest( + produceJsonAndExpectBadRequest( baseUrl + "?param:session-id=ok¶m:another-non-declared=y", content, "unknown parameters: [another-non-declared]"); - produceAndExpectOk(baseUrl + "?param:session-id=1", content); - produceAndExpectOk(baseUrl + "?param:session-id=string-value", content); + produceJsonAndExpectOk(baseUrl + "?param:session-id=1", content); + produceJsonAndExpectOk(baseUrl + "?param:session-id=string-value", content); } @Test @@ -446,10 +480,10 @@ void testAuthentication() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); - produceAndExpectUnauthorized(baseUrl, "{\"value\": \"my-value\"}"); - produceAndExpectUnauthorized(baseUrl + "?credentials=", "{\"value\": \"my-value\"}"); - produceAndExpectUnauthorized(baseUrl + "?credentials=error", "{\"value\": \"my-value\"}"); - produceAndExpectOk( + produceJsonAndExpectUnauthorized(baseUrl, "{\"value\": \"my-value\"}"); + produceJsonAndExpectUnauthorized(baseUrl + "?credentials=", "{\"value\": \"my-value\"}"); + produceJsonAndExpectUnauthorized(baseUrl + "?credentials=error", "{\"value\": \"my-value\"}"); + produceJsonAndExpectOk( baseUrl + "?credentials=test-user-password", "{\"value\": \"my-value\"}"); } @@ -494,11 +528,11 @@ void testTestCredentials() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); - produceAndExpectUnauthorized( + produceJsonAndExpectUnauthorized( baseUrl + "?test-credentials=test", "{\"value\": \"my-value\"}"); - produceAndExpectOk( + produceJsonAndExpectOk( baseUrl + "?test-credentials=test-user-password", "{\"value\": \"my-value\"}"); - produceAndExpectUnauthorized( + produceJsonAndExpectUnauthorized( ("http://localhost:%d/api/gateways/produce/tenant1/application1/produce-no-test?test-credentials=test" + "-user-password") .formatted(port), @@ -529,13 +563,17 @@ void testService() throws Exception { assertMessageContent( new MsgRecord("my-key", "my-value", Map.of()), - produceAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); + produceJsonAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); assertMessageContent( new MsgRecord("my-key2", "my-value", Map.of()), - produceAndGetBody(url, "{\"key\": \"my-key2\", \"value\": \"my-value\"}")); + produceJsonAndGetBody(url, "{\"key\": \"my-key2\", \"value\": \"my-value\"}")); + + assertMessageContent( + new MsgRecord(null, "my-text", Map.of()), + produceTextAndGetBody(url, "my-text")); assertMessageContent( new MsgRecord("my-key2", "my-value", Map.of("header1", "value1")), - produceAndGetBody( + produceJsonAndGetBody( url, "{\"key\": \"my-key2\", \"value\": \"my-value\", \"headers\": {\"header1\":\"value1\"}}")); } From 84f57b249403029b3cfdee8a0c2b4e2a00276c42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 3 Nov 2023 11:40:34 +0100 Subject: [PATCH 4/7] fix --- .../apigateway/gateways/ProduceGateway.java | 4 +++- .../apigateway/http/GatewayResource.java | 8 ++++---- .../apigateway/http/GatewayResourceTest.java | 14 +++++--------- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java index 156b16683..765e3f7ae 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java @@ -153,7 +153,9 @@ public static ProduceRequest parseProduceRequest(String payload) throws ProduceE try { produceRequest = mapper.readValue(payload, ProduceRequest.class); } catch (JsonProcessingException err) { - throw new ProduceException("Error while parsing JSON payload: " + err.getMessage(), ProduceResponse.Status.BAD_REQUEST); + throw new ProduceException( + "Error while parsing JSON payload: " + err.getMessage(), + ProduceResponse.Status.BAD_REQUEST); } return produceRequest; } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index 5ba23f236..794fd5734 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -90,9 +90,7 @@ public class GatewayResource { Executors.newCachedThreadPool( new BasicThreadFactory.Builder().namingPattern("http-consume-%d").build()); - @PostMapping( - value = "/produce/{tenant}/{application}/{gateway}", - consumes = "*/*") + @PostMapping(value = "/produce/{tenant}/{application}/{gateway}", consumes = "*/*") ProduceResponse produce( WebRequest request, @NotBlank @PathVariable("tenant") String tenant, @@ -243,7 +241,9 @@ public void validateOptions(Map options) {} throw new ResponseStatusException( HttpStatus.BAD_REQUEST, "Only POST method is supported"); } - final String payload = new String(servletRequest.getInputStream().readAllBytes(), StandardCharsets.UTF_8); + final String payload = + new String( + servletRequest.getInputStream().readAllBytes(), StandardCharsets.UTF_8); final ProduceRequest produceRequest = parseProduceRequest(request, payload); return handleServiceWithTopics(produceRequest, authContext); } diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index ef4a4642a..7ffe4d185 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -233,7 +233,6 @@ void produceJsonAndExpectOk(String url, String content) { {"status":"OK","reason":null}""", response.body()); } - @SneakyThrows String produceTextAndGetBody(String url, String content) { final HttpRequest request = @@ -317,8 +316,7 @@ void testSimpleProduce() throws Exception { HttpRequest.newBuilder(URI.create(url)) .POST(HttpRequest.BodyPublishers.ofString("my-string")) .build(); - HttpResponse response = - CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + HttpResponse response = CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); assertEquals(200, response.statusCode()); assertEquals(""" {"status":"OK","reason":null}""", response.body()); @@ -328,12 +326,10 @@ void testSimpleProduce() throws Exception { .header("Content-Type", "plain/text") .POST(HttpRequest.BodyPublishers.ofString("my-string")) .build(); - response = - CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + response = CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); assertEquals(200, response.statusCode()); assertEquals(""" {"status":"OK","reason":null}""", response.body()); - } @Test @@ -482,7 +478,8 @@ void testAuthentication() throws Exception { produceJsonAndExpectUnauthorized(baseUrl, "{\"value\": \"my-value\"}"); produceJsonAndExpectUnauthorized(baseUrl + "?credentials=", "{\"value\": \"my-value\"}"); - produceJsonAndExpectUnauthorized(baseUrl + "?credentials=error", "{\"value\": \"my-value\"}"); + produceJsonAndExpectUnauthorized( + baseUrl + "?credentials=error", "{\"value\": \"my-value\"}"); produceJsonAndExpectOk( baseUrl + "?credentials=test-user-password", "{\"value\": \"my-value\"}"); } @@ -569,8 +566,7 @@ void testService() throws Exception { produceJsonAndGetBody(url, "{\"key\": \"my-key2\", \"value\": \"my-value\"}")); assertMessageContent( - new MsgRecord(null, "my-text", Map.of()), - produceTextAndGetBody(url, "my-text")); + new MsgRecord(null, "my-text", Map.of()), produceTextAndGetBody(url, "my-text")); assertMessageContent( new MsgRecord("my-key2", "my-value", Map.of("header1", "value1")), produceJsonAndGetBody( From 5f7e182bf31dca9ca5ee550b0fd58be13ccd2687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 3 Nov 2023 11:52:31 +0100 Subject: [PATCH 5/7] fix tests --- .../ai/langstream/apigateway/http/GatewayResourceTest.java | 3 ++- .../websocket/handlers/ProduceConsumeHandlerTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index 7ffe4d185..fb03356c0 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -323,10 +323,11 @@ void testSimpleProduce() throws Exception { request = HttpRequest.newBuilder(URI.create(url)) - .header("Content-Type", "plain/text") + .header("Content-Type", "text/plain") .POST(HttpRequest.BodyPublishers.ofString("my-string")) .build(); response = CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + log.info("Response body: {}", response.body()); assertEquals(200, response.statusCode()); assertEquals(""" {"status":"OK","reason":null}""", response.body()); diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java index be56f9be8..bcccad9da 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java @@ -858,7 +858,7 @@ void testProduce() throws Exception { assertEquals(ProduceResponse.Status.BAD_REQUEST, response.status()); assertEquals( - "Unrecognized token 'invalid': was expecting (JSON String, Number, Array, Object or token " + "Error while parsing JSON payload: Unrecognized token 'invalid': was expecting (JSON String, Number, Array, Object or token " + "'null', 'true' or 'false')\n" + " at [Source: (String)\"invalid-json\"; line: 1, column: 8]", response.reason()); From 6cfc1b8b859385f89addf2ac41fd9f2d6f63428b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 3 Nov 2023 12:48:55 +0100 Subject: [PATCH 6/7] Fix pulsar --- .../apigateway/gateways/ConsumeGateway.java | 1 + .../apigateway/http/GatewayResource.java | 16 ++++++++--- .../apigateway/http/GatewayResourceTest.java | 27 ++++++++++++------- ...PulsarTopicConnectionsRuntimeProvider.java | 12 +++++++-- 4 files changed, 40 insertions(+), 16 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java index 015fd2957..e2d595d75 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java @@ -145,6 +145,7 @@ public void startReadingAsync( log.debug("[{}] Started reader", logRef); readMessages(stop, onMessage); } catch (Throwable ex) { + log.error("[{}] Error reading messages", logRef, ex); throw new RuntimeException(ex); } finally { closeReader(); diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index 794fd5734..f7d9960b0 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -255,16 +255,23 @@ private CompletableFuture handleServiceWithTopics( final String langstreamServiceRequestId = UUID.randomUUID().toString(); final CompletableFuture completableFuture = new CompletableFuture<>(); - try (final ConsumeGateway consumeGateway = - new ConsumeGateway( - topicConnectionsRuntimeRegistryProvider - .getTopicConnectionsRuntimeRegistry()); + try ( final ProduceGateway produceGateway = new ProduceGateway( topicConnectionsRuntimeRegistryProvider .getTopicConnectionsRuntimeRegistry(), topicProducerCache); ) { + final ConsumeGateway consumeGateway = + new ConsumeGateway( + topicConnectionsRuntimeRegistryProvider + .getTopicConnectionsRuntimeRegistry()); + completableFuture.thenRunAsync(() -> { + if (consumeGateway != null) { + consumeGateway.close(); + } + }, consumeThreadPool); + final Gateway.ServiceOptions serviceOptions = authContext.gateway().getServiceOptions(); try { final List> messageFilters = @@ -306,6 +313,7 @@ record -> { new ProduceRequest( produceRequest.key(), produceRequest.value(), passedHeaders)); } catch (Throwable t) { + log.error("Error on service gateway", t); completableFuture.completeExceptionally(t); } return completableFuture; diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index fb03356c0..eecda5664 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -30,6 +30,7 @@ import ai.langstream.api.model.StoredApplication; import ai.langstream.api.model.StreamingCluster; import ai.langstream.api.runner.code.Record; +import ai.langstream.api.runner.topics.TopicConnectionsRuntime; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.runner.topics.TopicConsumer; import ai.langstream.api.runner.topics.TopicProducer; @@ -583,20 +584,20 @@ private void startTopicExchange(String fromTopic, String toTopic) throws Excepti topicConnectionsRuntimeProvider .getTopicConnectionsRuntimeRegistry(); final StreamingCluster streamingCluster = getStreamingCluster(); + final TopicConnectionsRuntime runtime = topicConnectionsRuntimeRegistry + .getTopicConnectionsRuntime(streamingCluster) + .asTopicConnectionsRuntime(); + runtime.init(streamingCluster); try (final TopicConsumer consumer = - topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime(streamingCluster) - .asTopicConnectionsRuntime() + runtime .createConsumer( null, streamingCluster, - Map.of("topic", fromTopic)); ) { + Map.of("topic", fromTopic, "subscriptionName", "s")); ) { consumer.start(); try (final TopicProducer producer = - topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime(streamingCluster) - .asTopicConnectionsRuntime() + runtime .createProducer( null, streamingCluster, @@ -604,15 +605,21 @@ private void startTopicExchange(String fromTopic, String toTopic) throws Excepti producer.start(); while (true) { - final List records = consumer.read(); + if (records.isEmpty()) { + continue; + } + log.info("read {} records from {}: {}", records.size(), fromTopic, records); for (Record record : records) { - producer.write(record); + producer.write(record).get(); } + consumer.commit(records); + log.info("written {} records to {}: {}", records.size(), toTopic, records); } } - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); + throw new RuntimeException(e); } }); futures.add(future); diff --git a/langstream-pulsar-runtime/src/main/java/ai/langstream/pravega/runner/PulsarTopicConnectionsRuntimeProvider.java b/langstream-pulsar-runtime/src/main/java/ai/langstream/pravega/runner/PulsarTopicConnectionsRuntimeProvider.java index 6540223f9..5de086f78 100644 --- a/langstream-pulsar-runtime/src/main/java/ai/langstream/pravega/runner/PulsarTopicConnectionsRuntimeProvider.java +++ b/langstream-pulsar-runtime/src/main/java/ai/langstream/pravega/runner/PulsarTopicConnectionsRuntimeProvider.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import lombok.SneakyThrows; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -359,6 +360,7 @@ public void delete(ExecutionPlan applicationInstance) { } } + @ToString private static class PulsarConsumerRecord implements Record { private final Object finalKey; private final Object finalValue; @@ -584,6 +586,7 @@ private class PulsarTopicProducer implements TopicProducer { private final Map configuration; private final AtomicLong totalIn = new AtomicLong(); + String topic; Producer producer; Schema schema; @@ -614,6 +617,7 @@ public PulsarTopicProducer(Map configuration) { @Override @SneakyThrows public void start() { + topic = (String) configuration.remove("topic"); if (configuration.containsKey("valueSchema")) { SchemaDefinition valueSchemaDefinition = mapper.convertValue( @@ -633,9 +637,10 @@ public void start() { } else { schema = (Schema) valueSchema; } + producer = client.newProducer(schema) - .topic((String) configuration.remove("topic")) + .topic(topic) .loadConf(configuration) .create(); } @@ -664,6 +669,9 @@ private Schema getSchema(Class klass) { @Override public CompletableFuture write(Record r) { + if (topic == null) { + throw new RuntimeException("PulsarTopicProducer not started"); + } totalIn.addAndGet(1); if (schema == null) { try { @@ -686,7 +694,7 @@ public CompletableFuture write(Record r) { } producer = client.newProducer(schema) - .topic((String) configuration.remove("topic")) + .topic(topic) .loadConf(configuration) .create(); } catch (Exception e) { From 4bb545a1f6581d33fcb08e553e635bfe37acdeea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 3 Nov 2023 12:49:15 +0100 Subject: [PATCH 7/7] fix --- .../apigateway/http/GatewayResource.java | 23 ++++++----- .../apigateway/http/GatewayResourceTest.java | 41 ++++++++++++------- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index f7d9960b0..b824f5178 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -255,22 +255,23 @@ private CompletableFuture handleServiceWithTopics( final String langstreamServiceRequestId = UUID.randomUUID().toString(); final CompletableFuture completableFuture = new CompletableFuture<>(); - try ( - final ProduceGateway produceGateway = - new ProduceGateway( - topicConnectionsRuntimeRegistryProvider - .getTopicConnectionsRuntimeRegistry(), - topicProducerCache); ) { + try (final ProduceGateway produceGateway = + new ProduceGateway( + topicConnectionsRuntimeRegistryProvider + .getTopicConnectionsRuntimeRegistry(), + topicProducerCache); ) { final ConsumeGateway consumeGateway = new ConsumeGateway( topicConnectionsRuntimeRegistryProvider .getTopicConnectionsRuntimeRegistry()); - completableFuture.thenRunAsync(() -> { - if (consumeGateway != null) { - consumeGateway.close(); - } - }, consumeThreadPool); + completableFuture.thenRunAsync( + () -> { + if (consumeGateway != null) { + consumeGateway.close(); + } + }, + consumeThreadPool); final Gateway.ServiceOptions serviceOptions = authContext.gateway().getServiceOptions(); try { diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index eecda5664..0408ff726 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -584,24 +584,27 @@ private void startTopicExchange(String fromTopic, String toTopic) throws Excepti topicConnectionsRuntimeProvider .getTopicConnectionsRuntimeRegistry(); final StreamingCluster streamingCluster = getStreamingCluster(); - final TopicConnectionsRuntime runtime = topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime(streamingCluster) - .asTopicConnectionsRuntime(); + final TopicConnectionsRuntime runtime = + topicConnectionsRuntimeRegistry + .getTopicConnectionsRuntime(streamingCluster) + .asTopicConnectionsRuntime(); runtime.init(streamingCluster); try (final TopicConsumer consumer = - runtime - .createConsumer( - null, - streamingCluster, - Map.of("topic", fromTopic, "subscriptionName", "s")); ) { + runtime.createConsumer( + null, + streamingCluster, + Map.of( + "topic", + fromTopic, + "subscriptionName", + "s")); ) { consumer.start(); try (final TopicProducer producer = - runtime - .createProducer( - null, - streamingCluster, - Map.of("topic", toTopic)); ) { + runtime.createProducer( + null, + streamingCluster, + Map.of("topic", toTopic)); ) { producer.start(); while (true) { @@ -609,12 +612,20 @@ private void startTopicExchange(String fromTopic, String toTopic) throws Excepti if (records.isEmpty()) { continue; } - log.info("read {} records from {}: {}", records.size(), fromTopic, records); + log.info( + "read {} records from {}: {}", + records.size(), + fromTopic, + records); for (Record record : records) { producer.write(record).get(); } consumer.commit(records); - log.info("written {} records to {}: {}", records.size(), toTopic, records); + log.info( + "written {} records to {}: {}", + records.size(), + toTopic, + records); } } } catch (Throwable e) {