基于链接节点的<mark>无界线程安全</mark>{@linkplain Queue队列}。
这个队列对元素<mark>FIFO</mark>(先进先出)进行排序。
队列的head是队列中存在时间最长的元素。
队列的tail是队列中时间最短的元素。
新元素插入到队列的尾部,队列检索操作获取队列头部的元素。
当许<mark>多线程共享对公共集合的访问</mark>时,{@code ConcurrentLinkedQueue}是一个合适的选择。
与大多数其他并发集合实现一样,该类<mark>不允许使用{@code null}元素</mark>。
迭代器是弱一致性的,返回反映队列在迭代器创建时或创建后某个点的状态的元素。
它们不会抛出{@link java.util。ConcurrentModificationException},并可以与其他操作并发进行。
自创建迭代器以来,队列中包含的元素将只返回一次。
注意,与大多数集合不同,{@code size}方法不是常量时间操作。
<mark>由于这些队列的异步性,确定当前元素的数量需要遍历元素,因此如果在遍历期间修改了这个集合,可能会报告不准确的结果</mark>。
此外,<mark>不能保证批量操作{@code addAll}、{@code removeAll}、{@code retainAll}、{@code containsAll}、{@code equals}和{@code toArray}是原子操作</mark>。
例如,与{@code addAll}操作并发操作的迭代器可能只查看一些之前添加过的元素。
这个类及其迭代器实现了{@link队列}和{@link迭代器}接口的所有可选方法。
内存一致性效应:与其他并发集合一样,在将对象放入ConcurrentLinkedQueue之前的线程中的操作发生在从另一个线程的ConcurrentLinkedQueue中访问或删除该元素之后的操作之前。
成员变量
//一定是头结点
private transient volatile Node<E> head;
//不一定就是尾节点
private transient volatile Node<E> tail;
Q&A:
-transient?
禁止序列化
-volatile?
保证内存可见
Node是啥
private static class Node<E> {
//当前节点的值
volatile E item;
//下一个节点的引用,这里可以看到,引用是单向的,不像LinkedList是双向的
volatile Node<E> next;
/** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext.。 */
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
//cas更新item
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
//还有一个方法putObjectVolatile,所以下边这个方法是线程实时不可见的~(因为没有volatile修饰,也没有锁操作直接同步)
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//cas更新next
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
//下面两个变量是两个内部属性的内存偏移地址 Unsafe类通过这个地址来对属性做一些操作
private static final long itemOffset;
private static final long nextOffset;
static {
//使用Unsafe,配合反射,拿取类属性的内存偏移地址,用于支持进行后续的赋值操作~
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Unsafe干嘛的
跳过JVM进行内存赋值,简单了解下就可以
构造方法
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
/** * 这里为啥不加锁? * 一个new会做三件事情 * 1:分配内存空间 * 2:初始化对象 * 3:将分配的内存地址指向对象 * * 2、3满足as-if-serial语义,因此可以重排序为132 * * 如果c的数量很大,在所有的c构造为node之前,在queue中获取一个c中的值,而这个值还没有被添加到queue,那岂不是就获取不到了? * 那这个类岂不是很鸡肋? */
for (E e : c) {
//判空,因为不允许有空元素
checkNotNull(e);
//使用构造方法创建新节点,值就是当前c中的e值
Node<E> newNode = new Node<E>(e);
if (h == null)
//对头结点初始为null的情况进行赋值
h = t = newNode;
else {
//将当前tail的next节点设置为新创建的节点
t.lazySetNext(newNode);
//将当前tail节点指向新的节点
t = newNode;
}
}
//如果传进来的集合里边都是null,那么对head & tail 赋值null
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
Q&A:
-for为啥不加锁?
看完下边的东西,就知道了,这种队列本身就不保证读的原子性~~!
新增一个元素
//将指定的元素插入到此队列的末尾。
//由于队列是无界的,这个方法永远不会抛出{@link IllegalStateException}或返回{@code false}。
//但是如果e为null,就会抛出一个空指针
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
checkNotNull(e);
//创建入队节点
final Node<E> newNode = new Node<E>(e);
//死循环,入队不成功就反复入队
for (Node<E> t = tail, p = t; ; ) {
//Node p和t 都指向tail节点,q节点是p的next节点
Node<E> q = p.next;
// 判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null
if (q == null) {
// q指向的是tail的next节点,如果为null,说明此时此刻没有别的线程新增节点成功
// p是尾节点,设置p的next节点指向新创建的节点(也叫入队节点)
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
//如果 p!=t , 则将入队节点设置成tail节点,更新失败也没关系,更新失败表示有其他线程更新了tail节点
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// cas更新p的下个节点失败,再次尝试
} else if (p == q)
/** * 这种情况就属于遇到了哨兵节点导致的。所谓的哨兵节点就是next指向了自己。 * 这种节点在队列中存在的价值不大,主要表示要删除的节点或者空节点,当遇到哨兵节点时,无法通过next取得后续的节点, * 很可能直接返回head,这样链表就会从头部开始遍历,重新查找到链表末尾。 * 当然也有可能返回t,这样就进行了一次“打赌”,使用新的tail作为链表尾部(即t),这样避免了重新查找tail的开销。 * * 这句代码虽然只有一行,但是包含的信息比较多。首先“!=”不是原子操作,它是可以被中断的,也就是说,在执行“!=”时, * 程序会先取得t的值,在执行t = tail,并取得新的t的值,然后比较这两个值是否相等。 * 在单线程中,t != t这种语句显然不会成立, * 但是在并发环境下,有可能在获得左边的t值后,右边的t值被其他线程修改,这样,t != t 就成立了。 * 这种情况表示在比较的过程中,tail被其他线程修改了,这时,我们就用新的tail为链表的尾,也就是等式右边的t, * 但如果tail没有被修改,则返回head,要求从头部开始,重新查找链表末尾。 */
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
Q&A:
为啥不把tail直接定义为尾节点?如果这样的话,代码其实很简单。
如果直接把tail设置为尾节点,那么上面你的入队列操作可以用下面代码实现:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (;;) {
Node<E> t = tail;
if (t.casNext(null ,newNode) && casTail(t, newNode)) {
return true;
}
}
}
将tail节点永远作为队列尾节点,这样实现起来确实更简单,但是缺点在于每次tail节点的更新都需要使用CAS的方式来进行,这样会一定程度上降低入队的效率。
p == q ,也就是节点的next引用指向了自己,这种情况怎么出现的?
下边会说。说实话,这点要是没有看到下边代码,还真是想不清楚~
删除一个元素
/** * 删除第一个元素,并返回第一个元素的值 * @return */
public E poll() {
restartFromHead:
for (; ; ) {
for (Node<E> h = head, p = h, q; ; ) {
E item = p.item;
//p 指向 head节点,如果p的item不为null,则通过cas将其设置为null
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
//如果p 不是指向 head了,那么更新head
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
} else if ((q = p.next) == null) {
//如果P节点的下一个节点为null,说明这个队列已经为空了,直接更新头结点为p节点
updateHead(h, p);
return null;
} else if (p == q) {
// p == q,则使用新的head重新开始
continue restartFromHead;
}
// 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
else
p = q;
}
}
}
//震惊!真相就在我们眼前!
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
//这里就是为什么会出现节点的next引用指向自己,也就是出现了p==q的情况
h.lazySetNext(h);
}
其他的一些方法
//读取队列的第一个元素的值(只读不删)
public E peek() {
restartFromHead:
for (; ; ) {
for (Node<E> h = head, p = h, q; ; ) {
E item = p.item;
if (item != null || (q = p.next) == null) {
//这里会改变head的指向,head指向队列中第一个具有非空元素的节点。默认的head节点中item为null
updateHead(h, p);
return item;
} else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
public boolean remove(Object o) {
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item) && p.casItem(item, null)) {
Node<E> next = succ(p);
if (pred != null && next != null)
pred.casNext(p, next);
return true;
}
pred = p;
}
return false;
}
NOTE:
1:contains(Object o)、size()这两个方法并不能在并发下保证结果正确。
2:判断队列中是否还有元素,使用isEmpty()方法优于size()>0的方式,前者只是判断队列中第一个元素是否为null,后者需要遍历队列获取最终结果,而且后者在并发下并不能保证最红结果准确。