Skip to content

Commit d01bed4

Browse files
committed
add response interface and http,grpc implement
1 parent 6ddceb6 commit d01bed4

File tree

7 files changed

+267
-1
lines changed

7 files changed

+267
-1
lines changed

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

+24
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import io.dapr.client.domain.StateOptions;
3838
import io.dapr.client.domain.SubscribeConfigurationRequest;
3939
import io.dapr.client.domain.TransactionalStateOperation;
40+
import io.dapr.client.domain.response.DaprResponse;
41+
import io.dapr.client.domain.response.GrpcDaprResponse;
4042
import io.dapr.config.Properties;
4143
import io.dapr.exceptions.DaprException;
4244
import io.dapr.internal.opencensus.GrpcWrapper;
@@ -62,9 +64,13 @@
6264

6365
import java.io.Closeable;
6466
import java.io.IOException;
67+
import java.lang.reflect.ParameterizedType;
68+
import java.lang.reflect.Type;
6569
import java.util.Collections;
70+
import java.util.HashMap;
6671
import java.util.List;
6772
import java.util.Map;
73+
import java.util.Objects;
6874
import java.util.concurrent.ExecutionException;
6975
import java.util.function.Consumer;
7076
import java.util.stream.Collectors;
@@ -206,6 +212,11 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
206212
).flatMap(
207213
it -> {
208214
try {
215+
if (type.getType().getTypeName().startsWith(DaprResponse.class.getName())) {
216+
Map<String, String> headers = new HashMap<>();
217+
headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE,it.getContentType());
218+
return getMono(type, it.getData().getValue().toByteArray(),headers);
219+
}
209220
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
210221
} catch (IOException e) {
211222
throw DaprException.propagate(e);
@@ -217,6 +228,16 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
217228
}
218229
}
219230

