阻塞队列

WangScaler: 一个用心创作的作者。

声明:才疏学浅,如有错误,恳请指正。

  • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界队列。
  • PriorityBlockingQueue:一个支持线程优先级排序的无界队列。
  • DelayQueue:一个实现PriorityBlockingQueue实现延迟获取的无界队列。
  • SynchronousQueue: 一个不存储元素的阻塞队列。只能容纳一个元素,即当插入元素之后,必须等待移除方法将元素移除之后,才能继续插入。同样的队列为空使,移除方法只能阻塞等待,直到插入方法执行之后才会被唤醒。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

阻塞队列:当队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。比如我们去面包房买面包,当货架上没有时,我们只能等待货架有面包才能买;当货架满的时候,面包机造出来的面包也不能直接放在货架上,也得等待。此时我们买面包的就是消费者,面包机就是生产者,而货架就是我们的阻塞队列

阻塞队列的处理方法

阻塞队列提供了四种处理方法:

处理方法 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:当队列满时插入抛出异常java.lang.IllegalStateException: Queue full;当队列空时移除抛出异常。java.util.NoSuchElementException;当检查时队首空时抛出异常java.util.NoSuchElementException
  • 返回特殊值:当队列满时插入返回特殊值,插入成功/失败时返回true/false;当队列为空移除成功/失败时返回 移除的元素/null。当队列空时检查元素返回null。
  • 一直阻塞:当队列满或者空时阻塞等待队列成功插入、移除或者响应中断。
  • 超时退出: 当队列满或者空时阻塞一段时间,超过这段时间之后线程退出。

从源码理解ArrayBlockingQueue

用数组实现的阻塞队列,所以要指定长度,按照先进先出的原则,支持公平锁和非公平锁。

我们从源码来看一看插入的方法

  • add

    public boolean add(E e) {
        return super.add(e);
    }
    复制代码

    调用的父类的add方法,而父类的add方法通过offer方法进行插入,如果插入成功则返回true,否则抛出异常IllegalStateException("Queue full");

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    复制代码

    那么插入的重点在offer,我们接下来看看。

  • offer

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    复制代码

    首先检查传入的参数并加锁,避免其他线程进行操作。如果队列已经满了返回false,否则的话执行enqueue方法,返回true并解锁。那么看来插入的过程在enqueue方法,继续往下看

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    复制代码

    将元素x放入数组的putIndex位置,然后putIndex进行自增,如果自增后的值超过数组的长度,则重新赋值为0。count表示数组中元素的数量进行加一操作。最后发出队列不为空的信号,因为有的读操作的线程在阻塞等待队列,当队列不为空时唤醒读取队列,这一步就是为了唤醒那些阻塞的队列。

  • put

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    复制代码

    put方法同样也是先获得锁,如果队列满的话进入阻塞,队列不满或者被唤醒时,使用enqueue进行插入,上边已经讲过enqueue方法。

接下来看看移除的方法

  • poll

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    复制代码

    获得锁,如果队列是空的则返回null,如果不为空则执行dequeue(),接下来继续看看dequeue方法

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
    复制代码

    获取当前队首的元素,然后将队首的元素置为null,将队首的指针takeIndex进行自增操作后移,如果后移之后为数组的长度,则重新赋值为0,然后将队列的总数进行自减操作。然后发出队列不满的信号,让处于阻塞中的插入队列唤醒,进行插入操作。最后返回读取的值。

  • take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    复制代码

    原理都是一样的,如果队列为空,则进入阻塞等待,不为空则执行dequeue方法进行读取。

总结

从上述可以看出,就是循环利用这个数组。画个图加深一下理解

先将数据放入数组putIndex的位置,加下来putIndex自增变成3==数组的长度,难不成执行数组外?肯定不行,最好的办法是循环使用。

同样的删除操作也是如此。

先读数组takeIndex位置的元素,然后将值改为null

题外话

历时五个月,在EMQ的问答社区,收获两枚徽章,在此纪念一下,做物联网的兄弟们,应该知道EMQX吧。