本文首发于 个人博客
假如我们需要写个下载工具或者爬虫。
在保证速度的前提下,我们会发起多个请求。但是一般网站都有请求数量的限制。为了避免触发网站的限制,我们需要对并发请求的数量进行控制。
那么如何对并发请求进行控制呢?
使用第三方库
对于这种异步控制的场景,现有的库有对应的方法。比如 RxJS
和 Async
let observables = [] // fill with observables
from(observables)
.pipe(mergeMap(observable => observable, 3))
.subscribe();
复制代码
上面的例子用到了 RxJS
操作符 mergeMap
mergeMap
的第二个参数就是控制并发的 Observable
数量。
async.eachLimit
也可以做到,这里就不赘述了。
实现并发请求控制
言归正传,我们开始实现一个这样的函数。
// iteratorFn 是一个返回 Promise 的函数
function eachLimit(limit, arr, iteratorFn) {
const res = []
for (const item of arr) {
res.push(iteratorFn(item))
}
return Promise.all(res)
}
复制代码
上面的代码,不加任何限制。有多少次请求就发起多少次。
首先能想到的是直接把我们异步调用的对象,放进一个队列。队列的长度就是限制请求的大小: limit
function eachLimit(limit, arr, iteratorFn) {
const res = [];
const activeList = [];
for (const item of arr) {
const p = iteratorFn(item);
res.push(p);
activeList.push(p)
while (activeList.length >= limit) {
// 开始进行请求限制了
}
}
return Promise.all(res);
}
复制代码
队列的控制
有了这个队列后,那么接下来就是更新队列。队列里面都是 Promise
对象,如何判断状态是已经完成呢,并且找到这个完成的 Promise
对象,把它踢出队列,以便于新的加入?
答案是 Promise.race
但是还有个问题是 Promise.race
返回的是 我们放进队列的 Promise
实例的 fullfilled 值。
async function eachLimit(limit, arr, iteratorFn) {
const res = [];
const activeList = [];
for (const item of arr) {
const p = iteratorFn(item);
res.push(p);
activeList.push(p)
while (activeList.length >= limit) {
const fastestResult = await Promise.race(activeList)
}
}
return Promise.all(res);
}
复制代码
那么利用 fastestResult 怎么判断哪个 Promise
实例完成了?
额。。。其实我也不知道,然后找到了下面的说明。
The "remove from queue" step should happen by the completed promise itself (using then) instead of relying on the returned promise from Promise.race. It seems this is the only way around it.
通过查找资料,按照 stackoverflow 上面的说法,只能通过 Promise
完成状态进行处理。
async function eachLimit(limit, arr, iteratorFn) {
const res = [];
const activeList = [];
for (const item of arr) {
const p = iteratorFn(item);
res.push(p);
const e = p.then(() =>
activeList.splice(activeList.indexOf(e), 1)
);
activeList.push(e)
while (activeList.length >= limit) {
await Promise.race(activeList)
}
}
return Promise.all(res);
}
复制代码
上面便是完整的实现
测试代码
async function test() {
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
const results = await eachLimit(2, [1000, 5000, 3000, 2000], timeout);
console.log(results);
}
复制代码
换一个思路
上面的方法单个队列进行控制。接下来的思路是使用 limit
大小的序列进行处理。
假设目前是 A, B, C,D, E 5个请求要执行。其中要求 3个并发执行。
我们可以改造成 [A then D], [B then E] [C]
async function asyncLoop(limit, arr, iteratorFn) {
const queues = new Array(limit).fill(0).map(() => Promise.resolve());
let index = 0;
const add = cb => {
index = (index + 1) % limit;
return queues[index] = queues[index].then(() => cb());
};
let results = [];
for (let v of arr) {
results.push(add(() => iteratorFn(v)));
}
return await Promise.all(results);
};
复制代码
总结
第一种方法: 主要是利用了 Promise.race
, 以及包装原来的 promise
实例进一个新的promise 实例,在 then
里面处理队列,即是把完成的自己移除队列。
第二种方法: 使用了 N 个序列,每个序列都是 promise.then 的链式调用。
参考链接
javascript - Get which promise completed in Promise.race - Stack Overflow