专栏名称: 挖地兔
金融数据采集与挖掘,开启量化金融的第一扇大门。
目录
相关文章推荐
环球物理  ·  【聚集高考】重磅!2025全国高考时间刚刚定了! ·  20 小时前  
环球物理  ·  【春晚考点】2025春晚15个热门考点汇总, ... ·  2 天前  
51好读  ›  专栏  ›  挖地兔

你真的懂线程吗?史上最全Python线程解析

挖地兔  · 公众号  ·  · 2019-04-14 19:00

正文


使用Python中的线程模块,能够同时运行程序的不同部分,并简化设计。 如果你已经入门Python,并且想用线程来提升程序运行速度的话,那本文就是为你准备的!

通过阅读本文,你将了解到:

· 什么是线程?

· 如何创建、执行线程?

· 如何使用线程池ThreadPoolExecutor?

· 如何避免资源竞争问题?

· 如何使用Python中线程模块threading提供的常用工具?


目录


1. 什么是线程

2. 创建线程

2.1. 守护线程

2.2. 加入线程

3. 多线程

4. 线程池

5. 竞态条件

5.1. 单线程

5.2. 两个线程

5.3. 示例的意义

6. 同步锁

7. 死锁

8. 生产者-消费者模型中的线程

8.1 在生产者-消费者模型中使用锁

8.2 在生产者-消费者模型中使用队列

9. 线程对象

9.1 信号量

9.2 定时器

9.3 栅栏



阅读提醒:

已掌握Python基本知识;

使用Python 3.6以上版本运行。



1. 什么是线程


线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。


在Python3中实现的大部分运行任务里,不同的线程实际上并没有同时运行:它们只是看起来像是同时运行的。


大家很容易认为线程化是在程序上运行两个(或多个)不同的处理器,每个处理器同时执行一个独立的任务。这种理解并不完全正确,线程可能会在不同的处理器上运行,但一次只能运行一个线程。


同时执行多个任务需要使用非标准的Python运行方式:用不同的语言编写一部分代码,或者使用多进程模块 multiprocessing ,但这么做会带来一些额外的开销。


由于Python默认的运行环境是CPython(C语言开发的Python),所以线程化可能不会提升所有任务的运行速度。这是因为和GIL(Global Interpreter Lock)的交互形成了限制:一次只能运行一个Python线程。


线程化的一般替代方法是:让各项任务花费大量时间等待外部事件。但问题是,如果想缩短等待时间,会需要大量的CPU计算,结果是程序的运行速度可能并不会提升。


当代码是用Python语言编写并在默认执行环境CPython上运行时,会出现这种情况。如果线程代码是用C语言写的,那它们就能够释放GIL并同时运行。如果是在别的Python执行环境(如IPython, PyPy,Jython, IronPython)上运行,请参考相关文档了解它们是如何处理线程的。


如果只用Python语言在默认的Python执行环境下运行,并且遇到CPU受限的问题,那就应该用多进程模块 multiprocessing 来解决。


在程序中使用线程也可以简化设计。本文中的大部分示例并不保证可以提升程序运行速度,其目的是使设计结构更加清晰、便于逻辑推理。


下面就来看看如何使用线程吧!



2. 创建线程


既然已经对什么是线程有了初步了解,下面让我们来学习如何创建一个线程。


Python标准库提供了 threading 模块,里面包含将在本文中介绍的大部分基本模块。在这个模块中, Thread 类很好地封装了有关线程的子类,为我们提供了干净的接口来使用它们。


要启动一个线程,需要创建一个 Thread 实例,然后调用 .start() 方法:


import logging
import threading
import time


def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)


if __name__ == "__main__":
     format = "%(asctime)s: %(message)s"
     logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S" )

     logging.info("Main    : before creating thread")
     x = threading.Thread(target=thread_function, args=(1,))
     logging.info("Main    : before running thread")
     x.start()
     logging.info("Main    : wait for the thread to finish")
     # x.join()
     logging.info("Main    : all done")


查看日志语句,可以看到 __main__ 部分正在创建并启动线程:


x = threading.Thread(target=thread_function, args=(1,))
x.start()


创建线程时,我们需要传递两个参数,第一个参数 target 是函数名,指定这个线程去哪个函数里面去执行代码,第二个参数 args 是一个元组类型,指定为这个函数传递的参数。在本例中, Thread 运行函数 thread_function() ,并将1作为参数传递给该函数。


在本文中,我们用连续整数为线程命名。虽然 threading.get_ident() 方法能够为每一个线程生成唯一的名称,但这些名称通常会比较长,而且可读性差。


这里的 thread_function() 函数本身没做什么,它只是简单地记录了一些信息,并用 time.sleep() 隔开。


运行程序(注释掉倒数第二行代码),结果如下:


$ ./single_thread.py
Main    : before creating thread
Main    : before running thread
Thread 1: starting
Main    : wait for the thread to finish
Main    : all done
Thread 1: finishing


可以看到,线程 Thread __ main__ 部分代码运行完后才结束。下一节会对这一现象做出解释,并讨论被注释掉那行代码。


2.1. 守护线程


在计算机科学中,守护进程 daemon 是一类在后台运行的特殊进程,用于执行特定的系统任务。


守护进程 daemon 在Python线程模块 threading 中有着特殊的含义。当程序退出时,守护线程将立即关闭。可以这么理解,守护线程是一个在后台运行,且不用费心去关闭它的线程,因为它会随程序自动关闭。


如果程序运行的线程是非守护线程,那么程序将等待所有线程结束后再终止。但如果运行的是守护线程,当程序退出时,守护线程会被自动杀死。


我们仔细研究一下上面程序运行的结果,注意看最后两行。当运行程序时,在 __main__ 部分打印完all done信息后、线程结束前,有一个大约2秒的停顿。


这时,Python在等待非守护线程完成运行。当Python程序结束时,关闭过程的一部分是清理线程。


查看Python线程模块的源代码,可以看到 thread ._shutdown() 方法遍历所有正在运行的线程,并在每个非守护线程上调用 .join() 函数,检查它们是否已经结束运行。


因此,程序退出时需要等待,因为守护线程本身会在休眠中等待其他非守护线程运行结束。一旦 thread ._shutdown() 运行完毕并打印出信息,程序就可以退出。


守护线程这种自动退出的特性很实用,但其实还有其他的方法能实现相同的功能。我们先用守护线程重复运行一下上面的程序,看看结果。只需在创建线程时,添加参数 daemon=True


x = threading.Thread(target=thread_function, args=(1,), daemon=True)


现在运行程序,结果如下:


$ ./single_thread.py
Main    : before creating thread
Main    : before running thread
Thread 1: starting
Main    : wait for the thread to finish
Main    : all done
Thread 1: finishing

添加参数daemon=True前


$ ./daemon_thread.py
Main    : before creating thread
Main    : before running thread
Thread 1: starting
Main    : wait for the thread to finish
Main    : all done

添加参数daemon=True后


不同的地方是,之前输出的最后一行不见了,说明 thread_function() 函数没有机会完成运行。这是一个守护线程,所以当 __main__ 部分运行完最后一行代码,程序终止,守护线程被杀死。


2.2. 加入一个线程


守护线程用起来很方便,但如果想让守护线程运行完毕后再结束程序该怎么办?或者想让守护线程运行完后不退出程序呢?


让我们来看一下刚刚注释掉的那行代码:


# x.join()




    


要让一个线程等待另一个线程完成,可以调用 .join() 函数。如果取消对这行代码的注释,主线程将会暂停,等待线程 x 完成运行。


这个功能在守护线程和非守护线程上同样适用。如果用 .join() 函数加入了一个线程,则主线程将一直等待,直到被加入的线程运行完成。



3. 多线程


到目前为止,示例代码中只用到了两个线程:主线程和一个 threading.Thread 线程对象。

通常,我们希望同时启动多个线程,让它们执行不同的任务。先来看看比较复杂的创建多线程的方法,然后再看简单的。


这个复杂的创建方法其实前面已经展示过了:


import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(3):
        logging.info("Main    : create and start thread %d."index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d."index)
        thread.join()
        logging.info("Main    : thread %d done"index)


这段代码和前面提到的创建单线程时的结构是一样的,创建线程对象,然后调用 .start() 方法。程序中会保存一个包含多个线程对象的列表,为稍后使用 .join() 函数做准备。


多次运行这段代码可能会产生一些有趣的结果:


Main    : create and start thread 0.
Thread 0starting
Main    : create and start thread 1.
Thread 1starting
Main    : create and start thread 2.
Thread 2starting
Main    : before joining thread 0.
Thread 2: finishing
Thread 1: finishing
Thread 0: finishing
Main    : thread 0 done
Main    : before joining thread  1.
Main    : thread 1 done
Main    : before joining thread 2.
Main    : thread 2 done


仔细看一下输出结果,三个线程都按照预想的顺序创建0,1,2,但它们的结束顺序却是相反的!多次运行将会生成不同的顺序。查看线程 Thread x: finish 中的信息,可以知道每个线程都在何时完成。


线程的运行顺序是由操作系统决定的,并且很难预测。很有可能每次运行所得到的顺序都不一样,所以在用线程设计算法时需要注意这一点。


幸运的是,Python中提供了几个基础模块,可以用来协调线程并让它们一起运行。在介绍这部分内容之前,让我们先看看如何更简单地创建一组线程。



4. 线程池


我们可以用一种更简单的方法来创建一组线程:线程池 ThreadPoolExecutor ,它是Python中 concurrent.futures 标准库的一部分。(Python 3.2 以上版本适用)。


最简单的方式是把它创建成上下文管理器,并使用 with 语句管理线程池的创建和销毁。


ThreadPoolExecutor 重写上例中的 __main__ 部分,代码如下:


import concurrent.futures

# [rest of code]

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))


这段代码创建一个线程池 ThreadPoolExecutor 作为上下文管理器,并传入需要的工作线程数量。然后使用 .map() 遍历可迭代对象,本例中是 range(3) ,每个对象生成池中的一个线程。

with 模块的结尾,会让线程池 ThreadPoolExecutor 对池中的每个线程调用 .join() 。强烈建议使用线程池 ThreadPoolExecutor 作为上下文管理器,因为这样就不会忘记写 .join()


注:

使用线程池 ThreadPoolExecutor 可能会报一些奇怪的错误。例如,调用一个没有参数的函数,但将参数传入.map()时,线程将抛出异常。


不幸的是,线程池 ThreadPoolExecutor 会隐藏该异常,程序会在没有任何输出的情况下终止。刚开始调试时,这会让人很头疼。


运行修改后的示例代码,结果如下:


$ ./executor.py
Thread 0: starting
Thread 1: starting
Thread 2: starting
Thread 1: finishing
Thread 0: finishing
Thread 2: finishing


再提醒一下,这里的线程1在线程0之前完成,这是因为线程的调度是由操作系统决定的,并不遵循一个特定的顺序。



5. 竞态条件


在继续介绍Python线程模块的一些其他特性之前,让我们先讨论一下在编写线程化程序时会遇到的一个更头疼的问题: 竞态条件。


我们先了解一下竞态条件的含义,然后看一个实例,再继续学习标准库提供的其他模块,来防止竞态条件的发生。


当两个或多个线程访问共享的数据或资源时,可能会出现竞态条件。在本例中,我们创建了一个每次都会发生的大型竞态条件,但请注意,大多数竞态条件不会如此频繁发生。通常情况下,它们很少发生,但一旦发生,会很难进行调试。


在本例中,我们会写一个更新数据库的类,但这里并不需要一个真正的数据库,只是一个虚拟的,因为这不是本文讨论的重点。


这个 FakeDatabase 类包括 .__init__() .update() 方法。


class




    
 FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)


FakeDatabase 类会一直跟踪一个值: .value ,它是共享数据,这里会出现竞态条件。


.__init__() 方法将 .value 的值初始化为0。 .update() 方法从数据库中读取一个值,对其进行一些计算,然后将新值写回数据库。


