阻塞队列
WangScaler: 一个用心创作的作者。
声明:才疏学浅,如有错误,恳请指正。
- ArrayBlockingQueue:是一个用数组实现的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界队列。
- PriorityBlockingQueue:一个支持线程优先级排序的无界队列。
- DelayQueue:一个实现PriorityBlockingQueue实现延迟获取的无界队列。
- SynchronousQueue: 一个不存储元素的阻塞队列。只能容纳一个元素,即当插入元素之后,必须等待移除方法将元素移除之后,才能继续插入。同样的队列为空使,移除方法只能阻塞等待,直到插入方法执行之后才会被唤醒。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
阻塞队列:当队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。比如我们去面包房买面包,当货架上没有时,我们只能等待货架有面包才能买;当货架满的时候,面包机造出来的面包也不能直接放在货架上,也得等待。此时我们买面包的就是消费者,面包机就是生产者,而货架就是我们的阻塞队列
阻塞队列的处理方法
阻塞队列提供了四种处理方法:
处理方法 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
- 抛出异常:当队列满时插入抛出异常
java.lang.IllegalStateException: Queue full
;当队列空时移除抛出异常。java.util.NoSuchElementException
;当检查时队首空时抛出异常java.util.NoSuchElementException
。
- 返回特殊值:当队列满时插入返回特殊值,插入成功/失败时返回true/false;当队列为空移除成功/失败时返回 移除的元素/null。当队列空时检查元素返回null。
- 一直阻塞:当队列满或者空时阻塞等待队列成功插入、移除或者响应中断。
- 超时退出: 当队列满或者空时阻塞一段时间,超过这段时间之后线程退出。
从源码理解ArrayBlockingQueue
用数组实现的阻塞队列,所以要指定长度,按照先进先出的原则,支持公平锁和非公平锁。
我们从源码来看一看插入的方法
-
add
public boolean add(E e) { return super.add(e); } 复制代码
调用的父类的add方法,而父类的add方法通过offer方法进行插入,如果插入成功则返回true,否则抛出异常IllegalStateException("Queue full");
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } 复制代码
那么插入的重点在offer,我们接下来看看。
-
offer
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } 复制代码
首先检查传入的参数并加锁,避免其他线程进行操作。如果队列已经满了返回false,否则的话执行enqueue方法,返回true并解锁。那么看来插入的过程在enqueue方法,继续往下看
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } 复制代码
将元素x放入数组的putIndex位置,然后putIndex进行自增,如果自增后的值超过数组的长度,则重新赋值为0。count表示数组中元素的数量进行加一操作。最后发出队列不为空的信号,因为有的读操作的线程在阻塞等待队列,当队列不为空时唤醒读取队列,这一步就是为了唤醒那些阻塞的队列。
-
put
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } 复制代码
put方法同样也是先获得锁,如果队列满的话进入阻塞,队列不满或者被唤醒时,使用enqueue进行插入,上边已经讲过enqueue方法。
接下来看看移除的方法
-
poll
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } 复制代码
获得锁,如果队列是空的则返回null,如果不为空则执行dequeue(),接下来继续看看dequeue方法
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; } 复制代码
获取当前队首的元素,然后将队首的元素置为null,将队首的指针takeIndex进行自增操作后移,如果后移之后为数组的长度,则重新赋值为0,然后将队列的总数进行自减操作。然后发出队列不满的信号,让处于阻塞中的插入队列唤醒,进行插入操作。最后返回读取的值。
-
take
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } 复制代码
原理都是一样的,如果队列为空,则进入阻塞等待,不为空则执行dequeue方法进行读取。
总结
从上述可以看出,就是循环利用这个数组。画个图加深一下理解
先将数据放入数组putIndex的位置,加下来putIndex自增变成3==数组的长度,难不成执行数组外?肯定不行,最好的办法是循环使用。
同样的删除操作也是如此。
先读数组takeIndex位置的元素,然后将值改为null
题外话
历时五个月,在EMQ的问答社区,收获两枚徽章,在此纪念一下,做物联网的兄弟们,应该知道EMQX吧。