一、概要
上一篇文章介绍了Hytrix的熔断判断逻辑。里面涉及到一个很重要的组件就是HystrixCommandMetrics,请求的成功数,失败的数量就是使用这个组件来实现。那么接下来我们来重点分析下这个组件的实现方式。 PS:由于hystrix使用了rxjava来实现,阅读起来有时候会比较费力,这里的目标是为了让大家了解hytrix的统计逻辑实现,不会对rxjava的语法做过多的解析。
二、分析
1.AbstractCommand.java
这里尝试从入口方法去解析。
//这个是AbstractCommand的核心方法,执行HystrixCommand的execute最终回调用到这里。
public Observable<R> toObservable() {
...
--命令执行完成后会调用这个方法
//doOnCompleted handler already did all of the SUCCESS work
//doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
final Action0 terminateCommandCleanup = new Action0() {
@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
handleCommandEnd(true); //user code did run
}
}
};
...
return afterCache
-- terminateCommandCleanup 订阅 afterCache的完成事件(完成后会执行terminateCommandCleanup的逻辑)
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
2.HystrixThreadEventStream
订阅了执行的完成事件后会把执行结果汇总到HystrixThreadEventStream。顾名思义就是一个事件流。
//onNext触发的行为,写入HystrixCommandCompletionStream,如果是使用线程池隔离的方式还会写入HystrixThreadPoolCompletionStream。
private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
@Override
public void call(HystrixCommandCompletion commandCompletion) {
HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
commandStream.write(commandCompletion);
if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
threadPoolStream.write(commandCompletion);
}
}
};
HystrixThreadEventStream(Thread thread) {
this.threadId = thread.getId();
this.threadName = thread.getName();
writeOnlyCommandStartSubject = PublishSubject.create();
writeOnlyCommandCompletionSubject = PublishSubject.create();
writeOnlyCollapserSubject = PublishSubject.create();
writeOnlyCommandStartSubject
.onBackpressureBuffer()
.doOnNext(writeCommandStartsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
//重点关注writeOnlyCommandCompletionSubject这个对象,触发onNext()方法的时候会调用writeCommandCompletionsToShardedStreams这个方法。
writeOnlyCommandCompletionSubject
.onBackpressureBuffer()
.doOnNext(writeCommandCompletionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
writeOnlyCollapserSubject
.onBackpressureBuffer()
.doOnNext(writeCollapserExecutionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
}
//executionResult为执行结果,commandKey为hystrix的命令key,主要是用来区分不同业务进行统计使用的
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
//发送命令到writeOnlyCommandCompletionSubject这个subject(被观察者)
writeOnlyCommandCompletionSubject.onNext(event);
}
接下来的操作也比较容易猜到,我们需要一个订阅者来订阅这个事件来进行汇总。 最终会把处理的结果写入HystrixThreadPoolCompletionStream和HystrixThreadPoolCompletionStream这两个流里面。
3.HealthCountsStream(订阅者)
上面我们已经知道处理的结果会写到HystrixThreadPoolCompletionStream和HystrixThreadPoolCompletionStream。接下来我们来了解最核心的统计实现逻辑HealthCountsStream。
//这个构造函数会获取HystrixCommandCompletionStream这个流来进行消息订阅。
private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
}
这里统计的逻辑是用到了滑动窗口来进行汇总统计。我们来看父类BucketedRollingCounterStream的方法。
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
//这里是滑动窗口算法的核心实现逻辑,因为使用了rxjava,所以看起来比较费力。后面考虑单独花一个章节对这部分代码进行测试。
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
this.sourceStream = bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get();
if (healthCountBucketSizeInMs == 0) {
throw new RuntimeException("You have set the bucket size to 0ms. Please set a positive number, so that the metric stream can be properly consumed");
}
final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds().get() / healthCountBucketSizeInMs;
return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs);
}
healthCountBucketSizeInMs:这里是每个bucket的统计时间 numHealthCountBuckets:这里是滑动窗口每次统计的桶数量。
这里我们通过一个简单的demo来理解hystrix的滑动窗口。通过这个例子我们可以很直观的了解滑动窗口的实现原理。每个bucket作为统计的最小单位,通过不断地统计固定数量的buctet进行统计。每次统计完成后把bucket往后移一位。
package com.netflix.hystrix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import java.util.concurrent.TimeUnit;
public class RollingWindowTest {
private static final Logger logger = LoggerFactory.getLogger(RollingWindowTest.class);
public static final Func2<Integer, Integer, Integer> INTEGER_SUM =
(integer, integer2) -> integer + integer2;
public static final Func1<Observable<Integer>, Observable<Integer>> WINDOW_SUM =
window -> window.scan(0, INTEGER_SUM).skip(3);
public static final Func1<Observable<Integer>, Observable<Integer>> INNER_BUCKET_SUM =
integerObservable -> integerObservable.reduce(0, INTEGER_SUM);
public static void main(String[] args) throws InterruptedException {
PublishSubject<Integer> publishSubject = PublishSubject.create();
SerializedSubject<Integer, Integer> serializedSubject = publishSubject.toSerialized();
serializedSubject
.window(5, TimeUnit.SECONDS) // 5秒作为一个基本块
.flatMap(INNER_BUCKET_SUM) // 基本块内数据求和
.window(3, 1) // 3个块作为一个窗口,滚动布数为1
.flatMap(WINDOW_SUM) // 窗口数据求和
.subscribe((Integer integer) ->
logger.info("[{}] call ...... {}", // 输出统计数据到日志
Thread.currentThread().getName(), integer));
// 缓慢发送数据,观察效果
for (int i=0; i<100; ++i) {
if (i < 30) {
serializedSubject.onNext(1);
} else {
serializedSubject.onNext(2);
}
Thread.sleep(1000);
}
}
}
三、总结
这次尝试从入口方法去一步步理解hytrix的统计逻辑,但是对于最核心的滑动窗口的实现逻辑,这里需要进一步的测试和调试。这部分留到下一篇文章来仔细去了解。