FakeDatabase 类的使用实例如下:


if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)


该程序创建一个线程池 ThreadPoolExecutor ,里面包含两个线程,然后在每个线程上调用 .submit() 方法,告诉它们运行 database.update() 函数。


.submit() 允许将位置参数和关键字参数传递给正在线程中运行的函数:


.submit(function, *args, **kwargs)


示例代码中, index 作为唯一一个位置参数传递给 database.update() 函数,后面会介绍,也可以用类似的方式传递多个参数。


由于每个线程都会运行 .update() , 让 .value 的变量值加上1,所以最后打印出的 database.value 值应该是2。但如果是这样的话,举这个例子就没有什么意义了。


实际上,运行上面这段代码的输出如下:


$ ./racecond.py
Testing unlocked update. Starting value is 0.
Thread 0starting update
Thread 1starting update
Thread 0: finishing update
Thread 1: finishing update
Testing unlocked update. Ending value is 1.


我们来仔细研究一下这里究竟发生了什么,有助于更好地理解有关这个问题的解决方案。


5.1. 单线程


在深入研究上面有关两个线程的问题之前,我们先回过头看一下线程到底是如何工作的。


这里不会讨论所有的细节,因为在目前这个学习阶段还没必要掌握这么多内容。我们还将简化一些东西,虽然可能在技术上不够精确,但可以方便大家理解其中的原理。


当线程池 ThreadPoolExecutor 运行每个线程时,我们会指定运行哪个函数,以及传递给该函数的参数: executor.submit(database.update, index) ,这里是指运行 database.update 函数,并传入 index 参数。


这么做的结果是,线程池中的每个线程都将调用 database.update(index) 。注意,主线程 __main__ 中创建的 database 是对 FakeDatabase 对象的引用。在这个对象上调用 .update() ,会调用该对象的实例方法。


每个线程都将引用同一个 FakeDatabase 对象: database 。每个线程还有一个独特的 index 值,使得日志语句更易阅读:



当线程开始运行 .update() 函数时,它拥有局部变量 local_copy 。这绝对是一件好事,否则,运行相同函数的两个线程总是会相互混淆。也就是说,函数内定义的局部变量是线程安全的。

现在我们可以看一下,如果使用单线程、调用一次 .update() 函数运行上面的程序会发生什么。

下图展示了在只运行一个线程的情况下, .update() 函数是如何逐步执行的。代码显示在左上角,后面跟着一张图,显示线程中局部变量 local_value 和共享数据 database.value 的值:



这张图是这样布局的,从上至下时间增加,它以创建线程1开始,并在线程1终止时结束。


线程1启动时, FakeDatabase.value 的值为0。第一行代码将值0复制给局部变量 local_copy 。接下来, local_copy += 1 语句让 local_copy 的值增加1,可以看到线程1中的 .value 值变成了1。


然后调用 time.sleep() 方法,暂停当前线程,并允许其他线程运行。因为本例中只有一个线程,这里没什么影响。


当线程1被唤醒继续运行时,它将新值从局部变量 local_copy 复制到 FakeDatabase.value ,线程完成运行。可以看到 database.value 的值被设为1。


到目前为止,一切顺利。我们运行了一次 .update() 函数, FakeDatabase.value 值增加到1。


5.2. 两个线程


回到竞态条件,这两个线程会并发运行,但不会同时运行。它们都有各自的局部变量 local_copy ,并指向相同的 database 对象。正是 database 这个共享数据导致了这些问题。


程序创建线程1,运行update()函数:

当线程1调用 time.sleep() 方法时,它允许另一个线程开始运行。这时,线程2启动并执行相同的操作。它也将 database.value 的值复制给私有变量 local_copy ,但共享数据 database.value 的值还未更新,仍为0:



当线程2进入休眠状态时,共享数据 database.value 的值还是未被修改的0,而且两个线程中的私有变量 local_copy 的值都是1。


