本文共 5758 字,大约阅读时间需要 19 分钟。
CAS(Compare and Swap)比较和交换,是java在处理并发问题时,使用最多的一种方式,简单说就是,指定一个对象V,给出他的期望值,及需要修改的值,如果期望值等同于内存中的值,那么就把这个对象修改成我们想要改变的值,否则修改失败。
CAS使用最佳实践
先看下我们下面的场景:
public class Case { public volatile int n; public void add() { n++; }}
int 类型的对象n, 我们使用volatile修饰,来保证了内存可见性,但是当多线程情况下,我们调用add()方法,由于该方法并不是线程安全的,并且n++这个操作并不是原子操作,(其实是三步1、从主内存获取n的值,2、n的值+1,3、将n的值写会至主内存)在以上三步中,如果多线程同时访问,那么就会造成n的值小于预期的值。我们怎么解决上面这个情况呢
最常见的办法就是 在add方法上添加synchronized关键字,当然可以保证多线程的安全性,但是并不是很推荐(synchronized是重量级锁,获取锁和释放锁,需要进行内核态和用户态之间的切换,后面会讨论synchronized和cas性能问题)
还有一种解决办法就是使用java中提供的CAS操作,在CAS操作当中,这个步骤是原子操作,不会被其他的线程访问,同时也不需要加锁。性能上可能会更加有优势,我们来看下java中提供对Integer值得增加,修改的原子操作类AtomicInteger
AtomicInteger底层是通过unsafe类来操作数据的,其内部维护了一个Integer类型的属性value
static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } }
1、这段代码时获取到value这个属性相对于AtomicInteger对象的内存偏移量,方便后续直接通过unsafe来修改内存当中的值
对于上面我们的n++操作,对应到AtomicInteger类当中
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
通过对象和对象中属性的偏移量,来计算出当前偏移量下,存储的旧的数值,然后在旧值上增加新值,这一步需要注意的是,通过do-while这种方式,直到修改成功
最终调用的就是unsafe当中的compareAndSwapInt
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
其参数就是当前对象,var2:属性相对于当前对象的偏移量,var4 :期待值,var5:修改的新值
CAS操作容易出现的问题:
1、ABA问题,线程1 从内存当中获取到变量V的值是A,线程2也从内存当中获取到V的值A,然后线程2将V的值修改成B, 然后线程2又将变量V的值改成了A,这时候线程1 判断变量V的值也是A,然后就进行修改成功了。
这个时候可能就会有个疑问即使我中间变化了,可我最终还是把值给改成我所期待的值,并不会有影响,这种想法只是在修改一个Integer类型时,可能中间修改过多少次并没有太大的影响,但是如果是链表的话,那么这个影响是很大的
比如单向链表,A-->B--->null, 如图
加入此时线程1 想要把栈顶A 替换为B, head.comareAndSet(A,B),还没有执行成功
此时线程2 进入,将链表当中A后面的元素成功设置成了C和D,
此时线程1 回来执行compareAndSwap操作,发现栈顶还是A ,然后执行成功,但是此时链表变成了如下
此时栈顶中只有一个元素B,这种情况下就会把C、D元素丢掉了
使用AtomicStampedReference来解决ABA问题
AtomicStampedReference对象当中维护了内部类,在原有基础上维护了一个对象,及一个int类型的值(可以理解为版本号),在每次进行对比修改时,都会先判断要修改的值,和内存中的值是否相同,以及版本号是否相同
private static class Pair{ final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static Pair of(T reference, int stamp) { return new Pair (reference, stamp); } }
以上是其内部类,我们在创建一个对象时,同时会出传递一个时间戳或者版本号,在每次进行修改时,都会修改原来的时间戳或者版本号
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Paircurrent = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
在compareAndSet时,会传入期望值,新值,期望版本号,新的版本号,并且使用unsafe的方式修改object对象
private boolean casPair(Paircmp, Pair val) { return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val); }
CAS和Synchronized对比
在jdk1.7中,AtomicInteger的getAndIncrement方法实现如下
public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; }}
这种情况下,CAS操作适合于竞争不激烈的情况下,通过硬件来完成原子操作,不需要加锁,也需要进行用户态和核心态的一个切换,在竞争理解的情况下,线程进入自旋更新操作中,而synchronized是通过使线程阻塞,来阻止对值得修改,这时候我们查看cpu会发现,通过CAS操作,当线程达到一定数量之后,更加耗时,同时CPU占有率更高,而synchronized方式,他的CPU占有率不会增加,
但是在JDK 1.8当中,AtomicInteger进行了优化public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
经过测试,发现CAS耗时基本上和synchronized差不多,总体上CAS是低于synchronized的耗时
public class CASDemo { private final int THREAD_NUM = 800; private final int MAX_VALUE = 20000000; private AtomicInteger casI = new AtomicInteger(0); private int syncI = 0; private Object obj = new Object(); public static void main(String[] args) throws Exception { CASDemo casDemo = new CASDemo(); casDemo.casAdd(); casDemo.syncAdd(); } public void casAdd() throws Exception { long begin = System.currentTimeMillis(); Thread[] threads = new Thread[THREAD_NUM]; for (int i = 0; i < THREAD_NUM; i++) { threads[i] = new Thread(new Runnable() { @Override public void run() { while (casI.get() < MAX_VALUE) { casI.getAndIncrement(); } } }); threads[i].start(); } for (int i = 0; i < THREAD_NUM; i++) { threads[i].join(); } System.out.println("cas costs time :" + (System.currentTimeMillis() - begin)); } public void syncAdd() throws Exception { long begin = System.currentTimeMillis(); Thread[] threads = new Thread[THREAD_NUM]; for (int i = 0; i < THREAD_NUM; i++) { threads[i] = new Thread(new Runnable() { @Override public void run() { while (syncI < MAX_VALUE) { synchronized (obj) { ++syncI; } } } }); threads[i].start(); } for (int i = 0; i < THREAD_NUM; i++) { threads[i].join(); } System.out.println("synchronized cost:" + (System.currentTimeMillis() - begin)); }}
参考: