翻译:老齐
译者注:与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》
第二部分
竞态条件
在讨论Python线程的其他特性之前,让我们先讨论一下编写线程程序时遇到的一个更困难的问题:竞态条件。
一旦你了解了什么是竞态条件,并看到了正在发生的情况,然后就使用标准库提供的模块,以防止这些竞态条件的出现。
当两个或多个线程访问共享数据或资源时,可能会出现竞态情况。在本例中,你将创建一个每次都发生的大型竞态条件,但请注意,大多数它并不是很明显。示例中的情况通常很少发生,而且会产生令人困惑的结果。可以想象,因为竞态条件而引起的bug很难被发现。
幸运的是,在下述示例中竞态问题每次都会发生,你将详细地了解它以便解释发生了什么。
对于本例,将编写一个更新数据库的类。你不会真的有一个数据库:你只是要伪造它,因为这不是本文的重点。
FakeDatabase
类中有
.__init__()
和
.update()
方法:
class FakeDatabase:
def __init__(self):
self.value = 0
def update(self, name):
logging.info("Thread %s: starting update", name)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info("Thread %s: finishing update", name)
复制代码
FakeDatabase
中的属性
.value
,用于作为竞态条件中共享的数据。
.__init__()
中将
.value
值初始化为
0.
,到目前为止,一切正常。
.update()
看起来有点奇怪,它模拟从数据库中读取一个值,对其进行一些计算,然后将一个新值写回数据库。
所谓从数据库中读取,即将
.value
的值复制到本地变量。计算就是在原值上加1,然后
.sleep()
一小会儿。最后,它通过将本地值复制回
.value
,将值写回去。
下面是
FakeDatabase
的使用方法:
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
database = FakeDatabase()
logging.info("Testing update. Starting value is %d.", database.value)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for index in range(2):
executor.submit(database.update, index)
logging.info("Testing update. Ending value is %d.", database.value)
复制代码
程序中创建了两个
ThreadPoolExecutor
,然后对每个线程调用
.submit()
,告诉它们运行
database.update()
。
.submit()
有一个明显特征,它允许将位置参数和命名参数传给线程中运行的函数:
.submit(function, *args, **kwargs)
复制代码
在上面的用法中,
index
作为第一个也是唯一一个位置参数传给
database.update()
。你将在本文后面看到,可以用类似的方式传多个参数。
由于每个线程都运行
.update()
,而
.update()
会让
.value
的值加1,因此在最后打印时,你可能会希望
database.value
为2。但如果是这样的话,你就不会看这个例子了。如果运行上述代码,则输出如下:
$ ./racecond.py
Testing unlocked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing unlocked update. Ending value is 1.
复制代码
你可能已经预料到这种情况会发生,但是让我们来看看实际情况的细节,因为这将使这个问题的解决方案更容易理解。
单线程
在用两个线程深入讨论这个问题之前,让我们先退一步,谈谈线程工作流程的一些细节。
我们不会在这里深入讨论所有的细节,因为这种全面深入的讨论现在并不重要。我们还将简化一些事情,这种做法虽然在技术上并不准确,但会让你对正在发生的事情有正确的认识。
当你告诉
ThreadPoolExecutor
运行每个线程时,也就是告诉它要运行哪个函数以及要传给它的参数:
executor.submit(database.update, index)
。
其结果是线程池中的每个线程都将调用
database.update(index)
。注意,
database
是
__main__
中创建的
FakeDatabase
实例对象,调用它的方法
.update()
。
每个线程都将引用同一个
FakeDatabase
的实例
database
,每个线程还将有一个唯一的值
index
。为了让上述过程更容易理解,请看下图:
当某线程开始运行
.update()
时,它有此方法的本地的数据,即
.update()
中的
local_copy
。这绝对是件好事,否则,在两个线程中运行同一个函数就会互相干扰了。这意味着该函数的所有作用域(或本地)变量对于线程来说都是安全的。
现在,你已经理解,如果使用单个线程和对
.update()
的单个调用来运行上面的程序会发生什么情况。
如果只运行一个线程,如下图所示,会一步一步地执行
.update()
。下图中,语句显示在上面,下面用图示方式演示了线程中的
local_value
和共享的
database.value
中的值的变化:
按照时间顺序,从上到下观察上面的示意图,从创建线程
Thread 1
开始,到
Thread 1
结束终止。
Thread 1
启动时,
FakeDatabase.value
为零。方法中的第一行代码
local_copy=self.value
将0复制到局部变量。接下来,使用
local_copy+=1
语句增加
local_copy
的值。你可以看到
Thread 1
中的
.value
值为1。
然后,调用下一个
time.sleep()
,这将使当前线程暂停并允许其他线程运行。因为在这个例子中只有一个线程,所以这没有影响。
当
Thread 1
唤醒并继续时,它将新值从
local_copy
复制到
FakeDatabase.value
,然后线程完成。你可以看到
database.value
为1。
到目前为止,一切正常。你只运行了一次
.update()
并且将
FakeDatabase.value
递增为1。
两个线程
回到竞态条件,两个线程并行,但不是同时运行。每个线程都有自己的
local_copy
,并指向相同的
database
,正是这个共享数据库对象导致了这些问题。