/ 今日科技快讯 /
近日,Meta 携手斯坦福大学,推出全新 AI 模型系列 Apollo,显著提升机器对视频的理解能力。尽管人工智能在处理图像和文本方面取得了巨大进步,但让机器真正理解视频仍然是一个重大挑战。视频包含复杂的动态信息,人工智能更难处理这些信息,不仅需要更多的计算能力,而且如何设计最佳 AI 视频解读系统,也存在诸多困难。在视频处理方面,研究人员发现,保持每秒恒定的帧采样率能获得最佳结果。因此 Apollo 模型使用两个不同的组件,一个处理单独的视频帧,而另一个跟踪对象和场景如何随时间变化。此外,在处理后的视频片段之间添加时间戳,有助于模型理解视觉信息与文本描述之间的关系,保持时间感知。
/ 作者简介 /
本篇文章来自Sunday1990的投稿,文章主要分享了如何改造 EventBus,相信会对大家有所帮助!同时也感谢作者贡献的精彩文章。
Sunday1990的博客地址:
https://juejin.cn/user/3843548382274455/posts
/ 前言 /
大部分安卓开发者应该都用过大名鼎鼎的greenrobot/EventBus,既然他已经封装好了,为什么还有很多人要再封装?
可能是因为Kotlin Flow用的越来越多,人们发现用Flow可以轻松封装EventBus,而且可以和协程更好的配合使用。
为什么这么多人用Flow封装EventBus,我还要再写一个?因为我想要一个纯粹的,只有事件通知的EventBus。
/ 封装 /
1.0版本
object FEvent {
private val _map = mutableMapOf, MutableSharedFlow>()
// 发射事件
suspend fun emit(
event: T,
key: Class<T> = event.javaClass,
) {
val flow = _map[key] as? MutableSharedFlow
flow?.emit(event)
}
// 收集事件
suspend fun collect(
key: Class<T>,
block: suspend (T) -> Unit,
) {
val flow = _map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
flow.collect { block(it) }
}
}
把Class当作key,和事件流MutableSharedFlow做关联,emit发射事件,collect收集事件,整体代码比较简单。
这是一个比较粗略的版本,它还有可以改进的地方,例如:collect结束时,如果flow没有收集者了,应该要把它释放掉。
让我们优化一下,看看下面的版本。
1.1版本
object FEvent {
private val _map = mutableMapOf, MutableSharedFlow>()
suspend fun emit(
event: T,
key: Class<T> = event.javaClass,
) {
// 主线程执行
withContext(Dispatchers.Main) {
val flow = _map[key] as? MutableSharedFlow
flow?.emit(event)
}
}
suspend fun collect(
key: Class<T>,
block: suspend (T) -> Unit,
) {
// 主线程执行
withContext(Dispatchers.Main) {
val flow = _map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
try {
flow.collect { block(it) }
} finally {
if (flow.subscriptionCount.value == 0) {
_map.remove(key)
}
}
}
}
}
省略掉其他非关键代码,在flow.collect时使用try finally,并在finally中判断,如果当前flow没有收集者了,则把它从_map中移除。
这个版本的基本逻辑没什么问题,但它不是线程安全的,让我们继续优化。
1.2版本
在协程中处理并发问题,通常会用单线程调度器或者Mutex。
单线程调度器
把可能并发的代码都限制在同一个线程中执行,这样就不会有并发问题了,代码如下:
object FEvent {
private val _map = mutableMapOf, MutableSharedFlow>()
suspend fun emit(
event: T,
key: Class<T> = event.javaClass,
) {
// 主线程执行
withContext(Dispatchers.Main) {
val flow = _map[key] as? MutableSharedFlow
flow?.emit(event)
}
}
suspend fun collect(
key: Class<T>,
block: suspend (T) -> Unit,
) {
// 主线程执行
withContext(Dispatchers.Main) {
val flow = _map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
try {
flow.collect { block(it) }
} finally {
if (flow.subscriptionCount.value == 0) {
_map.remove(key)
}
}
}
}
}
通过withContext(Dispatchers.Main)把emit和collect都限制在主线程执行。
实际开发中,collect大多在主线程调用,选择主线程作为调度器,block也在主线程触发,比较符合直觉。
Mutex
如果用Mutex,该如何实现呢?
object FEvent {
// ......
private val _mutex = Mutex()
suspend fun emit(
event: T,
key: Class<T> = event.javaClass,
) {
_mutex.withLock {
// ......
}
}
suspend fun collect(
key: Class<T>,
block: suspend (T) -> Unit,
) {
_mutex.withLock {
// ......
flow.collect { block(it) }
// ......
}
}
}
直接把withContext替换为_mutex.withLock可以吗?
答案是不行!
因为flow.collect是一个挂起函数,在它还没结束之前,其他操作都做不了了,withLock已经被flow.collect占有。
那什么时候调用withLock?
object FEvent {
// ......
private val _mutex = Mutex()
suspend fun collect(
key: Class<T>,
block: suspend (T) -> Unit,
) {
// 1
val flow = _mutex.withLock {
_map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
}
try {
// 2
flow.collect { block(it) }
} finally {
// 3
_mutex.withLock {
if (flow.subscriptionCount.value == 0) {
_map.remove(key)
}
}
}
}
}
在注释1和注释3处调用可以吗?
也不行!因为_mutex.withLock是一个挂起函数,当finally被执行时,当前协程可能已经被取消。
要让一个已经取消的协程正常调用挂起函数,需要使用NonCancellable:
suspend fun collect(
key: Class<T>,
block: suspend (T) -> Unit,
) {
val flow = _mutex.withLock {
_map.getOrPut(key) { MutableSharedFlow() } as MutableSharedFlow
}
try {
flow.collect { block(it) }
} finally {
// 使用NonCancellable
withContext(NonCancellable) {
_mutex.withLock {
if (flow.subscriptionCount.value == 0) {
_map.remove(key)
}
}
}
}
}
对比之下,我更倾向第一种方案主线程调度,既解决了并发问题,又符合直觉。
/ 扩展 /
扩展一
有时候需要在非协程环境中发射事件,扩展一个非挂起函数:
@JvmOverloads
fun FEvent.post(
event: T,
key: Class<T> = event.javaClass,
) {
GlobalScope.launch(Dispatchers.Main) {
emit(event, key)
}
}
这里直接使用GlobalScope,不再单独创建一个CoroutineScope。
你可能会有疑问,emit函数已经使用Dispatchers.Main,为什么launch时还要再指定Dispatchers.Main?
其实这里指定Dispatchers.Main只是为了按调用顺序发射事件。
有些库会单独创建一个CoroutineScope,并指定调度器为Dispatchers.IO或者Dispatchers.Default。
实际上这是不安全的,这两个调度器,会通过线程池执行,可能存在并发,不能保证调用顺序和发射顺序一致。
例如:依次调用post函数发射A,B,C,实际上emit函数发射时顺序不一定是A,B,C。
扩展二
有时候我们希望收到事件之后,使用Flow操作符做一些变换,扩展一下:
fun FEvent.flowOf(key: Class): Flow = channelFlow {
collect(key) { send(it) }
}
使用channelFlow{}创建冷流,通过flowOf函数获取事件流。
这里不能使用flow{},因为collect中使用withContext(Dispatchers.Main)来切换调度器,这在使用flow{}时是不允许的。
/ 关于sticky /
是个EventBus都逃不过sticky,有时候我们把某个事件设置为sticky,允许它重播。实际上我们需要的是状态!在早期现代化架构还没普及时,这个所谓的状态,没有它的容身之处,而它刚好从事件而来,所以EventBus理所应当就成了状态容器。如果不是为了兼容老项目代码,我觉得它应该在Repository中以状态流的形式对外暴露比较合适,例如Flow。
/ 写在最后 /
这一次,不想让EventBus那么累了,做一个纯粹的事件总线或许才是它真正的使命!完整代码在这里:https://github.com/zj565061763/event
推荐阅读:
我的新书,《第一行代码 第3版》已出版!
HarmonyOS NEXT实战:自定义封装多种样式导航栏组件
Android Studio 中的 Gemini 迎来自发布以来最大的功能更新
欢迎关注我的公众号
学习技术或投稿
长按上图,识别图中二维码即可关注