正文
摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-second-isolation-strategy/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Hystrix 1.5.X 版本
🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:
RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都 将得到认真 回复。甚至不知道如何读源码也可以请教噢 。
新的 源码解析文章实时 收到通知。每周更新一篇左右 。
认真的 源码交流微信群。
1. 概述
本文主要分享 Hystrix 命令执行(二)之执行隔离策略 。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
Hystrix 提供两种执行隔离策略( ExecutionIsolationStrategy ) :
两种方式的优缺点比较 ,推荐阅读 《【翻译】Hystrix文档-实现原理》「依赖隔离」 。
推荐 Spring Cloud 书籍 :
2. HystrixThreadPoolProperties
com.netflix.hystrix.HystrixThreadPoolProperties
,Hystrix 线程池属性配置抽象类 ,点击 链接 查看,已添加中文注释说明。
com.netflix.hystrix.strategy.properties.HystrixPropertiesThreadPoolDefault
,Hystrix 线程池配置实现类 ,点击 链接 查看。实际上没什么内容,官方如是说 :
Default implementation of {@link HystrixThreadPoolProperties} using Archaius (https://github.com/Netflix/archaius)
3. HystrixThreadPoolKey
com.netflix.hystrix.HystrixThreadPoolKey
,Hystrix 线程池标识接口 。
FROM HystrixThreadPoolKey 接口注释
A key to represent a {@link HystrixThreadPool} for monitoring, metrics publishing, caching and other such uses.
This interface is intended to work natively with Enums so that implementing code can be an enum that implements this interface.
直白的说 ,希望通过相同的 name
( 标识 ) 获得同 HystrixThreadPoolKey 对象。通过在内部维持一个 name
与 HystrixThreadPoolKey 对象的映射,以达到枚举 的效果。
HystrixThreadPoolKey 代码如下 :
1 : public interface HystrixThreadPoolKey extends HystrixKey {
2 : class Factory {
3 : private Factory () {
4 : }
5 :
6 :
7 : private static final InternMap<String, HystrixThreadPoolKey> intern
8 : = new InternMap<String, HystrixThreadPoolKey>(
9 : new InternMap.ValueConstructor<String, HystrixThreadPoolKey>() {
10 : @Override
11 : public HystrixThreadPoolKey create (String key) {
12 : return new HystrixThreadPoolKeyDefault(key);
13 : }
14 : });
15 :
16 : public static HystrixThreadPoolKey asKey (String name) {
17 : return intern.interned(name);
18 : }
19 :
20 : private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey {
21 : public HystrixThreadPoolKeyDefault (String name) {
22 : super (name);
23 : }
24 : }
25 :
26 : static int getThreadPoolCount () {
27 : return intern.size();
28 : }
29 : }
30 : }
HystrixThreadPoolKey 实现 com.netflix.hystrix.HystrixKey
接口 ,点击 链接 查看。该接口定义的 #name()
方法,即是上文我们所说的标识( Key )。
intern
属性,name
与 HystrixThreadPoolKey 对象的映射,以达到枚举 的效果。
com.netflix.hystrix.util.InternMap
,点击 链接 查看带中文注释的代码。
#asKey(name)
方法,从 intern
获得 HystrixThreadPoolKey 对象。
#getThreadPoolCount()
方法,获得 HystrixThreadPoolKey 数量。
在 AbstractCommand 构造方法 里,初始化命令的 threadPoolKey
属性,代码如下 :
protected final HystrixThreadPoolKey threadPoolKey;
protected AbstractCommand (HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this .commandGroup = initGroupKey(group);
this .commandKey = initCommandKey(key, getClass());
this .properties = initCommandProperties(this .commandKey, propertiesStrategy, commandPropertiesDefaults);
this .threadPoolKey = initThreadPoolKey(threadPoolKey, this .commandGroup, this .properties.executionIsolationThreadPoolKeyOverride().get());
}
调用 #initThreadPoolKey(...)
方法,创建最终的 threadPoolKey
属性。代码如下 :
private static HystrixThreadPoolKey initThreadPoolKey (HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
if (threadPoolKeyOverride == null ) {
if (threadPoolKey == null ) {
return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
} else {
return threadPoolKey;
}
} else {
return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
}
}
优先级 :threadPoolKeyOverride
> threadPoolKey
> groupKey
4. HystrixConcurrencyStrategy
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy
,Hystrix 并发策略抽象类 。
HystrixConcurrencyStrategy#getThreadPool(...)
方法,代码如下 :
1 : public ThreadPoolExecutor getThreadPool (final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
2 : final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
3 :
4 : final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
5 : final int dynamicCoreSize = threadPoolProperties.coreSize().get();
6 : final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
7 : final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
8 :
9 : final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
10 :
11 : if (allowMaximumSizeToDivergeFromCoreSize) {
12 : final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
13 : if (dynamicCoreSize > dynamicMaximumSize) {
14 : logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
15 : dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
16 : dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value" );
17 : return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
18 : } else {
19 : return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
20 : }
21 : } else {
22 : return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
23 : }
24 : }
第 2 行 :调用 #getThreadFactory(...)
方法,获得 ThreadFactory 。点击 链接 查看方法代码。
PlatformSpecific#getAppEngineThreadFactory()
方法,无需细看,适用于 Google App Engine 场景。
第 4 至 7 行 :「2. HystrixThreadPoolProperties」 有详细解析。
第 9 行 :调用 #getBlockingQueue()
方法,获得线程池的阻塞队列。点击 链接 查看方法代码。
当 maxQueueSize <= 0
时( 默认值 :-1
) 时,使用 SynchronousQueue 。超过线程池的 maximumPoolSize
时,提交任务被拒绝 。
当 SynchronousQueue > 0
时,使用 LinkedBlockingQueue 。超过线程池的 maximumPoolSize
时,任务被拒绝。超过线程池的 maximumPoolSize
+ 线程池队列的 maxQueueSize
时,提交任务被阻塞等待 。
推荐 :《聊聊并发(三)——JAVA线程池的分析和使用》
推荐 :《聊聊并发(七)——Java中的阻塞队列》
第 11 至 23 行 :创建 ThreadPoolExecutor 。看起来代码比较多,根据 allowMaximumSizeToDivergeFromCoreSize
的情况,计算线程池的 maximumPoolSize
属性。计算的方式和 HystrixThreadPoolProperties#actualMaximumSize()
方法是一致的。
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault
,Hystrix 并发策略实现类 。代码如下( 基本没做啥 ) :
public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {
private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();
public static HystrixConcurrencyStrategy getInstance () {
return INSTANCE;
}
private HystrixConcurrencyStrategyDefault () {
}
}
在 AbstractCommand 构造方法 里,初始化命令的 threadPoolKey
属性,代码如下 :
protected final HystrixConcurrencyStrategy concurrencyStrategy;
protected AbstractCommand (HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this .concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
}
HystrixPlugins ,Hystrix 插件 体系,github.com/Netflix/Hys… 有详细解析。
调用 HystrixPlugins#getConcurrencyStrategy()
获得 HystrixConcurrencyStrategy 对象。默认情况下,使用 HystrixConcurrencyStrategyDefault 。当然你也可以参考 Hystrix 插件体系,实现自定义 的 HystrixConcurrencyStrategy 实现,以达到覆写 #getThreadPool()
,#getBlockingQueue()
等方法。点击 链接 查看该方法代码。
5. HystrixThreadPool
com.netflix.hystrix.HystrixThreadPool
,Hystrix 线程池接口 。当 Hystrix 命令使用 THREAD
执行隔离策略时,HystrixCommand#run()
方法在线程池执行 。点击 链接 查看。HystrixThreadPool 定义接口如下 :
#getExecutor()
:获得 ExecutorService 。
#getScheduler()
/ #getScheduler(Func0<Boolean>)
:获得 RxJava Scheduler 。
#isQueueSpaceAvailable()
:线程池队列是否有空余 。
#markThreadExecution()
/ #markThreadCompletion()
/ #markThreadRejection()
:TODO 【2002】【metrics】
5.1 HystrixThreadPoolDefault
com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault
,Hystrix 线程池实现类 。
构造方法 ,代码如下 :
1 : private final HystrixThreadPoolProperties properties;
2 : private final BlockingQueue<Runnable> queue;
3 : private final ThreadPoolExecutor threadPool;
4 : private final HystrixThreadPoolMetrics metrics;
5 : private final int queueSize;
6 :
7 : public HystrixThreadPoolDefault (HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
8 :
9 : this .properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
10 :
11 : HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
12 :
13 : this .queueSize = properties.maxQueueSize().get();
14 :
15 :
16 : this .metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
17 : concurrencyStrategy.getThreadPool(threadPoolKey, properties),
18 : properties);
19 :
20 :
21 : this .threadPool = this .metrics.getThreadPool();
22 : this .queue = this .threadPool.getQueue();
23 :
24 :
25 :
26 : HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this .metrics, this .properties);
27 : }
第 9 行 :初始化 HystrixThreadPoolProperties 。
第 11 行 :初始化 HystrixConcurrencyStrategy 。
第 13 行 :初始化 queueSize
。
第 16 至 18 行 :TODO 【2002】【metrics】
第 17 行 :调用 HystrixConcurrencyStrategy#getThreadPool(...)
方法,初始化 ThreadPoolExecutor 。
第 21 行 :获得 ThreadPoolExecutor 。
第 22 行 :获得 ThreadPoolExecutor 的队列。
第 26 行 :TODO 【2002】【metrics】
#getExecutor()
方法,代码如下 :
@Override
public ThreadPoolExecutor getExecutor () {
touchConfig();
return threadPool;
}
调用 #touchConfig()
方法,动态 调整 threadPool
的 coreSize
/ maximumSize
/ keepAliveTime
参数。点击 链接 查看该方法。
#getScheduler()
/ #getScheduler(Func0<Boolean>)
方法,代码如下 :
@Override
public Scheduler getScheduler () {
return getScheduler(new Func0<Boolean>() {
@Override
public Boolean call () {
return true ;
}
});
}
@Override
public Scheduler getScheduler (Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this , shouldInterruptThread);
}
#isQueueSpaceAvailable()
方法,代码如下 :
@Override
public boolean isQueueSpaceAvailable () {
if (queueSize <= 0 ) {
return true ;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
由于线程池的队列大小不能动态 调整,该方法的实现 通过 HystrixThreadPoolProperties.queueSizeRejectionThreshold 属性控制。
注意 queueSize
属性,决定了线程池的队列类型。
queueSize <= 0
时,#isQueueSpaceAvailable()
都返回 true
的原因是,线程池使用 SynchronousQueue 作为队列,不支持新 任务排队,任务超过线程池的 maximumPoolSize
时,新任务被拒绝。
queueSize > 0
时,#isQueueSpaceAvailable()
根据情况true
/false
的原因是,线程池使用 LinkedBlockingQueue 作为队列,支持一定数量 的阻塞 排队,但是这个数量无法调整。通过 #isQueueSpaceAvailable()
方法的判断,动态 调整。另外,初始配置 的 queueSize
要相对大 ,否则即使 queueSizeRejectionThreshold
配置的大于 queueSize
,实际提交任务到线程池,也会被拒绝 。
5.2 Factory
com.netflix.hystrix.HystrixThreadPool.Factory
,HystrixThreadPool 工厂类,不仅限于 HystrixThreadPool 的创建,也提供了 HystrixThreadPool 的管理( HystrixThreadPool 的容器 )。
threadPools
属性,维护创建的 HystrixThreadPool 对应的映射,代码如下 :
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
Key 为 HystrixThreadPoolKey#name()
,每个 HystrixThreadPoolKey 对应一个 HystrixThreadPool 对象。
#getInstance(...)
方法,获得 HystrixThreadPool 对象,代码如下 :
static HystrixThreadPool getInstance (HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
String key = threadPoolKey.name();
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null ) {
return previouslyCached;
}
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
根据 threadPoolKey
先从 threadPool
获取已创建的 HystrixThreadPool ;获取不到,创建对应的 HystrixThreadPool 返回,并添加到 threadPool
。
#shutdown()
/ #shutdown(timeout, unit)
方法,比较易懂,点击 链接 查看。
5.3 初始化
在 AbstractCommand 构造方法 里,初始化命令的 threadPool
属性,代码如下 :
protected final HystrixThreadPool threadPool;
protected AbstractCommand (HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this .threadPoolKey = initThreadPoolKey(threadPoolKey, this .commandGroup, this .properties.executionIsolationThreadPoolKeyOverride().get());
this .threadPool = initThreadPool(threadPool, this .threadPoolKey, threadPoolPropertiesDefaults);
}
调用 #initThreadPool(...)
方法,获得 HystrixThreadPool ,点击 链接 查看。
6. HystrixScheduler
Hystrix 实现了自定义的 RxJava Scheduler ,整体类图如下 :
HystrixContextScheduler ( 实现 RxJava Scheduler 抽象类 ),内嵌类型为 ThreadPoolScheduler ( 实现 RxJava Scheduler 抽象类 )的 actualScheduler
属性。
HystrixContextWorker ( 实现 RxJava Worker 抽象类 ),内嵌类型为 ThreadPoolWorker ( 实现 RxJava Worker 抽象类 )的 worker
属性。
6.1 HystrixContextScheduler
构造方法 ,代码如下 :
public class HystrixContextScheduler extends Scheduler {
private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scheduler actualScheduler;
private final HystrixThreadPool threadPool;
public HystrixContextScheduler (HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this .concurrencyStrategy = concurrencyStrategy;
this .threadPool = threadPool;
this .actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
}
actualScheduler
属性,类型为 ThreadPoolScheduler 。
#createWorker()
方法,代码如下 :
@Override
public Worker createWorker () {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
使用 actualScheduler
创建 ThreadPoolWorker ,传参给 HystrixContextSchedulerWorker 。
6.2 HystrixContextSchedulerWorker
构造方法 ,代码如下 :
private class HystrixContextSchedulerWorker extends Worker {
private final Worker worker;
private HystrixContextSchedulerWorker (Worker actualWorker) {
this .worker = actualWorker;
}
}
worker
属性,类型为 ThreadPoolWorker 。
#schedule(Action0)
方法,代码如下 :
@Override
public Subscription schedule (Action0 action) {
if (threadPool != null ) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold." );
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
调用 ThreadPool#isQueueSpaceAvailable()
方法,判断线程池队列是否有空余 。这个就是 HystrixContextScheduler 的实际 用途。
#unsubscribe()
/ #isUnsubscribed()
方法,使用 worker
判断,点击 链接 查看。
6.3 ThreadPoolScheduler
ThreadPoolScheduler 比较简单,点击 链接 查看。
6.4 ThreadPoolWorker
构造方法 ,代码如下 :
private static class ThreadPoolWorker extends Worker {
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;
public ThreadPoolWorker (HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this .threadPool = threadPool;
this .shouldInterruptThread = shouldInterruptThread;
}
}
#schedule(Action0)
方法,代码如下 :
1 : @Override
2 : public Subscription schedule (final Action0 action) {
3 :
4 : if (subscription.isUnsubscribed()) {
5 :
6 : return Subscriptions.unsubscribed();
7 : }
8 :
9 :
10 :
11 : ScheduledAction sa = new ScheduledAction(action);
12 :
13 :
14 : subscription.add(sa);
15 : sa.addParent(subscription);
16 :
17 :
18 : ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
19 : FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
20 : sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
21 :
22 : return sa;
23 : }
第 4 至 7 行 :未订阅,返回。
第 11 行 : 创建 ScheduledAction 。在 TODO 【2013】【ScheduledAction】 详细解析。
第 14 至 15 行 :添加到订阅( subscription
)。
第 18 至 20 行 :使用 threadPool
,提交任务,并创建 FutureCompleterWithConfigurableInterrupt 添加到订阅( sa
)。
第 22 行 :返回订阅( sa
)。整体订阅关系如下 :
#unsubscribe()
/ #isUnsubscribed()
方法,使用 subscription
判断,点击 链接 查看。
6.5 FutureCompleterWithConfigurableInterrupt
com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.FutureCompleterWithConfigurableInterrupt
,实现类似 rx.internal.schedulers.ScheduledAction.FutureCompleter
,在它的基础上,支持配置 FutureTask#cancel(Boolean)
是否可打断 运行( mayInterruptIfRunning
)。
构造方法 ,代码如下 :
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
private FutureCompleterWithConfigurableInterrupt (FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this .f = f;
this .shouldInterruptThread = shouldInterruptThread;
this .executor = executor;
}
}
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe()
方法,取消执行 。
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe()
方法,取消执行 。
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe()
方法,取消执行 。
#unsubscribe()
方法,代码如下 :
@Override
public void unsubscribe () {
executor.remove(f);
if (shouldInterruptThread.call()) {
f.cancel(true );
} else {
f.cancel(false );
}
}
根据 shouldInterruptThread
方法,判断是否强制 取消。
shouldInterruptThread
对应的方法,实现代码如下 :
subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call () {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
当 executionIsolationThreadInterruptOnTimeout = true
时,命令可执行超时 。当命令可执行超时 时,强制 取消。
当使用 HystrixCommand.queue()
返回的 Future ,可以使用 Future#cancel(Boolean)
取消命令执行。从 shouldInterruptThread
对应的方法可以看到,如果此时不满足命令执行超时 的条件,命令执行取消的方式是非强制 的。此时当 executionIsolationThreadInterruptOnFutureCancel = true
时,并且调用 Future#cancel(Boolean)
传递 mayInterruptIfRunning = true
,强制取消命令执行。
666. 彩蛋
一边写一边想明白了 RxJava 的一些东西,挺舒服的赶脚。
继续 Go On ~ 周末嗨不停。
胖友,分享一波朋友圈可好!