关于CAS等原子操作
在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构。
这个操作用C语言来描述就是下面这个样子:意思就是说,看一看内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。
int compare_and_swap (int* reg, int oldval, int newval)
{
int old_reg_val = *reg;
if (old_reg_val == oldval) {
*reg = newval;
}
return old_reg_val;
}我们可以看到,old_reg_val 总是返回,于是,我们可以在 compare_and_swap 操作之后对其进行测试,以查看它是否与 oldval相匹配,因为它可能有所不同,这意味着另一个并发线程已成功地竞争到 compare_and_swap 并成功将 reg 值从 oldval 更改为别的值了。
这个操作可以变种为返回bool值的形式(返回 bool值的好处在于,可以调用者知道有没有更新成功):
bool compare_and_swap (int *addr, int oldval, int newval)
{
if ( *addr != oldval ) {
return false;
}
*addr = newval;
return true;
}CAS的ABA问题
所谓ABA,问题基本是这个样子:
进程P1在共享变量中读到值为A
P1被抢占了,进程P2执行
P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
P1回来看到共享变量里的值没有被改变,于是继续执行。
虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的值。很明显,值是很容易又变成原样的。
你拿着一个装满钱的手提箱在飞机场,此时过来了一个***性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。解决ABA的问题
一次用CAS检查双倍长度的值,前半部是值,后半部分是一个计数器。
只有这两个都一样,才算通过检查,要吧赋新的值。并把计数器累加1。
这样一来,ABA发生时,虽然值一样,但是计数器就不一样无锁队列的链表实现
初始化一个队列的代码很简,初始化一个dummy结点(注:在链表操作中,使用一个dummy结点,可以少掉很多边界条件的判断),如下所示:
InitQueue(Q) { node = new node() node->next = NULL; Q->head = Q->tail = node; }我们先来看一下进队列用CAS实现的方式,基本上来说就是链表的两步操作:
第一步,把tail指针的next指向要加入的结点。tail->next = p;
第二步,把tail指针移到队尾。tail = p;
EnQueue(Q, data) //进队列
{
//准备新加入的结点数据
n = new node();
n->value = data;
n->next = NULL;
do {
p = Q->tail; //取链表尾指针的快照
} while( CAS(p->next, NULL, n) != TRUE);
//while条件注释:如果没有把结点链在尾指针上,再试
CAS(Q->tail, p, n); //置尾结点 tail = n;
}我们可以看到,程序中的那个 do-while 的 Retry-Loop 中的 CAS 操作:如果 p->next 是 NULL,那么,把新结点 n 加到队尾。如果不成功,则重新再来一次!
就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。
但是你会看到,为什么我们的“置尾结点”的操作不判断是否成功,因为:
如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
直到T1线程更新完 tail 指针,于是其它的线程中的某个线程就可以得到新的 tail 指针,继续往下走了。
所以,只要线程能从 while 循环中退出来,意味着,它已经“独占”了,tail 指针必然可以被更新。
这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()
EnQueue(Q, data) //进队列改良版 v1
{
n = new node();
n->value = data;
n->next = NULL;
p = Q->tail;
oldp = p
do {
while (p->next != NULL)
p = p->next;
} while( CAS(p.next, NULL, n) != TRUE); //如果没有把结点链在尾上,再试
CAS(Q->tail, oldp, n); //置尾结点
}我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而且,如果一个线程不断的EnQueue,会导致所有的其它线程都去 fetch 他们的 p 指针到队尾,能不能不要所有的线程都干同一个事?这样可以节省整体的时间?
比如:直接 fetch Q->tail 到队尾?因为,所有的线程都共享着 Q->tail,所以,一旦有人动了它后,相当于其它的线程也跟着动了,于是,我们的代码可以改进成如下的实现:
EnQueue(Q, data) //进队列改良版 v2
{
n = new node();
n->value = data;
n->next = NULL;
while(TRUE) {
//先取一下尾指针和尾指针的next
tail = Q->tail;
next = tail->next;
//如果尾指针已经被移动了,则重新开始
if ( tail != Q->tail ) continue;
//如果尾指针的 next 不为NULL,则 fetch 全局尾指针到next
if ( next != NULL ) {
CAS(Q->tail, tail, next);
continue;
}
//如果加入结点成功,则退出
if ( CAS(tail->next, next, n) == TRUE ) break;
}
CAS(Q->tail, tail, n); //置尾结点
}上述的代码还是很清楚的,相信你一定能看懂,而且,这也是 Java 中的 ConcurrentLinkedQueue 的实现逻辑,当然,我上面的这个版本比 Java 的好一点,因为没有 if 嵌套,嘿嘿。
好了,我们解决了EnQueue,我们再来看看DeQueue的代码:(很简单,我就不解释了)

京公网安备 11010502036488号