基于链接节点的<mark>无界线程安全</mark>{@linkplain Queue队列}。

这个队列对元素<mark>FIFO</mark>(先进先出)进行排序。

队列的head是队列中存在时间最长的元素。

队列的tail是队列中时间最短的元素。

新元素插入到队列的尾部,队列检索操作获取队列头部的元素。

当许<mark>多线程共享对公共集合的访问</mark>时,{@code ConcurrentLinkedQueue}是一个合适的选择。

与大多数其他并发集合实现一样,该类<mark>不允许使用{@code null}元素</mark>。

迭代器是弱一致性的,返回反映队列在迭代器创建时或创建后某个点的状态的元素。

它们不会抛出{@link java.util。ConcurrentModificationException},并可以与其他操作并发进行。

自创建迭代器以来,队列中包含的元素将只返回一次。

注意,与大多数集合不同,{@code size}方法不是常量时间操作。

<mark>由于这些队列的异步性,确定当前元素的数量需要遍历元素,因此如果在遍历期间修改了这个集合,可能会报告不准确的结果</mark>。

此外,<mark>不能保证批量操作{@code addAll}、{@code removeAll}、{@code retainAll}、{@code containsAll}、{@code equals}和{@code toArray}是原子操作</mark>。

例如,与{@code addAll}操作并发操作的迭代器可能只查看一些之前添加过的元素。

这个类及其迭代器实现了{@link队列}和{@link迭代器}接口的所有可选方法。

内存一致性效应:与其他并发集合一样,在将对象放入ConcurrentLinkedQueue之前的线程中的操作发生在从另一个线程的ConcurrentLinkedQueue中访问或删除该元素之后的操作之前。

成员变量

//一定是头结点
private transient volatile Node<E> head;

//不一定就是尾节点
private transient volatile Node<E> tail;

Q&A:

-transient

禁止序列化

-volatile?

保证内存可见

Node是啥

    private static class Node<E> {
        //当前节点的值
        volatile E item;
        //下一个节点的引用,这里可以看到,引用是单向的,不像LinkedList是双向的
        volatile Node<E> next;

        /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext.。 */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        //cas更新item
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            //还有一个方法putObjectVolatile,所以下边这个方法是线程实时不可见的~(因为没有volatile修饰,也没有锁操作直接同步)
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        
        //cas更新next
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        //下面两个变量是两个内部属性的内存偏移地址 Unsafe类通过这个地址来对属性做一些操作
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            //使用Unsafe,配合反射,拿取类属性的内存偏移地址,用于支持进行后续的赋值操作~
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

Unsafe干嘛的

跳过JVM进行内存赋值,简单了解下就可以

构造方法

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

    public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        /** * 这里为啥不加锁? * 一个new会做三件事情 * 1:分配内存空间 * 2:初始化对象 * 3:将分配的内存地址指向对象 * * 2、3满足as-if-serial语义,因此可以重排序为132 * * 如果c的数量很大,在所有的c构造为node之前,在queue中获取一个c中的值,而这个值还没有被添加到queue,那岂不是就获取不到了? * 那这个类岂不是很鸡肋? */
        for (E e : c) {
            //判空,因为不允许有空元素
            checkNotNull(e);
            //使用构造方法创建新节点,值就是当前c中的e值
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                //对头结点初始为null的情况进行赋值
                h = t = newNode;
            else {
                //将当前tail的next节点设置为新创建的节点
                t.lazySetNext(newNode);
                //将当前tail节点指向新的节点
                t = newNode;
            }
        }
        //如果传进来的集合里边都是null,那么对head & tail 赋值null
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }

Q&A:

-for为啥不加锁?

看完下边的东西,就知道了,这种队列本身就不保证读的原子性~~!

