Skip to content

Commit

Permalink
Merge pull request #1340 from slievrly/master_0408
Browse files Browse the repository at this point in the history
optimize: optimize SeataHystrixConcurrencyStrategy
  • Loading branch information
mercyblitz authored Apr 10, 2020
2 parents a3299d5 + 6c4d602 commit c83b8fe
Showing 1 changed file with 82 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,93 @@

package com.alibaba.cloud.seata.feign.hystrix;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

/**
* @author xiaojing
*/
public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

private final Logger logger = LoggerFactory.getLogger(SeataHystrixConcurrencyStrategy.class);
private HystrixConcurrencyStrategy delegate;

public SeataHystrixConcurrencyStrategy() {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof SeataHystrixConcurrencyStrategy) {
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
} catch (Exception ex) {
logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
}
}

private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (logger.isDebugEnabled()) {
logger.debug("Current Hystrix plugins configuration is [" + "concurrencyStrategy [" + this.delegate + "],"
+ "eventNotifier [" + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
logger.debug("Registering Seata Hystrix Concurrency Strategy.");
}
}

@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue);
}

@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}

@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}

@Override
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}

@Override
Expand All @@ -44,15 +114,14 @@ public <K> Callable<K> wrapCallable(Callable<K> c) {
Callable<K> wrappedCallable;
if (this.delegate != null) {
wrappedCallable = this.delegate.wrapCallable(c);
}
else {
} else {
wrappedCallable = c;
}
if (wrappedCallable instanceof SeataContextCallable) {
return wrappedCallable;
}

return new SeataContextCallable<>(wrappedCallable);
return new SeataContextCallable<>(wrappedCallable, RequestContextHolder.getRequestAttributes());
}

private static class SeataContextCallable<K> implements Callable<K> {
Expand All @@ -61,19 +130,23 @@ private static class SeataContextCallable<K> implements Callable<K> {

private final String xid;

SeataContextCallable(Callable<K> actual) {
private final RequestAttributes requestAttributes;

SeataContextCallable(Callable<K> actual, RequestAttributes requestAttribute) {
this.actual = actual;
this.requestAttributes = requestAttribute;
this.xid = RootContext.getXID();
}

@Override
public K call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
RootContext.bind(xid);
return actual.call();
}
finally {
} finally {
RootContext.unbind();
RequestContextHolder.resetRequestAttributes();
}
}

Expand Down

0 comments on commit c83b8fe

Please sign in to comment.