本文共 6898 字,大约阅读时间需要 22 分钟。
阻塞队列:
-
什么是阻塞队列:
- 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法
- 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满
- 支持阻塞的移除方法:当队列为空时,获取元素的线程会等待队列变为非空
- 阻塞队列常用于生产者和消费者场景,生产者是向队列里添加元素的线程,消费者是从队列里获取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器
-
为什么要使用阻塞队列: 就是适用在不得不阻塞的场景 如上面所说生产者 和 消费者场景中 要是队列中为空 消费者不得不进行阻塞 队列满也是同样的道理 而且好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了 在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度
-
阻塞队列的使用:
- 继承关系:与List、Set类似,都是继承Collection 所以以后不要说Collection中只知道List 和 Set两种了
- 两个附加操作的4种处理方式:
法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
插入方法 | add(e) | offer(e) | put(e) | offer(time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
- 抛出异常: 当队列满时,如果再往队列里插入元素(再执行add(e)操作),会抛出IllegalArgumentException Queue full异常。当队列空时,从队列里获取元素(执行element()操作)会抛出NoSuchElementException异常
- 返回特殊值: 当往队列插入元素时(执行offer(e)操作),会返回元素是否插入成功,成功返回true。如果是移除方法(执行poll()操作或者peek()操作),则是从队列里取出一个元素,如果没有则返回null。
- 一直阻塞: 当队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
- 超时退出: 当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。当队列为空时,队列会阻塞消费者线程一段时间,如果超过一定的时间,消费者线程会退出。
- 【注意】NoSuchElementException异常是由element()方法抛出的,它将peek()方法进行了封装。peek()方法要么返回队列头部元素,要么返回null。当peek()方法返回null时,element()返回NoSuchElementException异常。
-
阻塞队列实现类的使用:
塞队列的是西安类 | 描述 |
ArrayBlockingQueue | 由数组结构组成的有界阻塞队列 需要手动的传入初始长度 |
LinkedBlockingQueue | 有链表结构组成的有界阻塞队列 (但是默认的初始值为Integer.MAX_VALUE) |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
DelayQueue | 使用优先级队列实现的延迟无界队列 |
SynchronousQueue | 不存储元素的阻塞队列 也就是单个元素的队列 |
LinkedTransferQueue | 有链表组成的无界阻塞队列 |
LinkedBlockingDeque | 有链表组成的双向阻塞队列 |
- 阻塞队列详解:
-
ArrayBlockingQueue: 是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序。构造方法如下:
public ArrayBlockingQueue(int capacity) { }// 传入的是队列的初始值public ArrayBlockingQueue(int capacity, boolean fair) { }public ArrayBlockingQueue(int capacity, boolean fair, Collection c) { }
- 传入的两个参数:
- 公平访问队列/非公平访问队列:
- 所谓公平访问是指阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。
- 非公平是对先等待的线程是非公平的。当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。
- 为什么默认的是使用非公平的形式进行队列的访问:为了保证公平性,通常会降低吞吐量。因此,参数fair的默认值为false
- 公平性是如何实现的:使用的是ReentrantLock 通过可重入锁ReentrantLock 不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁 至于ReentrrantLock 加锁 和 锁的释放 在之前 的博客中有详细的解释
-
LinkedBlockingQueue 是一个用链表实现的有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序 此队列的默认和最大容量为Integer.MAX_VALUE,实际上被看做是无界阻塞队列(但是这个队列实际上的使用时非常危险的 会产生OOM 在线程池中会讲解)构造方法如下:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}// 默认的是创建一个初始值为Integer.MAX_VALUE的队列public LinkedBlockingQueue(int capacity) { }public LinkedBlockingQueue(Collection c) { }
- 传入的参数详解:
- 无参构造函数创建的LinkedBlockingQueue,capacity为默认值Integer.MAX_VALUE。
- 也可以指定LinkedBlockingQueue的容量,也可以在创建时就向LinkedBlockingQueue中添加元素
- 队列中锁的使用:
- LinkedBlockingQueue有putLock和takeLock两个锁,都是可重入锁ReentrantLock。当往队列中添加元素时,使用putLock;从队列中获取元素时,使用takeLock。都是先进行加锁(lock()),再在finally方法中进行解锁(unlock())
-
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来进行排序。但是不能保证同优先级元素的顺序,在ScheduledFutureTask中为了避免这种情况,除了依靠time进行优先级排序,还依靠sequenceNumber进行优先级排序构造方法如下:
public PriorityBlockingQueue() { }public PriorityBlockingQueue(int initialCapacity){ }public PriorityBlockingQueue(Collection c) { }public PriorityBlockingQueue(int initialCapacity, Comparator comparator) {
- 传入的参数:
- 使用无参构造函数创建时,默认初始化大小
DEFAULT_INITIAL_CAPACITY = 11
- 无界阻塞队列,为何可以指定大小原因 使用size 记录中的元素个数,当size >= queue.length时,会使用tryGrow()方法进行扩容 这就像是ArrayList一样 防止内存的浪费 这样的设计也确实合理
- 队列中锁的使用:
- 只有一个锁:在进行存取使用的时候都是需要进行加锁 和 锁的释放
-
DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列使用 PriorityQueue 来实现 (不是非常的常用)构造方法如下:
public DelayQueue() { }public DelayQueue(Collection c) { }
- 传入的参数:
- DelayQueue的核心属性为PriorityQueue的实例q,用它可以按照某种优先级对元素排序。
- DelayQueue使用无参构造函数创建对象时,是有默认大小的。这是由实例q决定的,DEFAULT_INITIAL_CAPACITY = 11。
- 当向DelayQueue中添加元素时,实际是向PriorityQueue中添加元素,会调用PriorityQueue的相应方法。当遇到size >= queue.length时,会使用grow(size+1)方法对队列进行扩容
- 队列中锁的使用:
- 只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁
-
SynchronousQueue 是一个不存储元素的阻塞队列 也就是说队列中的元素个数最多只有一个构造函数如下:
public SynchronousQueue() { this(false);}public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue () : new TransferStack ();}
- 传入的参数:
- 使用无参构造函数,创建的 SynchronousQueue 采用默认的非公平访问策略。当有元素可获取时,所有被阻塞的线程都有机会争抢获取元素的资格,可能会导致先阻塞的线程最后才获取到元素。
- 也可以指定是否公平访问队列。如果是true,则transferer属性赋值为TransferQueue对象;否则,transferer属性赋值为TransferStack对象
- 详解:
- 每一个添加元素的操作必须等待另一个线程获取元素的操作,否则不能继续添加元素。反之亦然。
- SynchronousQueue 本身并不存储任何元素。只是负责把生产者线程处理的数据直接传递给消费者线程,非常适合于传递性场景。
- 支持公平访问队列,默认情况下采用非公平访问策略。
- SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue
- 锁的使用:
- transferer属性是一个volatitle变量, SynchronousQueue 没有使用到可重入锁ReentrantLock
-
LinkedTransferQueue 是一个用链表实现的的无界阻塞 TransferQueue 队列相对于其他阻塞队列LinkedTransferQueue 多了 transfer()和 tryTransfer()方法 构造方法如下:
public LinkedTransferQueue() { }public LinkedTransferQueue(Collection c) { }
- transfer()方法:
- 如果有正在等待接收元素的消费者,transfer()方法将生产者产生的元素立刻传输给消费者。
- 如果没有正在等待接收元素的消费者,transfer()方法将生产这产生的元素放到队列尾部,直到有消费者消费了该元素才返回。
- tryTransfer()方法:
- 有两种形式,一种是不带时间参数的tryTransfer(E e)方法,一种是带时间参数的tryTransfer(E e, long timeout, TimeUnit unit)方法。
- 不带时间参数的tryTransfer(e)方法:如果有正在等待接收元素的消费者,则立即将生产者产生的元素传递给消费者;如果没有正在接收获取元素的消费者,则直接返回false。
- 带时间参数的tryTransfer(E e, long timeout, TimeUnit unit)方法:与tryTransfer()方法不同的是,如果没有正在等待接收元素的消费者,它会等待指定的时间再返回。如果超时还没有被消费者消费,则返回false;如果在等待的时间内被消费者消费,则返回true。
-
LinkedBlockingDeque 是一个用链表实现的双向有界阻塞队列,可以从队列的两端插入和移除元素LinkedBlockingDeque
可以运用在工作窃取模式中 与LinkedBlockingQueue一样,默认的长度和最大容量为Integer.MAX_VALUE。双向队列由于存在两个出入口,可以减少多线程同时入队的竞争 构造方法如下:
public LinkedBlockingDeque() { }public LinkedBlockingDeque(int capacity) { }public LinkedBlockingDeque(Collection c) { }
- 使用默认的无参构造函数,创建的LinkedBlockingDeque的容量为默认值Integer.MAX_VALUE。
- 为了防止LinkedBlockingDeque过度膨胀,可以使用待capacity参数的构造函数,设置容量。
- 相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast 等方法。以 First 单词结尾的方法,表示插入、获取、移除队列中的第一个元素(队列头部);以 Last 单词结尾方法,表示插入、获取、移除队列中的最后一个元素(队列尾部)。
- LinkedBlockingDeque 也保留了原始的add、remove等方法,但是默认的方向具有差异:有的默认操作队列头部,有的默认操作队列尾部。所以使用时,最好使用以Last和First结尾的方法,指明操作队列的方向。
- 只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁
5.阻塞队列的总结
-
有界阻塞队列:ArrayBlockingQueue、LinkedBlockingQueue(经常被看做无界)、LinkedBlockingDeque(经常被看做无界)
-
无界阻塞队列:DelayQueue(可以指定大小,因为可以扩容)、PriorityBlockingQueue(可以指定大小,因为可以扩容)
-
不存储元素的阻塞队列:SynchronousQueue
-
支持公平访问的队列有:ArrayBlockingQueue、SynchronousQueue
-
有一个可重入锁的队列:ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue、LinkedBlockingDeque
-
有两个可重入锁的队列:LinkedBlockingQueue
-
不带可重入锁的队列:SynchronousQueue(transfer属性是volatile变量)
转载地址:http://ofxrn.baihongyu.com/