231+
private <T> Mono<T> getMono(TypeRef<T> type, byte[] data, Map<String,String> headers) {
232+
if (type.getType() instanceof ParameterizedType) {
233+
Type[] actualTypeArguments = ((ParameterizedType) type.getType()).getActualTypeArguments();
234+
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
235+
type = TypeRef.get(actualTypeArguments[0]);
236+
}
237+
}
238+
return (Mono<T>) Mono.just(new GrpcDaprResponse<T>(data, headers,objectSerializer, type));
239+
}
240+
220241
/**
221242
* {@inheritDoc}
222243
*/
@@ -253,6 +274,9 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
253274
).flatMap(
254275
it -> {
255276
try {
277+
if (type.getType().getTypeName().startsWith(DaprResponse.class.getName())) {
278+
return getMono(type, it.getData().toByteArray(),it.getMetadataMap());
279+
}
256280
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
257281
} catch (IOException e) {
258282
throw DaprException.propagate(e);

sdk/src/main/java/io/dapr/client/DaprClientHttp.java

+14
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import io.dapr.client.domain.SubscribeConfigurationRequest;
3737
import io.dapr.client.domain.TransactionalStateOperation;
3838
import io.dapr.client.domain.TransactionalStateRequest;
39+
import io.dapr.client.domain.response.DaprResponse;
40+
import io.dapr.client.domain.response.HttpDaprResponse;
3941
import io.dapr.config.Properties;
4042
import io.dapr.exceptions.DaprException;
4143
import io.dapr.serializer.DaprObjectSerializer;
@@ -46,13 +48,16 @@
4648
import reactor.core.publisher.Mono;
4749

4850
import java.io.IOException;
51+
import java.lang.reflect.ParameterizedType;
52+
import java.lang.reflect.Type;
4953
import java.util.ArrayList;
5054
import java.util.Arrays;
5155
import java.util.Collections;
5256
import java.util.HashMap;
5357
import java.util.Iterator;
5458
import java.util.List;
5559
import java.util.Map;
60+
import java.util.Objects;
5661
import java.util.Optional;
5762
import java.util.stream.Collectors;
5863

@@ -221,6 +226,15 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
221226

222227
private <T> Mono<T> getMono(TypeRef<T> type, DaprHttp.Response r) {
223228
try {
229+
if (type.getType().getTypeName().startsWith(DaprResponse.class.getName())) {
230+
if (type.getType() instanceof ParameterizedType) {
231+
Type[] actualTypeArguments = ((ParameterizedType) type.getType()).getActualTypeArguments();
232+
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
233+
type = TypeRef.get(actualTypeArguments[0]);
234+
}
235+
}
236+
return (Mono<T>) Mono.just(new HttpDaprResponse<T>(r, objectSerializer, type));
237+
}
224238
T object = objectSerializer.deserialize(r.getBody(), type);
225239
if (object == null) {
226240
return Mono.empty();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain.response;
15+
16+
import java.io.IOException;
17+
import java.util.Map;
18+
19+
/**
20+
* Response.
21+
*/
22+
public interface DaprResponse<T> {
23+
24+
/**
25+
* get response code.
26+
* @return response code
27+
*/
28+
int getCode();
29+
30+
/**
31+
* get response data.
32+
* @return response data
33+
*/
34+
T getData() throws IOException;
35+
36+
/**
37+
* get response header.
38+
* @return response header
39+
*/
40+
Map<String,String> getHeaders();
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain.response;
15+
16+
import io.dapr.serializer.DaprObjectSerializer;
17+
import io.dapr.utils.TypeRef;
18+
19+
import java.io.IOException;
20+
import java.util.Map;
21+
22+
/**
23+
* GrpcDaprResponse.
24+
*/
25+
public class GrpcDaprResponse<T> implements DaprResponse<T> {
26+
27+
private final DaprObjectSerializer serializer;
28+
29+
private final TypeRef<T> type;
30+
31+
private final byte[] data;
32+
33+
private final Map<String,String> headers;
34+
35+
/**
36+
* build grpc dapr response.
37+
* @param data grpc invoke response data
38+
* @param headers grpc invoke headers
39+
* @param serializer objectSerializer
40+
* @param type type
41+
*/
42+
public GrpcDaprResponse(byte[] data, Map<String,String> headers, DaprObjectSerializer serializer, TypeRef<T> type) {
43+
this.data = data;
44+
this.headers = headers;
45+
this.serializer = serializer;
46+
this.type = type;
47+
}
48+
49+
@Override
50+
public int getCode() {
51+
// InvokeResponse didn't have it.
52+
return 200;
53+
}
54+
55+
@Override
56+
public T getData() throws IOException {
57+
return serializer.deserialize(data, type);
58+
}
59+
60+
@Override
61+
public Map<String, String> getHeaders() {
62+
return this.headers;
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain.response;
15+
16+
import io.dapr.client.DaprHttp;
17+
import io.dapr.serializer.DaprObjectSerializer;
18+
import io.dapr.utils.TypeRef;
19+
20+
import java.io.IOException;
21+
import java.util.Map;
22+
23+
/**
24+
* HttpDaprResponse.
25+
*/
26+
public class HttpDaprResponse<T> implements DaprResponse<T> {
27+
28+
private final DaprHttp.Response response;
29+
30+
private final DaprObjectSerializer serializer;
31+
32+
private final TypeRef<T> type;
33+
34+
/**
35+
* build http dapr response.
36+
* @param response http invoke response
37+
* @param serializer serializer
38+
* @param type type
39+
*/
40+
public HttpDaprResponse(DaprHttp.Response response, DaprObjectSerializer serializer, TypeRef<T> type) {
41+
this.response = response;
42+
this.serializer = serializer;
43+
this.type = type;
44+
}
45+
46+
@Override
47+
public int getCode() {
48+
return response.getStatusCode();
49+
}
50+
51+
@Override
52+
public T getData() throws IOException {
53+
byte[] data = response.getBody();
54+
if (type.getType() == String.class) {
55+
return (T) new String(data);
56+
}
57+
return serializer.deserialize(data, type);
58+
}
59+
60+
@Override
61+
public Map<String, String> getHeaders() {
62+
return response.getHeaders();
63+
}
64+
}

sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.dapr.client.domain.State;
2626
import io.dapr.client.domain.StateOptions;
2727
import io.dapr.client.domain.TransactionalStateOperation;
28+
import io.dapr.client.domain.response.DaprResponse;
2829
import io.dapr.config.Properties;
2930
import io.dapr.serializer.DaprObjectSerializer;
3031
import io.dapr.serializer.DefaultObjectSerializer;
@@ -633,6 +634,22 @@ public void invokeServiceTest() {
633634
assertEquals(expected, strOutput);
634635
}
635636

637+
@Test
638+
public void invokeServiceTestReturnResponse() throws IOException {
639+
String expected = "Value";
640+
doAnswer((Answer<Void>) invocation -> {
641+
StreamObserver<CommonProtos.InvokeResponse> observer = (StreamObserver<CommonProtos.InvokeResponse>) invocation.getArguments()[1];
642+
observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build());
643+
observer.onCompleted();
644+
return null;
645+
}).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any());
646+
647+
Mono<DaprResponse<String>> result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, new TypeRef<DaprResponse<String>>() {});
648+
DaprResponse<String> res = result.block();
649+
650+
assertEquals(expected, res.getData());
651+
}
652+
636653
@Test
637654
public void invokeServiceObjectTest() throws Exception {
638655
MyObject object = new MyObject(1, "Value");

sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java

+43-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.dapr.client.domain.State;
2525
import io.dapr.client.domain.StateOptions;
2626
import io.dapr.client.domain.TransactionalStateOperation;
27+
import io.dapr.client.domain.response.DaprResponse;
2728
import io.dapr.config.Properties;
2829
import io.dapr.exceptions.DaprException;
2930
import io.dapr.serializer.DaprObjectSerializer;
@@ -39,6 +40,7 @@
3940
import okio.BufferedSink;
4041
import org.junit.Before;
4142
import org.junit.Test;
43+
import org.junit.jupiter.api.Assertions;
4244
import org.mockito.Mockito;
4345
import reactor.core.publisher.Mono;
4446
import reactor.util.context.Context;
@@ -417,7 +419,47 @@ public void invokeServiceWithContext() {
417419
}
418420

419421
@Test
420-
public void invokeBinding() {
422+
public void invokeServiceReturnResponse() throws IOException {
423+
String resultString = "request success";
424+
String resultHeaderName = "test-header";
425+
String resultHeaderValue = "1";
426+
mockInterceptor.addRule()
427+
.post("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder")
428+
.respond(resultString)
429+
.addHeader(resultHeaderName,resultHeaderValue);
430+
431+
InvokeMethodRequest req = new InvokeMethodRequest("41", "neworder")
432+
.setBody("request")
433+
.setHttpExtension(HttpExtension.POST);
434+
Mono<DaprResponse<String>> result = daprClientHttp.invokeMethod(req, new TypeRef<DaprResponse<String>>() {});
435+
DaprResponse<String> response = result.block();
436+
Assertions.assertNotNull(response);
437+
Assertions.assertEquals(200, response.getCode());
438+
Assertions.assertEquals(resultString,response.getData());
439+
Assertions.assertEquals(resultHeaderValue,response.getHeaders().get(resultHeaderName));
440+
}
441+
442+
@Test
443+
public void invokeBinding() throws IOException {
444+
String resultString = "request success";
445+
String resultHeaderName = "test-header";
446+
String resultHeaderValue = "1";
447+
Map<String, String> map = new HashMap<>();
448+
mockInterceptor.addRule()
449+
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
450+
.respond(resultString)
451+
.addHeader(resultHeaderName,resultHeaderValue);
452+
453+
Mono<DaprResponse<String>> mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", new TypeRef<DaprResponse<String>>() {});
454+
DaprResponse<String> response = mono.block();
455+
Assertions.assertNotNull(response);
456+
Assertions.assertEquals(200, response.getCode());
457+
Assertions.assertEquals(resultString,response.getData());
458+
Assertions.assertEquals(resultHeaderValue,response.getHeaders().get(resultHeaderName));
459+
}
460+
461+
@Test
462+
public void invokeBindingReturnResponse() {
421463
Map<String, String> map = new HashMap<>();
422464
mockInterceptor.addRule()
423465
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")

0 commit comments

Comments
 (0)