一、概要

hystrix的执行隔离策略有两种。一种是线程池的模式,另外一种是信号量的模式。hystrix默认的策略是线程池的模式。

对于每个command的执行会启用一个新的线程,同一个command key会使用同一个线程池,不同的command key会使用不同的线程池就进行隔离。 优点:由于command的执行隔离在不同的线程中,因此某一个command线程执行异常不会影响到其它command的执行。如果希望使用到异步的方式执行,只能选择这种模式。 缺点:由于多了线程的介入,相当于增加了线程间的上下文切换的成本。另外如果系统的command key很多,可能会影响到系统的整体性能。

个人理解跟信号量其实没有太大的关联性。本质上是跟主线程共用一个线程,只是这里增加了一个最大并发数的限制(信号量)。 优点:不涉及线程上下文切换。在大部分业务场景下其实是能满足的(eg.tomcat单请求单线程的业务,command key之间没有异步执行的需求) 缺点:某一个command执行过程中导致线程崩溃会影响到整个流程的执行(共享线程)。

二、源码分析

  1. 信号量的接口定义
    /* package */static interface TryableSemaphore {
    
         /**
          * Use like this:
          * <p>
          * 
          * <pre>
          * if (s.tryAcquire()) {
          * try {
          * // do work that is protected by 's'
          * } finally {
          * s.release();
          * }
          * }
          * </pre>
          * 
          * @return boolean
          */
         public abstract boolean tryAcquire();
    
         /**
          * ONLY call release if tryAcquire returned true.
          * <p>
          * 
          * <pre>
          * if (s.tryAcquire()) {
          * try {
          * // do work that is protected by 's'
          * } finally {
          * s.release();
          * }
          * }
          * </pre>
          */
         public abstract void release();
    
         public abstract int getNumberOfPermitsUsed();
    
     }
    

    接口的定义很简单,上层调用会先tryAcquire获得信号量,然后通过release方式释放信号量。

  2. 实现类 实现类有两种。TryableSemaphoreActual和TryableSemaphoreNoOp。
    • TryableSemaphoreActual

信号量的实现类。

这个实现类相当于不限制,线程池模式的实现类采用的方式就是这种。

/* package */static class TryableSemaphoreActual implements TryableSemaphore {
        protected final HystrixProperty<Integer> numberOfPermits;
        private final AtomicInteger count = new AtomicInteger(0);

        public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
            this.numberOfPermits = numberOfPermits;
        }

        @Override
        public boolean tryAcquire() {
            int currentCount = count.incrementAndGet();
            if (currentCount > numberOfPermits.get()) {
                count.decrementAndGet();
                return false;
            } else {
                return true;
            }
        }

        @Override
        public void release() {
            count.decrementAndGet();
        }

        @Override
        public int getNumberOfPermitsUsed() {
            return count.get();
        }

    }

这个实现类就不多加解释了,主要是用了AtomicInteger做加减的操作。 3.调用层

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
       ...

        /* determine if we're allowed to execute */
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
//释放信号量的操作
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
//获取信号量
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }

三、总结

在实际的工作中对信号量和线程池的选择还是要慎重,事实上对于一般场景的单请求单线程的业务,信号量是能够满足需求的。 对于需要异步(command并行执行)的业务需要使用线程池的方式来实现。