Skip to content

Commit

Permalink
feat: allow metadata in asyncReply (#1937)
Browse files Browse the repository at this point in the history
  • Loading branch information
efgpinto authored Dec 22, 2023
1 parent 634f4af commit cb30982
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ interface Builder {
<S> Effect<S> reply(S message);

/**
* Create a message reply.
* Create a message reply with custom Metadata.
*
* @param message The payload of the reply.
* @param metadata The metadata for the message.
Expand Down Expand Up @@ -203,6 +203,16 @@ interface Builder {
*/
<S> Effect<S> asyncReply(CompletionStage<S> message);

/**
* Create a message reply from an async operation result with custom Metadata.
*
* @param message The future payload of the reply.
* @param <S> The type of the message that must be returned by this call.
* @param metadata The metadata for the message.
* @return A message reply.
*/
<S> Effect<S> asyncReply(CompletionStage<S> message, Metadata metadata);

/**
* Create a reply from an async operation result returning an effect.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ object ActionEffectImpl {
def error[S](description: String, httpErrorCode: ErrorCode): Action.Effect[S] =
error(description, StatusCodeConverter.toGrpcCode(httpErrorCode))
def asyncReply[S](futureMessage: CompletionStage[S]): Action.Effect[S] =
AsyncEffect(futureMessage.asScala.map(s => Builder.reply[S](s))(ExecutionContext.parasitic), Nil)
asyncReply(futureMessage, Metadata.EMPTY)
def asyncReply[S](futureMessage: CompletionStage[S], metadata: Metadata): Action.Effect[S] =
AsyncEffect(futureMessage.asScala.map(s => Builder.reply[S](s, metadata))(ExecutionContext.parasitic), Nil)
def asyncEffect[S](futureEffect: CompletionStage[Action.Effect[S]]): Action.Effect[S] =
AsyncEffect(futureEffect.asScala, Nil)
def ignore[S](): Action.Effect[S] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -634,6 +635,31 @@ public void shouldPropagateMetadataWithHttpDeferredCall() {
assertThat(actionResponse.text).isEqualTo(value);
}

@Test
public void shouldSupportMetadataInReplies() {
String value = "someValue";

String headerInResponse =
webClient
.get()
.uri("/reply-meta/myKey/" + value)
.exchangeToMono(response -> Mono.just(Objects.requireNonNull(
response.headers().asHttpHeaders().getFirst("myKey"))))
.block();

assertThat(value).isEqualTo(headerInResponse);

String headerInAyncResponse =
webClient
.get()
.uri("/reply-async-meta/myKey/" + value)
.exchangeToMono(response -> Mono.just(Objects.requireNonNull(
response.headers().asHttpHeaders().getFirst("myKey"))))
.block();

assertThat(value).isEqualTo(headerInAyncResponse);
}

@Test
public void searchWithInstant() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

import java.util.concurrent.CompletableFuture;

public class ActionWithMetadata extends Action {

private ComponentClient componentClient;
Expand All @@ -41,4 +43,16 @@ public Effect<Message> returnMeta(@PathVariable String key) {
var metaValue = actionContext().metadata().get(key).get();
return effects().reply(new Message(metaValue));
}

@GetMapping("/reply-meta/{key}/{value}")
public Effect<Message> returnAsMeta(@PathVariable String key, @PathVariable String value) {
var md = Metadata.EMPTY.add(key, value);
return effects().reply(new Message(value), md);
}

@GetMapping("/reply-async-meta/{key}/{value}")
public Effect<Message> returnAsMetaAsync(@PathVariable String key, @PathVariable String value) {
var md = Metadata.EMPTY.add(key, value);
return effects().asyncReply(CompletableFuture.completedFuture(new Message(value)), md);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object Action {
def reply[S](message: S): Action.Effect[S]

/**
* Create a message reply.
* Create a message reply with custom Metadata.
*
* @param message
* The payload of the reply.
Expand Down Expand Up @@ -154,6 +154,20 @@ object Action {
*/
def asyncReply[S](message: Future[S]): Action.Effect[S]

/**
* Create a message reply from an async operation result with custom Metadata.
*
* @param message
* The future payload of the reply.
* @param metadata
* The metadata for the message.
* @return
* A message reply.
* @tparam S
* The type of the message that must be returned by this call.
*/
def asyncReply[S](message: Future[S], metadata: Metadata): Action.Effect[S]

/**
* Create a reply from an async operation result returning an effect.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ private[scalasdk] object ActionEffectImpl {
else ErrorEffect(description, Some(statusCode), Nil)

override def asyncReply[S](futureMessage: Future[S]): Action.Effect[S] =
AsyncEffect(futureMessage.map(s => Builder.reply[S](s))(ExecutionContext.parasitic), Nil)
asyncReply(futureMessage, Metadata.empty)
override def asyncReply[S](futureMessage: Future[S], metadata: Metadata): Action.Effect[S] =
AsyncEffect(futureMessage.map(s => Builder.reply[S](s, metadata))(ExecutionContext.parasitic), Nil)
override def asyncEffect[S](futureEffect: Future[Action.Effect[S]]): Action.Effect[S] =
AsyncEffect(futureEffect, Nil)
override def ignore[S]: Action.Effect[S] =
Expand Down

0 comments on commit cb30982

Please sign in to comment.