一、概要

上一篇文章介绍了Hytrix的熔断判断逻辑。里面涉及到一个很重要的组件就是HystrixCommandMetrics,请求的成功数,失败的数量就是使用这个组件来实现。那么接下来我们来重点分析下这个组件的实现方式。 PS:由于hystrix使用了rxjava来实现,阅读起来有时候会比较费力,这里的目标是为了让大家了解hytrix的统计逻辑实现,不会对rxjava的语法做过多的解析。

二、分析

断路器整体架构

1.AbstractCommand.java

这里尝试从入口方法去解析。

AbstractCommand调用栈

//这个是AbstractCommand的核心方法,执行HystrixCommand的execute最终回调用到这里。
public Observable<R> toObservable() {
...
--命令执行完成后会调用这个方法
//doOnCompleted handler already did all of the SUCCESS work
        //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
        final Action0 terminateCommandCleanup = new Action0() {

            @Override
            public void call() {
                if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                    handleCommandEnd(false); //user code never ran
                } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                    handleCommandEnd(true); //user code did run
                }
            }
        };
...
return afterCache
-- terminateCommandCleanup 订阅 afterCache的完成事件(完成后会执行terminateCommandCleanup的逻辑)
                        .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                        .doOnCompleted(fireOnCompletedHook);
2.HystrixThreadEventStream

订阅了执行的完成事件后会把执行结果汇总到HystrixThreadEventStream。顾名思义就是一个事件流。

//onNext触发的行为,写入HystrixCommandCompletionStream,如果是使用线程池隔离的方式还会写入HystrixThreadPoolCompletionStream。
private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
        @Override
        public void call(HystrixCommandCompletion commandCompletion) {
            HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
            commandStream.write(commandCompletion);

            if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
                HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
                threadPoolStream.write(commandCompletion);
            }
        }
    };

HystrixThreadEventStream(Thread thread) {
        this.threadId = thread.getId();
        this.threadName = thread.getName();
        writeOnlyCommandStartSubject = PublishSubject.create();
        writeOnlyCommandCompletionSubject = PublishSubject.create();
        writeOnlyCollapserSubject = PublishSubject.create();

        writeOnlyCommandStartSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandStartsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());
//重点关注writeOnlyCommandCompletionSubject这个对象,触发onNext()方法的时候会调用writeCommandCompletionsToShardedStreams这个方法。
        writeOnlyCommandCompletionSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandCompletionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());

        writeOnlyCollapserSubject
                .onBackpressureBuffer()
                .doOnNext(writeCollapserExecutionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());
    }

//executionResult为执行结果,commandKey为hystrix的命令key,主要是用来区分不同业务进行统计使用的
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
//发送命令到writeOnlyCommandCompletionSubject这个subject(被观察者)
        writeOnlyCommandCompletionSubject.onNext(event);
    }

接下来的操作也比较容易猜到,我们需要一个订阅者来订阅这个事件来进行汇总。 最终会把处理的结果写入HystrixThreadPoolCompletionStream和HystrixThreadPoolCompletionStream这两个流里面。

HystrixThreadPoolCompletionStream

3.HealthCountsStream(订阅者)

上面我们已经知道处理的结果会写到HystrixThreadPoolCompletionStream和HystrixThreadPoolCompletionStream。接下来我们来了解最核心的统计实现逻辑HealthCountsStream。

HealthCountsStream类图

//这个构造函数会获取HystrixCommandCompletionStream这个流来进行消息订阅。
private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
                               Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
        super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
    }

这里统计的逻辑是用到了滑动窗口来进行汇总统计。我们来看父类BucketedRollingCounterStream的方法。

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                           final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           final Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
//这里是滑动窗口算法的核心实现逻辑,因为使用了rxjava,所以看起来比较费力。后面考虑单独花一个章节对这部分代码进行测试。
        Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
            @Override
            public Observable<Output> call(Observable<Bucket> window) {
                return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
            }
        };
        this.sourceStream = bucketedStream      //stream broken up into buckets
                .window(numBuckets, 1)          //emit overlapping windows of buckets
                .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer
    }
public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
        final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get();
        if (healthCountBucketSizeInMs == 0) {
            throw new RuntimeException("You have set the bucket size to 0ms.  Please set a positive number, so that the metric stream can be properly consumed");
        }
        final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds().get() / healthCountBucketSizeInMs;

        return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs);
    }

healthCountBucketSizeInMs:这里是每个bucket的统计时间 numHealthCountBuckets:这里是滑动窗口每次统计的桶数量。

这里我们通过一个简单的demo来理解hystrix的滑动窗口。通过这个例子我们可以很直观的了解滑动窗口的实现原理。每个bucket作为统计的最小单位,通过不断地统计固定数量的buctet进行统计。每次统计完成后把bucket往后移一位。

package com.netflix.hystrix;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;

import java.util.concurrent.TimeUnit;

public class RollingWindowTest {
    private static final Logger logger = LoggerFactory.getLogger(RollingWindowTest.class);

    public static final Func2<Integer, Integer, Integer> INTEGER_SUM =
            (integer, integer2) -> integer + integer2;

    public static final Func1<Observable<Integer>, Observable<Integer>> WINDOW_SUM =
            window -> window.scan(0, INTEGER_SUM).skip(3);

    public static final Func1<Observable<Integer>, Observable<Integer>> INNER_BUCKET_SUM =
            integerObservable -> integerObservable.reduce(0, INTEGER_SUM);

    public static void main(String[] args) throws InterruptedException {
        PublishSubject<Integer> publishSubject = PublishSubject.create();
        SerializedSubject<Integer, Integer> serializedSubject = publishSubject.toSerialized();

        serializedSubject
                .window(5, TimeUnit.SECONDS) // 5秒作为一个基本块
                .flatMap(INNER_BUCKET_SUM)           // 基本块内数据求和
                .window(3, 1)              // 3个块作为一个窗口,滚动布数为1
                .flatMap(WINDOW_SUM)                 // 窗口数据求和
                .subscribe((Integer integer) ->
                        logger.info("[{}] call ...... {}", // 输出统计数据到日志
                                Thread.currentThread().getName(), integer));

        // 缓慢发送数据,观察效果
        for (int i=0; i<100; ++i) {
            if (i < 30) {
                serializedSubject.onNext(1);
            } else {
                serializedSubject.onNext(2);
            }
            Thread.sleep(1000);
        }
    }
}

三、总结

这次尝试从入口方法去一步步理解hytrix的统计逻辑,但是对于最核心的滑动窗口的实现逻辑,这里需要进一步的测试和调试。这部分留到下一篇文章来仔细去了解。

四、参考文献

Hystrix核心基础 - 滑动窗口创建过程及demo