现在线程1被唤醒并保存其私有变量 local_copy 的值,然后终止,线程2继续运行。线程2在休眠的时候并不知道线程1已经运行完毕并更新了 database.value 中的值,当继续运行时, 它将自己私有变量 local_copy 的值存储到 database.value 中,也是1。



这两个线程交错访问同一个共享对象,覆盖了彼此的结果。当一个线程释放内存或在另一个线程完成访问之前关闭文件句柄时,可能会出现类似的竞争条件。


5.3. 示例的意义


上面的例子是为了确保每次运行程序时都发生竞态条件。因为操作系统可以在任何时候交换出一个线程,所以有可能在读取了x的值之后,像x = x + 1这样的语句会中断,导致写回数据库的值不是我们想要的。


这一过程中的细节非常有趣,但本文剩下部分的学习不需要了解具体细节,所以可以先跳过。


看完有关竞态条件的实例,让我们接下来看看如何解决它们!



6. 同步锁


有很多方法可以避免或解决竞态条件,这里不会介绍所有的解决方法,但会提到一些会经常用到的。让我们先从锁 Lock 开始学习。


要解决上述竞态条件问题,需要找到一种方法,每次只允许一个线程进入代码的 read-modify-write 部分。最常用就是Python中的锁。在一些其他语言中,同样的思想被称为互斥锁 mutex 。互斥锁 mutex 属于进程互斥 MUTual EXclusion 的一部分,它和锁所做的工作是一样的。


锁是一种类似于通行证的东西,每次只有一个线程可以拥有锁,任何其他想要获得锁的线程必须等待,直到该锁的所有者将它释放出来。


完成此任务的基本函数是 .acquire() .release ()。线程将调用 my_lock.acquire() 来获取锁。如果锁已经存在,则调用线程将会等待,直到锁被释放。这里有一点很重要,如果一个线程获得了锁,但从未释放,程序会被卡住。稍后会介绍更多关于这方面的内容。


幸运的是,Python的锁也将作为上下文管理器运行,所以可以在 with 语句中使用它,并且当 with 模块出于任何原因退出时,锁会自动释放。


让我们看看添加了锁的FakeDatabase,调用函数保持不变:


class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)


除了添加一些调试日志以便更清楚地查看锁的运行之外,这里最大的变化是添加了一个叫 ._lock 的成员,它是一个 thread . lock() 对象。这个 ._lock 在未锁定状态下被初始化,并由 with 语句锁定和释放。


值得注意的是,运行该函数的线程将一直持有这个锁,直到它完全更新完数据库。在本例中,这意味着它将在复制、更新、休眠并将值写回数据库的整个过程中持有锁。


日志设置为警告级别,运行程序,结果如下:


$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0starting update
Thread 1starting update
Thread 0: finishing update
Thread 1: finishing update
Testing locked update. Ending value is 2.


在主线程 __main__ 中配置完日志输出后,将日志级别设置为 DEBUG 可以打开完整的日志:


logging.getLogger




    
().setLevel(logging.DEBUG)


用调试日志运行程序的结果如下:


$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0starting update
Thread 0 about to lock
Thread 0 has lock
Thread 1starting update
Thread 1 about to lock
Thread 0 about to release lock
Thread 0 after release
Thread 0: finishing update
Thread 1 has lock
Thread 1 about to release lock
Thread 1 after release
Thread 1: finishing update
Testing locked update. Ending value is 2.


线程0获得锁,并且在它进入睡眠状态时仍然持有锁。然后线程1启动并尝试获取同一个锁,因为线程0仍然持有它,线程1就必须等待。这就是互斥锁。


本文其余部分的许多示例都有警告和调试级别的日志记录。我们通常只显示警告级别的输出,因为调试日志可能非常长。



7. 死锁


在继续学习之前,我们先看一下使用锁时会出现的常见问题。在上例中,如果锁已经被某个线程获取,那么第二次调用 .acquire() 时将一直等待,直到持有锁的线程调用 .release() 将锁释放。


思考一下,运行下面这段代码会得到什么结果:


import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")


当程序第二次调用 l.acquire() 时,它需要等待锁被释放。在本例中,可以删除第二次调用修复死锁,但是死锁通常在以下两种情况下会发生:


