更新時間:2022-08-25 來源:黑馬程序員 瀏覽量:
并發(fā)計數(shù)器各個方案介紹
方案概述
1. jdk5提供的原子更新長整型類 AtomicLong
2. synchronized
3. jdk8提供的 LongAdder 【單機(jī)推薦】
4. Redisson分布式累加器【分布式推薦】
方案介紹
jdk5提供的原子更新長整型類 AtomicLong
在JDK1.5開始就新增了并發(fā)的Integer/Long的操作工具類AtomicInteger和AtomicLong。
AtomicLong 利用底層操作系統(tǒng)的CAS來保證原子性,在一個死循環(huán)內(nèi)不斷執(zhí)行CAS操作,直到操作成功。不過,CAS操作的一個問題是在并發(fā)量比較大的時候,可能很多次的執(zhí)行CAS操作都不成功,這樣性能就受到較大影響。
示例代碼
AtomicLong value = new AtomicLong(0); //定義 incrementAndGet(); //遞增1 ```
synchronized
synchronized是一個重量級鎖,主要是因?yàn)榫€程競爭鎖會引起操作系統(tǒng)用戶態(tài)和內(nèi)核態(tài)切換,浪費(fèi)資源效率不高,在jdk1.5之前,synchronized沒有做任何優(yōu)化,但在jdk1.6做了性能優(yōu)化,它會經(jīng)歷偏向鎖,輕量級鎖,最后才到重量級鎖這個過程,在性能方面有了很大的提升,在jdk1.7的ConcurrentHashMap是基于ReentrantLock的實(shí)現(xiàn)了鎖,但在jdk1.8之后又替換成了synchronized,就從這一點(diǎn)可以看出JVM團(tuán)隊對synchronized的性能還是挺有信心的。下面我們分別來介紹下無鎖,偏向鎖,輕量級鎖,重量級鎖。
jdk8提供的 LongAdder 【單機(jī)推薦】
在JDK8中又新增了LongAdder,這是一個針對Long類型的數(shù)據(jù)的操作工具類。
那我們知道,在ConcurrentHashMap中,對Map分割成多個segment,這樣多個Segment的操作就可以并行執(zhí)行,從而可以提高性能。在JDK8中,LongAdder與ConcurrentHashMap類似,將內(nèi)部操作數(shù)據(jù)value分離成一個Cell數(shù)組,每個線程訪問時,通過Hash等算法映射到其中一個Cell上。
計算最終的數(shù)據(jù)結(jié)果,則是各個Cell數(shù)組的累計求和。
LongAddr常用api方法
add(): //增加指定的數(shù)值; increament(): //增加1; decrement(): //減少1; intValue(); //intValue();/floatValue()/doubleValue():得到最終計數(shù)后的結(jié)果 sum()://求和,得到最終計數(shù)結(jié)果 sumThenReset()://求和得到最終計數(shù)結(jié)果,并重置value。 ```
Redisson分布式累加器【分布式推薦】
基于Redis的Redisson分布式整長型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類似的接口。通過利用客戶端內(nèi)置的LongAdder對象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計其性能最高比分布式AtomicLong對象快 10000 倍以上。
RLongAddr itheimaLongAddr = redission.getLongAddr("itheimaLongAddr"); itheimaLongAddr.add(100); //添加指定數(shù)量 itheimaLongAddr.increment(); //遞增1 itheimaLongAddr.increment(); //遞減1 itheimaLongAddr.sum(); //聚合求和 ```
基于Redis的Redisson分布式雙精度浮點(diǎn)累加器(DoubleAdder)采用了與java.util.concurrent.atomic.DoubleAdder類似的接口。通過利用客戶端內(nèi)置的DoubleAdder對象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計其性能最高比分布式AtomicDouble對象快 12000 倍。
示例代碼
RLongDouble itheimaDouble = redission.getLongDouble("itheimaLongDouble"); itheimaDouble.add(100); //添加指定數(shù)量 itheimaDouble.increment(); //遞增1 itheimaDouble.increment(); //遞減1 itheimaDouble.sum(); //聚合求和 ```
以上【整長型累加器】和【雙精度浮點(diǎn)累加器】完美適用于分布式統(tǒng)計計量場景。
各個方案性能測試
測試代碼
``` package com.itheima._01性能比較; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; /** * @author 黑馬程序員 */ public class CountTest { private int count = 0; @Test public void startCompare() { compareDetail(1, 100 * 10000); compareDetail(20, 100 * 10000); compareDetail(30, 100 * 10000); compareDetail(40, 100 * 10000); compareDetail(100, 100 * 10000); } /** * @param threadCount 線程數(shù) * @param times 每個線程增加的次數(shù) */ public void compareDetail(int threadCount, int times) { try { System.out.println(String.format("threadCount: %s, times: %s", threadCount, times)); long start = System.currentTimeMillis(); testSynchronized(threadCount, times); System.out.println("testSynchronized cost: " + (System.currentTimeMillis() - start)); start = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("testAtomicLong cost: " + (System.currentTimeMillis() - start)); start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("testLongAdder cost: " + (System.currentTimeMillis() - start)); System.out.println(); } catch (Exception e) { e.printStackTrace(); } } public void testSynchronized(int threadCount, int times) throws InterruptedException { List<Thread> threadList = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { threadList.add(new Thread(()-> { for (int j = 0; j < times; j++) { add(); } })); } for (Thread thread : threadList) { thread.start(); } for (Thread thread : threadList) { thread.join(); } } public synchronized void add() { count++; } public void testAtomicLong(int threadCount, int times) throws InterruptedException { AtomicLong count = new AtomicLong(); List<Thread> threadList = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { threadList.add(new Thread(()-> { for (int j = 0; j < times; j++) { count.incrementAndGet(); } })); } for (Thread thread : threadList) { thread.start(); } for (Thread thread : threadList) { thread.join(); } } public void testLongAdder(int threadCount, int times) throws InterruptedException { LongAdder count = new LongAdder(); List<Thread> threadList = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { threadList.add(new Thread(()-> { for (int j = 0; j < times; j++) { count.increment(); } })); } for (Thread thread : threadList) { thread.start(); } for (Thread thread : threadList) { thread.join(); } } } ```
運(yùn)行結(jié)果
threadCount: 1, times: 1000000 testSynchronized cost: 69 testAtomicLong cost: 16 testLongAdder cost: 15 threadCount: 20, times: 1000000 testSynchronized cost: 639 testAtomicLong cost: 457 testLongAdder cost: 59 threadCount: 30, times: 1000000 testSynchronized cost: 273 testAtomicLong cost: 538 testLongAdder cost: 70 threadCount: 40, times: 1000000 testSynchronized cost: 312 testAtomicLong cost: 717 testLongAdder cost: 81 threadCount: 100, times: 1000000 testSynchronized cost: 719 testAtomicLong cost: 2098 testLongAdder cost: 225 ```
結(jié)論
并發(fā)量比較低的時候AtomicLong優(yōu)勢比較明顯,因?yàn)锳tomicLong底層是一個樂觀鎖,不用阻塞線程,不斷cas即可。但是在并發(fā)比較高的時候用synchronized比較有優(yōu)勢,因?yàn)榇罅烤€程不斷cas,會導(dǎo)致cpu持續(xù)飆高,反而會降低效率
LongAdder無論并發(fā)量高低,優(yōu)勢都比較明顯。且并發(fā)量越高,優(yōu)勢越明顯
原理分析
AtomicLong 實(shí)現(xiàn)原子操作原理
非原子操作示例代碼
package com.itheima._02Unsafe測試; import java.util.ArrayList; import java.util.List; /** * @author 黑馬程序員 */ public class Test1 { private int value = 0; public static void main(String[] args) throws InterruptedException { Test1 test1 = new Test1(); test1.increment(); System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value); //結(jié)果,期待值:10000,最終結(jié)果值:xxxx } private void increment() throws InterruptedException { List<Thread> list = new ArrayList<>(); //啟動100個線程,每個線程對value進(jìn)行累加100次 for (int i = 0; i < 100; i++) { Thread t = new Thread(() -> { for (int j = 0; j < 100; j++) { value++; } }); list.add(t); t.start(); } //保證所有線程運(yùn)行完成 for (Thread thread : list) { thread.join(); } } } ```
運(yùn)行效果
結(jié)論
> 可以發(fā)現(xiàn)輸出的結(jié)果值錯誤,這是因?yàn)?`value++` 不是一個原子操作,它將 `value++` 拆分成了 3 個步驟 `load、add、store`,多線程并發(fā)有可能上一個線程 add 過后還沒有 store 下一個線程又執(zhí)行了 load 了這種重復(fù)造成得到的結(jié)果可能比最終值要小。
AtomicLong是JDK1.5提供的原子操作示例代碼
package com.itheima._03AtomicLong的CAS原子操作示例; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** * @author 黑馬程序員 */ public class Test2 { private AtomicLong value = new AtomicLong(0); public static void main(String[] args) throws InterruptedException { Test2 test1 = new Test2(); test1.increment(); System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value); //結(jié)果,期待值:10000,最終結(jié)果值:10000 } private void increment() throws InterruptedException { List<Thread> list = new ArrayList<>(); //啟動100個線程,每個線程對value進(jìn)行累加100次 for (int i = 0; i < 100; i++) { Thread t = new Thread(() -> { for (int j = 0; j < 100; j++) { value.incrementAndGet(); } }); list.add(t); t.start(); } //保證所有線程運(yùn)行完成 for (Thread thread : list) { thread.join(); } } } ```
運(yùn)行效果
AtomicLong CAS原理介紹
1.使用volatile保證內(nèi)存可見性,獲取主存中最新的操作數(shù)據(jù)
2.使用CAS(Compare-And-Swap)操作保證數(shù)據(jù)原子性
CAS算法是jdk對并發(fā)操作共享數(shù)據(jù)的支持,包含了3個操作數(shù)
第一個操作數(shù):內(nèi)存值value(V)
第二個操作數(shù):預(yù)估值expect(O)
第三個操作數(shù):更新值new(N)
含義:CAS比較交換的過程可以通俗的理解為CAS(V,O,N),包含三個值分別為:V 內(nèi)存地址(主存)存放的實(shí)際值;O 預(yù)期的值(舊值);N 更新的新值。當(dāng)V和O相同時,也就是說舊值和內(nèi)存中實(shí)際的值相同表明該值沒有被其他線程更改過,即該舊值O就是目前來說最新的值了,自然而然可以將新值N賦值給V;當(dāng)V和O不相同時,會一致循環(huán)下去直至修改成功。
AtomicLong底層CAS實(shí)現(xiàn)原子操作原理
查看incrementAndGet()方法源碼
public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } ``` getAndAddLong方法源碼 ```java public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; } ```
> 這里是一個循環(huán)CAS操作
compareAndSwapLong方法源碼
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); ```
我們發(fā)現(xiàn)調(diào)用的是 native 的 `unsafe.compareAndSwapLong(Object obj, long valueOffset, Long expect, Long update)`,我們翻看 Hotspot 源碼發(fā)現(xiàn)在 unsafe.cpp 中定義了這樣一段代碼
> Unsafe中基本都是調(diào)用native方法,那么就需要去JVM里面找對應(yīng)的實(shí)現(xiàn)。
>
> 到`http://hg.openjdk.java.net/` 進(jìn)行一步步選擇下載對應(yīng)的hotspot版本,我這里下載的是`http://hg.openjdk.java.net/jdk8u/jdk8u60/hotspot/archive/tip.tar.gz`,
>
> 然后解hotspot目錄,發(fā)現(xiàn) `\src\share\vm\prims\unsafe.cpp`,這個就是對應(yīng)jvm相關(guān)的c++實(shí)現(xiàn)類了。
>
> 比如我們對CAS部分的實(shí)現(xiàn)很感興趣,就可以在該文件中搜索compareAndSwapInt,此時可以看到對應(yīng)的JNI方法為`Unsafe_CompareAndSwapInt`
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x)) UnsafeWrapper("Unsafe_CompareAndSwapLong"); Handle p (THREAD, JNIHandles::resolve(obj)); jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset)); #ifdef SUPPORTS_NATIVE_CX8 return (jlong)(Atomic::cmpxchg(x, addr, e)) == e; #else if (VM_Version::supports_cx8()) return (jlong)(Atomic::cmpxchg(x, addr, e)) == e; else { jboolean success = false; MutexLockerEx mu(UnsafeJlong_lock, Mutex::_no_safepoint_check_flag); jlong val = Atomic::load(addr); if (val == e) { Atomic::store(x, addr); success = true; } return success; } #endif UNSAFE_END ```
Atomic::cmpxchg c++源碼
可以看到調(diào)用了“Atomic::cmpxchg”方法,“Atomic::cmpxchg”方法在linux_x86和windows_x86的實(shí)現(xiàn)如下。
linux_x86的實(shí)現(xiàn):
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; } ```
windows_x86的實(shí)現(xiàn)(c++源文件):
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange int mp = os::is_MP(); __asm { mov edx, dest mov ecx, exchange_value mov eax, compare_value LOCK_IF_MP(mp) cmpxchg dword ptr [edx], ecx } } ```
Atomic::cmpxchg方法解析:
mp是“os::is_MP()”的返回結(jié)果,“os::is_MP()”是一個內(nèi)聯(lián)函數(shù),用來判斷當(dāng)前系統(tǒng)是否為多處理器。
如果當(dāng)前系統(tǒng)是多處理器,該函數(shù)返回1。
否則,返回0。
LOCK_IF_MP(mp)會根據(jù)mp的值來決定是否為cmpxchg指令添加lock前綴。
如果通過mp判斷當(dāng)前系統(tǒng)是多處理器(即mp值為1),則為cmpxchg指令添加lock前綴。
否則,不加lock前綴。
這是一種優(yōu)化手段,認(rèn)為單處理器的環(huán)境沒有必要添加lock前綴,只有在多核情況下才會添加lock前綴,因?yàn)閘ock會導(dǎo)致性能下降。cmpxchg是匯編指令,作用是比較并交換操作數(shù)。
> 底層會調(diào)用cmpxchg匯編指令,如果是多核處理器會加鎖實(shí)現(xiàn)原子操作
反匯編指令查詢
查看java程序運(yùn)行的匯編指令資料
將上圖2個文件拷貝到j(luò)re\bin目錄下,如下圖
配置運(yùn)行參數(shù)
```
-server -Xcomp -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -XX:CompileCommand=compileonly,*
```
運(yùn)行Test2效果
synchronized 實(shí)現(xiàn)同步操作原理
鎖對象
java中任何一個對象都可以稱為鎖對象,原因在于java對象在內(nèi)存中存儲結(jié)構(gòu),如下圖所示:
在對象頭中主要存儲的主要是一些運(yùn)行時的數(shù)據(jù),如下所示:
其中 在Mark Work中存儲著該對象作為鎖時的一些信息,如下所示是Mark Work中在64位系統(tǒng)中詳細(xì)信息:
偏向鎖
在無競爭環(huán)境中(沒有并發(fā))使用一種鎖
> 偏向鎖的作用是當(dāng)有線程訪問同步代碼或方法時,線程只需要判斷對象頭的Mark Word中判斷一下是否有偏向鎖指向線程ID.
>
> 偏向鎖記錄過程
>
> - 線程搶到了對象的同步鎖(鎖標(biāo)志為01參考上圖即無其他線程占用)
> - 對象Mark World 將是否偏向標(biāo)志位設(shè)置為1
> - 記錄搶到鎖的線程ID
> - 進(jìn)入偏向狀態(tài)
輕量級鎖
當(dāng)有另外一個線程競爭獲取這個鎖時,由于該鎖已經(jīng)是偏向鎖,當(dāng)發(fā)現(xiàn)對象頭 Mark Word 中的線程 ID 不是自己的線程 ID,就會進(jìn)行 CAS 操作獲取鎖,**如果獲取成功**,直接替換 Mark Word 中的線程 ID 為自己的 ID,該鎖會保持偏向鎖狀態(tài);**如果獲取鎖失敗**,代表當(dāng)前鎖有一定的競爭,偏向鎖將升級為輕量級鎖。
- 舉個例子來說明一下什么時候需要升級偏向鎖
假設(shè)A線程 持有鎖 X(此時X是偏向鎖) 這是有個B線程也同樣用到了鎖X,而B線程在檢查鎖對象的Mark World時發(fā)現(xiàn)偏向鎖的線程ID已經(jīng)指向了線程A。這時候就需要升級鎖X為輕量級鎖。輕量級鎖意味著標(biāo)示該資源現(xiàn)在處于競爭狀態(tài)。
當(dāng)有其他線程想訪問加了輕量級鎖的資源時,會使用自旋鎖優(yōu)化,來進(jìn)行資源訪問。
> 自旋策略
>
> JVM 提供了一種自旋鎖,可以通過自旋方式不斷嘗試獲取鎖,從而避免線程被掛起阻塞。這是基于大多數(shù)情況下,線程持有鎖的時間都不會太長,畢竟線程被掛起阻塞可能會得不償失。
>
> 從 JDK1.7 開始,自旋鎖默認(rèn)啟用,自旋次數(shù)由 JVM 設(shè)置決定,這里我不建議設(shè)置的重試次數(shù)過多,因?yàn)?CAS 重試操作意味著長時間地占用 CPU。自旋鎖重試之后如果搶鎖依然失敗,同步鎖就會升級至重量級鎖,鎖標(biāo)志位改為 10。在這個狀態(tài)下,未搶到鎖的線程都會進(jìn)入 Monitor,之后會被阻塞在 _WaitSet 隊列中。
重量級鎖
自旋失敗,很大概率 再一次自選也是失敗,因此直接升級成重量級鎖,進(jìn)行線程阻塞,減少cpu消耗。
當(dāng)鎖升級為重量級鎖后,未搶到鎖的線程都會被阻塞,進(jìn)入阻塞隊列。
重量級鎖在高并發(fā)下性能就會變慢,因?yàn)樗袥]有獲取鎖的線程會進(jìn)行阻塞等待,到獲取鎖的時候被喚醒,這些操作都是消耗很多資源。
輕量級鎖膨脹流程圖
LongAdder 實(shí)現(xiàn)原子操作原理
LongAdder實(shí)現(xiàn)高并發(fā)計數(shù)實(shí)現(xiàn)思路
LongAdder實(shí)現(xiàn)高并發(fā)的秘密就是用空間換時間,對一個值的cas操作,變成對多個值的cas操作,當(dāng)獲取數(shù)量的時候,對這多個值加和即可。
測試代碼
``` package com.itheima._04LongAddr使用測試; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.LongBinaryOperator; /** * @author 黑馬程序員 */ public class Test3 { private LongAdder value = new LongAdder(); //默認(rèn)初始值0 public static void main(String[] args) throws InterruptedException { Test3 test1 = new Test3(); test1.increment(); System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value.sum()); //結(jié)果,期待值:10000,最終結(jié)果值:10000 } private void increment() throws InterruptedException { List<Thread> list = new ArrayList<>(); //啟動100個線程,每個線程對value進(jìn)行累加100次 for (int i = 0; i < 100; i++) { Thread t = new Thread(() -> { for (int j = 0; j < 100; j++) { value.increment(); } }); list.add(t); t.start(); } //保證所有線程運(yùn)行完成 for (Thread thread : list) { thread.join(); } } } ```
源碼分析
1. 先對base變量進(jìn)行cas操作,cas成功后返回
2. 對線程獲取一個hash值(調(diào)用getProbe),hash值對數(shù)組長度取模,定位到cell數(shù)組中的元素,對數(shù)組中的元素進(jìn)行cas
增加數(shù)量源碼
public void increment() { add(1L); } ``` ```java public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } ```
當(dāng)數(shù)組不為空,并且根據(jù)線程hash值定位到數(shù)組某個下標(biāo)中的元素不為空,對這個元素cas成功則直接返回,否則進(jìn)入longAccumulate方法
1. cell數(shù)組已經(jīng)初始化完成,主要是在cell數(shù)組中放元素,對cell數(shù)組進(jìn)行擴(kuò)容等操作
2. cell數(shù)組沒有初始化,則對數(shù)組進(jìn)行初始化
3. cell數(shù)組正在初始化,這時其他線程利用cas對baseCount進(jìn)行累加操作
完整代碼
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } } ```
獲取計算數(shù)量源碼
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } ```
需要注意的是,調(diào)用sum()返回的數(shù)量有可能并不是當(dāng)前的數(shù)量,因?yàn)樵谡{(diào)用sum()方法的過程中,可能有其他數(shù)組對base變量或者cell數(shù)組進(jìn)行了改動,所以需要確保所有線程運(yùn)行完再獲取就是準(zhǔn)確值
LongAdder 的前世今生
其實(shí)在 Jdk1.7 時代,LongAdder 還未誕生時,就有一些人想著自己去實(shí)現(xiàn)一個高性能的計數(shù)器了,比如一款 Java 性能監(jiān)控框架 dropwizard/metrics 就做了這樣事,在早期版本中,其優(yōu)化手段并沒有 Jdk1.8 的 LongAdder 豐富,而在 metrics 的最新版本中,其已經(jīng)使用 Jdk1.8 的 LongAdder 替換掉了自己的輪子。在最后的測評中,我們將 metrics 版本的 LongAdder 也作為一個參考對象。
應(yīng)用場景
AtomicLong等原子類的使用
并發(fā)少競爭少(讀多寫少)的計數(shù)原子操作
LongAdder 的使用
高性能計數(shù)器的首選方案, 單體項(xiàng)目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器
應(yīng)用場景功能:獲取全局自增id值
Synchronized與Lock的使用比較
Synchronized 適合少量的同步并發(fā)競爭
Lock 適合大量的同步并發(fā)競爭
總結(jié)
并發(fā)情況優(yōu)化鎖思路:
互斥鎖 -> 樂觀鎖 -> 鎖的粒度控制
在Java中對應(yīng)的實(shí)現(xiàn)方式:
ReentrantLock或者Syschronized -> CAS + Volatile -> 拆分競爭點(diǎn)(longAddr,分布式累加器,ConcurrentHashMap等)
ReentrantLock或者Syschronized 在高并發(fā)時都存在獲取鎖等待、阻塞、喚醒等操作,所以在使用的使用注意拆分競爭點(diǎn)。
AtomicLong
1. 并發(fā)量非常高,可能導(dǎo)致都在不停的爭搶該值,可能導(dǎo)致很多線程一致處于循環(huán)狀態(tài)而無法更新數(shù)據(jù),從而導(dǎo)致 CPU 資源的消耗過高。解決這個問題需要使用LongAdder
2. ABA 問題,比如說上一個線程增加了某個值,又改變了某個值,然后后面的線程以為數(shù)據(jù)沒有發(fā)生過變化,其實(shí)已經(jīng)被改動了。解決這個問題請參考《擴(kuò)展:原子更新字段類-ABA問題解決》
synchronized
synchronized鎖升級實(shí)際上是把本來的悲觀鎖變成了 在一定條件下 使用無所(同樣線程獲取相同資源的偏向鎖),以及使用樂觀(自旋鎖 cas)和一定條件下悲觀(重量級鎖)的形式。
偏向鎖:適用于單線程適用鎖的情況
輕量級鎖:適用于競爭較不激烈的情況(這和樂觀鎖的使用范圍類似)
重量級鎖:適用于競爭激烈的情況
LongAdder
- AtomicLong :并發(fā)場景下讀性能優(yōu)秀,寫性能急劇下降,不適合作為高性能的計數(shù)器方案。內(nèi)需求量少。
- LongAdder :并發(fā)場景下寫性能優(yōu)秀,讀性能由于組合求值的原因,不如直接讀值的方案,但由于計數(shù)器場景寫多讀少的緣故,整體性能在幾個方案中最優(yōu),是高性能計數(shù)器的首選方案。由于 Cells 數(shù)組以及緩存行填充的緣故,占用內(nèi)存較大。
最佳方案
高性能計數(shù)器的首選方案, 單體項(xiàng)目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器
應(yīng)用場景功能:獲取全局自增id值