专栏名称: 鸿洋
你好,欢迎关注鸿洋的公众号,每天为您推送高质量文章,让你每天都能涨知识。点击历史消息,查看所有已推送的文章,喜欢可以置顶本公众号。此外,本公众号支持投稿,如果你有原创的文章,希望通过本公众号发布,欢迎投稿。
目录
相关文章推荐
开发者全社区  ·  贾玲复胖,原因曝光 ·  12 小时前  
开发者全社区  ·  约会软件的秘密 ·  15 小时前  
郭霖  ·  Android 15新特性,预测性返回手势 ·  2 天前  
鸿洋  ·  基于协程的 Android 事务框架设计 ·  2 天前  
开发者全社区  ·  Meta 计划再次裁员 ... ·  3 天前  
51好读  ›  专栏  ›  鸿洋

基于协程的 Android 事务框架设计

鸿洋  · 公众号  · android  · 2025-01-17 08:35

主要观点总结

本文介绍了数据库事务的概念及在Android开发中的使用,包括事务的封装、执行方式(同步、异步、嵌套事务)、数据通信、事务一致性保证等。文中还涉及到了拦截器、协程场景中ThreadContextElement的作用以及异步嵌套场景需要注意的问题。

关键观点总结

关键观点1: 数据库事务概念及在Android开发中的应用

文章介绍了数据库事务的概念,以及在Android开发中如何更方便地使用。提到了需要着重考虑的问题,如简化补偿操作的辅助代码、数据传递、Transaction嵌套场景、同步/异步处理、Interceptor设计等。

关键观点2: 事务的封装和执行方式

文章描述了使用默认参数创建和执行事务的方法,以及transaction支持直接返回值的特点。还介绍了在同步场景中,事务在当前线程上执行,会阻塞应用程序的其他部分;如果需要异步执行事务,可以参考使用协程。

关键观点3: 协程与事务的一致性保证

文章阐述了在协程场景下,如何保证事务的一致性,基于ThreadContextElement + ThreadLocal进行传播和恢复线程的上下文信息。还介绍了新建CoroutineScope时ThreadContextElement的使用。

关键观点4: 异步嵌套场景的问题及解决建议

文章指出在异步嵌套场景中,如果内部事务执行没有使用await等待,外部事务无法确保内部事务的执行结果。建议在使用嵌套事务时,使用await等待内层事务的完成,以确保事务的原子性和数据一致性。


正文

背景

数据库事务(transaction)是访问并可能操作各种数据项的一个数据库操作序列,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务由事务开始与事务结束之间执行的全部数据库操作组成。
“要么全部成功,要么全部失败,那么在失败后,对于已成功的操作,是如何回滚的呢?” 数据库系统通常使用日志 (log) 来实现事务的回滚。在事务执行过程中,数据库会记录所有对数据的修改操作到日志中。当事务需要回滚时, 数据库会读取日志, 并执行逆向操作来撤销之前的修改。这种逆向操作被称之为“补偿操作(compensating operation)”

Android 使用思考

Transaction 在数据库操作中可以省去很多不必要的问题,那在 Android 开发中,我们如何更方便的使用呢?想要方便就需要尽可能多的减少辅助代码。那下面几个问题是我们需要着重考虑的。
  • 如何简化补偿操作的辅助代码?
  • 数据如何传递?
  • Transaction 嵌套场景
  • 同步/异步的处理
  • Interceptor 设计
  • Lifecycle 设计

接下来我们思考一个方便好用的 API 应该如何设计。

1
New transaction
事务封装了一组 DSL 操作。要使用默认参数创建和执行事务,只需将函数块传递给transaction函数:
transaction {
    // do something here...
}


事务在当前线程上同步执行,因此它们将阻塞应用程序的其他部分!如果您需要异步执行事务,可以参考章节「使用协程」。

2
Transaction 返回值
transaction 支持直接返回值:
val result = transaction {
    "some data"
}
println(result) // some data


3
使用嵌套事务


默认情况下,嵌套transaction块共享其父transaction块的交易资源,因此对子块的任何影响都会影响父块:
在这种场景下,事务A/B/C均为同一个事务。如果想要在一个事务中开启新事务,可以移步「使用协程」部分。
val finalResult = transaction { // 事务A
    var result = 0
    result += transaction { // 事务B
        1
    }
    result += transaction { // 事务C
        2
    }
    result
}
println(result) // 3


