Queue: 应用于生产者-消费者模式的Python队列
图片来源于网络
版权声明
© 著作权归作者所有
允许自由转载,但请保持署名和原文链接。 不允许商业用途、盈利行为及衍生盈利行为。
什么是Queue?
Queue是Python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者()线程之间的信息传递。
为什么使用Queue,而不是list或者dict?
- list或者dict是非线程安全的,Queue是线程安全的
- 也即意味着:如果使用list或者dict,我们必须把它放到lock程序块中(acquire和release),以防止发生竞态条件。
- 使用list或者dict,需要考虑线程同步的问题,即需要额外考虑wait和notify。
- 生成者不能向满队列添加数据,如果使用list或者dict,需要额外的代码实现。
- Queue则封装了Condition行为,wait() notify() acquire() release() 满队列问题等等,无须额外考虑。
先来了解一些概念
生产者-消费者模式(Producer-Consumer)
Producer-Consumer模式是多线程编程中最常用的设计模式。生产者负责生产数据,并将数据存入队列,消费负责消费数据,不断从队列中取数据来使用。这里面有两个条件:
必须满足线程互斥条件:任何时候最多只允许一个线程访问数据,其他线程必须等待。这称为线程互斥。
必须满足线程同步条件:线程同步是指线程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息,当它没有得到另一个线程的消息时应等待,直到消息到达时才被唤醒。举个例子:在线程方式下,生产者和消费者各自是一个线程。生产者把数据写入队列,消费者从队列读出数据。当队列为空,消费者就阻塞等待(稍事休息);当队列满(达到最大长度),生产者就阻塞等待。
线程阻塞
线程阻塞通常是指一个线程在执行过程中暂停,以等待某个条件的触发。比如一个线程原子操作下,其他的线程都是阻塞状态的;比如input语句等待用户的输入,线程也是阻塞状态;比如当队列任务为空的时候,线程等待新的任务,这时候线程也是阻塞的。
线程安全
比如一个 ArrayList 类,在添加一个元素的时候,它可能会有两步来完成:
- 在 Items[Size] 的位置存放此元素;
- 增大 Size 的值。
在单线程运行的情况下,如果 Size = 0,添加一个元素后,此元素在位置 0,而且 Size=1;而如果是在多线程情况下,比如有两个线程,线程 A 先将元素1存放在位置 0。但是此时 CPU调度线程A暂停,线程B得到运行的机会。线程B向此 ArrayList 添加元素2,因为此时 Size 仍然等于0(注意,我们假设的是添加一个元素是要两个步骤,而线程A仅仅完成了步骤1),所以线程B也将元素存放在位置0。然后线程A和线程B都继续运行,都增加Size的值,结果Size等于2。 那好,我们来看看 ArrayList的情况,期望的元素应该有2个,而实际元素是在0位置,造成丢失元素,而且Size 等于 2。这就是“线程不安全”了。
原子操作
原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为"不可被中断的一个或一系列操作"。如变量的赋值,不可能一个线程在赋值,到一半切到另外一个线程工作去了,这是原子操作。但是一些数据结构的操作,比如上述ArrayList的例子,添加元素是分成两个步骤的,所以必须要加锁。加锁后的操作就可以认为是原子的了。
举个小栗子来加深理解:
原来在银行办理业务是排队的形式的,虽然也是多线程(多个窗口),但是经常出现有些柜台人多、有些柜台人少,或者有些柜台办理完了,有些柜台还排着长队,这时候就需要人工来干预,很麻烦,效率不高。
现在的银行都是叫号系统,这是一个典型的生产者-消费者模式。
银行提供四排座椅(队列),每人手上拿一个号,先来先办,后来后办(先进先出 First in First out);
由叫号机(生产者)来打印一个号,来一个顾客打印一个号,完了塞到队尾;当然可以设置一个队列最大数,比如100人,超过100人在队列里,叫号机就不打印,直到队列有空闲位置。
银行开多个窗口(多个消费者线程)从队列里叫号,办完一个,再叫一个。办理的业务是原子性的(存或者取都在一个人手上完成,中间流程不可分割)。也不会同时有多个柜台操作你的帐户,所以是线程安全的。
如果都办理完了,叫号机和柜台都陷入了等待状态,打一会儿瞌睡,线程阻塞,直到有新的顾客来办理业务。
三种形式的Queue:
FIFO队列
图片来源于网络
class Queue.Queue(maxsize=0)
FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
LIFO队列
图片来源于网络
class Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上
Priority队列
class Queue.PriorityQueue(maxsize=0)
构造一个优先队列。优先级级别越低的越先出来,maxsize用法同上。
常用方法
创建一个队列
# maxsize 表示队列长度,小于1表示队列长度无限。
import Queue
q = Queue.Queue(maxsize=10)
将一个值放入队列
# put(item[,block[,timeout]])
# 参数item为必选参数
# block 为可选参数,默认为True
# 如果队列为空且block=True,put()使得调用线程阻塞,直到空出一个数据单元。
# 如果队列为空且block=False,put()将抛出Full异常。
# 将10插入队尾
q.put(10)
将一个值从队列中取出
# get([block[,timeout]])
# 参数block为可选参数,默认为True
# 如果队列为空且block=True,get()使得调用线程阻塞,直到有新数据产生。
# 如果队列为空且block=False,get()将抛出Empty异常。
# 从对列头部取出一个数据
q.get()
获取队列的大小
q.qsize()
判断队列是否为空
# 队列为空返回True,反之返回False
q.empty()
判断队列是否已满
# 队列已满返回True,反之返回False
q.full()
task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
join()
阻塞调用线程,直到队列中的所有任务被处理掉。
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
其他
q.put_nowait(item) == q.put(item,False)
q.get_nowait() == q.get(False)
一个最简单的生产者-消费者模式
# coding: utf-8
# filename: queue.py
# author: caord@showwant.com
import time
import Queue
import threading
class Producer(threading.Thread):
def __init__(self, thread_name, queue):
threading.Thread.__init__(self, name=thread_name)
self.data = queue
def run(self):
for i in range(20):
print('%s:%s is producing %d to the queue!' % (time.ctime(), self.getName(), i))
self.data.put(i)
time.sleep(1)
print('%s: %s finished!' % (time.ctime(), self.getName()))
time.sleep(10)
for i in range(10):
self.data.put(i)
class Consumer(threading.Thread):
def __init__(self, thread_name, queue):
threading.Thread.__init__(self, name=thread_name)
self.data = queue
def run(self):
while 1:
try:
num = self.data.get()
print('%s: %s is consuming. %d in the queue is consumed!' % (time.ctime(), self.getName(), num))
except:
print('%s: %s finished!' % (time.ctime(), self.getName()))
#break
def main():
queue = Queue.Queue(maxsize=20)
producer = Producer('producer', queue)
consumer = Consumer('consumer', queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
print('All thread finished')
if __name__ == '__main__':
main()
这个好,正需要,帮我解决了问题,感谢!
您好,您的网站做的很不错,很漂亮,我已经收藏了,方便我随时访问.
感谢分享!!
感谢分享
感谢博主分享的文章
谢谢博主的分享
文章不错支持一下吧