专栏名称: 郭霖
Android技术分享平台,每天都有优质技术文章推送。你还可以向公众号投稿,将自己总结的技术心得分享给大家。
目录
相关文章推荐
郭霖  ·  一文了解 Gradle 插件 ·  昨天  
鸿洋  ·  ANR?谁控制了触发时间? ·  昨天  
郭霖  ·  使用 Jetpack Compose ... ·  4 天前  
51好读  ›  专栏  ›  郭霖

一起来造一个RxJava,揭秘RxJava的实现原理

郭霖  · 公众号  · android  · 2017-06-26 08:00

正文

今日科技快讯

北京时间6月22日下午,国家新闻出版广电总局发布通知称,已于近日责成属地管理部门采取有效措施关停微博等网站的视听节目服务,进行全面整改。按照《互联网视听节目服务管理规定》(广电总局、信息产业部第56号令),只有取得《信息网络传播视听节目许可证》的微博用户,才能上传视听节目,没有取得《信息网络传播视听节目许可证》的用户,不能上传视听节目。微博用户上传非节目类视频不受影响。微博将按照国家相关法律法规和管理要求,严格加强视听节目的管理,进一步规范视频服务,也会及时向社会和媒体公开,接受社会监督。

作者简介

周末的天气变化无常,小伙伴们是不是也宅了两天呢?新的一周也要继续努力学习!

本篇是 TellH 的投稿,分享了 揭秘RxJava的实现原理,希望能够帮助到大家。

TellH 的博客地址:

http://my.csdn.net/TellH

前言

RxJava 是一个神奇的框架,用法很简单,但内部实现有点复杂,代码逻辑有点绕。我读源码时,确实有点似懂非懂的感觉。网上关于RxJava源码分析的文章,源码贴了一大堆,代码逻辑绕来绕去的,让人看得云里雾里的。既然用拆轮子的方式来分析源码比较难啃,不如换种方式,以造轮子的方式,将源码中与性能、兼容性、扩展性有关的代码剔除,留下核心代码带大家揭秘 RxJava 的实现原理。

什么是响应式编程

首先,我们需要明确,RxJava 是 Reactive Programming 在 Java 中的一种实现。什么是响应式编程? 
用一个字来概括就是流( Stream )。Stream 就是一个按时间排序的 Events 序列,它可以放射三种不同的 Events:(某种类型的) Value、Error 或者一个” Completed ” Signal。通过分别为 Value、Error、” Completed ”定义事件处理函数,我们将会异步地捕获这些 Events。基于观察者模式,事件流将从上往下,从订阅源传递到观察者。

至于使用 Rx 框架的优点,它可以避免回调嵌套,更优雅地切换线程实现异步处理数据。配合一些操作符,可以让处理事件流的代码更加简洁,逻辑更加清晰。

搭建大体的框架

要造一座房子,首先要把大体的框架搭好。在 RxJava 里面,有两个必不可少的角色:Subscriber(观察者) 和 Observable(订阅源)。

Subscriber(观察者)

Subsribler 在 RxJava 里面是一个抽象类,它实现了 Observer 接口。

为了尽可能的简单,将 Subscriber 简化如下:

Observable(订阅源)

Observable(订阅源)在 RxJava 里面是一个大而杂的类,拥有很多工厂方法和各式各样的操作符。每个 Observable 里面有一个 OnSubscribe 对象,只有一个方法(

void call(Subscriber super T> subscriber);

),用来产生数据流,这是典型的命令模式。

实践

到此,一个小型的 RxJava 的雏形就出来了。不信?我们来实践一下吧。

添加操作符

其实,强大的 RxJava 的核心原理并没有想象中那么复杂和神秘,运用的就是典型的观察者模式。有了基本雏形之后,我们继续为这个框架添砖加瓦吧。RxJava 之所以强大好用,与其拥有丰富灵活的操作符是分不开的。那么我们就试着为这个框架添加一个最常用的操作符:map。

那么 RxJava 是如何实现操作符的呢?其实,每调用一次操作符的方法,就相当于在上层数据源和下层观察者之间桥接了一个新的 Observable。桥接的 Observable 内部会实例化有新的 OnSuscribe和Subscriber。OnSuscribe 负责接受目标 Subscriber 传来的订阅请求,并调用源Observable.OnSubscribe的subscribe方法。源 Observable.OnSubscribe将 Event 往下发送给桥接 Observable.Subscriber,最终桥接 Observable.Subscriber 将 Event 做相应处理后转发给目标Subscriber。流程如下图所示:

接着,我们用代码实现这一过程。在 Observable 类里面添加如下代码:

map 操作符的作用是将T类型的 Event 转化成 R 类型,转化策略抽象成 Transformer(RxJava 中用的是Func1,但为了便于理解,起了一个有意义的名字)这一个函数接口,由外部传入。

