本文首发于
知乎
本文分为以下几个部分
- 两个线程抓10个网页
- 线程数量试验
- 参考资料
- 线程数量控制
- 线程池
两个线程抓10个网页
之前我们有过循环抓取10页豆瓣电影数据的例子,当时是每次循环都新建了一个线程,但是如果我们想要只用两个线程怎么办呢?
首先声明,1个线程分5个的想法是不行的,因为每个线程运行时间带有随机性,如果任务平均分配,则很可能出现一个线程还在苦苦工作,而另一个线程已经完成,却无法帮助前一个线程分担的情况,这无疑会降低运行效率。
所以比较好的方法是维护一个队列,两个线程都从中获取任务,直到把这个队列中的任务都做完。这个过程其实就是特殊的生产消费模式,只不过没有生产者,任务量是固定的而已。
import threading
import requests
from bs4 import BeautifulSoup
from queue import Queue
class MyThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while not self.queue.empty(): # 如果while True 线程永远不会终止
url = self.queue.get()
print(self.name, url)
url_queue.task_done()
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
lis = soup.find('ol', class_='grid_view').find_all('li')
for li in lis:
title = li.find('span', class_="title").text
print(title)
url_queue = Queue()
for i in range(10):
url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)
url_queue.put(url)
th1 = MyThread(url_queue)
th2 = MyThread(url_queue)
th1.start()
th2.start()
th1.join()
th2.join()
url_queue.join()
print('finish')
这里注意几点
-
Queue.empty()
表示如果队列是空则为True
,否则是False
-
Queue.join()
和Queue.task_done()
是相互配合使用的。这里的join
和线程的join
作用是类似的,它表示直到队列全部操作完成再执行后面的代码,而只有前面每次操作队列都运行一次Queue.task_done()
,join
才能通过 -
不过
Queue.join()
和Queue.task_done()
一起删除不会影响当前程序,只是为了更安全规范最好都加上 -
如果不是每次循环新建一个线程,则运行的函数经常以
while
或while True
开始,因为一个线程要处理多个任务,它对应的函数需要是能不断去获取任务的,必须是一个循环
在使用多线程时,我们现在看到了两种形式
-
用
for
循环,每一项开启一个线程 - 构建队列,开启少量线程,每个线程从队列中获取任务
我们可以对比一下这两种形式,第一种开启了更多线程,速度会更快。但是当任务成千上万的时候,还可以用第一种吗?线程数量有没有限制?
我们首先自己测试一下,然后再查阅资料
线程数量试验
首先用一个简单的函数,运行时显示当前的线程数量
import time
import threading
import random
thread_num = 1000
def run():
print('first, there are', threading.activeCount(), 'threads running')
time.sleep(thread_num/1000 * random.random())
print('second, there are ', threading.activeCount(), 'threads running')
for i in range(thread_num):
th = threading.Thread(target = run)
th.start()
thread_num
变量表示开启线程数量,通过
time.sleep
延长程序运行时间。同时运行的线程数量应该比
thread_num
小,因为有些线程结束时,有些线程还没开始。
这个值我测到100000都没有出什么问题,只是CPU运行全满,不敢再加大。
下面我们测试一下网页抓取
import threading
import requests
import json
thread_num = 100
def run():
print('first, there are', threading.activeCount(), 'threads running')
r = requests.post("http://httpbin.org/post",
data = 'second there are {} threading running'.format(threading.activeCount()))
print(r.json()['data'])
for i in range(thread_num):
th = threading.Thread(target = run)
th.start()
这个爬虫的功能也是请求之前输出当前线程数量,抓取结束返回(请求当时的)线程数量(主要是为了保证抓取是成功的)。线程数测试到1000也没出什么问题,最多有400多个线程同时运行,1000次抓取几秒就跑完了,感觉比不使用多线程时抓取10个页面还快。所以说只要对面网站不因为你请求过快把你封掉,你对每次循环开一个多线程是没有问题的。
参考资料
这个问题在stackoverflow上有讨论,见 这里 ,最后这个问题因为没有一个统一标准答案而被关闭。从回答者的观点来看,不要揣测线程的最大承受数量,可以切实去尝试,去触碰它的上界。也就是通过试验,找到运行最快,又不出任何问题的那个数量,即使这个数量非常大,也不用无畏地担心。
线程数量控制
使用
threading.Semaphore
可以控制最多允许多少个线程同时进行,超出的部分自动等待
import threading
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
def __init__(self, i):
threading.Thread.__init__(self)
self.i = i
def run(self):
with thread_max_num:
print(self.name, 'start')
url = 'https://movie.douban.com/top250?start={}&filter='