专栏名称: 老齐Py
Data Science
目录
相关文章推荐
养育男孩  ·  无数的男孩,因为这一点,总被误解伤害 ·  11 小时前  
养育男孩  ·  无数的男孩,因为这一点,总被误解伤害 ·  11 小时前  
班主任家园  ·  102岁杨振宁摔倒住院,小27岁岳母来医院探 ... ·  3 天前  
融媒吴江  ·  年底投用!苏大未来校区新进展! ·  3 天前  
51好读  ›  专栏  ›  老齐Py

【译】线程:概念和实现(4)

老齐Py  · 掘金  ·  · 2020-02-24 03:48

正文

阅读 11

【译】线程:概念和实现(4)

翻译:老齐

译者注:与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》


第四部分

将队列应用于PCP

如果你希望一次能够处理管道中的多个值,就需要一种针对管道的数据结构,它相当于 producer 的备份,能实现数量增加和减少。

Python标准库有一个 queue 模块,该模块有一个 Queue 类,下面将 Pipeline 改为 Queue ,就可以不再使用 Lock 锁定某些变量,此外,还将使用Python的 threading 模块中的 Event 来停止工作线程,这是一种与以往不同的方法。

Event 开始。当有很多线程等待 threading.Event 实例的时候,它能够将一个线程标记为一个事件。这段代码的关键是,等待事件的线程不一定需要停止它们正在做的事情,它们可以每隔一段时间检查一次 Event 的状态。

很多事情都可以触发 event 。在本例中,主线程将简单地休眠一段时间,然后运行 .set()

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)
    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)
        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()
复制代码

这里唯一的变化是创建了 event 对象,然后将 event 作为参数传给后面的 .submit 方法,在with语句中,有一句要sleep一秒钟,再记录日志信息,最后调用 event.set()

producer 也不需要改变太多:

def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")
    logging.info("Producer received EXIT event. Exiting")
复制代码

while 循环中不再为 pipeline 设置 SENTINEL 值。 consumer 需要相应做较大改动:

def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")
复制代码

必须删除 SENTINEL 值相关的代码, while 循环的条件也因此更复杂了一些,现在需要考虑 not event.is_set() not pipeline.empty() 两个条件,也就是未设置 event ,或者 pipeline 未清空时。

要确保在 consumer 进程结束是队列中已经是空的了,否则就会出现以下两种糟糕的情况。一是丢失了这些最终消息,但更严重的情况是第二种, producer 如果视图将信息添加到完整队列中,会被锁住,从而不能返回。这种事件会发生在 producer 验证 .is_set() 条件之后,调用 pipeline.set_message() 之前。

这种事件会发生在 producer 验证 .is_set() 条件之后,调用 pipeline.set_message() 之前。

如果发生这种情况, producer 可能会在队列仍然全满的情况下唤醒并退出。然后,调用 .set_message() .set_message() 将一直等到队列中有新信息的空间。若 consumer 已经退出,这种情况就不会发生,而且 producer 不会退出。

consumer 中的其他部分看起来应该很熟悉。

然而, Pipeline 还需要重写:

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)
复制代码

上面的 Pipeline queue.Queue 的子类。 Queue 在初始化时指定一个可选参数,以指定队列的最大长度。

如果为 maxsize 指定一个正数,则该数字为队列元素个数的极限,如果达到该值, .put() 方法被锁定,直到元素的数量少于 maxsize 才解锁。如果不指定 maxsize ,则队列将增长到计算机内存的所许可的最值。

.get_message() .set_message() 两个方法代码更少了,它们基本上把 .get() .put() 封装在 Queue 中。你可能想知道防止线程发生竞态条件的锁都去了哪里。







请到「今天看啥」查看全文