前言
最近由于Hystrix的配置问题导致在业务高峰期的时候直接阻断掉业务,趁这个机会我们好好分析一下Hystrix的配置。
@DefaultProperties(groupKey = HystrixGroupKey.DISTRIBUTE_LOCKER_COMMAND,
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "300"),
//由于redis为强依赖,所以这里不做熔断
@HystrixProperty(name = "circuitBreaker.forceClosed", value = "true"),
},
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "20"),
@HystrixProperty(name = "maxQueueSize", value = "200"),
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),
})
@Component
public class DistributLocker {
分析
TestCommand
是测试的Hystrix类。test
方法模拟100线程并发测试。
public class HystrixTest {
private static final Logger LOGGER = LoggerFactory.getLogger(HystrixTest.class);
@Test
public void test() throws InterruptedException {
int threadnum=100;
CountDownLatch c=new CountDownLatch(threadnum);
IntStream.rangeClosed(0, threadnum).forEach(i -> new TestCommand(c).queue());
c.await();
}
public static class TestCommand extends HystrixCommand<Boolean> {
private CountDownLatch c;
public TestCommand(CountDownLatch c) {
super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("accountBalanceLocalGroup")).
andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withMaxQueueSize(200)//队列长度,
.withQueueSizeRejectionThreshold(15)//排队线程数量阈值,默认为5,达到时拒绝
.withCoreSize(20))//设置线程池的20
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(600)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(100)));//调用线程允许请求HystrixCommand.GetFallback()的最大数量,默认10。超出时将会有异常抛出
this.c=c;
}
@Override
protected Boolean run() throws Exception {
LOGGER.info("thread {} is doing...",Thread.currentThread().getId());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
c.countDown();
return true;
}
@Override
protected Boolean getFallback() {
Throwable e = getExecutionException();
if(e!=null) {
LOGGER.error(e.getMessage(), e);
}
c.countDown();
return false;
}
}
}
测试结果我们可以发现有部分失败。我们可以看到主要是线程池队列拒绝。
java.util.concurrent.RejectedExecutionException: Rejected command because thread-pool queueSize is at rejection threshold.
at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:103) ~[hystrix-core-1.5.12.jar:1.5.12]
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45) ~[rxjava-1.2.0.jar:1.2.0]
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30) ~[rxjava-1.2.0.jar:1.2.0]
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) [rxjava-1.2.0.jar:1.2.0]
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) [rxjava-1.2.0.jar:1.2.0]
at rx.Observable.unsafeSubscribe(Observable.java:10151) [rxjava-1.2.0.jar:1.2.0]
参数调优
我们接下来看下线程池的一些核心参数。
参数 | 描述 | 默认值 |
---|---|---|
coreSize | 线程池coreSize | 默认值:10。 设置标准:qps*99meantime+breathing room |
maximumSize | 此属性设置最大线程池大小。 这是在不开始拒绝HystrixCommands的情况下可以支持的最大并发数。 请注意,此设置仅在您还设置allowMaximumSizeToDivergeFromCoreSize时才会生效。 | 默认值:10 |
maxQueueSize | 请求等待队列 | 默认值:-1。 如果使用正数,队列将从SynchronizeQueue改为LinkedBlockingQueue |
queueSizeRejectionThreshold | 此属性设置队列大小拒绝阈值 - 即使未达到maxQueueSize也将发生拒绝的人为最大队列大小。 此属性存在,因为BlockingQueue的maxQueueSize不能动态更改,我们希望允许您动态更改影响拒绝的队列大小。 | 默认值:5。 注意:如果maxQueueSize == -1,则此属性不适用。 |
keepAliveTimeMinutes | 此属性设置保持活动时间,以分钟为单位。 | 默认值:1 |
allowMaximumSizeToDivergeFromCoreSize | 此属性允许maximumSize的配置生效。 那么该值可以等于或高于coreSize。 设置coreSize <maximumSize会创建一个线程池,该线程池可以支持maximumSize并发,但在相对不活动期间将向系统返回线程。 (以keepAliveTimeInMinutes为准) | 默认值:false |
metrics.rollingStats.timeInMilliseconds | 此属性设置statistical rolling窗口的持续时间(以毫秒为单位)。 这是为线程池保留多长时间。 | 默认值:10000 |
metrics.rollingStats.numBuckets | 此属性设置滚动统计窗口划分的桶数。 注意:以下必须为true - “metrics.rollingStats.timeInMilliseconds%metrics.rollingStats.numBuckets == 0” -否则将引发异常。 |
coreSize配置
消费接口的QPS我们期望能支撑1000QPS,接口99line约为100ms,6台写机器。 \(coreSize=1000*0.1/6=16.7\) 20个核心线程没有问题,问题是当QPS上涨会导致接口时延上涨,接口99line预计会上涨到300ms。 \(coreSize=1000*0.3/6=50\) 那么我们的核心线程数需要配到50。
maximumSize配置
默认值为10。且allowMaximumSizeToDivergeFromCoreSize
配置没有打开。因此会导致线程池无法扩容。
queueSizeRejectionThreshold
这里只配置了5。虽然maxQueueSize配置了100,但实际上没有任何意义,只要队列到5的时候就会拒绝,100只是队列的大小(BlockingQueue的大小)。
HystrixThreadPool
@Override
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
因此我们可以考虑通过调整queueSizeRejectionThreshold的大小来保证请求入队成功(然而队列变长又会导致执行结果太长)。
信号量
线程池的模式本身是一个比较损耗资源的方式(需要增加一个额外的线程池),使用信号量是否一个更好的方式?
@Test
public void testSemaphore(){
StopWatch sw = new StopWatch("test");
sw.start();
Boolean execute = new SemaphoreCommand().execute();
sw.stop();
LOGGER.info("result:{}...耗时:{}",execute,sw.getTotalTimeMillis());
}
public static class SemaphoreCommand extends HystrixCommand<Boolean> {
public SemaphoreCommand() {
super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("accountBalanceLocalGroup"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
.withExecutionTimeoutInMilliseconds(100)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(100)));
}
@Override
protected Boolean run() throws Exception {
LOGGER.info("thread {} is doing...",Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
@Override
protected Boolean getFallback() {
Throwable e = getExecutionException();
if(e!=null) {
LOGGER.error(e.getMessage(), e);
}
return false;
}
}
然而由于Hystrix的信号量模式无法严格限制线程的执行时间(由于线程共享的关系,一旦中断了当前线程,相当于工作线程也中断了),虽然超时是可以执行fallback的内容,但是还是需要等待run方法执行完成。
fallback其实任务在到达超时时间(也就是配置的150ms)时,由另外的线程去执行的fallback内容。但是main线程也只能在Hystrix run方法执行完成之后才能获取到fallback中的返回值。
参考
聊聊hystrix的queueSizeRejectionThreshold参数
理解ThreadPoolExecutor线程池的corePoolSize、maximumPoolSize和poolSize