上面代码中使用到一些泛型的通配符,有些地方使用了 super,有些地方使用了 extends,其实这是有讲究的,传给 Transformer#call 方法的参数是T类型的,那么 call 方法的参数类型可以声明成是 T 的父类,Transformer#call 方法的返回值要求是 R 类型的,那么它的返回值类型应该声明成 R 的子类。如果大家不能理解,也可以不用在意这些细节。

那么我们一起来测试一下吧。

但是,我们看到 map() 方法内内部类有点多,代码缺少拓展性和可读性,我们应该进行适当地重构,将主要的逻辑抽离成独立的模块,并保证模块间尽量解耦,否则 Observable 只会越来越臃肿。

添加线程切换功能

RxJava 中最激动人心的功能是异步处理,能够自如地切换线程。利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。 observeOn() 可以多次调用,实现了线程的多次切换,最终目标 Subscriber 的执行线程与最后一次 observeOn() 的调用有关。但 subscribeOn() 多次调用只有第一个 subscribeOn() 起作用。为什么呢?因为 observeOn() 作用的是 Subscriber,而 subscribeOn() 作用的是 OnSubscribe。

这里借用扔物线的图:

简单地调用一个方法就可以完成线程的切换,很神奇对吧。RxJava 是如何实现的呢?除了桥接 Observable 以外,RxJava 还用到一个很关键的类— Scheduler(调度器)。文档中给 Scheduler 的定义是:A Scheduler is an object that schedules units of work.,也就是进行任务的调度的一个东西。Scheduler 里面有一个重要的抽象方法:

public abstract Worker createWorker();

Worker 是 Scheduler 的内部类,它是具体任务的执行者。当要提交任务给 Worker 执行需要调用 Worker 的 schedule(Action0 aciton)方法。

public abstract Subscription schedule(Action0 action);

要获得一个 Scheduler 并不需要我们去 new,一般是调用 Schedulers 的工厂方法。

具体的 Scheduler 的实现类就不带大家一起看了,但我们需要知道,能做到线程切换的关键 Worker 的 schedule 方法,因为它会把传过来的任务放入线程池,或新线程中执行,这取决于具体 Scheduler 的实现。

自定义 Scheduler

那么,下面我们先来自定义一个简单的 Scheduler 和 Worker。

为了达到高仿效果,我们也提供相应的工厂方法。

实现 subscribeOn

subscribeOn 是作用于上层 OnSubscribe 的,可以让 OnSubscribe 的 call 方法在新线程中执行。

因此,在 Observable 类里面,添加如下代码:

测试一下:

实现 observerOn

subscribeOn 是作用于下层 Subscriber 的,需要让下层 Subscriber 的事件处理方法放到新线程中执行。

为此,在 Observable 类里面,添加如下代码:

测试一下:

在 Android 中切换线程

经过以上实践,我们终于知道了 RxJava 线程切换的核心原理了。下面我们顺便来看看 Android 里面是如何进行线程切换的。

首先找到 AndroidSchedulers,发现一个 Scheduler 的具体实现类:LooperScheduler。

LooperScheduler 的代码很清晰,内部持有一个 Handler,用于线程的切换。在 Worker的 schedule(Action0 action,...) 方法中,将 action 通过 Handler 切换到所绑定的线程中执行。

结语

就这样,以上用代码演示了 RxJava 一些核心功能是如何实现的,希望能给大家带来不一样的启发。但这只是一个小小的 Demo,离真正能运用于工程的 Rx 框架还差太远。这也让我们明白到,一个健壮的框架,需要考虑太多东西,比如代码的可拓展性和可读性,性能优化,可测试性,兼容性,极端情况等等。但有时要想深入理解一个复杂框架的实现原理,就需要剥离这些细节代码,多关注主干的调用逻辑,化繁为简。

Demo代码可到Github获取:

https://github.com/TellH/RxJavaDemo/tree/master/src/my_rxjava

参考&拓展

  • https://mp.weixin.qq.com/s__biz=MzI1MTA1MzM2Nw==&mid=2649796857&idx=1&sn=ed8325aeddac7fd2bd81a0717c010e98&scene=1&srcid=0817o3Xzkx4ILR6FKaR1M9LX#rd

  • https://gank.io/post/560e15be2dca930e00da1083

  • https://zhuanlan.zhihu.com/p/22338235

更多

每天学习累了,看些搞笑的段子放松一下吧。关注最具娱乐精神的公众号,每天都有好心情。

如果你有好的技术文章想和大家分享,欢迎向我的公众号投稿,投稿具体细节请在公众号主页点击“投稿”菜单查看。

欢迎长按下图 -> 识别图中二维码或者扫一扫关注我的公众号: