diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java index c328ea104..bfb324b48 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java @@ -59,6 +59,16 @@ public void testProcessor() { "{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value\"," + "\"headers\":{\"langstream-client-session-id\":\"s1\"}}")); + output = + executeCommandOnClient( + ("bin/langstream gateway consume %s topic-producer --position earliest -n 1 --connect-timeout 30") + .formatted(applicationId) + .split(" ")); + log.info("Output2: {}", output); + Assertions.assertTrue( + output.contains( + "{\"record\":{\"key\":null,\"value\":\"my-value test-topic-producer\",\"headers\":{}}")); + updateLocalApplicationAndAwaitReady( tenant, applicationId, @@ -77,7 +87,7 @@ public void testProcessor() { + "30 -p sessionId=s2") .formatted(applicationId) .split(" ")); - log.info("Output2: {}", output); + log.info("Output3: {}", output); Assertions.assertTrue( output.contains( "{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value - changed\"," @@ -89,6 +99,7 @@ public void testProcessor() { log.info("all topics: {}", topics); Assertions.assertTrue(topics.contains("ls-test-topic0")); Assertions.assertFalse(topics.contains("ls-test-topic1")); + Assertions.assertTrue(topics.contains("ls-test-topic-producer")); } @Test diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml index 1977390d9..da2c2c443 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml @@ -33,4 +33,7 @@ gateways: filters: headers: - key: langstream-client-session-id - value-from-parameters: sessionId \ No newline at end of file + value-from-parameters: sessionId + - id: topic-producer + type: consume + topic: ls-test-topic-producer diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml index 5add9e10e..b0e3bc16e 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml @@ -31,6 +31,12 @@ topics: type: string keySchema: type: string + - name: ls-test-topic-producer + creation-mode: create-if-not-exists + schema: + type: string + keySchema: + type: string pipeline: - name: "Process using Python" resources: @@ -44,4 +50,4 @@ pipeline: output: ls-test-topic1 configuration: secret_value: "${secrets.secret1.value-key}" - className: example.Exclamation \ No newline at end of file + className: example.Exclamation diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py b/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py index a87a6fe73..5627de32c 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py @@ -26,6 +26,7 @@ def init(self, config, context: AgentContext): def process(self, record): logging.info("Processing record" + str(record)) + self.context.get_topic_producer().write("ls-test-topic-producer", {"value": record.value() + " test-topic-producer"}).result() directory = self.context.get_persistent_state_directory() counter_file = os.path.join(directory, "counter.txt") counter = 0 diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py index 8518f6dbc..e5a964f71 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py @@ -91,9 +91,9 @@ async def agent_info(self, _, __): async def poll_topic_producer_records(self, context): while True: topic, record, future = await self.topic_producer_records.get() - schemas, grpc_record = self.to_grpc_record(record) + schemas, grpc_record = self.to_grpc_record(wrap_in_record(record)) for schema in schemas: - await context.write(TopicProducerResponse(schema=schema)) + await contextµ.write(TopicProducerResponse(schema=schema)) self.topic_producer_record_id += 1 self.topic_producer_records_pending[self.topic_producer_record_id] = future grpc_record.record_id = self.topic_producer_record_id