正文
简述
最近老大给了个新项目,我打算用Swift写.原来OC用的RAC,换到Swift自然框架也想试试新的,就用了RXSwift,对于这两个框架,我都是会用,但不解其中的原理,正好最近需求没下来,就研究了研究RXSwif,把自己的收获分享一下,文中要有不准确的地方还望大家多多指正~
关于RXSwift是什么和怎么用我就不废话了,网上资源很多,本文先从
Observable
实现原理入手,旨在以小见大,后面的
Single
什么的自然举一反三~
使用Demo
下面是一段简单使用
Observable
的代码
let numbers: Observable<Int> = Observable.create { observer -> Disposable in
observer.onNext(0)
observer.onNext(1)
observer.onCompleted()
return Disposables.create {
}
}
numbers.subscribe{
print ($0 )
}
demo实现的效果其实就是 将上一段闭包中输入的 产生的事件(0,1,Completed),在下一段闭包中提取出来.
这样就将 事件的产生 和 事件的处理 分开. 本文也就是分析这个效果怎么实现的
主要类
AnonymousObservable
匿名
可
观察者,存储产生事件的闭包 和激活处理事件闭包的入口
AnyObserver
任意观察者,用于存储事件 和 输出事件
AnonyObserver
匿名观察者,用于存储 处理事件的闭包
AnonymousObservableSink
将可观察者 和 观察者 链接,实现事件的传递
ObserverType
协议,将上面所有内容都包裹起来,将它们加以限制,便于有效的沟通~
Event
事件本身,是枚举,有 Error,Complete,Element(元素)
实现过程
存储
首先要说的是 ObserverType 定义的一些内容
associatedtype E
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
E:为本次事件流中定义一个确定的
类型
,保证 产生的和处理的元素类型相同,否则无法传递
create方法
Observable<Int>.create { observer -> Disposable in ....}
对于
Observable
,它是一个抽象类,我们在实际使用中并不能使用它,在协议中有默认的实现
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
所以此处创建的是
AnonymousObservable
对象,我先称其为
A1
,
A1
将事件产生的闭包持有, 闭包中产生的事件 输入到
AnyObserver
结构体中.闭包我们成为
A2
这样 存储部分就好了~~
激活
激活 我们通过调用
A1
的订阅方法
subscribe
(也是协议中限定的方法),接下来看方法中的实现~
因为
Observable
是抽象类,所以这里也是协议默认的实现
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
在这里就分两步了,一是观察者的实现,而是事件的传递
观察者
在这里很简单,也就是创建
AnonymousObserver
匿名观察者对象
B1
,
B1
将事件处理闭包持有,闭包我们成为
B2
传递
首先是
asObservable()
方法,因为 B1间接继承自
Observable
,所以也就是
return self
,应该是在处理其他类型的可观察物用到,在后续 如果碰到我会补充~
然后就是对
A1
的 另一个订阅方法(重载),将
B1
作为参数传入
细枝末节先不说,先把握主干~
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
//第一步
let disposer = SinkDisposer()
//第二步
let sinkAndSubscription = run(observer, cancel: disposer)
//第三步
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
//else 先不说~
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
第一步
SinkDisposer
对象是关于 传递结束后,处理资源回收的对象,叫它
C1
,用来处理
A1
create闭包返回的disposer闭包的~
第二步
调用了
run
方法,将
B1
对象传入
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
//2.1
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
//2.2
let subscription = sink.run(self)
//2.3
return (sink: sink, subscription: subscription)
}
2.1步
创建
AnonymousObservableSink
对象,我称它
D1
,它也是将
B1
对象和
C1
对象持有
2.2步
调用
D1
对象的
run
方法,将
A1
自身传入
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
在该方法中,就是将
A1
对象的
A2
闭包 调用,将
D1
对象化为
AnyObserver
结构体作为
A2
参数传入~
然后我们看
D1
对象 若何转换的
//结构体方法
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
在这里结构体 将
D1
持有的
B1
对象的on方法 作为属性持有~,将结构体成为
E1
再来看
E1
的
onNext....
方法
extension ObserverType {
//YSD
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted () {
on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
on(.error(error))
}
}
对应的其实是调用
B1
的
on
方法~~
func on(_ event: Event<E>) {
switch event {
case .next:
if _isStopped == 0 {
onCore(event)
}
case .error, .completed:
if AtomicCompareAndSwap(0, 1, &_isStopped) {
onCore(event)
}
}
}
对应的
B1
的
onCore
方法
override func onCore(_ event: Event<Element>) {
return _eventHandler(event)
}
也就是将
E1
从
A2
接收的事件 传入
B2
中,最终实现内容的传递~~ 然后再将
A1
中释放资源的闭包返回~
2.3
将
D1
和disposable闭包 作为元组返回~
第三步
C1
接收元组参数,调用
setSinkAndSubscription
方法~,然后将
SinkDisposer
对象返回,让用户选择是否释放~
图示
文字太抽象,画个图吧~ 画的有点丑(๑•ᴗ•๑)~
可以看到 A1 在这个过程中只持有了A2, 不会导致内存泄露~ 当然如果你dispose 使用不当 肯定有泄漏的~ 亲测(๑•ᴗ•๑)~
细枝末节
1
订阅2中的
if !CurrentThreadScheduler.isScheduleRequired
内容是这样的~
public static fileprivate(set ) var isScheduleRequired: Bool {
get {
//获取该指示值
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set (isScheduleRequired) {
//设置
//http://www.jianshu.com/p/d52c1ebf808a
// 成功返回0 true 设置no no设置为 true
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed" )
}
}
}
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
//YSD
//https://onevcat.com/2015/01/swift-pointer/
//可变指针 pthread_key_t类型 分配空间
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer {
key.deallocate(capacity: 1)
}
//创建线程安全的变量
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed" )
}
return key.pointee
}()
这里应该是为了保护,RXSwift在多线程操作下的数据安全~ 在本次事件流中只使用了get方法,并没使用set~,所以具体效果我不清楚~,以后碰到了 我在补充上吧~
SinkDisposer
就是释放资源部分~
fileprivate enum DisposeState: UInt32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
// Jeej, swift API consistency rules
fileprivate enum DisposeStateInt32: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
private var _state: AtomicInt = 0
private var _sink: Disposable? = nil
private var _subscription: Disposable? = nil
func set SinkAndSubscription(sink: Disposable, subscription: Disposable) {
_sink = sink
_subscription = subscription
let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set" )
}
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
func dispose () {
let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = _sink else {
rxFatalError("Sink not set" )
}
guard let subscription = _subscription else {
rxFatalError("Subscription not set" )
}
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
从输出崩溃提示哪里就可以得知~ 这里是为了防止dispose的多次调用~ 因为在整个事件流中,dipose闭包 可能是 产生Complete,Error或者用户手动调用的~
AtomicOr
方法其实调用的是
OSAtomicOr32OrigBarrier(A,&B)
该函数会将两个变量 线程安全的 按位或运算返回结果, 并为后者赋值=前者~ B=A
未调用dipose时 逻辑与运算 state = 2 previousState = 0 两个条件都不成立~ 所以此时是用户要手动dispose
之前调用过 也就是发生complete 或 Error(在上面的代码中也有保证,两者只发生一起~),则 state = 1当调用setSinkAndSubscription方法时 逻辑与运算 state = 2 previousState = 1 则第一个条件不成立 第二个成立~ 释放资源
当多次Complete时,则只会dipose一次~
当在外界多次调用时 则state = 2 previousState = 1 则第一个条件成立 崩溃~
当然这里实现这种效果的方案有很多种~ RSSwift的方案比较有逼格吧~
总结
看完这些源码,我的感觉是RXSwift对 设计模式 贯彻的很彻底~ 在时间富裕的情况下自己写的项目要向他靠拢,增强项目的延展性,这样项目经理让加啥也不会太头疼了~~