diff --git a/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/action/Action.java b/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/action/Action.java index db1335c27c..a92645864a 100644 --- a/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/action/Action.java +++ b/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/action/Action.java @@ -145,7 +145,7 @@ interface Builder { Effect reply(S message); /** - * Create a message reply. + * Create a message reply with custom Metadata. * * @param message The payload of the reply. * @param metadata The metadata for the message. @@ -203,6 +203,16 @@ interface Builder { */ Effect asyncReply(CompletionStage message); + /** + * Create a message reply from an async operation result with custom Metadata. + * + * @param message The future payload of the reply. + * @param The type of the message that must be returned by this call. + * @param metadata The metadata for the message. + * @return A message reply. + */ + Effect asyncReply(CompletionStage message, Metadata metadata); + /** * Create a reply from an async operation result returning an effect. * diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/action/ActionEffectImpl.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/action/ActionEffectImpl.scala index cbc38b7fe9..0996b7151f 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/action/ActionEffectImpl.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/action/ActionEffectImpl.scala @@ -98,7 +98,9 @@ object ActionEffectImpl { def error[S](description: String, httpErrorCode: ErrorCode): Action.Effect[S] = error(description, StatusCodeConverter.toGrpcCode(httpErrorCode)) def asyncReply[S](futureMessage: CompletionStage[S]): Action.Effect[S] = - AsyncEffect(futureMessage.asScala.map(s => Builder.reply[S](s))(ExecutionContext.parasitic), Nil) + asyncReply(futureMessage, Metadata.EMPTY) + def asyncReply[S](futureMessage: CompletionStage[S], metadata: Metadata): Action.Effect[S] = + AsyncEffect(futureMessage.asScala.map(s => Builder.reply[S](s, metadata))(ExecutionContext.parasitic), Nil) def asyncEffect[S](futureEffect: CompletionStage[Action.Effect[S]]): Action.Effect[S] = AsyncEffect(futureEffect.asScala, Nil) def ignore[S](): Action.Effect[S] = diff --git a/sdk/java-sdk-spring/src/it/java/com/example/wiring/SpringSdkIntegrationTest.java b/sdk/java-sdk-spring/src/it/java/com/example/wiring/SpringSdkIntegrationTest.java index 1d78fe1b20..1550c9be26 100644 --- a/sdk/java-sdk-spring/src/it/java/com/example/wiring/SpringSdkIntegrationTest.java +++ b/sdk/java-sdk-spring/src/it/java/com/example/wiring/SpringSdkIntegrationTest.java @@ -65,6 +65,7 @@ import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -634,6 +635,31 @@ public void shouldPropagateMetadataWithHttpDeferredCall() { assertThat(actionResponse.text).isEqualTo(value); } + @Test + public void shouldSupportMetadataInReplies() { + String value = "someValue"; + + String headerInResponse = + webClient + .get() + .uri("/reply-meta/myKey/" + value) + .exchangeToMono(response -> Mono.just(Objects.requireNonNull( + response.headers().asHttpHeaders().getFirst("myKey")))) + .block(); + + assertThat(value).isEqualTo(headerInResponse); + + String headerInAyncResponse = + webClient + .get() + .uri("/reply-async-meta/myKey/" + value) + .exchangeToMono(response -> Mono.just(Objects.requireNonNull( + response.headers().asHttpHeaders().getFirst("myKey")))) + .block(); + + assertThat(value).isEqualTo(headerInAyncResponse); + } + @Test public void searchWithInstant() { diff --git a/sdk/java-sdk-spring/src/it/java/com/example/wiring/actions/echo/ActionWithMetadata.java b/sdk/java-sdk-spring/src/it/java/com/example/wiring/actions/echo/ActionWithMetadata.java index e588e9f727..9e3ebec7f4 100644 --- a/sdk/java-sdk-spring/src/it/java/com/example/wiring/actions/echo/ActionWithMetadata.java +++ b/sdk/java-sdk-spring/src/it/java/com/example/wiring/actions/echo/ActionWithMetadata.java @@ -22,6 +22,8 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; +import java.util.concurrent.CompletableFuture; + public class ActionWithMetadata extends Action { private ComponentClient componentClient; @@ -41,4 +43,16 @@ public Effect returnMeta(@PathVariable String key) { var metaValue = actionContext().metadata().get(key).get(); return effects().reply(new Message(metaValue)); } + + @GetMapping("/reply-meta/{key}/{value}") + public Effect returnAsMeta(@PathVariable String key, @PathVariable String value) { + var md = Metadata.EMPTY.add(key, value); + return effects().reply(new Message(value), md); + } + + @GetMapping("/reply-async-meta/{key}/{value}") + public Effect returnAsMetaAsync(@PathVariable String key, @PathVariable String value) { + var md = Metadata.EMPTY.add(key, value); + return effects().asyncReply(CompletableFuture.completedFuture(new Message(value)), md); + } } diff --git a/sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/action/Action.scala b/sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/action/Action.scala index 0cad28e411..e411d90f34 100644 --- a/sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/action/Action.scala +++ b/sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/action/Action.scala @@ -91,7 +91,7 @@ object Action { def reply[S](message: S): Action.Effect[S] /** - * Create a message reply. + * Create a message reply with custom Metadata. * * @param message * The payload of the reply. @@ -154,6 +154,20 @@ object Action { */ def asyncReply[S](message: Future[S]): Action.Effect[S] + /** + * Create a message reply from an async operation result with custom Metadata. + * + * @param message + * The future payload of the reply. + * @param metadata + * The metadata for the message. + * @return + * A message reply. + * @tparam S + * The type of the message that must be returned by this call. + */ + def asyncReply[S](message: Future[S], metadata: Metadata): Action.Effect[S] + /** * Create a reply from an async operation result returning an effect. * diff --git a/sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/impl/action/ActionEffectImpl.scala b/sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/impl/action/ActionEffectImpl.scala index f4b4e4263a..a99438f680 100644 --- a/sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/impl/action/ActionEffectImpl.scala +++ b/sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/impl/action/ActionEffectImpl.scala @@ -141,7 +141,9 @@ private[scalasdk] object ActionEffectImpl { else ErrorEffect(description, Some(statusCode), Nil) override def asyncReply[S](futureMessage: Future[S]): Action.Effect[S] = - AsyncEffect(futureMessage.map(s => Builder.reply[S](s))(ExecutionContext.parasitic), Nil) + asyncReply(futureMessage, Metadata.empty) + override def asyncReply[S](futureMessage: Future[S], metadata: Metadata): Action.Effect[S] = + AsyncEffect(futureMessage.map(s => Builder.reply[S](s, metadata))(ExecutionContext.parasitic), Nil) override def asyncEffect[S](futureEffect: Future[Action.Effect[S]]): Action.Effect[S] = AsyncEffect(futureEffect, Nil) override def ignore[S]: Action.Effect[S] =