4
使用协程
Transaction 支持指定 CoroutineContext 来使用协程,本质上,这个框架可以基于协程和 ThreadLocal 去封装的。
目前一共支持四种方式去使用协程去开启一个事务。

4.1 suspendedTransaction

阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则使用之前的事务。
runBlocking {
    val result = suspendedTransaction(Dispatchers.IO) {
        println("Transaction # ${this.id}"// Transaction # 3
        1
    }
    println("Result: $result"// Result: 1
}


在嵌套场景执行时,如果在其他事物中使用 suspendedTransaction,则使用之前的事务。
runBlocking {
    val result = suspendedTransaction(Dispatchers.IO) { // 事务A
        suspendedTransaction { // 事务B
            // do something
            suspendedTransaction { // 事务C
                // do something
            }
        }
    }
}


在这种情况下:

事务 A (outerTransaction = null) = 事务B = 事务C


4.2 newSuspendedTransaction

阻塞当前线程,协程开启一个新的事务,如果当前存在事务,则新建一个。
runBlocking {
    val result = newSuspendedTransaction(Dispatchers.IO) {
        println("Transaction # ${this.id}"// Transaction # 3
        1
    }
    println("Result: $result"// Result: 1
}


在嵌套场景执行时,如果在其他事务中使用newSuspendedTransaction,则新建一个事务,使用链表进行管理事务嵌套场景的关联关系。
runBlocking {
    val result = newSuspendedTransaction(Dispatchers.IO) { // 事务A
        newSuspendedTransaction { // 事务B
            // do something
            newSuspendedTransaction { // 事务C
                // do something
            }
        }
    }
}


在这种情况下:

事务 A (outerTransaction = null)
    └─ 事务 B (outerTransaction = 事务 A)
        └─ 事务 C (outerTransaction = 事务 B)


4.3 suspendedTransactionAsync

异步执行,返回值:Deferred,不阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则使用之前的事务。
runBlocking {
    val result = suspendedTransactionAsync(Dispatchers.IO) {
        println("Transaction # ${this.id}"// Transaction # 3
        1
    }.await()
    println("Result: $result"// Result: 1
}


在嵌套场景执行时,如果在其他事物中使用suspendedTransactionAsync,则使用之前的事务。
runBlocking {
    val result = suspendedTransactionAsync(Dispatchers.IO) { // 事务A
        suspendedTransactionAsync { // 事务B
            // do something
            suspendedTransactionAsync { // 事务C
                // do something
            }
        }
    }
}


在这种情况下:

事务 A (outerTransaction = null) = 事务B = 事务C


4.4 newSuspendedTransactionAsync

异步执行,不阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则新建一个事务。
runBlocking {
    val result = newSuspendedTransactionAsync(Dispatchers.IO) {
        println("Transaction # ${this.id}"// Transaction # 3
        1
    }.await()
    println("Result: $result"// Result: 1
}


在嵌套场景执行时,如果在其他事物中使用newSuspendedTransactionAsync,则新建一个事务,使用链表进行管理事务嵌套场景的关联关系。
建议异步处理时,遇到协程嵌套场景,请保证内部的逻辑都执行完毕,否则会导致生命周期的顺序不能得到保障。
runBlocking {
    val result = newSuspendedTransactionAsync(Dispatchers.IO) { // 事务A
        newSuspendedTransactionAsync { // 事务B
            // do something
            newSuspendedTransactionAsync { // 事务C
                // do something
            }.await()
        }.await()
    }
}


在这种情况下:

事务 A (outerTransaction = null)
    └─ 事务 B (outerTransaction = 事务 A)
        └─ 事务 C (outerTransaction = 事务 B)
   
5
Transaction interceptor
可以在事务中定义拦截器,用于在不同阶段可以处理更多的事情。必须要在 configuration 中定义拦截器,具体拦截器的用法如下:
// 定义拦截器
class FooInterceptorStatementInterceptor {
    override fun beforeExecution(transaction: Transaction, context: StatementContext) {}
    override fun afterExecution(transaction: Transaction, contexts: List<StatementContext>) {}
    override fun beforeCommit(transaction: Transaction) {}
    override fun afterCommit() {}
    // 补偿操作
    override fun beforeRollback(transaction: Transaction, e:Throwable?) {}
    override fun afterRollback() {}
}

runBlocking {
    suspendedTransaction(configuration = {
        // 注册拦截器
        registerInterceptor(FooInterceptor())
    }){
        // do something here
    }
}


拦截器中提供了多个方法,具体的生命周期如下图所示。


6
数据通信
可以在一个事务中去传递和设置数据,用法如下:
runBlocking {
    suspendedTransaction(configuration = {
        putData("logger", Logger("test"))
    }){
        val logger:Logger = getData("logger")
        suspendedTransaction {
            val innerLogger:Logger = getData("logger")
            assertEquals(logger, innerLogger)
        }
    }
}


7
事务一致性保证
通过 ThreadContextElement/ThreadLocal 保证事务唯一性,利用协程实现异步事务,并在 finally 块中确保提交或回滚操作。同时,通过拦截器机制提供事务生命周期钩子, 增强了事务框架的灵活性和可扩展性。

7.1 同步场景

在同步场景里,整个事务的执行是会阻塞当前线程运行的,transaction 方法内会保证每个线程在事务里只有唯一一个事务实例。
使用 ThreadLocal 变量来保存当前数据库连接以及事务上下文。这确保了每个线程在执行 transaction 函数时都拥有自己的事务上下文,避免了事务混乱。
// 发生事务嵌套时,或者在最外层 transaction 执行的代码块内,只会保证有唯一一个 transaction
transaction {
    val aTransaction = this
    transaction {
        Assert.assertEquals(aTransaction, this)
    }
}


核心代码如下:

7.2 协程场景

在协程场景下,我们需要保证在每个协程中,事务的一致性,基于 ThreadContextElement + ThreadLocal 去进行传播和恢复线程的上下文信息。
ThreadContextElement 工作原理:
  • 传播上下文: 当一个协程从一个线程切换到另一个线程时, 如果协程的 CoroutineContext 中包含 ThreadContextElement,那么 ThreadContextElement updateThreadContext 方法会被调用。在这个方法中,ThreadContextElement 可以将一些线程相关的上下文信息存储到目标线程中。
  • 恢复上下文: 当协程在目标线程上执行完成后,ThreadContextElement restoreThreadContext 方法会被调用。在这个方法中,ThreadContextElement 可以将之前存储的上下文信息从目标线程中移除, 并将目标线程的上下文恢复到原来的状态。
知道了 ThreadContextElement 的作用后,我们只需要在每次 transaction 创建的时候,在 coroutineContext 中去使用 ThreadContextElement 即可。
新建 CoroutineScope :
ThreadContextElement 的使用:


8
异步嵌套场景
需要特别注意这种情况:内外均为异步事务,但是内部的事务执行没有 await,代码如下:
runBlocking {
    newSuspendedTransactionAsync { // TransactionA
        newSuspendedTransactionAsync { // TransactionB
           // do something...
        }.await()
    }
}


生命周期可能会如下执行:
  • beforeExecution (A)
  • statement(A)
  • afterExecution (A)
  • 事务 B:
    • beforeExecution (B)
    • statement(B)
    • afterExecution (B)
    • afterExecution (B)
    • beforeCommit (B) (如果事务 B 内部没有异常)
    • afterCommit (B) (如果事务 B 内部没有异常)
  • afterExecution (A)
  • beforeCommit (A) (如果事务 A、B 内部没有异常)
  • afterCommit (A) (如果事务 A、B 内部没有异常)
由于事务 B 没有使用 await 等待,因此事务 A 无法保证事务 B 的执行结果。如果事务 B 抛出异常,事务 A 可能会继续执行,导致数据不一致。建议在嵌套事务场景中,建议使用 await 等待内层事务的完成,以确保事务的原子性和数据一致性。
newSuspendedTransactionAsync { // 事务 A
    // ... 事务 A 的语句 1 ...
    val deferredB = newSuspendedTransactionAsync { // 事务 B
        // ... 事务 B 的语句 1 ...
        // ... 事务 B 的语句 2 ...
    }
    // ... 事务 A 的语句 2 ...
    deferredB.await() // 等待事务 B 完成
}


参考

https://github.com/JetBrains/Exposed

https://ktor.io/


扫一扫 关注我的公众号

如果你想要跟大家分享你的文章,欢迎投稿~


┏(^0^)┛明天见!