① 锁没有被正确释放时会产生运行错误;

在一个实用程序函数需要被其他函数调用的地方会出现设计问题,这些函数可能已经拥有或者没有锁。


第一种情况有时会发生,但是使用锁作为上下文管理器可以大大减少这种情况发生的频率。建议充分利用上下文管理器来编写代码,因为它们有助于避免出现异常跳过 .release() 调用的情况。


在某些语言中,设计问题可能有点棘手。庆幸的是,Python的线程模块还提供了另一个锁对象 RLock 。它允许线程在调用 .release() 之前多次获取 .acquire() 锁,且程序不会阻塞。该线程仍需要保证 .release() .acquire() 的调用次数相同,但它是用了另一种方式而已。


Lock RLock 是线程化编程中用来防止竞争条件的两个基本工具,还有一些其他的工具。在研究它们之前,我们先转移到一个稍微不同的领域。



8. 生产者-消费者模型中的线程


生产者-消费者模型是一个标准的计算机科学领域的问题,用于解决线程同步或进程同步。我们先介绍一个它的变形,大致了解一下Python中的线程模块提供了哪些基础模块。


本例中,假设需要写一个从网络读取消息并将其写入磁盘的程序。该程序不会主动请求消息,它必须在消息传入时侦听并接受它们。而且这些消息不会以固定的速度传入,而是以突发的方式传入。这一部分程序叫做生产者。


另一方面,一旦传入了消息,就需要将其写入数据库。数据库访问很慢,但访问速度足以跟上消息传入的平均速度。但当大量消息同时传入时,速度会跟不上。这部分程序叫消费者。


在生产者和消费者之间,需要创建一个管道 Pipeline ,随着对不同同步对象的深入了解,我们需要对管道里面的内容进行修改。

这就是基本的框架。让我们看看使用 Lock 的解决方案。虽然它并不是最佳的解决方法,但它运用的是前面已经介绍过的工具,所以比较容易理解。


8.1. 在生产者-消费者模型中使用锁


既然这是一篇关于Python线程的文章, 而且刚刚已经阅读了有关锁的内容,所以让我们尝试用锁解决竞态条件问题。


先写一个生产者线程,从虚拟网络中读取消息并放入管道中:


SENTINEL = object()

def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")


生产者获得一个介于1到100之间的随机数,作为生成的虚拟消息。它调用管道上的 .set_message() 方法将其发送给消费者。


生产者还用一个 SENTINEL 值来警告消费者,在它发送10个值之后停止。这有点奇怪,但不必担心,在完成本示例后,会介绍如何去掉这个 SENTINEL 值。


管道 pipeline 的另一端是消费者:


def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)


消费者从管道中读取一条消息并将其写入虚拟数据库,在本例中,只是将其储存到磁盘中。如果消费者获取了 SENTINEL 值,线程会终止。


在研究管道 Pipeline 之前,先看一下生成这些线程的主线程 __main__ 部分:


if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)


看起来应该很熟悉,因为它和前面示例中介绍过的 __main__ 部分类似。


注意,打开调试日志可以查看所有的日志消息,方法是取消对这一行的注释:


logging




    
.getLogger().setLevel(logging.DEBUG)


我们有必要遍历调试日志消息,来查看每个线程是在何处获得和释放锁的。


现在让我们看一下将消息从生产者传递给消费者的管道 Pipeline :


class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    "
""
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)


好长一段代码!别害怕,大部分是日志语句,删除所有日志语句后的代码如下:


class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    "
""
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        self.consumer_lock.acquire()
        message = self.message
        self.producer_lock.release()
        return message

    def set_message(self, message, name):
        self.producer_lock.acquire()
        self.message = message
        self.consumer_lock.release()


这样看起来更清晰,管道类中有三个成员:

.message 存储要传递的消息;

.producer_lock 是一个线程锁对象,限制生产者线程对消息的访问;

.consumer_lock 也是一个线程锁,限制消费者线程对消息的访问。


