前言

最近由于Hystrix的配置问题导致在业务高峰期的时候直接阻断掉业务,趁这个机会我们好好分析一下Hystrix的配置。

@DefaultProperties(groupKey = HystrixGroupKey.DISTRIBUTE_LOCKER_COMMAND,
        commandProperties = {
                @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "300"),
                //由于redis为强依赖,所以这里不做熔断
                @HystrixProperty(name = "circuitBreaker.forceClosed", value = "true"),
        },
        threadPoolProperties = {
                @HystrixProperty(name = "coreSize", value = "20"),
                @HystrixProperty(name = "maxQueueSize", value = "200"),
                @HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),
        })
@Component
public class DistributLocker {

分析

TestCommand 是测试的Hystrix类。test方法模拟100线程并发测试。

public class HystrixTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(HystrixTest.class);
    @Test
    public void test() throws InterruptedException {
        int threadnum=100;
        CountDownLatch c=new CountDownLatch(threadnum);
        IntStream.rangeClosed(0, threadnum).forEach(i -> new TestCommand(c).queue());
        c.await();
    }

    public static class TestCommand extends HystrixCommand<Boolean> {
        private CountDownLatch c;
        public TestCommand(CountDownLatch c) {
            super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("accountBalanceLocalGroup")).
                    andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                            .withMaxQueueSize(200)//队列长度,
                            .withQueueSizeRejectionThreshold(15)//排队线程数量阈值,默认为5,达到时拒绝
                            .withCoreSize(20))//设置线程池的20
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                            .withExecutionTimeoutInMilliseconds(600)
                            .withFallbackIsolationSemaphoreMaxConcurrentRequests(100)));//调用线程允许请求HystrixCommand.GetFallback()的最大数量,默认10。超出时将会有异常抛出
            this.c=c;
        }
        @Override
        protected Boolean run() throws Exception {
            LOGGER.info("thread {} is doing...",Thread.currentThread().getId());
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            c.countDown();
            return true;
        }

        @Override
        protected Boolean getFallback() {
            Throwable e = getExecutionException();
            if(e!=null) {
                LOGGER.error(e.getMessage(), e);
            }
            c.countDown();
            return false;
        }
    }
}

测试结果我们可以发现有部分失败。我们可以看到主要是线程池队列拒绝。

java.util.concurrent.RejectedExecutionException: Rejected command because thread-pool queueSize is at rejection threshold.
	at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:103) ~[hystrix-core-1.5.12.jar:1.5.12]
	at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45) ~[rxjava-1.2.0.jar:1.2.0]
	at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30) ~[rxjava-1.2.0.jar:1.2.0]
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) [rxjava-1.2.0.jar:1.2.0]
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) [rxjava-1.2.0.jar:1.2.0]
	at rx.Observable.unsafeSubscribe(Observable.java:10151) [rxjava-1.2.0.jar:1.2.0]

参数调优

我们接下来看下线程池的一些核心参数。

参数 描述 默认值
coreSize 线程池coreSize 默认值:10。
设置标准:qps*99meantime+breathing room
maximumSize 此属性设置最大线程池大小。 这是在不开始拒绝HystrixCommands的情况下可以支持的最大并发数。 请注意,此设置仅在您还设置allowMaximumSizeToDivergeFromCoreSize时才会生效。 默认值:10
maxQueueSize 请求等待队列 默认值:-1。
如果使用正数,队列将从SynchronizeQueue改为LinkedBlockingQueue
queueSizeRejectionThreshold 此属性设置队列大小拒绝阈值 - 即使未达到maxQueueSize也将发生拒绝的人为最大队列大小。 此属性存在,因为BlockingQueue的maxQueueSize不能动态更改,我们希望允许您动态更改影响拒绝的队列大小。 默认值:5。
注意:如果maxQueueSize == -1,则此属性不适用。
keepAliveTimeMinutes 此属性设置保持活动时间,以分钟为单位。 默认值:1
allowMaximumSizeToDivergeFromCoreSize 此属性允许maximumSize的配置生效。 那么该值可以等于或高于coreSize。 设置coreSize <maximumSize会创建一个线程池,该线程池可以支持maximumSize并发,但在相对不活动期间将向系统返回线程。 (以keepAliveTimeInMinutes为准) 默认值:false
metrics.rollingStats.timeInMilliseconds 此属性设置statistical rolling窗口的持续时间(以毫秒为单位)。 这是为线程池保留多长时间。 默认值:10000
metrics.rollingStats.numBuckets 此属性设置滚动统计窗口划分的桶数。
注意:以下必须为true - “metrics.rollingStats.timeInMilliseconds%metrics.rollingStats.numBuckets == 0” -否则将引发异常。
 

coreSize配置

消费接口的QPS我们期望能支撑1000QPS,接口99line约为100ms,6台写机器。 \(coreSize=1000*0.1/6=16.7\) 20个核心线程没有问题,问题是当QPS上涨会导致接口时延上涨,接口99line预计会上涨到300ms。 \(coreSize=1000*0.3/6=50\) 那么我们的核心线程数需要配到50。

maximumSize配置

默认值为10。且allowMaximumSizeToDivergeFromCoreSize配置没有打开。因此会导致线程池无法扩容。

queueSizeRejectionThreshold

这里只配置了5。虽然maxQueueSize配置了100,但实际上没有任何意义,只要队列到5的时候就会拒绝,100只是队列的大小(BlockingQueue的大小)。

HystrixThreadPool

@Override
        public boolean isQueueSpaceAvailable() {
            if (queueSize <= 0) {
                // we don't have a queue so we won't look for space but instead
                // let the thread-pool reject or not
                return true;
            } else {
                return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
            }
        }

因此我们可以考虑通过调整queueSizeRejectionThreshold的大小来保证请求入队成功(然而队列变长又会导致执行结果太长)。

信号量

线程池的模式本身是一个比较损耗资源的方式(需要增加一个额外的线程池),使用信号量是否一个更好的方式?

@Test
    public void testSemaphore(){
        StopWatch sw = new StopWatch("test");
        sw.start();
        Boolean execute = new SemaphoreCommand().execute();
        sw.stop();
        LOGGER.info("result:{}...耗时:{}",execute,sw.getTotalTimeMillis());


    }

    public static class SemaphoreCommand extends HystrixCommand<Boolean> {
        public SemaphoreCommand() {
            super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("accountBalanceLocalGroup"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                            .withExecutionTimeoutInMilliseconds(100)
                            .withFallbackIsolationSemaphoreMaxConcurrentRequests(100)));
        }
        @Override
        protected Boolean run() throws Exception {
            LOGGER.info("thread {} is doing...",Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return true;
        }

        @Override
        protected Boolean getFallback() {
            Throwable e = getExecutionException();
            if(e!=null) {
                LOGGER.error(e.getMessage(), e);
            }
            return false;
        }
    }

然而由于Hystrix的信号量模式无法严格限制线程的执行时间(由于线程共享的关系,一旦中断了当前线程,相当于工作线程也中断了),虽然超时是可以执行fallback的内容,但是还是需要等待run方法执行完成。

fallback其实任务在到达超时时间(也就是配置的150ms)时,由另外的线程去执行的fallback内容。但是main线程也只能在Hystrix run方法执行完成之后才能获取到fallback中的返回值。

参考

Hystrix使用说明,配置参数说明

聊聊hystrix的queueSizeRejectionThreshold参数

理解ThreadPoolExecutor线程池的corePoolSize、maximumPoolSize和poolSize

Hystrix信号量模式支持超时时间吗