博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
世界上最简单的无等待算法(getAndIncrement)
阅读量:6189 次
发布时间:2019-06-21

本文共 5725 字,大约阅读时间需要 19 分钟。

本文基于compareandswap指令完成一个无等待并发算法。

根据维基百科,它的定义如下:

An algorithm is wait-free if every operation has a bound on the number of steps the algorithm will take before the operation completes.

本文的方法参考了,,。

它的目标非常简单,就是把一个值通过原子指令CMPXCHG加N,然后返回原来的值。
lock-free版本如下:

public final int getAndIncrement(int add) {        for (;;) {            int current = get();            int next = current + add;            if (compareAndSet(current, next))                return current;        }    }

那么,这里的方式是,compareAndSet失败了重试。并且导致它失败的那一方必定取得了进展。

这里的问题在于,究竟需要重试多少次compareAndSet才能成功呢?
假如一直处于竞争环境下,这个重试次数是没有上限的。这里也就是饥饿的问题。
有没有办法给出一个上限,哪怕这个上限比较大
这种并发的性质被称为wait-freedom。
先给出一个我实现好了的代码,之后再来讲思路:

public static int getAndIncrement(int index, int add) {        //fast-path, 最多MAX次。        int count = MAX;        for(;;) {            ValueObj valueObj_ = valueObj;            if(valueObj_.threadObj == null) {                ValueObj valueObjNext = new ValueObj(valueObj_.value + add, null);                if(casValueObj(valueObj_, valueObjNext)) {                    StateObj myState = states[index];                    //前进一步,每assistStep,尝试一个帮助。                    if(((++myState.steps) & myState.assistStep) == 0){                        long helpThread = myState.index;                        help(helpThread);                        //下一个协助的对象。                        ++myState.index;                    }                    return valueObj_.value;                }                Thread.yield();Thread.yield();Thread.yield();Thread.yield();            } else {                helpTransfer(valueObj_);            }                        if(--count == 0)                break;        }//        System.out.println("here " + inter.incrementAndGet());        for(int j = 0; j < bigYields; ++j)            Thread.yield();                //slow-path,将自己列为被帮助对象。        ThreadObj myselfObj = new ThreadObj(new ThreadObj.WrapperObj(null, false), add);        setThreadObj(index, myselfObj);        //开始帮助自己        ValueObj result = help(index);        setThreadObj(index, null);        return result.value;    }        // valueObj->threadObj->wrapperObj->valueObj。    // step 1-3,每一个步骤都不会阻塞其他步骤。    // 严格遵守以下顺序:     // step 1: 通过将ValueObj指向ThreadObj:    //         atomic: (value, null)->(value, ThreadObj)来锚定该值                      //确定该value归ThreadObj对应线程所有。    // step 2: 通过将ThreadObj包裹的WrapperObj,    //         atomic: 从(null, false)更新为(valueObj, true)来更新状态的同时传递value    //对应线程通过isFinish判定操作已完成。    // step 3: 更新ValueObj,提升value,同时设置ThreadObj为null:    //         atomic: (value, ThreadObj)->(value+1, null)完成收尾动作                 //此时value值回到了没有被线程锚定的状态,也可以看做step1之前的状态。    private static ValueObj help(long helpIndex) {        helpIndex = helpIndex % N;        ThreadObj helpObj = getThreadObj(helpIndex);        ThreadObj.WrapperObj wrapperObj;        if(helpObj == null || helpObj.wrapperObj == null)            return null;        //判定句,是否该线程对应的操作未完成,(先取valueObj,再取isFinish,这很重要)。        ValueObj valueObj_ = valueObj;        while(!(wrapperObj = helpObj.wrapperObj).isFinish) {            /*ValueObj valueObj_ = valueObj;*/            if(valueObj_.threadObj == null) {                ValueObj intermediateObj = new ValueObj(valueObj_.value, helpObj);                //step1                if(!casValueObj(valueObj_, intermediateObj)) {                    valueObj_ = valueObj;                    continue;                }                //step1: 锚定该ValueObj,接下来所有看到该valueObj的线程,都会一致地完成一系列操作.                valueObj_ = intermediateObj;            }            //完成ValueObj、ThreadObj中的WrapperObj的状态迁移。            helpTransfer(valueObj_);            valueObj_ = valueObj;        }        valueObj_ = wrapperObj.value;        helpValueTransfer(valueObj_);        //返回锚定的valueObj。        return valueObj_;    }        private static void helpTransfer(ValueObj valueObj_) {        ThreadObj.WrapperObj wrapperObj = valueObj_.threadObj.wrapperObj;        //step2: 先完成ThreadObj的状态迁移,WrapperObj(valueObj,true)分别表示(值,完成),原子地将这两个值喂给threadObj。        if(!wrapperObj.isFinish) {            ThreadObj.WrapperObj wrapValueFiniash = new ThreadObj.WrapperObj(valueObj_, true);            valueObj_.threadObj.casWrapValue(wrapperObj, wrapValueFiniash);        }        //step3: 最后完成ValueObj上的状态迁移        helpValueTransfer(valueObj_);    }        private static ValueObj helpValueTransfer(ValueObj valueObj_) {        if(valueObj_ == valueObj) {            ValueObj valueObjNext = new ValueObj(valueObj_.value + valueObj_.threadObj.add, null);            casValueObj(valueObj_, valueObjNext);        }        return valueObj_;    }

首先全局作用域有个值ValueObj,一种自然的想法是ValueObj应该值包含一个int值value,图[1]:

图片描述
如图所示,无锁并发的版本无非就是从一个值原子地变化为另一个+1的值。
但是我们要的是wait-free,势必涉及到线程之间的互助。如何在线程之间传递信息从而实现协助呢?
我们给ValueObj增加一个字段threadObj,那么上图的版本现在可以视为,图[2]:
图片描述
那么现在增加了一个数据ThreadObj(后面会解释如何使用),ValueObj会有两种状态:

  1. (Value, null)
  2. (Value, threadObj)

那么好,现在是两种情况:

  1. (value, null),代表这个value我可以去竞争。
  2. (value, threadObj),代表这个value已经被threadObj对应的线程预定了,剩余线程达成共识。

所以情况2我们能做什么呢?

这里就涉及到无等待算法的核心技术,help,通俗地说,就是多个线程间的相互协助。
我们会先将value,原子地喂给threadObj,再将(value, threadObj)原子地迁移到(value+1, null)。
这里的顺序很重要,因为假如线程A先迁移了(value, threadObj)->(value+1, nulll),那么剩下的其他所有线程都无法再看到threadObj,也就是说尽管value已经增加了,但是threadObj对应线程被锁住了。只能等待线程A将value喂给threadObj。
ThreadObj中包裹着WrapperObj,WrapperObj中初始化为(null, false),将会原子地转变为(ValueObj, ture),从而ThreadObj拿到了ValueObj。
所以这里一共是三个steps:
step 1: (value, null) atomic-> (value, threadObj)
step 2: threadObj.WrapperObj(null, false) atomic-> WrapperObj(valueObj, true)
stpe 3: (value, trheadObj) atomic-> (value + 1, null)
如图所示[3]:
图片描述

其中灰框黑字标识了关键的3个step.

图[3]所示是一种通用的无等待构造技术。
对于其中的ThreadObj以及每个线程协助其他线程的策略states,可以通过数组来构造,具体可在代码里看到。
最后我们的程序结构,如图[4]:
图片描述
最后给出完整的可执行代码 :

or :

,每个loop,100个线程每个操作100000次。

参考文献:
0:
1:
2:
3:

转载地址:http://xroda.baihongyu.com/

你可能感兴趣的文章
基于OpenGL编写一个简易的2D渲染框架-04 绘制图片
查看>>
Markdown基本语法总结
查看>>
PCA and ZAC Whitening
查看>>
android raw与assets区别
查看>>
C语言内存分配
查看>>
C++知识点
查看>>
死锁、进程通信
查看>>
centos 7扩展磁盘分区容量
查看>>
github 新建一个分支
查看>>
[MEF] 学习之一 入门级的简单Demo
查看>>
2017全国卷1文科第9题高考真题的解法
查看>>
坐标系与参数方程习题01【中级高阶辅导】
查看>>
Java 可视化垃圾回收
查看>>
【10-25】intelliji ide 学习笔记
查看>>
JavaScript常用方法
查看>>
洛谷P2485 [SDOI2011]计算器(exgcd+BSGS)
查看>>
Katalon Recorder初探
查看>>
20160929001 Guid生成
查看>>
Java知多少(109)数据库更新
查看>>
WKWebView使用
查看>>