专栏名称: 郭霖
Android技术分享平台,每天都有优质技术文章推送。你还可以向公众号投稿,将自己总结的技术心得分享给大家。
目录
相关文章推荐
郭霖  ·  这一次,让EventBus纯粹一些 ·  昨天  
郭霖  ·  HarmonyOS ... ·  2 天前  
鸿洋  ·  初识Android内存优化 ·  2 天前  
鸿洋  ·  Android AppOpsService是什么? ·  3 天前  
鸿洋  ·  打造Android热修复专家 ·  4 天前  
51好读  ›  专栏  ›  郭霖

这一次,让EventBus纯粹一些

郭霖  · 公众号  · android  · 2024-12-19 08:00

正文



/   今日科技快讯   /


近日,Meta 携手斯坦福大学,推出全新 AI 模型系列 Apollo,显著提升机器对视频的理解能力。尽管人工智能在处理图像和文本方面取得了巨大进步,但让机器真正理解视频仍然是一个重大挑战。视频包含复杂的动态信息,人工智能更难处理这些信息,不仅需要更多的计算能力,而且如何设计最佳 AI 视频解读系统,也存在诸多困难。在视频处理方面,研究人员发现,保持每秒恒定的帧采样率能获得最佳结果。因此 Apollo 模型使用两个不同的组件,一个处理单独的视频帧,而另一个跟踪对象和场景如何随时间变化。此外,在处理后的视频片段之间添加时间戳,有助于模型理解视觉信息与文本描述之间的关系,保持时间感知。


/   作者简介   /


本篇文章来自Sunday1990的投稿,文章主要分享了如何改造 EventBus,相信会对大家有所帮助!同时也感谢作者贡献的精彩文章。


Sunday1990的博客地址:

https://juejin.cn/user/3843548382274455/posts


/   前言   /


大部分安卓开发者应该都用过大名鼎鼎的greenrobot/EventBus,既然他已经封装好了,为什么还有很多人要再封装?


可能是因为Kotlin Flow用的越来越多,人们发现用Flow可以轻松封装EventBus,而且可以和协程更好的配合使用。


为什么这么多人用Flow封装EventBus,我还要再写一个?因为我想要一个纯粹的,只有事件通知的EventBus。


/   封装   /


1.0版本


object FEvent {
  private val _map = mutableMapOf, MutableSharedFlow>()

  // 发射事件
  suspend fun  emit(
    event: T,
    key: Class<T> = event.javaClass,
  )
 {
    val flow = _map[key] as? MutableSharedFlow
    flow?.emit(event)
  }

  // 收集事件
  suspend fun  collect(
    key: Class<T>,
    block: suspend (T) -> Unit,
  )
 {
    val flow = _map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
    flow.collect { block(it) }
  }
}


把Class当作key,和事件流MutableSharedFlow做关联,emit发射事件,collect收集事件,整体代码比较简单。


这是一个比较粗略的版本,它还有可以改进的地方,例如:collect结束时,如果flow没有收集者了,应该要把它释放掉。


让我们优化一下,看看下面的版本。


1.1版本


object FEvent {
  private val _map = mutableMapOf, MutableSharedFlow>()

  suspend fun  emit(
    event: T,
    key: Class<T> = event.javaClass,
  )
 {
    // 主线程执行
    withContext(Dispatchers.Main) {
      val flow = _map[key] as? MutableSharedFlow
      flow?.emit(event)
    }
  }

  suspend fun  collect(
    key: Class<T>,
    block: suspend (T) -> Unit,
  )
 {
    // 主线程执行
    withContext(Dispatchers.Main) {
      val flow = _map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
      try {
        flow.collect { block(it) }
      } finally {
        if (flow.subscriptionCount.value == 0) {
          _map.remove(key)
        }
      }
    }
  }
}


省略掉其他非关键代码,在flow.collect时使用try finally,并在finally中判断,如果当前flow没有收集者了,则把它从_map中移除。


这个版本的基本逻辑没什么问题,但它不是线程安全的,让我们继续优化。


1.2版本


在协程中处理并发问题,通常会用单线程调度器或者Mutex。


单线程调度器


把可能并发的代码都限制在同一个线程中执行,这样就不会有并发问题了,代码如下:


object FEvent {
  private val _map = mutableMapOf, MutableSharedFlow>()

  suspend fun  emit(
    event: T,
    key: Class<T> = event.javaClass,
  )
 {
    // 主线程执行
    withContext(Dispatchers.Main) {
      val flow = _map[key] as? MutableSharedFlow
      flow?.emit(event)
    }
  }

