本文基于compareandswap指令完成一个无等待并发算法。
根据维基百科,它的定义如下:An algorithm is wait-free if every operation has a bound on the number of steps the algorithm will take before the operation completes.
本文的方法参考了,,。
它的目标非常简单,就是把一个值通过原子指令CMPXCHG加N,然后返回原来的值。lock-free版本如下:public final int getAndIncrement(int add) { for (;;) { int current = get(); int next = current + add; if (compareAndSet(current, next)) return current; } }
那么,这里的方式是,compareAndSet失败了重试。并且导致它失败的那一方必定取得了进展。
这里的问题在于,究竟需要重试多少次compareAndSet才能成功呢?假如一直处于竞争环境下,这个重试次数是没有上限的。这里也就是饥饿的问题。有没有办法给出一个上限,哪怕这个上限比较大?这种并发的性质被称为wait-freedom。先给出一个我实现好了的代码,之后再来讲思路:public static int getAndIncrement(int index, int add) { //fast-path, 最多MAX次。 int count = MAX; for(;;) { ValueObj valueObj_ = valueObj; if(valueObj_.threadObj == null) { ValueObj valueObjNext = new ValueObj(valueObj_.value + add, null); if(casValueObj(valueObj_, valueObjNext)) { StateObj myState = states[index]; //前进一步,每assistStep,尝试一个帮助。 if(((++myState.steps) & myState.assistStep) == 0){ long helpThread = myState.index; help(helpThread); //下一个协助的对象。 ++myState.index; } return valueObj_.value; } Thread.yield();Thread.yield();Thread.yield();Thread.yield(); } else { helpTransfer(valueObj_); } if(--count == 0) break; }// System.out.println("here " + inter.incrementAndGet()); for(int j = 0; j < bigYields; ++j) Thread.yield(); //slow-path,将自己列为被帮助对象。 ThreadObj myselfObj = new ThreadObj(new ThreadObj.WrapperObj(null, false), add); setThreadObj(index, myselfObj); //开始帮助自己 ValueObj result = help(index); setThreadObj(index, null); return result.value; } // valueObj->threadObj->wrapperObj->valueObj。 // step 1-3,每一个步骤都不会阻塞其他步骤。 // 严格遵守以下顺序: // step 1: 通过将ValueObj指向ThreadObj: // atomic: (value, null)->(value, ThreadObj)来锚定该值 //确定该value归ThreadObj对应线程所有。 // step 2: 通过将ThreadObj包裹的WrapperObj, // atomic: 从(null, false)更新为(valueObj, true)来更新状态的同时传递value //对应线程通过isFinish判定操作已完成。 // step 3: 更新ValueObj,提升value,同时设置ThreadObj为null: // atomic: (value, ThreadObj)->(value+1, null)完成收尾动作 //此时value值回到了没有被线程锚定的状态,也可以看做step1之前的状态。 private static ValueObj help(long helpIndex) { helpIndex = helpIndex % N; ThreadObj helpObj = getThreadObj(helpIndex); ThreadObj.WrapperObj wrapperObj; if(helpObj == null || helpObj.wrapperObj == null) return null; //判定句,是否该线程对应的操作未完成,(先取valueObj,再取isFinish,这很重要)。 ValueObj valueObj_ = valueObj; while(!(wrapperObj = helpObj.wrapperObj).isFinish) { /*ValueObj valueObj_ = valueObj;*/ if(valueObj_.threadObj == null) { ValueObj intermediateObj = new ValueObj(valueObj_.value, helpObj); //step1 if(!casValueObj(valueObj_, intermediateObj)) { valueObj_ = valueObj; continue; } //step1: 锚定该ValueObj,接下来所有看到该valueObj的线程,都会一致地完成一系列操作. valueObj_ = intermediateObj; } //完成ValueObj、ThreadObj中的WrapperObj的状态迁移。 helpTransfer(valueObj_); valueObj_ = valueObj; } valueObj_ = wrapperObj.value; helpValueTransfer(valueObj_); //返回锚定的valueObj。 return valueObj_; } private static void helpTransfer(ValueObj valueObj_) { ThreadObj.WrapperObj wrapperObj = valueObj_.threadObj.wrapperObj; //step2: 先完成ThreadObj的状态迁移,WrapperObj(valueObj,true)分别表示(值,完成),原子地将这两个值喂给threadObj。 if(!wrapperObj.isFinish) { ThreadObj.WrapperObj wrapValueFiniash = new ThreadObj.WrapperObj(valueObj_, true); valueObj_.threadObj.casWrapValue(wrapperObj, wrapValueFiniash); } //step3: 最后完成ValueObj上的状态迁移 helpValueTransfer(valueObj_); } private static ValueObj helpValueTransfer(ValueObj valueObj_) { if(valueObj_ == valueObj) { ValueObj valueObjNext = new ValueObj(valueObj_.value + valueObj_.threadObj.add, null); casValueObj(valueObj_, valueObjNext); } return valueObj_; }
首先全局作用域有个值ValueObj,一种自然的想法是ValueObj应该值包含一个int值value,图[1]:
如图所示,无锁并发的版本无非就是从一个值原子地变化为另一个+1的值。但是我们要的是wait-free,势必涉及到线程之间的互助。如何在线程之间传递信息从而实现协助呢?我们给ValueObj增加一个字段threadObj,那么上图的版本现在可以视为,图[2]:那么现在增加了一个数据ThreadObj(后面会解释如何使用),ValueObj会有两种状态:- (Value, null)
- (Value, threadObj)
那么好,现在是两种情况:
- (value, null),代表这个value我可以去竞争。
- (value, threadObj),代表这个value已经被threadObj对应的线程预定了,剩余线程达成共识。
所以情况2我们能做什么呢?
这里就涉及到无等待算法的核心技术,help,通俗地说,就是多个线程间的相互协助。我们会先将value,原子地喂给threadObj,再将(value, threadObj)原子地迁移到(value+1, null)。这里的顺序很重要,因为假如线程A先迁移了(value, threadObj)->(value+1, nulll),那么剩下的其他所有线程都无法再看到threadObj,也就是说尽管value已经增加了,但是threadObj对应线程被锁住了。只能等待线程A将value喂给threadObj。ThreadObj中包裹着WrapperObj,WrapperObj中初始化为(null, false),将会原子地转变为(ValueObj, ture),从而ThreadObj拿到了ValueObj。所以这里一共是三个steps:step 1: (value, null) atomic-> (value, threadObj)step 2: threadObj.WrapperObj(null, false) atomic-> WrapperObj(valueObj, true)stpe 3: (value, trheadObj) atomic-> (value + 1, null)如图所示[3]:其中灰框黑字标识了关键的3个step.
图[3]所示是一种通用的无等待构造技术。对于其中的ThreadObj以及每个线程协助其他线程的策略states,可以通过数组来构造,具体可在代码里看到。最后我们的程序结构,如图[4]:最后给出完整的可执行代码 :or :
,每个loop,100个线程每个操作100000次。
参考文献:0:1:2:3: