Skip to content

Commit

Permalink
*增加跨线程传递数据功能
Browse files Browse the repository at this point in the history
Signed-off-by: provenceee <[email protected]>
  • Loading branch information
provenceee committed Jan 9, 2024
1 parent 9d1f090 commit 2e8f2eb
Show file tree
Hide file tree
Showing 79 changed files with 3,208 additions and 334 deletions.
1 change: 1 addition & 0 deletions .github/actions/scenarios/spring/lane/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ runs:
shell: bash
env:
SPRING_CLOUD_VERSION: ${{ matrix.springCloudVersion }}
SPRING_BOOT_VERSION: ${{ matrix.springBootVersion }}
run: mvn test -Dsermant.integration.test.type=LANE --file sermant-integration-tests/spring-test/pom.xml
- name: exit
if: always()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class LaneServiceImpl implements LaneService {
public Map<String, Object> getLaneByDubbo(String name, LaneTestEntity laneTestEntity, String[] arr,
List<Integer> list, Map<String, Object> map) {
RpcContext.getContext().setAttachment(Constant.TAG_KEY, Constant.TAG);

// dubbo 2.5.0 - 2.5.3存在bug,会传递异步调用,所以需要手动设置为同步
RpcContext.getContext().setAttachment("async", "false");
Map<String, Object> result = new HashMap<>(fooService.getAttachments());
result.put(applicationName, getMetadata());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import javax.annotation.Resource;

Expand All @@ -50,13 +52,17 @@ public class LaneController {
*
* @param entity entity
* @return map
* @throws ExecutionException ExecutionException
* @throws InterruptedException InterruptedException
*/
@GetMapping("/getLaneByDubbo")
public Map<String, Object> getLaneByDubbo(TestEntity entity) {
public Map<String, Object> getLaneByDubbo(TestEntity entity) throws ExecutionException, InterruptedException {
RpcContext.getContext().setAttachment(Constant.LANE_TEST_USER_ID, String.valueOf(entity.getId()));
return laneService.getLaneByDubbo(entity.getName(), new LaneTestEntity(entity.getLaneId(), entity.isEnabled()),
laneService.getLaneByDubbo(entity.getName(), new LaneTestEntity(entity.getLaneId(), entity.isEnabled()),
new String[]{entity.getArrName()}, Collections.singletonList(entity.getListId()),
Collections.singletonMap("name", entity.getMapName()));
Future<Map<String, Object>> future = RpcContext.getContext().getFuture();
return future.get();
}

/**
Expand All @@ -66,13 +72,18 @@ public Map<String, Object> getLaneByDubbo(TestEntity entity) {
* @param id id
* @param enabled enabled
* @return map
* @throws ExecutionException ExecutionException
* @throws InterruptedException InterruptedException
*/
@GetMapping("/getLaneByFeign")
public Map<String, Object> getLaneByFeign(@RequestParam(value = "name", defaultValue = "") String name,
@RequestParam(value = "id", defaultValue = "0") int id,
@RequestParam(value = "enabled", defaultValue = "false") boolean enabled) {
@RequestParam(value = "enabled", defaultValue = "false") boolean enabled)
throws ExecutionException, InterruptedException {
RpcContext.getContext().setAttachment(Constant.LANE_TEST_USER_ID, String.valueOf(id));
return laneService.getLaneByFeign(name, new LaneTestEntity(id, enabled));
laneService.getLaneByFeign(name, new LaneTestEntity(id, enabled));
Future<Map<String, Object>> future = RpcContext.getContext().getFuture();
return future.get();
}

/**
Expand All @@ -82,12 +93,17 @@ public Map<String, Object> getLaneByFeign(@RequestParam(value = "name", defaultV
* @param id id
* @param enabled enabled
* @return map
* @throws ExecutionException ExecutionException
* @throws InterruptedException InterruptedException
*/
@GetMapping("/getLaneByRest")
public Map<String, Object> getLaneByRest(@RequestParam(value = "name", defaultValue = "") String name,
@RequestParam(value = "id", defaultValue = "0") int id,
@RequestParam(value = "enabled", defaultValue = "false") boolean enabled) {
@RequestParam(value = "enabled", defaultValue = "false") boolean enabled)
throws ExecutionException, InterruptedException {
RpcContext.getContext().setAttachment(Constant.LANE_TEST_USER_ID, String.valueOf(id));
return laneService.getLaneByRest(name, new LaneTestEntity(id, enabled));
laneService.getLaneByRest(name, new LaneTestEntity(id, enabled));
Future<Map<String, Object>> future = RpcContext.getContext().getFuture();
return future.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
interface="com.huaweicloud.integration.service.MetadataService" timeout="10000"/>

<dubbo:reference id="laneService" check="false"
interface="com.huaweicloud.integration.service.LaneService" timeout="10000"/>
interface="com.huaweicloud.integration.service.LaneService" timeout="10000" async="true"/>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import javax.annotation.Resource;

Expand All @@ -49,13 +51,17 @@ public class LaneController {
*
* @param entity entity
* @return map
* @throws ExecutionException ExecutionException
* @throws InterruptedException InterruptedException
*/
@GetMapping("/getLaneByDubbo")
public Map<String, Object> getLaneByDubbo(TestEntity entity) {
public Map<String, Object> getLaneByDubbo(TestEntity entity) throws ExecutionException, InterruptedException {
RpcContext.getContext().setAttachment(Constant.LANE_TEST_USER_ID, String.valueOf(entity.getId()));
return laneService.getLaneByDubbo(entity.getName(), new LaneTestEntity(entity.getLaneId(), entity.isEnabled()),
laneService.getLaneByDubbo(entity.getName(), new LaneTestEntity(entity.getLaneId(), entity.isEnabled()),
new String[]{entity.getArrName()}, Collections.singletonList(entity.getListId()),
Collections.singletonMap("name", entity.getMapName()));
Future<Map<String, Object>> future = RpcContext.getContext().getFuture();
return future.get();
}

/**
Expand All @@ -65,13 +71,18 @@ public Map<String, Object> getLaneByDubbo(TestEntity entity) {
* @param id id
* @param enabled enabled
* @return map
* @throws ExecutionException ExecutionException
* @throws InterruptedException InterruptedException
*/
@GetMapping("/getLaneByFeign")
public Map<String, Object> getLaneByFeign(@RequestParam(value = "name", defaultValue = "") String name,
@RequestParam(value = "id", defaultValue = "0") int id,
@RequestParam(value = "enabled", defaultValue = "false") boolean enabled) {
@RequestParam(value = "enabled", defaultValue = "false") boolean enabled)
throws ExecutionException, InterruptedException {
RpcContext.getContext().setAttachment(Constant.LANE_TEST_USER_ID, String.valueOf(id));
return laneService.getLaneByFeign(name, new LaneTestEntity(id, enabled));
laneService.getLaneByFeign(name, new LaneTestEntity(id, enabled));
Future<Map<String, Object>> future = RpcContext.getContext().getFuture();
return future.get();
}

/**
Expand All @@ -81,12 +92,17 @@ public Map<String, Object> getLaneByFeign(@RequestParam(value = "name", defaultV
* @param id id
* @param enabled enabled
* @return map
* @throws ExecutionException ExecutionException
* @throws InterruptedException InterruptedException
*/
@GetMapping("/getLaneByRest")
public Map<String, Object> getLaneByRest(@RequestParam(value = "name", defaultValue = "") String name,
@RequestParam(value = "id", defaultValue = "0") int id,
@RequestParam(value = "enabled", defaultValue = "false") boolean enabled) {
@RequestParam(value = "enabled", defaultValue = "false") boolean enabled)
throws ExecutionException, InterruptedException {
RpcContext.getContext().setAttachment(Constant.LANE_TEST_USER_ID, String.valueOf(id));
return laneService.getLaneByRest(name, new LaneTestEntity(id, enabled));
laneService.getLaneByRest(name, new LaneTestEntity(id, enabled));
Future<Map<String, Object>> future = RpcContext.getContext().getFuture();
return future.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
interface="com.huaweicloud.integration.service.MetadataService" timeout="10000"/>

<dubbo:reference id="laneService" check="false"
interface="com.huaweicloud.integration.service.LaneService" timeout="10000"/>
interface="com.huaweicloud.integration.service.LaneService" timeout="10000" async="true"/>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
<build>
<finalName>feign-consumer</finalName>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.reactive.function.client.WebClient;

/**
* 启动类
Expand All @@ -31,9 +34,9 @@
* @since 2022-07-29
*/
@SpringBootApplication(scanBasePackages = {
"com.huaweicloud.spring.feign.consumer.controller",
"com.huaweicloud.spring.common.loadbalancer.feign",
"com.huaweicloud.spring.feign.api.configuration"
"com.huaweicloud.spring.feign.consumer.controller",
"com.huaweicloud.spring.common.loadbalancer.feign",
"com.huaweicloud.spring.feign.api.configuration"
})
@EnableFeignClients(basePackages = "com.huaweicloud.spring.feign.api")
public class FeignConsumerApplication {
Expand All @@ -55,4 +58,27 @@ public static void main(String[] args) {
public ErrorDecoder errorDecoder() {
return new FeignErrorDecoder();
}

/**
* webclient,需要懒加载以避免spring cloud Finchley.x的bug
*
* @param builder 构造器
* @return webclient
*/
@Bean
@Lazy
public WebClient webClient(WebClient.Builder builder) {
return builder.build();
}

/**
* 构造器
*
* @return builder
*/
@Bean
@LoadBalanced
public WebClient.Builder builder() {
return WebClient.builder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@
import com.huaweicloud.spring.feign.api.BootRegistryService;
import com.huaweicloud.spring.feign.api.FeignService;

import reactor.core.publisher.Mono;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -38,6 +45,8 @@
@RestController
@RequestMapping("/router")
public class RouterController {
private static final String PROVIDER_URL = "http://feign-provider/lane";

@Value("${spring.application.name}")
private String applicationName;

Expand All @@ -50,6 +59,10 @@ public class RouterController {
@Autowired
private FeignService feignService;

@Autowired
@Lazy
private WebClient client;

/**
* 获取区域
*
Expand Down Expand Up @@ -89,6 +102,28 @@ public Map<String, Object> getLaneByCloud(@RequestParam(value = "name", defaultV
return result;
}

/**
* 获取泳道信息
*
* @param name name
* @param id id
* @param enabled enabled
* @return 泳道信息
*/
@GetMapping("/cloud/getLaneByWebclient")
public Mono<Map<String, Object>> getLaneByCloudWebclient(
@RequestParam(value = "name", defaultValue = "") String name,
@RequestParam(value = "id", defaultValue = "0") int id,
@RequestParam(value = "enabled", defaultValue = "false") boolean enabled) {
RequestHeadersSpec<?> requestHeadersSpec = client.get().uri(PROVIDER_URL);
ResponseSpec spec = requestHeadersSpec.retrieve();
return spec.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
}).map(map -> {
map.put(applicationName, getMetadata());
return map;
});
}

private Map<String, Object> getMetadata() {
Map<String, Object> meta = new HashMap<>();
meta.put("version", version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ hystrix:
execution:
isolation:
thread:
timeoutInMilliseconds: 10000
timeoutInMilliseconds: 50000

ribbon:
ConnectTimeout: 10000
Expand Down
Loading

0 comments on commit 2e8f2eb

Please sign in to comment.