2.1 Task
proc 函数(redux-saga 的源码)用于运行一个迭代器,并返回一个 Task 对象。Task 对象描述了该迭代器的运行状态,我们首先来看看 Task 的接口(使用 TypeScript 来表示类型信息)。在 little-saga 中,我们将使用类似的 Task 接口。(注意是类似的接口,而不是相同的接口)
type Callback = (result: any, isErr: boolean) => void
type Joiner = { task: Task; cb: Callback }
interface Task {
cancel(): void
toPromise(): Promise
result: any
error: Error
isRunning: boolean
isCancelled: boolean
isAborted: boolean
}
Task 对象包含了
toPromise
()
方法,该方法会返回 saga 实例对应的 promise。
cancel
()
方法使得 saga 实例允许被取消;
isXXX
等字段反映了 saga 实例的运行状态;
result
/
error
字段可以记录了 saga 实例的运行结果。
async 函数被调用后,内部逻辑也许非常复杂,最后返回一个 Promise 对象来表示异步结果。而 saga 实例除了异步结果,还包含了额外的功能:「取消」以及「查询运行状态」。所以 Task 接口是要比 Promise 接口复杂的,内部实现也需要更多的数据结构和逻辑。
2.2 fork model
redux-saga 提供了 fork effect 来进行非阻塞调用,
yield
fork
(...)
会返回一个 Task 对象,用于表示在后台执行的 saga 实例。在更普遍的情况下,一个 saga 实例在运行的时候会多次 yield fork effect,那么一个 parent-saga 实例就会有多个 child-saga。rootSaga 通过
sagaMiddleware
.
run
()
开始运行,在 rootSaga 运行过程中,会 fork 得到若干个 child-saga,每一个 child-saga 又会 fork 得到若干个 grandchild-saga,如果我们将所有的 parent-child 关系绘制出来的话,我们可以得到类似于下图这样的一棵 saga 树。
redux-saga 的文档也对 fork model 进行了详细的说明,下面我做一点简单的翻译:
当一个节点的所有子节点完成时,且自身迭代器代码执行完毕时,该节点才算完成。
当一个节点发生错误时,错误会沿着树向根节点向上传播,直到某个节点捕获该错误。
取消一个节点时,该节点对应的整个子树都将被取消。
2.3 类
ForkQueue
类
ForkQueue
是 fork model 的具体实现。redux-saga 使用了 函数 forkQueue 来实现,在 little-saga 中我们使用 class 语法定义了
ForkQueue
类。
每一个 saga 实例可以用一个 Task 对象进行描述,为了实现 fork model,每一个 saga 实例开始运行时,我们需要用一个数组来保存 child-tasks。我们来看看 forkQueue 的接口:
interface ForkQueue {
constructor(mainTask: MainTask)
// cont: Callback 这是一个私有的字段
addTask(task: Task): void
cancelAll(): void
abort(err: Error): void
}
ForkQueue 的构造函数接受一个参数
mainTask
,该参数代表当前迭代器自身代码的执行状态,forkQueue.cont 会在 ForkQueue 被构造之后进行设置。当所有的 child-task 以及 mainTask 都完成时,我们需要调用 forkQueue.cont 来通知其 parent-saga(对应于 2.2 fork model 中的「完成」)。
ForkQueue 对象包含三个方法。方法
addTask
用于添加新的 child-task;方法
cancelAll
用于取消所有的 child-task;而方法
abort
不仅会取消所有的 child-task,还会调用 forkQueue.cont 向 parent-task 通知错误。
little-saga 的 ForkQueue 实现如下:
class ForkQueue {
tasks = []
result = undefined
// 使用 completed 变量来保证「完成」和「出错」的互斥
completed = false
// cont will be set after calling constructor()
cont = undefined
constructor(mainTask
) {
this.mainTask = mainTask
// mainTask 一开始就会被添加到数组中
this.addTask(this.mainTask)
}
// 取消所有的 child-task,并向上层通知错误
abort(err) {
this.cancelAll()
this.cont(err, true)
}
addTask(task) {
this.tasks.push(task)
// 指定 child-task 完成时的行为
task.cont =
(res, isErr) => {
if (this.completed) {
return
}
// 移除 child-task
remove(this.tasks, task)
// 清空child-task完成时的行为
task.cont = noop
if (isErr) {
// 某一个 child-task 发生了错误,调用 abort 来进行「错误向上传播」
this.abort(res)
} else {
// 如果是 mainTask 完成的话,记录其结果
if (task ===
this.mainTask) {
this.result = res
}
if (this.tasks.length === 0) {
// 满足了 task 完成的两个条件
this.completed = true
this.cont(this.result)
}
}
}
}
cancelAll() {
if (this.completed) {
return
}
this.completed = true
// 依次调用 child-task 的 cancel 方法,进行「级联向下取消」,并清空 child-task 完成时的行为
this.tasks.forEach(t => {
t.cont = noop
t.cancel()
})
this.tasks = []
}
}
2.4 task context
每一个 task 都有其对应的 context 对象,用于保存该 task 运行时的上下文信息。在 redux-saga 中我们可以使用 getContext/setContext 读写该对象。context 的一大特性是 child-task 会使用原型链的方式继承 parent-task context。当尝试访问 context 中的某个属性时,不仅会在当前 task context 对象中搜寻该属性,也会在 parent-task context 对象进行搜索,以及 parent-task 的 parent-task,依次层层往上搜索,直到找到该属性或是到达 rootSaga。该继承机制在 redux-saga 中的实现也非常简单,只有一行代码:
const
taskContext
=
Object
.
create
(
parentContext
)
。
context 是一个强大的机制,例如在 React 中,React context 用途非常广泛,react-redux / react-router 等相关类库都是基于该机制实现的。然而在 redux-saga 中,context 似乎很少被提起。
在 little-saga 中,我们将充分利用 context 机制,并使用该机制实现「effect 类型拓展」、「连接 redux store」等功能。这些机制的实现会在本文后面提到。
2.5 effect 类型拓展
在 1.9 proc 初步实现中,函数
runEffect
遇到未知 effect 类型便会抛出异常。这里我们对该处做一些修改,以实现 effect 的类型拓展。当遇到未知的 effect 类型时,我们将使用
ctx
.
translator
的
getRunner
方法来获取该 effect 对应的 effectRunner,然后调用该 effectRunner。只要我们提前设置好
ctx
.
translator
,就能在后续的代码中使用拓展类型。为了方便设置
ctx
.
translator
,little-saga 中新增了 def 类型的 effect,用来关联拓展类型与其对应的 effectRunner。
根据 context 的特性,child-task 会继承 parent-saga 的 context,故在 parent-task 中定义的拓展类型也能用于 child-task。在 little-saga 中,race/all/take/put 等类型将使用该拓展机制进行实现。
function runEffect(effect, currCb) {
const effectType = effect[0]
if (effectType === 'promise') {
resolvePromise(effect, ctx, currCb)
} else if (effectType === 'iterator') {
resolveIterator(iterator, ctx, currCb)
} else if (effectType ===
'def') {
runDefEffect(effect, ctx, currCb)
/* 其他已知类型的 effect */
/* .................... */
} else { // 未知类型的effect
const effectRunner = ctx.translator.getRunner(effect)
if (effectRunner == null) {
const error = new Error(`Cannot resolve effect-runner for type: ${effectType}`)
error.effect = effect
currCb(error, true)
} else {
effectRunner(effect,
ctx, currCb, { digestEffect })
}
}
}
function runDefEffect([_, name, handler], ctx, cb) {
def(ctx, name, handler)
cb()
}
// def 定义在其他文件
function def(ctx, type, handler) {
const old = ctx.translator
// 替换 ctx.translator
ctx.translator = {
getRunner
(effect) {
return effect[0] === type ? handler : old.getRunner(effect)
}
},
}
2.6 little-saga 核心部分的完整实现
little-saga 核心部分的实现代码位于 /src/core 文件夹内。完整实现的代码较多,相比于 1.9 proc 初步实现,完整实现添加了 context、task、fork model、effect 类型拓展、错误处理等功能,完善了 Task 生命周期(启动/完成/出错/取消)。
redux-saga 对应的实现代码全部都位于 proc.js 一个文件中,导致该文件很大;而 little-saga 则是将实现分成了若干个文件,下面我们一个一个来进行分析。
2.6.1 整体思路
在 2.6 后面的内容中,我们将使用以下的代码作为例子。该例子中,Parent 会 fork Child1 和 Child2,分别需要 100ms 和 300ms 来完成。Parent 迭代器本身的代码(mainTask)需要 200ms 完成。而整个 Parent Task 则需要 300ms 才能完成。
function* Parent() {
const Child1 = yield fork(api.xxxx) // LINE-1 需要 100ms 才能完成
const Child2 = yield fork(api.yyyy) // LINE-2 需要 300ms 才能完成
yield delay(200) // LINE-3 需要 200 ms 才能完成
}
下图展示了 Parent 运行时,各个 Task / mainTask / ForkQueue 之间的相互关系。图中的实线箭头表示两个对象之间的后继关系(cont):「A 指向 B」意味着「当 A 完成时,需要将结果传递给 B」。
在 1.7 cancellation 中我们知道 cancellation 的顺序和 cont 恰好是相反的,在具体代码实现时,我们不仅需要构建下图中的 cont 关系,还需要构建反向的 cancellation 关系。
本小节中的代码比较复杂,如果觉得理解起来比较困难的话,可以和 2.7 Task 状态变化举例 对照着看。
2.6.2 函数
proc
函数 proc 是运行 saga 实例的入口函数。结合上图,函数 proc 的作用是创建图中各个 Task/mainTask 对象,并建立对象之间的后继关系(cont)和取消关系(cancellation)。代码如下:
// /src/core/proc.js
function proc(iterator, parentContext, cont) {
// 初始化当前 task 的 context
const ctx = Object.create(parentContext)
// mainTask 用来跟踪当前迭代器的语句执行状态
const mainTask = {
// cont: **will be set when passed to ForkQueue**
isRunning: true,
isCancelled: false,
cancel() {
if (mainTask.isRunning && !mainTask.isCancelled)
{
mainTask.isCancelled = true
next(TASK_CANCEL)
}
},
}
// 创建 ForkQueue 对象和 Task 对象,这两个类的代码在后面会写出来
const taskQueue = new ForkQueue(mainTask)
const task = new Task(taskQueue)
// 设置后继关系
taskQueue.cont = task.end
task.cont = cont
// 设置取消关系
cont.cancel = task.cancel
next()
return task
// 以下代码均为函数定义
// 在图中驱动函数只和 mainTask 有联系
// 然后我们也可以发现下面 next 函数的代码中,也只调用了 mainTask 的接口
// 即 next 函数中的代码不会引用 task 和 taskQueue 对象
function next(arg, isErr) {
console.assert(mainTask.isRunning, 'Trying to resume an already finished generator')
try {
let result
if (isErr) {
result = iterator.throw(arg)
} else
if (arg === TASK_CANCEL) {
mainTask.isCancelled = true
next.cancel() // 取消当前执行的 effect
// 跳转到迭代器的 finally block,执行清理逻辑
result = iterator.return(TASK_CANCEL)
} else {
result = iterator.next(arg)
}
if (!result.done) {
digestEffect(result.value, next)
} else {
mainTask.isRunning = false
mainTask.cont(result.value)
}
} catch (error) {
if (!mainTask.isRunning) {
throw error
}
if (mainTask.isCancelled) {
// 在执行 cancel 逻辑时发生错误,在 3.4 其他问题与细节 中说明
console.error(error)
}
mainTask.isRunning = false
mainTask.cont(error, true)
}
}
// function digestEffect(rawEffect, cb) { /* ...... */ }
// function runEffect(effect, currCb) { /* ...... */ }
// function resolvePromise([effectType, promise], ctx, cb) { /* ... */ }
// function resolveIterator([effectType, iterator], ctx, cb) { /* ... */ }
// ...... 各种内置类型的 effect-runner
// fork-model 中新增了 fork/spawn/join/cancel/cancelled
// 这五种类型的 effec-runner 代码见下方
}
2.6.3 fork-model 相关的 effect-runner
函数
runForkEffect
用来执行 fork 类型的 effect,并向调用者返回一个 subTask 对象。该函数需要注意的是,有的时候调用者会使用 fork 来执行一些同步的任务,所以调用
proc
(
iterator
,
ctx
,
noop
)
可能会返回一个已经完成或是已经发生错误的 subTask,此时我们不需要将 subTask 放入 fork-queue 中,而是需要执行其他操作。
// /src/core/proc.js
function runForkEffect([effectType, fn, ...args], ctx, cb) {
const iterator = createTaskIterator(fn
, args)
try {
suspend() // 见 3.4 scheduler
const subTask = proc(iterator, ctx, noop)
if (subTask.isRunning) {
task.taskQueue.addTask(subTask)
cb(subTask)
} else if (subTask.error) {
task.taskQueue.abort(subTask.error)
} else {
cb(subTask)
}
} finally {
flush() // 见 3.4 scheduler
}
}
剩下四个类型(spawn / join / cancel / cancelled)的 effect-runner 比较简单,这里就不再进行介绍。
2.6.4 类
Task
类
Task
是 2.1 Task 的具体实现。
// /src/core/Task.js
class Task {
isRunning = true
isCancelled = false
isAborted = false
result = undefined
error = undefined
joiners = []
// cont will be set after calling constructor()
cont = undefined
constructor
(taskQueue) {
this.taskQueue = taskQueue
}
// 调用 cancel 函数来取消该 Task,这将取消所有当前正在执行的 child-task 和 mainTask
// cancellation 会向下传播,意味着该 Task 对应的 saga-tree 子树都将会被取消
// 同时 cancellation 也会传递给该 Task 的所有 joiners
cancel = () => {
// 如果该 Task 已经完成或是已经被取消,则跳过
if (this.isRunning && !this.isCancelled) {
this.isCancelled = true
this.taskQueue.cancelAll()
// 将 TASK_CANCEL 传递给所有 joiners
this.end(TASK_CANCEL)
}
}
// 结束当前 Task
// 设置 Task 的 result/error,然后调用 task.cont,最后将结果传递给 joiners
// 当该 Task 的 child-task 和 mainTask 都完成时(即 fork-queue 完成时),该函数将被调用
end = (result, isErr) => {
this.isRunning = false
if (!isErr) {
this.result = result
} else {
this.error = result
this.isAborted = true
}
this.cont(result
, isErr)
this.joiners.forEach(j => j.cb(result, isErr))
this.joiners = null
}
toPromise() {
// 获取 task 对应的 promise 对象,这里省略了代码
}
}
2.6.5 小节
这一节中代码较多,而且代码的逻辑密度很高。想要完全理解 little-saga 的实现思路,还是需要仔细阅读源代码才行。
2.7 Task 状态变化举例
下面的代码是 2.6 中的例子。
function* Parent() {
const Child1 = yield fork(api.xxxx) // LINE-1 需要 100ms 才能完成
const Child2 = yield fork(api.yyyy
) // LINE-2 需要 300ms 才能完成
yield delay(200) // LINE-3 需要 200 ms 才能完成
}
下表展示了这个例子在一些关键时间点的执行情况与相应的状态变化。注意在下表中,如果没有指定 task/forkQueue/mainTask 是属于 Parent 还是 Child1/Child2,默认都是属于 Parent 的。下表只展示了这个例子正常完成的过程,我们也可以思考一下在 t=50 / t=150 / t=250 / t=350 等不同时间点,如果 Parent 被取消了,代码又会怎么执行。
2.8 类
Env
类
Env
的作用是在运行 rootSaga 之前,对 root Task 的运行环境进行配置。Env 采用了链式调用风格的 API,方便将多个配置串联起来。
我们可以利用 Env 来预先添加一些常见的 effect 类型,例如 all/race/take/put 等,这样后续所有的 saga 函数都可以直接使用这些 effect 类型。例如下面的代码在运行 rootSaga 之前定义了 delay 和 echo 两种 effect。
new Env()
.def('delay', ([_, timeout], _ctx, cb) => setTimeout(cb, timeout))
.def('echo', ([_, arg], _ctx, cb) => cb(arg))
.run(rootSaga)
function* rootSaga() {
yield ['delay', 500] // 500ms之后 yield 才会返回
yield ['echo', 'hello'] // yield 返回字符串 'hello'
}
2.9 第二部分小节
至此,little-saga 的核心部分实现完毕。核心部分实现了 fork model,实现了 fork/join/cancel/promise/iterator 等内置类型的 effect-runner,并预留了拓展接口。第三部分中,我们将使用该拓展接口来实现 redux-saga 中剩下的那些 effect 类型(all/race/put/take 等)。
3.1 commonEffects 拓展
all-effect 的行为与 Promise#all 非常类似:all-effect 在构造时接受一些 effects 作为 sub-effects,当所有 sub-effects 完成时,all-effect 才算完成;当其中之一 sub-effect 抛出错误时,all-effect 会立即抛出错误。
有了 def effect,拓展 effect 就简单多了。redux-saga 中 runAllEffect 用于运行 all 类型的 effect,我们拷贝该代码,并简单修改,使其符合 effectRunner 接口,即可在 little-saga 中实现 all effect。little-sage 中实现 all effect 的代码如下:
// 这里该函数是一个简化版,省略了 all-effect 被取消的处理代码
// 这里假设 effects 是一个对象,实际版本中还需要考虑 effects为数组的情况
function all([_, effects], ctx, cb, { digestEffect }) {
const keys = Object.keys(effects)