  suspend fun  collect(
    key: Class<T>,
    block: suspend (T) -> Unit,
  )
 {
    // 主线程执行
    withContext(Dispatchers.Main) {
      val flow = _map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
      try {
        flow.collect { block(it) }
      } finally {
        if (flow.subscriptionCount.value == 0) {
          _map.remove(key)
        }
      }
    }
  }
}

通过withContext(Dispatchers.Main)把emit和collect都限制在主线程执行。


实际开发中,collect大多在主线程调用,选择主线程作为调度器,block也在主线程触发,比较符合直觉。


Mutex


如果用Mutex,该如何实现呢?


object FEvent {
  // ......
  private val _mutex = Mutex()

  suspend fun  emit(
    event: T,
    key: Class<T> = event.javaClass,
  )
 {
    _mutex.withLock {
      // ......
    }
  }

  suspend fun  collect(
    key: Class<T>,
    block: suspend (T) -> Unit,
  )
 {
    _mutex.withLock {
      // ......
      flow.collect { block(it) }
      // ......
    }
  }
}

直接把withContext替换为_mutex.withLock可以吗?


答案是不行!


因为flow.collect是一个挂起函数,在它还没结束之前,其他操作都做不了了,withLock已经被flow.collect占有。


那什么时候调用withLock?


object FEvent {
  // ......
  private val _mutex = Mutex()

  suspend fun  collect(
    key: Class<T>,
    block: suspend (T) -> Unit,
  )
 {
    // 1
    val flow = _mutex.withLock {
      _map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
    }
    try {
      // 2
      flow.collect { block(it) }
    } finally {
      // 3
      _mutex.withLock {
        if (flow.subscriptionCount.value == 0) {
          _map.remove(key)
        }
      }
    }
  }
}

在注释1和注释3处调用可以吗?


也不行!因为_mutex.withLock是一个挂起函数,当finally被执行时,当前协程可能已经被取消。


要让一个已经取消的协程正常调用挂起函数,需要使用NonCancellable:


suspend fun  collect(
    key: Class<T>,
    block: suspend (T) -> Unit,
  )
 {
    val flow = _mutex.withLock {
      _map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
    }
    try {
      flow.collect { block(it) }
    } finally {
      // 使用NonCancellable
      withContext(NonCancellable) {
        _mutex.withLock {
          if (flow.subscriptionCount.value == 0) {
            _map.remove(key)
          }
        }
      }
    }
  }

对比之下,我更倾向第一种方案主线程调度,既解决了并发问题,又符合直觉。


/   扩展   /


扩展一


有时候需要在非协程环境中发射事件,扩展一个非挂起函数:


@JvmOverloads
fun  FEvent.post(
  event: T,
  key: Class<T> = event.javaClass,
)
 {
  GlobalScope.launch(Dispatchers.Main) {
    emit(event, key)
  }
}


这里直接使用GlobalScope,不再单独创建一个CoroutineScope。


你可能会有疑问,emit函数已经使用Dispatchers.Main,为什么launch时还要再指定Dispatchers.Main?


其实这里指定Dispatchers.Main只是为了按调用顺序发射事件。


有些库会单独创建一个CoroutineScope,并指定调度器为Dispatchers.IO或者Dispatchers.Default。


实际上这是不安全的,这两个调度器,会通过线程池执行,可能存在并发,不能保证调用顺序和发射顺序一致。


例如:依次调用post函数发射A,B,C,实际上emit函数发射时顺序不一定是A,B,C。


扩展二


有时候我们希望收到事件之后,使用Flow操作符做一些变换,扩展一下:


fun  FEvent.flowOf(key: Class): Flow = channelFlow {
  collect(key) { send(it) }
}

使用channelFlow{}创建冷流,通过flowOf函数获取事件流。


这里不能使用flow{},因为collect中使用withContext(Dispatchers.Main)来切换调度器,这在使用flow{}时是不允许的。


/   关于sticky   /


是个EventBus都逃不过sticky,有时候我们把某个事件设置为sticky,允许它重播。实际上我们需要的是状态!在早期现代化架构还没普及时,这个所谓的状态,没有它的容身之处,而它刚好从事件而来,所以EventBus理所应当就成了状态容器。如果不是为了兼容老项目代码,我觉得它应该在Repository中以状态流的形式对外暴露比较合适,例如Flow。


/   写在最后   /


这一次,不想让EventBus那么累了,做一个纯粹的事件总线或许才是它真正的使命!完整代码在这里:

https://github.com/zj565061763/event


推荐阅读:

我的新书,《第一行代码 第3版》已出版!

HarmonyOS NEXT实战:自定义封装多种样式导航栏组件

Android Studio 中的 Gemini 迎来自发布以来最大的功能更新


欢迎关注我的公众号

学习技术或投稿



长按上图,识别图中二维码即可关注