diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java index 015fd2957..e2d595d75 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java @@ -145,6 +145,7 @@ public void startReadingAsync( log.debug("[{}] Started reader", logRef); readMessages(stop, onMessage); } catch (Throwable ex) { + log.error("[{}] Error reading messages", logRef, ex); throw new RuntimeException(ex); } finally { closeReader(); diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java index 73f776b3a..765e3f7ae 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java @@ -144,13 +144,20 @@ protected TopicProducer setupProducer(String topic, StreamingCluster streamingCl } public void produceMessage(String payload) throws ProduceException { + final ProduceRequest produceRequest = parseProduceRequest(payload); + produceMessage(produceRequest); + } + + public static ProduceRequest parseProduceRequest(String payload) throws ProduceException { final ProduceRequest produceRequest; try { produceRequest = mapper.readValue(payload, ProduceRequest.class); } catch (JsonProcessingException err) { - throw new ProduceException(err.getMessage(), ProduceResponse.Status.BAD_REQUEST); + throw new ProduceException( + "Error while parsing JSON payload: " + err.getMessage(), + ProduceResponse.Status.BAD_REQUEST); } - produceMessage(produceRequest); + return produceRequest; } public void produceMessage(ProduceRequest produceRequest) throws ProduceException { diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index c7d6d3a0c..b824f5178 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -36,10 +36,12 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -74,6 +76,7 @@ public class GatewayResource { protected static final String GATEWAY_SERVICE_PATH = "/service/{tenant}/{application}/{gateway}/**"; protected static final ObjectMapper MAPPER = new ObjectMapper(); + protected static final String SERVICE_REQUEST_ID_HEADER = "langstream-service-request-id"; private final TopicConnectionsRuntimeProviderBean topicConnectionsRuntimeRegistryProvider; private final TopicProducerCache topicProducerCache; private final ApplicationStore applicationStore; @@ -87,15 +90,13 @@ public class GatewayResource { Executors.newCachedThreadPool( new BasicThreadFactory.Builder().namingPattern("http-consume-%d").build()); - @PostMapping( - value = "/produce/{tenant}/{application}/{gateway}", - consumes = MediaType.APPLICATION_JSON_VALUE) + @PostMapping(value = "/produce/{tenant}/{application}/{gateway}", consumes = "*/*") ProduceResponse produce( WebRequest request, @NotBlank @PathVariable("tenant") String tenant, @NotBlank @PathVariable("application") String application, @NotBlank @PathVariable("gateway") String gateway, - @RequestBody ProduceRequest produceRequest) + @RequestBody String payload) throws ProduceGateway.ProduceException { final Map queryString = computeQueryString(request); @@ -125,11 +126,26 @@ ProduceResponse produce( ProduceGateway.getProducerCommonHeaders( context.gateway().getProduceOptions(), authContext); produceGateway.start(context.gateway().getTopic(), commonHeaders, authContext); + final ProduceRequest produceRequest = parseProduceRequest(request, payload); produceGateway.produceMessage(produceRequest); return ProduceResponse.OK; } } + private ProduceRequest parseProduceRequest(WebRequest request, String payload) + throws ProduceGateway.ProduceException { + final String contentType = request.getHeader("Content-Type"); + if (contentType == null || contentType.equals(MediaType.TEXT_PLAIN_VALUE)) { + return new ProduceRequest(null, payload, null); + } else if (contentType.equals(MediaType.APPLICATION_JSON_VALUE)) { + return ProduceGateway.parseProduceRequest(payload); + } else { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, + String.format("Unsupported content type: %s", contentType)); + } + } + private Map computeHeaders(WebRequest request) { final Map headers = new HashMap<>(); request.getHeaderNames() @@ -187,7 +203,7 @@ private CompletableFuture handleServiceCall( String tenant, String application, String gateway) - throws IOException { + throws IOException, ProduceGateway.ProduceException { final Map queryString = computeQueryString(request); final Map headers = computeHeaders(request); final GatewayRequestContext context = @@ -225,24 +241,37 @@ public void validateOptions(Map options) {} throw new ResponseStatusException( HttpStatus.BAD_REQUEST, "Only POST method is supported"); } - final ProduceRequest produceRequest = - MAPPER.readValue(servletRequest.getInputStream(), ProduceRequest.class); + final String payload = + new String( + servletRequest.getInputStream().readAllBytes(), StandardCharsets.UTF_8); + final ProduceRequest produceRequest = parseProduceRequest(request, payload); return handleServiceWithTopics(produceRequest, authContext); } } private CompletableFuture handleServiceWithTopics( ProduceRequest produceRequest, AuthenticatedGatewayRequestContext authContext) { + + final String langstreamServiceRequestId = UUID.randomUUID().toString(); + final CompletableFuture completableFuture = new CompletableFuture<>(); - try (final ConsumeGateway consumeGateway = - new ConsumeGateway( - topicConnectionsRuntimeRegistryProvider - .getTopicConnectionsRuntimeRegistry()); - final ProduceGateway produceGateway = - new ProduceGateway( - topicConnectionsRuntimeRegistryProvider - .getTopicConnectionsRuntimeRegistry(), - topicProducerCache); ) { + try (final ProduceGateway produceGateway = + new ProduceGateway( + topicConnectionsRuntimeRegistryProvider + .getTopicConnectionsRuntimeRegistry(), + topicProducerCache); ) { + + final ConsumeGateway consumeGateway = + new ConsumeGateway( + topicConnectionsRuntimeRegistryProvider + .getTopicConnectionsRuntimeRegistry()); + completableFuture.thenRunAsync( + () -> { + if (consumeGateway != null) { + consumeGateway.close(); + } + }, + consumeThreadPool); final Gateway.ServiceOptions serviceOptions = authContext.gateway().getServiceOptions(); try { @@ -251,7 +280,15 @@ private CompletableFuture handleServiceWithTopics( serviceOptions.getHeaders(), authContext.userParameters(), authContext.principalValues()); - consumeGateway.setup(serviceOptions.getInputTopic(), messageFilters, authContext); + messageFilters.add( + record -> { + final Header header = record.getHeader(SERVICE_REQUEST_ID_HEADER); + if (header == null) { + return false; + } + return langstreamServiceRequestId.equals(header.valueAsString()); + }); + consumeGateway.setup(serviceOptions.getOutputTopic(), messageFilters, authContext); final AtomicBoolean stop = new AtomicBoolean(false); consumeGateway.startReadingAsync( consumeThreadPool, @@ -266,9 +303,18 @@ record -> { } final List
commonHeaders = ProduceGateway.getProducerCommonHeaders(serviceOptions, authContext); - produceGateway.start(serviceOptions.getOutputTopic(), commonHeaders, authContext); - produceGateway.produceMessage(produceRequest); + produceGateway.start(serviceOptions.getInputTopic(), commonHeaders, authContext); + + Map passedHeaders = produceRequest.headers(); + if (passedHeaders == null) { + passedHeaders = new HashMap<>(); + } + passedHeaders.put(SERVICE_REQUEST_ID_HEADER, langstreamServiceRequestId); + produceGateway.produceMessage( + new ProduceRequest( + produceRequest.key(), produceRequest.value(), passedHeaders)); } catch (Throwable t) { + log.error("Error on service gateway", t); completableFuture.completeExceptionally(t); } return completableFuture; diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java index 90e5f93f9..a83998b39 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/ResourceErrorsHandler.java @@ -22,6 +22,7 @@ import org.springframework.http.ProblemDetail; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.context.request.async.AsyncRequestTimeoutException; import org.springframework.web.server.ResponseStatusException; @ControllerAdvice @@ -38,6 +39,11 @@ ProblemDetail handleAll(Throwable exception) { log.error("Bad request", exception); return ProblemDetail.forStatusAndDetail(HttpStatus.BAD_REQUEST, exception.getMessage()); } + if (exception instanceof AsyncRequestTimeoutException) { + log.error("Request timed out", exception); + return ProblemDetail.forStatusAndDetail( + HttpStatus.REQUEST_TIMEOUT, "Request timed out"); + } log.error("Internal error", exception); return ProblemDetail.forStatusAndDetail( HttpStatus.INTERNAL_SERVER_ERROR, exception.getMessage()); diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index 497dd4014..0408ff726 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -29,7 +29,11 @@ import ai.langstream.api.model.Gateways; import ai.langstream.api.model.StoredApplication; import ai.langstream.api.model.StreamingCluster; +import ai.langstream.api.runner.code.Record; +import ai.langstream.api.runner.topics.TopicConnectionsRuntime; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; +import ai.langstream.api.runner.topics.TopicConsumer; +import ai.langstream.api.runner.topics.TopicProducer; import ai.langstream.api.runtime.ClusterRuntimeRegistry; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.api.storage.ApplicationStore; @@ -49,8 +53,10 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -95,6 +101,7 @@ abstract class GatewayResourceTest { protected static final ObjectMapper MAPPER = new ObjectMapper(); static List topics; + static List> futures = new ArrayList<>(); static Gateways testGateways; protected static ApplicationStore getMockedStore(String instanceYaml) { @@ -207,10 +214,14 @@ public static void afterAll() { @AfterEach public void afterEach() { Metrics.globalRegistry.clear(); + for (CompletableFuture future : futures) { + future.cancel(true); + } + futures.clear(); } @SneakyThrows - void produceAndExpectOk(String url, String content) { + void produceJsonAndExpectOk(String url, String content) { final HttpRequest request = HttpRequest.newBuilder(URI.create(url)) .header("Content-Type", "application/json") @@ -224,7 +235,19 @@ void produceAndExpectOk(String url, String content) { } @SneakyThrows - String produceAndGetBody(String url, String content) { + String produceTextAndGetBody(String url, String content) { + final HttpRequest request = + HttpRequest.newBuilder(URI.create(url)) + .POST(HttpRequest.BodyPublishers.ofString(content)) + .build(); + final HttpResponse response = + CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, response.statusCode()); + return response.body(); + } + + @SneakyThrows + String produceJsonAndGetBody(String url, String content) { final HttpRequest request = HttpRequest.newBuilder(URI.create(url)) .header("Content-Type", "application/json") @@ -237,7 +260,7 @@ String produceAndGetBody(String url, String content) { } @SneakyThrows - void produceAndExpectBadRequest(String url, String content, String errorMessage) { + void produceJsonAndExpectBadRequest(String url, String content, String errorMessage) { final HttpRequest request = HttpRequest.newBuilder(URI.create(url)) .header("Content-Type", "application/json") @@ -253,7 +276,7 @@ void produceAndExpectBadRequest(String url, String content, String errorMessage) } @SneakyThrows - void produceAndExpectUnauthorized(String url, String content) { + void produceJsonAndExpectUnauthorized(String url, String content) { final HttpRequest request = HttpRequest.newBuilder(URI.create(url)) .header("Content-Type", "application/json") @@ -287,9 +310,28 @@ void testSimpleProduce() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); - produceAndExpectOk(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}"); - produceAndExpectOk(url, "{\"key\": \"my-key\"}"); - produceAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); + produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}"); + produceJsonAndExpectOk(url, "{\"key\": \"my-key\"}"); + produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); + HttpRequest request = + HttpRequest.newBuilder(URI.create(url)) + .POST(HttpRequest.BodyPublishers.ofString("my-string")) + .build(); + HttpResponse response = CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, response.statusCode()); + assertEquals(""" + {"status":"OK","reason":null}""", response.body()); + + request = + HttpRequest.newBuilder(URI.create(url)) + .header("Content-Type", "text/plain") + .POST(HttpRequest.BodyPublishers.ofString("my-string")) + .build(); + response = CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + log.info("Response body: {}", response.body()); + assertEquals(200, response.statusCode()); + assertEquals(""" + {"status":"OK","reason":null}""", response.body()); } @Test @@ -324,10 +366,10 @@ void testSimpleProduceCacheProducer() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); - produceAndExpectOk(url + "1", "{\"key\": \"my-key\", \"value\": \"my-value\"}"); - produceAndExpectOk(url + "2", "{\"key\": \"my-key\"}"); - produceAndExpectOk(url + "2", "{\"key\": \"my-key\"}"); - produceAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); + produceJsonAndExpectOk(url + "1", "{\"key\": \"my-key\", \"value\": \"my-value\"}"); + produceJsonAndExpectOk(url + "2", "{\"key\": \"my-key\"}"); + produceJsonAndExpectOk(url + "2", "{\"key\": \"my-key\"}"); + produceJsonAndExpectOk(url, "{\"key\": \"my-key\", \"headers\": {\"h1\": \"v1\"}}"); final String metrics = mockMvc.perform(get("/management/prometheus")) @@ -379,17 +421,17 @@ void testParametersRequired() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/gw".formatted(port); final String content = "{\"value\": \"my-value\"}"; - produceAndExpectBadRequest(baseUrl, content, "missing required parameter session-id"); - produceAndExpectBadRequest( + produceJsonAndExpectBadRequest(baseUrl, content, "missing required parameter session-id"); + produceJsonAndExpectBadRequest( baseUrl + "?param:otherparam=1", content, "missing required parameter session-id"); - produceAndExpectBadRequest( + produceJsonAndExpectBadRequest( baseUrl + "?param:session-id=", content, "missing required parameter session-id"); - produceAndExpectBadRequest( + produceJsonAndExpectBadRequest( baseUrl + "?param:session-id=ok¶m:another-non-declared=y", content, "unknown parameters: [another-non-declared]"); - produceAndExpectOk(baseUrl + "?param:session-id=1", content); - produceAndExpectOk(baseUrl + "?param:session-id=string-value", content); + produceJsonAndExpectOk(baseUrl + "?param:session-id=1", content); + produceJsonAndExpectOk(baseUrl + "?param:session-id=string-value", content); } @Test @@ -436,10 +478,11 @@ void testAuthentication() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); - produceAndExpectUnauthorized(baseUrl, "{\"value\": \"my-value\"}"); - produceAndExpectUnauthorized(baseUrl + "?credentials=", "{\"value\": \"my-value\"}"); - produceAndExpectUnauthorized(baseUrl + "?credentials=error", "{\"value\": \"my-value\"}"); - produceAndExpectOk( + produceJsonAndExpectUnauthorized(baseUrl, "{\"value\": \"my-value\"}"); + produceJsonAndExpectUnauthorized(baseUrl + "?credentials=", "{\"value\": \"my-value\"}"); + produceJsonAndExpectUnauthorized( + baseUrl + "?credentials=error", "{\"value\": \"my-value\"}"); + produceJsonAndExpectOk( baseUrl + "?credentials=test-user-password", "{\"value\": \"my-value\"}"); } @@ -484,11 +527,11 @@ void testTestCredentials() throws Exception { "http://localhost:%d/api/gateways/produce/tenant1/application1/produce" .formatted(port); - produceAndExpectUnauthorized( + produceJsonAndExpectUnauthorized( baseUrl + "?test-credentials=test", "{\"value\": \"my-value\"}"); - produceAndExpectOk( + produceJsonAndExpectOk( baseUrl + "?test-credentials=test-user-password", "{\"value\": \"my-value\"}"); - produceAndExpectUnauthorized( + produceJsonAndExpectUnauthorized( ("http://localhost:%d/api/gateways/produce/tenant1/application1/produce-no-test?test-credentials=test" + "-user-password") .formatted(port), @@ -497,8 +540,12 @@ void testTestCredentials() throws Exception { @Test void testService() throws Exception { - final String topic = genTopic(); - prepareTopicsForTest(topic); + final String inputTopic = genTopic(); + final String outputTopic = genTopic(); + prepareTopicsForTest(inputTopic, outputTopic); + + startTopicExchange(inputTopic, outputTopic); + testGateways = new Gateways( List.of( @@ -507,7 +554,7 @@ void testService() throws Exception { .type(Gateway.GatewayType.service) .serviceOptions( new Gateway.ServiceOptions( - null, topic, topic, List.of())) + null, inputTopic, outputTopic, List.of())) .build())); final String url = @@ -515,27 +562,89 @@ void testService() throws Exception { assertMessageContent( new MsgRecord("my-key", "my-value", Map.of()), - produceAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); + produceJsonAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); assertMessageContent( new MsgRecord("my-key2", "my-value", Map.of()), - produceAndGetBody(url, "{\"key\": \"my-key2\", \"value\": \"my-value\"}")); + produceJsonAndGetBody(url, "{\"key\": \"my-key2\", \"value\": \"my-value\"}")); + + assertMessageContent( + new MsgRecord(null, "my-text", Map.of()), produceTextAndGetBody(url, "my-text")); assertMessageContent( new MsgRecord("my-key2", "my-value", Map.of("header1", "value1")), - produceAndGetBody( + produceJsonAndGetBody( url, "{\"key\": \"my-key2\", \"value\": \"my-value\", \"headers\": {\"header1\":\"value1\"}}")); } + private void startTopicExchange(String fromTopic, String toTopic) throws Exception { + final CompletableFuture future = + CompletableFuture.runAsync( + () -> { + TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry = + topicConnectionsRuntimeProvider + .getTopicConnectionsRuntimeRegistry(); + final StreamingCluster streamingCluster = getStreamingCluster(); + final TopicConnectionsRuntime runtime = + topicConnectionsRuntimeRegistry + .getTopicConnectionsRuntime(streamingCluster) + .asTopicConnectionsRuntime(); + runtime.init(streamingCluster); + try (final TopicConsumer consumer = + runtime.createConsumer( + null, + streamingCluster, + Map.of( + "topic", + fromTopic, + "subscriptionName", + "s")); ) { + consumer.start(); + + try (final TopicProducer producer = + runtime.createProducer( + null, + streamingCluster, + Map.of("topic", toTopic)); ) { + + producer.start(); + while (true) { + final List records = consumer.read(); + if (records.isEmpty()) { + continue; + } + log.info( + "read {} records from {}: {}", + records.size(), + fromTopic, + records); + for (Record record : records) { + producer.write(record).get(); + } + consumer.commit(records); + log.info( + "written {} records to {}: {}", + records.size(), + toTopic, + records); + } + } + } catch (Throwable e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + }); + futures.add(future); + } + private record MsgRecord(Object key, Object value, Map headers) {} @SneakyThrows private void assertMessageContent(MsgRecord expected, String actual) { ConsumePushMessage consume = MAPPER.readValue(actual, ConsumePushMessage.class); + final Map headers = consume.record().headers(); + assertNotNull(headers.remove("langstream-service-request-id")); final MsgRecord actualMsgRecord = - new MsgRecord( - consume.record().key(), - consume.record().value(), - consume.record().headers()); + new MsgRecord(consume.record().key(), consume.record().value(), headers); assertEquals(expected, actualMsgRecord); } diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java index be56f9be8..bcccad9da 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java @@ -858,7 +858,7 @@ void testProduce() throws Exception { assertEquals(ProduceResponse.Status.BAD_REQUEST, response.status()); assertEquals( - "Unrecognized token 'invalid': was expecting (JSON String, Number, Array, Object or token " + "Error while parsing JSON payload: Unrecognized token 'invalid': was expecting (JSON String, Number, Array, Object or token " + "'null', 'true' or 'false')\n" + " at [Source: (String)\"invalid-json\"; line: 1, column: 8]", response.reason()); diff --git a/langstream-pulsar-runtime/src/main/java/ai/langstream/pravega/runner/PulsarTopicConnectionsRuntimeProvider.java b/langstream-pulsar-runtime/src/main/java/ai/langstream/pravega/runner/PulsarTopicConnectionsRuntimeProvider.java index 6540223f9..5de086f78 100644 --- a/langstream-pulsar-runtime/src/main/java/ai/langstream/pravega/runner/PulsarTopicConnectionsRuntimeProvider.java +++ b/langstream-pulsar-runtime/src/main/java/ai/langstream/pravega/runner/PulsarTopicConnectionsRuntimeProvider.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import lombok.SneakyThrows; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -359,6 +360,7 @@ public void delete(ExecutionPlan applicationInstance) { } } + @ToString private static class PulsarConsumerRecord implements Record { private final Object finalKey; private final Object finalValue; @@ -584,6 +586,7 @@ private class PulsarTopicProducer implements TopicProducer { private final Map configuration; private final AtomicLong totalIn = new AtomicLong(); + String topic; Producer producer; Schema schema; @@ -614,6 +617,7 @@ public PulsarTopicProducer(Map configuration) { @Override @SneakyThrows public void start() { + topic = (String) configuration.remove("topic"); if (configuration.containsKey("valueSchema")) { SchemaDefinition valueSchemaDefinition = mapper.convertValue( @@ -633,9 +637,10 @@ public void start() { } else { schema = (Schema) valueSchema; } + producer = client.newProducer(schema) - .topic((String) configuration.remove("topic")) + .topic(topic) .loadConf(configuration) .create(); } @@ -664,6 +669,9 @@ private Schema getSchema(Class klass) { @Override public CompletableFuture write(Record r) { + if (topic == null) { + throw new RuntimeException("PulsarTopicProducer not started"); + } totalIn.addAndGet(1); if (schema == null) { try { @@ -686,7 +694,7 @@ public CompletableFuture write(Record r) { } producer = client.newProducer(schema) - .topic((String) configuration.remove("topic")) + .topic(topic) .loadConf(configuration) .create(); } catch (Exception e) {