__init__() 初始化这三个成员,然后在 .consumer_lock 上调用 .acquire() ,消费者获得锁。生产者可以添加新消息,但消费者需要等待消息出现。


get_message() .set_messages() 几乎是相反的操作。 .get_message() consumer_lock 上调用 .acquire() ,这么做的目的是让消费者等待,直到有消息传入。


一旦消费者获得了锁 .consumer_lock ,它会将 self.message 的值复制给 .message ,然后在 .producer_lock 上调用 .release() 。释放此锁允许生产者在管道中插入下一条消息。


. get_message() 函数中有一些细节很容易被忽略。大家思考一下,为什么不把 message 变量删掉,直接返回 self.message 的值呢?


答案如下。

只要消费者调用 .producer_lock.release() ,它就被交换出去,生产者开始运行,这可能发生在锁被完全释放之前! 也就是说,存在一种微小的可能性,当函数返回 self.message 时,这个值是生产者生成的下一条消息,导致第一条消息丢失。这是竞态条件的另一个例子。


我们继续看事务的另一端: .set_message() 。生产者通过传入一条消息来调用该函数,获得锁 .producer_lock ,传入 .message 值,然后调用 consumer_lock.release () 释放锁,这将允许消费者读取该值。


运行代码,日志设置为警告级别,结果如下:


$ ./prodcom_lock.py
Producer got data 43
Producer got data 45
Consumer storing data43
Producer got data 86
Consumer storing data45
Producer got data 40
Consumer storing data86
Producer got data 62
Consumer storing data40
Producer got data 15
Consumer storing data62
Producer got data 16
Consumer storing data15
Producer got data 61
Consumer storing data16
Producer got data 73
Consumer storing data61
Producer got data 22
Consumer storing data73
Consumer storing data22


大家可能会觉得奇怪,生产者在消费者还没运行之前就获得了两条消息。回过头仔细看一下生产者和 .set_message() 函数,生产者先获取消息,打印出日志语句,然后试图将消息放入管道中,这时才需要等待锁。


当生产者试图传入第二条消息时,它会第二次调用 .set_message() ,发生阻塞。


操作系统可以在任何时候交换线程,但它通常会允许每个线程在交换之前有一段合理的运行时间。这就是为什么生产者会一直运行,直到第二次调用 .set_message() 时被阻塞。


一旦线程被阻塞,操作系统总是会把它交换出去,并找到另一个线程去运行。在本例中,就是消费者线程。


消费者调用 .get_message() 函数,它读取消息并在 .producer_lock 上调用 .release() 方法,释放锁,允许生产者再次运行。


注意,第一个值是43,正是消费者所读取的值,虽然生产者已经生成了新值45。


尽管使用锁的这种方法适用于本例,但对于常见的生产者-消费者模式问题,这不是一个很好的解决方法,因为它一次只允许管道中有一个值。当生产者收到大量值时,将无处安放。


让我们继续看一个更好的解决方法:使用队列 Queue .


8.2. 在生产者-消费者模型中使用队列


如果想在管道中一次处理多个值,我们需要为管道提供一个数据结构,当从生产者线程备份数据时,该结构允许管道中的数据量灵活变动,不再是单一值。


Python标准库中有一个模块叫队列 queue ,里面有一个类叫 Queue 。让我们用队列 Queue 改写一下上面受锁保护的管道。


此外,我们还会介绍另一种停止工作线程的方法,使用Python线程模块中的事件 Event 对象。


事件的触发机制可以是多种多样的。在本例中,主线程只是休眠一段时间,然后调用 event.set() 方法,通知所有处于等待阻塞状态的线程恢复运行状态:


1 if __name__ == "__main__":
2     format = "%(asctime)s: %(message)s"
3     logging.basicConfig(format=format, level=logging.INFO,
4                          datefmt="%H:%M:%S")
5     # logging.getLogger().setLevel(logging.DEBUG)
6
7     pipeline = Pipeline()
8     event = threading.Event()
9     with concurrent.futures.ThreadPoolExecutor(max_workers=2






请到「今天看啥」查看全文