新增一个元素

    //将指定的元素插入到此队列的末尾。
    //由于队列是无界的,这个方法永远不会抛出{@link IllegalStateException}或返回{@code false}。
    //但是如果e为null,就会抛出一个空指针
    public boolean add(E e) {
        return offer(e);
    }
    public boolean offer(E e) {
        checkNotNull(e);
        //创建入队节点
        final Node<E> newNode = new Node<E>(e);

        //死循环,入队不成功就反复入队
        for (Node<E> t = tail, p = t; ; ) {
            //Node p和t 都指向tail节点,q节点是p的next节点
            Node<E> q = p.next;
            // 判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null
            if (q == null) {
                // q指向的是tail的next节点,如果为null,说明此时此刻没有别的线程新增节点成功
                // p是尾节点,设置p的next节点指向新创建的节点(也叫入队节点)
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    //如果 p!=t , 则将入队节点设置成tail节点,更新失败也没关系,更新失败表示有其他线程更新了tail节点
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // cas更新p的下个节点失败,再次尝试
            } else if (p == q)
            /** * 这种情况就属于遇到了哨兵节点导致的。所谓的哨兵节点就是next指向了自己。 * 这种节点在队列中存在的价值不大,主要表示要删除的节点或者空节点,当遇到哨兵节点时,无法通过next取得后续的节点, * 很可能直接返回head,这样链表就会从头部开始遍历,重新查找到链表末尾。 * 当然也有可能返回t,这样就进行了一次“打赌”,使用新的tail作为链表尾部(即t),这样避免了重新查找tail的开销。 * * 这句代码虽然只有一行,但是包含的信息比较多。首先“!=”不是原子操作,它是可以被中断的,也就是说,在执行“!=”时, * 程序会先取得t的值,在执行t = tail,并取得新的t的值,然后比较这两个值是否相等。 * 在单线程中,t != t这种语句显然不会成立, * 但是在并发环境下,有可能在获得左边的t值后,右边的t值被其他线程修改,这样,t != t 就成立了。 * 这种情况表示在比较的过程中,tail被其他线程修改了,这时,我们就用新的tail为链表的尾,也就是等式右边的t, * 但如果tail没有被修改,则返回head,要求从头部开始,重新查找链表末尾。 */
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

Q&A:

为啥不把tail直接定义为尾节点?如果这样的话,代码其实很简单

如果直接把tail设置为尾节点,那么上面你的入队列操作可以用下面代码实现:

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    
    for (;;) {
        Node<E> t = tail;
        
        if (t.casNext(null ,newNode) && casTail(t, newNode)) {
            return true;
        }
    }
}

将tail节点永远作为队列尾节点,这样实现起来确实更简单,但是缺点在于每次tail节点的更新都需要使用CAS的方式来进行,这样会一定程度上降低入队的效率。



p == q ,也就是节点的next引用指向了自己,这种情况怎么出现的?

下边会说。说实话,这点要是没有看到下边代码,还真是想不清楚~


删除一个元素

    /** * 删除第一个元素,并返回第一个元素的值 * @return */
    public E poll() {
        restartFromHead:
        for (; ; ) {
            for (Node<E> h = head, p = h, q; ; ) {
                E item = p.item;

                //p 指向 head节点,如果p的item不为null,则通过cas将其设置为null
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    //如果p 不是指向 head了,那么更新head
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                } else if ((q = p.next) == null) {
                    //如果P节点的下一个节点为null,说明这个队列已经为空了,直接更新头结点为p节点
                    updateHead(h, p);
                    return null;
                } else if (p == q) {
                    // p == q,则使用新的head重新开始
                    continue restartFromHead;
                }
                // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
                else
                    p = q;
            }
        }
    }
    
    //震惊!真相就在我们眼前!
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            //这里就是为什么会出现节点的next引用指向自己,也就是出现了p==q的情况
            h.lazySetNext(h);
    }

其他的一些方法

    //读取队列的第一个元素的值(只读不删)
    public E peek() {
        restartFromHead:
        for (; ; ) {
            for (Node<E> h = head, p = h, q; ; ) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    //这里会改变head的指向,head指向队列中第一个具有非空元素的节点。默认的head节点中item为null
                    updateHead(h, p);
                    return item;
                } else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
    
     public boolean remove(Object o) {
        if (o == null) return false;
        Node<E> pred = null;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null && o.equals(item) && p.casItem(item, null)) {
                Node<E> next = succ(p);
                if (pred != null && next != null)
                    pred.casNext(p, next);
                return true;
            }
            pred = p;
        }
        return false;
    }

NOTE:

1:contains(Object o)、size()这两个方法并不能在并发下保证结果正确。

2:判断队列中是否还有元素,使用isEmpty()方法优于size()>0的方式,前者只是判断队列中第一个元素是否为null,后者需要遍历队列获取最终结果,而且后者在并发下并不能保证最红结果准确。


参考资料

【不看后悔系列】非阻塞算法在并发容器中的实现