相关阅读:
吊炸天!74款APP完整源码!
安居客Android项目架构演进
[干货收藏]Android-skin-support 一款用心去做的Android 换肤框架
来源:http://www.qingjingjie.com/blogs/23
我们知道程序分为同步风格和异步风格。
-
可以写成同步风格用多个线程来并发执行。
-
也可以写成异步风格以支持更为灵活的调度。
-
异步更适合并发编程。
为什么要异步
异步的目的:充分利用计算资源。
同步使线程阻塞,导致等待。
异步是非阻塞的,无需等待。
如果发生了不必要的等待,就会浪费资源,使程序变慢。
比如这样的程序:
val res1 = get("http://server1")
val res2 = get("http://server2")
compute(res1, res2)
按照同步编程风格,一定要先拿到res1,才能开始拿res2。
按照异步编程风格,res1和res2互不依赖,发起对res1的获取后,不必等待结果,而是马上发起对res2的获取,到了compute的时候,才需要阻塞等待两个数据。
这是一种“顺序解耦”。有时候我们并不要求某些操作按顺序执行!那么为什么要强制其顺序呢?异步风格让我们能放弃强制,解放资源,减少不必要的等待。
如果异步操作能并行,程序性能就提升了,如果不能并行,程序性能就没有提升。在当今的硬件条件下,一般都能并行,所以异步成为了趋势。
怎么个并行法?这要从计算机架构说起了。让我们把任何有处理能力的硬件看做一个处理单元——CPU显然是主要的处理单元,I/O设备也是处理单元,比如说网卡、内存控制器、硬盘控制器。CPU可以向一或多个I/O设备发出请求,当设备在准备数据时,CPU可以做其他事情(设备就绪后会用中断通知CPU),这时就有n个硬件在并行了!况且CPU本就是多核的,能做并行计算。除此之外,在分布式系统中,能同时调动多台计算机配合完成任务,也是并行。
因此,让CPU等待、每次只请求一个I/O设备、不利用多核、不利用其他空闲的计算机,都是比较浪费的。
下面我们来分析常见的并发编程模型。
基本模型
Thread
这是最简单的模型,创建线程来执行一个任务,完毕后销毁线程。当任务数量大时,会创建大量的线程。
大家都知道大量的线程会降低性能,但是你真的清楚性能开销在哪里吗?我试列举一下:
创建一个线程是比较耗时间的。需要请求操作系统、分配栈空间、初始化等工作。
大家都知道的,操作系统基本概念,不再赘述。值得注意的是,WAITING状态的线程(多见于I/O等待)几乎不会被调度,因此并不导致过多的上下文切换。
大量线程频繁切换,势必要访问不同的数据,打乱了空间局部性,导致CPU cache miss增加,需要经常访问更慢的内存,会明显影响CPU密集型程序的性能,这点大家恐怕没想到吧。
线程会增加内存占用,线程的栈空间通常占1MB,1000个就是1GB。而且在栈上引用了很多对象,暂时不能回收,你说有多少个GB?
一些有限的资源,如锁、数据库连接、文件句柄等,当线程被挂起或阻塞,就暂时无人可用了,浪费!还有死锁风险!
那么分配多少线程好呢?
-
对于I/O密集型程序:一个经验数值是两倍于数据库连接数,例如你有30个数据库连接,就开60个线程;我还有个经验数值是500以下,超过500就慢一些,如果调用栈特别深,这个数值还要下调。
-
对于CPU密集型程序:我的经验数值是略多于CPU核心数 (理论上是等于,但你难免有几个阻塞操作)。除了核心数,还要考虑CPU cache的大小,最好实际测试一下。举个例子,某司内部的自动重构程序在Intel i7 3630QM CPU上测试,3~4个线程效果最好。
传统的网络程序是每个会话占用一个连接、一个线程。I/O多路复用(I/O multiplexing:多个会话共用一个连接)是应C10K问题而生的,C10K就是1万个连接。1万个连接是很耗系统资源的,何况还有1万个线程。从上文的分析可知,C1K的时候就可以开始运用I/O多路复用了。
Thread Pool
预留一些可反复使用的线程在一个池里,反复地接受任务。线程数量可能是固定的,也可能是一定范围内变动的,依所选择的线程池的实现而定。
这个模型是极其常用的,例如Tomcat就是用线程池来处理请求的。
注意——尽量不要阻塞任务线程;若实在无法避免,多开一些线程——每阻塞一个线程,线程池就少一个可用的线程。
Java典型的线程池有Executors.newFixedThreadPool Executors.newFixedThreadPool Executors.newFixedThreadPool Executors.newScheduledThreadPool等等,也可以直接new ThreadPoolExecutor(可指定线程数的上限和下限)。
Scala没有增加新的线程池种类,但有个blocking方法能告诉线程池某个调用会阻塞,需要临时增加1个线程。
Future
Future是一个未来将会有值的对象,相当于一个占位符(提货凭证!)。
将任务投入线程池执行时,可为任务绑定一个Future,凭此Future即可在未来取得任务执行结果。未来是什么时候呢?要通过检查Future内部的状态来获知——任务完成时会修改这个状态,将执行结果存进去。
最初的代码示例可改写为:
// 两个future是并行的
val f1 = Future { get("http://server1") }
val f2 = Future { get("http://server2") }
compute(f1.get(), f2.get())
高级模型
Rx
Rx (Reactive Extensions)是响应式编程的一种具体形式。响应式编程是一种面向数据流和变化传播的编程模式。
我们知道Java 8提供了Stream类型,代表一个有限或无限的数据流,可应用map, filter, collect等操作。Rx类似于Stream,也是有限或无限的数据流,只不过数据操作可以委托给线程池异步执行。(Rx也像是生产者/消费者模型的延伸,增加了分发和转换的能力。对数据流进行连接组合,这边生产,那边分发和转换,源源不断交给消费者。)
以RxJava为例:
Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
以Reactor为例:
Flux.fromIterable(getSomeLongList())
.mergeWith(Flux.interval(100))
.doOnNext(serviceA::someObserver)
.map(d -> d * 2)
.take(3)
.onErrorResumeWith(errorHandler::fallback)
.doAfterTerminate(serviceM::incrementTerminate)
.subscribe(System.out::println);
由代码可见,对数据流的操作很像是对集合的函数式操作,subscribe就是异步的forEach,doOnNext就是有返回值的异步的forEach。
主流实现有RxJava、Reactor、Akka Streams,API各有不同。但是它们都在靠拢Reactive Streams规范,想必会变得越来越相似。
async-await
async-await是一种特殊语法,能自动把同步风格代码转换成异步风格代码。正确运用,就能使代码在阻塞时自动让出控制权。
C#内置的async-await是最完整的实现。Scala通过Async库提供这个语法,代码大概是这样:
val future = async {
println("Begin blocking")
await {
async {Thread.sleep(1000)}
}
println("End blocking")
}
代码会被自动转换成多种future的组合形式。无需特意处理,能并行的部分都会自动并行。
Fiber
Fiber是协程的仿制品。一般多线程是抢占式调度,你一个任务跑得好好的突然把你暂停;协程是协作式的,你一个任务阻塞或完成时要主动让出控制权,让调度器换入另一个任务。
async-await自动把代码转换成可自动让出控制权的形式,已经有协程的雏形了。Fiber更加智能,连async-await语法都不用了,只要把代码写在Fiber里面,就像写在Thread里面一样,自动异步化了。
async-await只能暂存当前作用域(转换成闭包),Fiber则能暂存整个执行栈(每个作用域只是一个栈帧)。当然了,运用嵌套的async-await也能暂存整个执行栈,我更赞同如此,因为能更好地控制内存占用。
JVM上主流的实现是Quasar,通过java-agent改写字节码来实现,在需要让出控制权时抛出异常打断控制流(不必担心异常方面的性能开销),保存执行栈,然后换入另一个任务。
Java示例:
new Fiber
() {
@Override
protected V run() throws SuspendExecution, InterruptedException {
// your code
}
}.start();
Kotlin示例:
fiber @Suspendable {
// your code
}
代码中调用的任何会阻塞的方法都要标记@Suspendable,让Quasar知道调这个方法时要暂停当前Fiber并执行另一个Fiber,同时用另外的线程池执行会阻塞的方法。
Actor
起源于电信领域的Erlang的编程模型。actor是任务处理单元:每个actor只处理一个任务,每个任务同时只有一个actor处理(如果有大任务,就要分解成小任务),actor之间用消息来通信。
在Erlang中,每个actor是一个轻量级进程,有独立的内存空间(所以通信只能靠消息),因此有独立的垃圾回收,不会stop the world。
actor可以发了消息就不管了(tell),这是典型的异步;也可以发了消息等回应(ask),返回值是一个Future,实际上是创建了一个新的actor在悄悄等待回应,仍然是异步。
actor可以透明地分布在不同机器上,消息可以发给本机的actor,也可以发给远程的actor。
JVM上唯一成熟的实现是Akka,JVM不能给每个actor独立的内存,垃圾回收仍可能stop the world。
actor显然是一个对象,拥有状态和行为。 actor也可被视为一个闭包,拥有函数和上下文(整个对象的状态都是上下文)。 actor每次能接收并处理一个消息,处理中可以发送消息给自己或另一个actor,然后挂起或结束。 为什么要发送消息给自己呢?因为正在处理消息时是不能挂起的,只能在“一个消息之后,下一个消息之前”的间隙中挂起。 假设你收到一个A消息,执行前半段业务逻辑,要做一次I/O再执行后半段业务逻辑。做I/O时应当结束当前处理,当IO完成时给自己发一个B消息,下次再让你在处理B消息时完成剩余业务逻辑。前后逻辑要分开写,共享变量要声明为actor的对象字段。 伪代码如下:
class MyActor extends BasicActor {
var halfDoneResult: XXX = None
def receive(): Receive = {
case A => {
halfDoneResult = 前半段逻辑()
doIO(halfDoneResult).onComplete {
self ! B()
}
}
case B => 后半段逻辑(halfDoneResult)
}
}
当actor的状态要彻底改变时,可以用become操作彻底改变actor的行为。从面向对象编程的设计模式来看,这是state pattern,从函数式编程来看,这是把一个函数变换成另一个函数。
由此可见,actor模型就是把函数表示成了更容易控制的对象,以便于满足一些并发或分布式方面的架构约束。
这段逻辑假如改写成async-await或fiber,伪代码如下所示,简单多了:
def logicInAsync() = async {
val halfDoneResult = 前半段逻辑()
await { doIO(halfDoneResult) }