翻译:老齐
译者注:与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》
第四部分
将队列应用于PCP
如果你希望一次能够处理管道中的多个值,就需要一种针对管道的数据结构,它相当于
producer
的备份,能实现数量增加和减少。
Python标准库有一个
queue
模块,该模块有一个
Queue
类,下面将
Pipeline
改为
Queue
,就可以不再使用
Lock
锁定某些变量,此外,还将使用Python的
threading
模块中的
Event
来停止工作线程,这是一种与以往不同的方法。
从
Event
开始。当有很多线程等待
threading.Event
实例的时候,它能够将一个线程标记为一个事件。这段代码的关键是,等待事件的线程不一定需要停止它们正在做的事情,它们可以每隔一段时间检查一次
Event
的状态。
很多事情都可以触发
event
。在本例中,主线程将简单地休眠一段时间,然后运行
.set()
:
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
# logging.getLogger().setLevel(logging.DEBUG)
pipeline = Pipeline()
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
复制代码
这里唯一的变化是创建了
event
对象,然后将
event
作为参数传给后面的
.submit
方法,在with语句中,有一句要sleep一秒钟,再记录日志信息,最后调用
event.set()
。
producer
也不需要改变太多:
def producer(pipeline, event):
"""Pretend we're getting a number from the network."""
while not event.is_set():
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
pipeline.set_message(message, "Producer")
logging.info("Producer received EXIT event. Exiting")
复制代码
while
循环中不再为
pipeline
设置
SENTINEL
值。
consumer
需要相应做较大改动:
def consumer(pipeline, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not pipeline.empty():
message = pipeline.get_message("Consumer")
logging.info(
"Consumer storing message: %s (queue size=%s)",
message,
pipeline.qsize(),
)
logging.info("Consumer received EXIT event. Exiting")
复制代码
必须删除
SENTINEL
值相关的代码,
while
循环的条件也因此更复杂了一些,现在需要考虑
not event.is_set()
和
not pipeline.empty()
两个条件,也就是未设置
event
,或者
pipeline
未清空时。
要确保在
consumer
进程结束是队列中已经是空的了,否则就会出现以下两种糟糕的情况。一是丢失了这些最终消息,但更严重的情况是第二种,
producer
如果视图将信息添加到完整队列中,会被锁住,从而不能返回。这种事件会发生在
producer
验证
.is_set()
条件之后,调用
pipeline.set_message()
之前。
这种事件会发生在
producer
验证
.is_set()
条件之后,调用
pipeline.set_message()
之前。
如果发生这种情况,
producer
可能会在队列仍然全满的情况下唤醒并退出。然后,调用
.set_message()
,
.set_message()
将一直等到队列中有新信息的空间。若
consumer
已经退出,这种情况就不会发生,而且
producer
不会退出。
consumer
中的其他部分看起来应该很熟悉。
然而,
Pipeline
还需要重写:
class Pipeline(queue.Queue):
def __init__(self):
super().__init__(maxsize=10)
def get_message(self, name):
logging.debug("%s:about to get from queue", name)
value = self.get()
logging.debug("%s:got %d from queue", name, value)
return value
def set_message(self, value, name):
logging.debug("%s:about to add %d to queue", name, value)
self.put(value)
logging.debug("%s:added %d to queue", name, value)
复制代码
上面的
Pipeline
是
queue.Queue
的子类。
Queue
在初始化时指定一个可选参数,以指定队列的最大长度。
如果为
maxsize
指定一个正数,则该数字为队列元素个数的极限,如果达到该值,
.put()
方法被锁定,直到元素的数量少于
maxsize
才解锁。如果不指定
maxsize
,则队列将增长到计算机内存的所许可的最值。
.get_message()
和
.set_message()
两个方法代码更少了,它们基本上把
.get()
和
.put()
封装在
Queue
中。你可能想知道防止线程发生竞态条件的锁都去了哪里。