专栏名称: 22earth
前端
目录
相关文章推荐
色影无忌  ·  松下G97是一款紧凑型M4/3中端相机 ·  3 天前  
摸摸艺术书  ·  400 个广告回顾过去 100 年的时尚 ·  3 天前  
摸摸艺术书  ·  《天外来客》大卫·鲍伊剧照摄影集 ·  3 天前  
摸摸艺术书  ·  “Home Made Russia” ... ·  1 周前  
51好读  ›  专栏  ›  22earth

动手实现并发请求控制

22earth  · 掘金  ·  · 2020-03-25 02:48

正文

阅读 41

动手实现并发请求控制

本文首发于 个人博客

假如我们需要写个下载工具或者爬虫。

在保证速度的前提下,我们会发起多个请求。但是一般网站都有请求数量的限制。为了避免触发网站的限制,我们需要对并发请求的数量进行控制。

那么如何对并发请求进行控制呢?

使用第三方库

对于这种异步控制的场景,现有的库有对应的方法。比如 RxJSAsync

let observables = [] // fill with observables
from(observables)
    .pipe(mergeMap(observable => observable, 3))
    .subscribe();
复制代码

上面的例子用到了 RxJS 操作符 mergeMap

RxJS - mergeMap

mergeMap 的第二个参数就是控制并发的 Observable 数量。

async.eachLimit 也可以做到,这里就不赘述了。

async - Documentation

实现并发请求控制

言归正传,我们开始实现一个这样的函数。

// iteratorFn 是一个返回 Promise 的函数
function eachLimit(limit, arr, iteratorFn) {
  const res = []
  for (const item of arr) {
    res.push(iteratorFn(item))
  }
  return Promise.all(res)
}

复制代码

上面的代码,不加任何限制。有多少次请求就发起多少次。

首先能想到的是直接把我们异步调用的对象,放进一个队列。队列的长度就是限制请求的大小: limit

function eachLimit(limit, arr, iteratorFn) {
  const res = [];
  const activeList = [];
  for (const item of arr) {
    const p = iteratorFn(item);
    res.push(p);
    activeList.push(p)
    while (activeList.length >= limit) {
      // 开始进行请求限制了
    }
  }
  return Promise.all(res);
}

复制代码

队列的控制

有了这个队列后,那么接下来就是更新队列。队列里面都是 Promise 对象,如何判断状态是已经完成呢,并且找到这个完成的 Promise 对象,把它踢出队列,以便于新的加入?

答案是 Promise.race

但是还有个问题是 Promise.race 返回的是 我们放进队列的 Promise 实例的 fullfilled 值。

async function eachLimit(limit, arr, iteratorFn) {
  const res = [];
  const activeList = [];
  for (const item of arr) {
    const p = iteratorFn(item);
    res.push(p);
    activeList.push(p)
    while (activeList.length >= limit) {
      const fastestResult = await Promise.race(activeList)
    }
  }
  return Promise.all(res);
}
复制代码

那么利用 fastestResult 怎么判断哪个 Promise 实例完成了?

额。。。其实我也不知道,然后找到了下面的说明。

The "remove from queue" step should happen by the completed promise itself (using then) instead of relying on the returned promise from Promise.race. It seems this is the only way around it.

通过查找资料,按照 stackoverflow 上面的说法,只能通过 Promise 完成状态进行处理。

async function eachLimit(limit, arr, iteratorFn) {
  const res = [];
  const activeList = [];
  for (const item of arr) {
    const p = iteratorFn(item);
    res.push(p);
    const e = p.then(() =>
      activeList.splice(activeList.indexOf(e), 1)
    );
    activeList.push(e)
    while (activeList.length >= limit) {
      await Promise.race(activeList)
    }
  }
  return Promise.all(res);
}
复制代码

上面便是完整的实现

测试代码

async function test() {
  const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
  const results = await eachLimit(2, [1000, 5000, 3000, 2000], timeout);
  console.log(results);
}
复制代码

换一个思路

上面的方法单个队列进行控制。接下来的思路是使用 limit 大小的序列进行处理。

假设目前是 A, B, C,D, E 5个请求要执行。其中要求 3个并发执行。

我们可以改造成 [A then D], [B then E] [C]

async function asyncLoop(limit, arr, iteratorFn) {
  const queues = new Array(limit).fill(0).map(() => Promise.resolve());
  let index = 0;
  const add = cb => {
    index = (index + 1) % limit;
    return queues[index] = queues[index].then(() => cb());
  };
  let results = [];
  for (let v of arr) {
    results.push(add(() => iteratorFn(v)));
  }
  return await Promise.all(results);
};
复制代码

总结

第一种方法: 主要是利用了 Promise.race , 以及包装原来的 promise 实例进一个新的promise 实例,在 then 里面处理队列,即是把完成的自己移除队列。

第二种方法: 使用了 N 个序列,每个序列都是 promise.then 的链式调用。

参考链接

rxaviers/async-pool: Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7

javascript - Get which promise completed in Promise.race - Stack Overflow