背景
数据库事务(transaction)是访问并可能操作各种数据项的一个数据库操作序列,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务由事务开始与事务结束之间执行的全部数据库操作组成。“要么全部成功,要么全部失败,那么在失败后,对于已成功的操作,是如何回滚的呢?” 数据库系统通常使用日志 (log) 来实现事务的回滚。在事务执行过程中,数据库会记录所有对数据的修改操作到日志中。当事务需要回滚时, 数据库会读取日志, 并执行逆向操作来撤销之前的修改。这种逆向操作被称之为“补偿操作(compensating operation)”Android 使用思考
Transaction 在数据库操作中可以省去很多不必要的问题,那在 Android 开发中,我们如何更方便的使用呢?想要方便就需要尽可能多的减少辅助代码。那下面几个问题是我们需要着重考虑的。接下来我们思考一个方便好用的 API 应该如何设计。
事务封装了一组 DSL 操作。要使用默认参数创建和执行事务,只需将函数块传递给transaction函数:transaction {
// do something here...
}
事务在当前线程上同步执行,因此它们将阻塞应用程序的其他部分!如果您需要异步执行事务,可以参考章节「使用协程」。
val result = transaction {
"some data"
}
println(result) // some data
默认情况下,嵌套transaction块共享其父transaction块的交易资源,因此对子块的任何影响都会影响父块:在这种场景下,事务A/B/C均为同一个事务。如果想要在一个事务中开启新事务,可以移步「使用协程」部分。val finalResult = transaction { // 事务A
var result = 0
result += transaction { // 事务B
1
}
result += transaction { // 事务C
2
}
result
}
println(result) // 3
Transaction 支持指定 CoroutineContext 来使用协程,本质上,这个框架可以基于协程和 ThreadLocal 去封装的。4.1 suspendedTransaction
阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则使用之前的事务。runBlocking {
val result = suspendedTransaction(Dispatchers.IO) {
println("Transaction # ${this.id}") // Transaction # 3
1
}
println("Result: $result") // Result: 1
}
在嵌套场景执行时,如果在其他事物中使用 suspendedTransaction,则使用之前的事务。runBlocking {
val result = suspendedTransaction(Dispatchers.IO) { // 事务A
suspendedTransaction { // 事务B
// do something
suspendedTransaction { // 事务C
// do something
}
}
}
}
在这种情况下:
事务 A (outerTransaction = null) = 事务B = 事务C
4.2 newSuspendedTransaction
阻塞当前线程,协程开启一个新的事务,如果当前存在事务,则新建一个。runBlocking {
val result = newSuspendedTransaction(Dispatchers.IO) {
println("Transaction # ${this.id}") // Transaction # 3
1
}
println("Result: $result") // Result: 1
}
在嵌套场景执行时,如果在其他事务中使用newSuspendedTransaction,则新建一个事务,使用链表进行管理事务嵌套场景的关联关系。runBlocking {
val result = newSuspendedTransaction(Dispatchers.IO) { // 事务A
newSuspendedTransaction { // 事务B
// do something
newSuspendedTransaction { // 事务C
// do something
}
}
}
}
在这种情况下:
事务 A (outerTransaction = null)
└─ 事务 B (outerTransaction = 事务 A)
└─ 事务 C (outerTransaction = 事务 B)
4.3 suspendedTransactionAsync
异步执行,返回值:Deferred,不阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则使用之前的事务。runBlocking {
val result = suspendedTransactionAsync(Dispatchers.IO) {
println("Transaction # ${this.id}") // Transaction # 3
1
}.await()
println("Result: $result") // Result: 1
}
在嵌套场景执行时,如果在其他事物中使用suspendedTransactionAsync,则使用之前的事务。runBlocking {
val result = suspendedTransactionAsync(Dispatchers.IO) { // 事务A
suspendedTransactionAsync { // 事务B
// do something
suspendedTransactionAsync { // 事务C
// do something
}
}
}
}
在这种情况下:
事务 A (outerTransaction = null) = 事务B = 事务C
4.4 newSuspendedTransactionAsync
异步执行,不阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则新建一个事务。runBlocking {
val result = newSuspendedTransactionAsync(Dispatchers.IO) {
println("Transaction # ${this.id}") // Transaction # 3
1
}.await()
println("Result: $result") // Result: 1
}
在嵌套场景执行时,如果在其他事物中使用newSuspendedTransactionAsync,则新建一个事务,使用链表进行管理事务嵌套场景的关联关系。建议异步处理时,遇到协程嵌套场景,请保证内部的逻辑都执行完毕,否则会导致生命周期的顺序不能得到保障。runBlocking {
val result = newSuspendedTransactionAsync(Dispatchers.IO) { // 事务A
newSuspendedTransactionAsync { // 事务B
// do something
newSuspendedTransactionAsync { // 事务C
// do something
}.await()
}.await()
}
}
在这种情况下:
事务 A (outerTransaction = null)
└─ 事务 B (outerTransaction = 事务 A)
└─ 事务 C (outerTransaction = 事务 B)
可以在事务中定义拦截器,用于在不同阶段可以处理更多的事情。必须要在 configuration 中定义拦截器,具体拦截器的用法如下:// 定义拦截器
class FooInterceptor: StatementInterceptor {
override fun beforeExecution(transaction: Transaction, context: StatementContext) {}
override fun afterExecution(transaction: Transaction, contexts: List<StatementContext>) {}
override fun beforeCommit(transaction: Transaction) {}
override fun afterCommit() {}
// 补偿操作
override fun beforeRollback(transaction: Transaction, e:Throwable?) {}
override fun afterRollback() {}
}
runBlocking {
suspendedTransaction(configuration = {
// 注册拦截器
registerInterceptor(FooInterceptor())
}){
// do something here
}
}
拦截器中提供了多个方法,具体的生命周期如下图所示。
runBlocking {
suspendedTransaction(configuration = {
putData("logger", Logger("test"))
}){
val logger:Logger = getData("logger")
suspendedTransaction {
val innerLogger:Logger = getData("logger")
assertEquals(logger, innerLogger)
}
}
}
通过 ThreadContextElement/ThreadLocal 保证事务唯一性,利用协程实现异步事务,并在 finally 块中确保提交或回滚操作。同时,通过拦截器机制提供事务生命周期钩子, 增强了事务框架的灵活性和可扩展性。7.1 同步场景
在同步场景里,整个事务的执行是会阻塞当前线程运行的,transaction 方法内会保证每个线程在事务里只有唯一一个事务实例。使用 ThreadLocal 变量来保存当前数据库连接以及事务上下文。这确保了每个线程在执行 transaction 函数时都拥有自己的事务上下文,避免了事务混乱。// 发生事务嵌套时,或者在最外层 transaction 执行的代码块内,只会保证有唯一一个 transaction
transaction {
val aTransaction = this
transaction {
Assert.assertEquals(aTransaction, this)
}
}
7.2 协程场景
在协程场景下,我们需要保证在每个协程中,事务的一致性,基于 ThreadContextElement + ThreadLocal 去进行传播和恢复线程的上下文信息。ThreadContextElement 工作原理:- 传播上下文: 当一个协程从一个线程切换到另一个线程时, 如果协程的 CoroutineContext 中包含 ThreadContextElement,那么 ThreadContextElement 的 updateThreadContext 方法会被调用。在这个方法中,ThreadContextElement 可以将一些线程相关的上下文信息存储到目标线程中。
- 恢复上下文: 当协程在目标线程上执行完成后,ThreadContextElement 的 restoreThreadContext 方法会被调用。在这个方法中,ThreadContextElement 可以将之前存储的上下文信息从目标线程中移除, 并将目标线程的上下文恢复到原来的状态。
知道了 ThreadContextElement 的作用后,我们只需要在每次 transaction 创建的时候,在 coroutineContext 中去使用 ThreadContextElement 即可。ThreadContextElement 的使用:
需要特别注意这种情况:内外均为异步事务,但是内部的事务执行没有 await,代码如下:runBlocking {
newSuspendedTransactionAsync { // TransactionA
newSuspendedTransactionAsync { // TransactionB
// do something...
}.await()
}
}
- beforeCommit (B) (如果事务 B 内部没有异常)
- afterCommit (B) (如果事务 B 内部没有异常)
- beforeCommit (A) (如果事务 A、B 内部没有异常)
- afterCommit (A) (如果事务 A、B 内部没有异常)
由于事务 B 没有使用 await 等待,因此事务 A 无法保证事务 B 的执行结果。如果事务 B 抛出异常,事务 A 可能会继续执行,导致数据不一致。建议在嵌套事务场景中,建议使用 await 等待内层事务的完成,以确保事务的原子性和数据一致性。newSuspendedTransactionAsync { // 事务 A
// ... 事务 A 的语句 1 ...
val deferredB = newSuspendedTransactionAsync { // 事务 B
// ... 事务 B 的语句 1 ...
// ... 事务 B 的语句 2 ...
}
// ... 事务 A 的语句 2 ...
deferredB.await() // 等待事务 B 完成
}
参考
https://github.com/JetBrains/Exposed
https://ktor.io/
扫一扫 关注我的公众号
如果你想要跟大家分享你的文章,欢迎投稿~
┏(^0^)┛明天见!