大家好,欢迎来到IT知识分享网。
在前边几篇关于多线程同步的文章中, 分别使用了线程锁,条件变量和多线程事件来实现生产者和消费者模型的线程同步。上边提到的几种方式,虽然都可以完美的解决这个问题,但都不是最优解。在线程锁中需要我们自定义在哪里上锁, 在哪里释放锁,稍有不慎可能导致上锁范围变大,让线程性能变慢。在条件变量和多线程事件中,我还需要关注锁的相互唤醒,如果某些条件导致线程未唤醒就会出现程序死锁的情况。本文将介绍 Python 中 Queue(同步序列) 库来解决这些问题。
Queue 模块
队列是计算机中基础的数据结构,各类编程语言中也都有相应的实现,如果有编程经历,这些会非常容易理解且容易上手。
Python 中 Queue 模块实现了生产者,消费者序列, 非常适用于多线程间进行安全的数据交换。模块中提供了 Queue(先进先出,类似于排队), lifoQueue(先进后出,类似于栈),PriortyQueue(根据优先级出),SimpleQueue(无界的先进先出, 无任务跟踪功能)4 种队列容器。以下是常用的方法:
from queue import Queue # 实例化一个队列, 并设置队列最大容量 # 如不设置, 默认为0, 则队列容量不限 que = Queue(maxsize = 2) # 获取当前队列的可用容量 que.qsize() # 获取队里是否为空 que.empty() # 获取队列是否已满 que.full() # 将数据放入队列, # block是否阻塞队列, timeout 阻塞超时时间,如果队列已满, 则忽略 que.put(item, block=True, timeout=None) # 不阻塞放入 que.put_nowait(item) # 从队列中取出元素,block为False而队列为空时,会引发Empty异常 que.get(block=True, timeout=False) # 相当于get(block=False) que.get_nowait() # 每个线程使用get方法从队列中获取一个元素, # 该线程通过调用task_done()表示该元素已处理完成。 que.task_done() # 阻塞主线程至队列中所有元素都被处理完成, # 即队列中所有元素都已被接收,且接收线程全已调用task_done()。 que.join()
task_done(): 每当queue.get()一次,无论是否从队列中获取到东西,都会产生一个任务。当完成这个任务后需要调用task_done()来告诉队列这个任务已经完成了(下方实例二)。如果线程里每从队列里get()一次,但没有执行task_done(),则join无法判断队列到底有没有结束,如果在最后执行join(),就会等不到结果的,会一直挂起(下方实例三)。
join(): 表示等待队列中任务全都执行完成, 因此如果需要确保队列在执行完成后才继续执行后续的逻辑, 每次使用 get 方式取出数据后, 都需要执行 task_done 来标记当前数据任务已经执行完成, 否则调用 join 会导致线程挂起(下方实例三)。
Queue 模块的使用
示例一:
理解 Queue 容器大小,容器大小指容器最大存放内容,当存入内容达到容量时就不可继续存入,线程阻塞并等待。 初始化一个指定 大小的 Queue 队列,并使用多线程设置值, 代码如下:
from queue import Queue from threading import current_thread, Thread from time import sleep # 初始化容量为5的队列 que = Queue(5) def producer(): count = 0 while True: # 放入容器 que.put(current_thread().name + str(count)) print(f"{current_thread().name} 放入 {current_thread().name[10:-1] + ' ' +str(count)}") count += 1 # 打印容器大小和是否已满 print(que.qsize(), que.full()) sleep(0.01) if __name__ == '__main__': # 启动3个线程,同时给队列放值 p1 = Thread(target=producer) p2 = Thread(target=producer) p3 = Thread(target=producer) p1.start() p2.start() p3.start()
从运行结果中可以看出, 如果容器只有 put 的时候,内容达到设置值后, 就会阻塞所有的线程,等待队列中的内容被取出
实例二:
容器使用,生产者和消费者模型
修改上述代码, 增加消费者,从队列中取出数据, 代码如下:
from queue import Queue from threading import current_thread, Thread from time import sleep que = Queue(5) # 生产者 def producer(): count = 0 while True: que.put(current_thread().name + str(count)) print(f"{current_thread().name} 放入 {current_thread().name[10:-1] + ' ' +str(count)}") count += 1 print("容器大小: " + str(que.qsize()), "容器是否已满:" + str(que.full())) sleep(0.01) # 消费者 def consumer(): while True: item = que.get() print(f"取出--->{item}") sleep(3) que.task_done() if __name__ == '__main__': # 创建3个生产者线程 for i in range(2): t = Thread(target=producer) t.start() # 创建消费者线程 c = Thread(target=consumer) c.start()
从运行结果中可以看到, 当有消费者消费队列信息后, 生产者才会被唤醒继续放入值, 在生产和消费者代码中, 只有自己的逻辑操作, 没有对线程的特别处理。
示例三:
理解 task_done 和 join 方法
task_done 用于标记当前数据被取出后, 任务已经完成, join 用于使主线程等待队列内容消费完成.改造上述产销代码, 使用 3 个线程分别放入数据, 消费者消费数据, 列表为空时结束
from queue import Queue from threading import current_thread, Thread from time import sleep que = Queue(5) def producer(): count = 0 que.put(current_thread().name + str(count)) print(f"{current_thread().name} 放入 {current_thread().name[10:-1] + ' ' +str(count)}") count += 1 print("容器大小: " + str(que.qsize()), "容器是否已满:" + str(que.full())) sleep(0.01) def consumer(): while True: item = que.get() print(f"取出--->{item}") sleep(2) que.task_done() if que.empty(): break if __name__ == '__main__': for i in range(3): t = Thread(target=producer) t.start() c = Thread(target=consumer) c.start() que.join() print("队列内容已经消费完成")
通过运行上述代码可以看到, 主线程在执行到 join 方法时, 等待子线程继续执行, 直到消费线程消费完队列数据后, 结束程序运行。
如果将上述代码中的 que.task_done()注释后, 程序将在队列消费完成后停止执行, 本质是因为 Queue 队列中任务系统无法确定当前队列中数据是否完成任务, 结果如下图所示:
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/94808.html