Shuffle核心概念、Shuffle調(diào)優(yōu)及故障排除
六、bypass機(jī)制開啟閾值
對于SortShuffleManager,如果shuffle reduce task的數(shù)量小于某一閾值則shuffle write過程中不會進(jìn)行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會將每個task產(chǎn)生的所有臨時磁盤文件都合并成一個文件,并會創(chuàng)建單獨(dú)的索引文件。
當(dāng)你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數(shù)調(diào)大一些,大于shuffle read task的數(shù)量,那么此時map-side就不會進(jìn)行排序了,減少了排序的性能開銷,但是這種方式下,依然會產(chǎn)生大量的磁盤文件,因此shuffle write性能有待提高。
SortShuffleManager排序操作閾值的設(shè)置可以通過spark.shuffle.sort.bypassMergeThreshold這一參數(shù)進(jìn)行設(shè)置,默認(rèn)值為200,該參數(shù)的設(shè)置方法如下:
reduce端拉取數(shù)據(jù)等待間隔配置:
val conf = new SparkConf()
.set("spark.shuffle.sort.bypassMergeThreshold", "400")
數(shù)據(jù)傾斜
就是數(shù)據(jù)分到各個區(qū)的數(shù)量不太均勻,可以自定義分區(qū)器,想怎么分就怎么分。
Spark中的數(shù)據(jù)傾斜問題主要指shuffle過程中出現(xiàn)的數(shù)據(jù)傾斜問題,是由于不同的key對應(yīng)的數(shù)據(jù)量不同導(dǎo)致的不同task所處理的數(shù)據(jù)量不同的問題。
例如,reduced端一共要處理100萬條數(shù)據(jù),第一個和第二個task分別被分配到了1萬條數(shù)據(jù),計算5分鐘內(nèi)完成,第三個task分配到了98萬數(shù)據(jù),此時第三個task可能需要10個小時完成,這使得整個Spark作業(yè)需要10個小時才能運(yùn)行完成,這就是數(shù)據(jù)傾斜所帶來的后果。
注意,要區(qū)分開數(shù)據(jù)傾斜與數(shù)據(jù)過量這兩種情況,數(shù)據(jù)傾斜是指少數(shù)task被分配了絕大多數(shù)的數(shù)據(jù),因此少數(shù)task運(yùn)行緩慢;數(shù)據(jù)過量是指所有task被分配的數(shù)據(jù)量都很大,相差不多,所有task都運(yùn)行緩慢。
數(shù)據(jù)傾斜的表現(xiàn):
Spark作業(yè)的大部分task都執(zhí)行迅速,只有有限的幾個task執(zhí)行的非常慢,此時可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)可以運(yùn)行,但是運(yùn)行得非常慢;Spark作業(yè)的大部分task都執(zhí)行迅速,但是有的task在運(yùn)行過程中會突然報出OOM,反復(fù)執(zhí)行幾次都在某一個task報出OOM錯誤,此時可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)無法正常運(yùn)行。定位數(shù)據(jù)傾斜問題:查閱代碼中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根據(jù)代碼邏輯判斷此處是否會出現(xiàn)數(shù)據(jù)傾斜;查看Spark作業(yè)的log文件,log文件對于錯誤的記錄會精確到代碼的某一行,可以根據(jù)異常定位到的代碼位置來明確錯誤發(fā)生在第幾個stage,對應(yīng)的shuffle算子是哪一個;1. 預(yù)聚合原始數(shù)據(jù)
1. 避免shuffle過程
絕大多數(shù)情況下,Spark作業(yè)的數(shù)據(jù)來源都是Hive表,這些Hive表基本都是經(jīng)過ETL之后的昨天的數(shù)據(jù)。為了避免數(shù)據(jù)傾斜,我們可以考慮避免shuffle過程,如果避免了shuffle過程,那么從根本上就消除了發(fā)生數(shù)據(jù)傾斜問題的可能。
如果Spark作業(yè)的數(shù)據(jù)來源于Hive表,那么可以先在Hive表中對數(shù)據(jù)進(jìn)行聚合,例如按照key進(jìn)行分組,將同一key對應(yīng)的所有value用一種特殊的格式拼接到一個字符串里去,這樣,一個key就只有一條數(shù)據(jù)了;之后,對一個key的所有value進(jìn)行處理時,只需要進(jìn)行map操作即可,無需再進(jìn)行任何的shuffle操作。通過上述方式就避免了執(zhí)行shuffle操作,也就不可能會發(fā)生任何的數(shù)據(jù)傾斜問題。
對于Hive表中數(shù)據(jù)的操作,不一定是拼接成一個字符串,也可以是直接對key的每一條數(shù)據(jù)進(jìn)行累計計算。要區(qū)分開,處理的數(shù)據(jù)量大和數(shù)據(jù)傾斜的區(qū)別。
2. 增大key粒度(減小數(shù)據(jù)傾斜可能性,增大每個task的數(shù)據(jù)量)
如果沒有辦法對每個key聚合出來一條數(shù)據(jù),在特定場景下,可以考慮擴(kuò)大key的聚合粒度。
例如,目前有10萬條用戶數(shù)據(jù),當(dāng)前key的粒度是(省,城市,區(qū),日期),現(xiàn)在我們考慮擴(kuò)大粒度,將key的粒度擴(kuò)大為(省,城市,日期),這樣的話,key的數(shù)量會減少,key之間的數(shù)據(jù)量差異也有可能會減少,由此可以減輕數(shù)據(jù)傾斜的現(xiàn)象和問題。(此方法只針對特定類型的數(shù)據(jù)有效,當(dāng)應(yīng)用場景不適宜時,會加重數(shù)據(jù)傾斜)
2. 預(yù)處理導(dǎo)致傾斜的key
1. 過濾
如果在Spark作業(yè)中允許丟棄某些數(shù)據(jù),那么可以考慮將可能導(dǎo)致數(shù)據(jù)傾斜的key進(jìn)行過濾,濾除可能導(dǎo)致數(shù)據(jù)傾斜的key對應(yīng)的數(shù)據(jù),這樣,在Spark作業(yè)中就不會發(fā)生數(shù)據(jù)傾斜了。
2. 使用隨機(jī)key
當(dāng)使用了類似于groupByKey、reduceByKey這樣的算子時,可以考慮使用隨機(jī)key實(shí)現(xiàn)雙重聚合,如下圖所示:
隨機(jī)key實(shí)現(xiàn)雙重聚合
首先,通過map算子給每個數(shù)據(jù)的key添加隨機(jī)數(shù)前綴,對key進(jìn)行打散,將原先一樣的key變成不一樣的key,然后進(jìn)行第一次聚合,這樣就可以讓原本被一個task處理的數(shù)據(jù)分散到多個task上去做局部聚合;隨后,去除掉每個key的前綴,再次進(jìn)行聚合。
此方法對于由groupByKey、reduceByKey這類算子造成的數(shù)據(jù)傾斜有比較好的效果,僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。
此方法也是前幾種方案沒有比較好的效果時要嘗試的解決方案。
3. sample采樣對傾斜key單獨(dú)進(jìn)行join
在Spark中,如果某個RDD只有一個key,那么在shuffle過程中會默認(rèn)將此key對應(yīng)的數(shù)據(jù)打散,由不同的reduce端task進(jìn)行處理。
所以當(dāng)由單個key導(dǎo)致數(shù)據(jù)傾斜時,可有將發(fā)生數(shù)據(jù)傾斜的key單獨(dú)提取出來,組成一個RDD,然后用這個原本會導(dǎo)致傾斜的key組成的RDD和其他RDD單獨(dú)join,此時,根據(jù)Spark的運(yùn)行機(jī)制,此RDD中的數(shù)據(jù)會在shuffle階段被分散到多個task中去進(jìn)行join操作。
傾斜key單獨(dú)join的流程如下圖所示:
傾斜key單獨(dú)join流程
適用場景分析:
對于RDD中的數(shù)據(jù),可以將其轉(zhuǎn)換為一個中間表,或者是直接使用countByKey()的方式,看一下這個RDD中各個key對應(yīng)的數(shù)據(jù)量,此時如果你發(fā)現(xiàn)整個RDD就一個key的數(shù)據(jù)量特別多,那么就可以考慮使用這種方法。
當(dāng)數(shù)據(jù)量非常大時,可以考慮使用sample采樣獲取10%的數(shù)據(jù),然后分析這10%的數(shù)據(jù)中哪個key可能會導(dǎo)致數(shù)據(jù)傾斜,然后將這個key對應(yīng)的數(shù)據(jù)單獨(dú)提取出來。
不適用場景分析:
如果一個RDD中導(dǎo)致數(shù)據(jù)傾斜的key很多,那么此方案不適用。
3. 提高reduce并行度
當(dāng)方案一和方案二對于數(shù)據(jù)傾斜的處理沒有很好的效果時,可以考慮提高shuffle過程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的數(shù)量,那么每個task分配到的數(shù)據(jù)量就會相應(yīng)減少,由此緩解數(shù)據(jù)傾斜問題。
1. reduce端并行度的設(shè)置
在大部分的shuffle算子中,都可以傳入一個并行度的設(shè)置參數(shù),比如reduceByKey(500),這個參數(shù)會決定shuffle過程中reduce端的并行度,在進(jìn)行shuffle操作的時候,就會對應(yīng)著創(chuàng)建指定數(shù)量的reduce task。對于Spark SQL中的shuffle類語句,比如group by、join等,需要設(shè)置一個參數(shù),即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,該值默認(rèn)是200,對于很多場景來說都有點(diǎn)過小。
增加shuffle read task的數(shù)量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數(shù)據(jù)。
舉例來說,如果原本有5個key,每個key對應(yīng)10條數(shù)據(jù),這5個key都是分配給一個task的,那么這個task就要處理50條數(shù)據(jù)。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數(shù)據(jù),那么自然每個task的執(zhí)行時間都會變短了。
2. reduce端并行度設(shè)置存在的缺陷
提高reduce端并行度并沒有從根本上改變數(shù)據(jù)傾斜的本質(zhì)和問題(方案一和方案二從根本上避免了數(shù)據(jù)傾斜的發(fā)生),只是盡可能地去緩解和減輕shuffle reduce task的數(shù)據(jù)壓力,以及數(shù)據(jù)傾斜的問題,適用于有較多key對應(yīng)的數(shù)據(jù)量都比較大的情況。
該方案通常無法徹底解決數(shù)據(jù)傾斜,因?yàn)槿绻霈F(xiàn)一些極端情況,比如某個key對應(yīng)的數(shù)據(jù)量有100萬,那么無論你的task數(shù)量增加到多少,這個對應(yīng)著100萬數(shù)據(jù)的key肯定還是會分配到一個task中去處理,因此注定還是會發(fā)生數(shù)據(jù)傾斜的。所以這種方案只能說是在發(fā)現(xiàn)數(shù)據(jù)傾斜時嘗試使用的一種手段,嘗試去用最簡單的方法緩解數(shù)據(jù)傾斜而已,或者是和其他方案結(jié)合起來使用。
在理想情況下,reduce端并行度提升后,會在一定程度上減輕數(shù)據(jù)傾斜的問題,甚至基本消除數(shù)據(jù)傾斜;但是,在一些情況下,只會讓原來由于數(shù)據(jù)傾斜而運(yùn)行緩慢的task運(yùn)行速度稍有提升,或者避免了某些task的OOM問題,但是,仍然運(yùn)行緩慢,此時,要及時放棄方案三,開始嘗試后面的方案。
4. 使用map join
正常情況下,join操作都會執(zhí)行shuffle過程,并且執(zhí)行的是reduce join,也就是先將所有相同的key和對應(yīng)的value匯聚到一個reduce task中,然后再進(jìn)行join。普通join的過程如下圖所示:
普通join過程
普通的join是會走shuffle過程的,而一旦shuffle,就相當(dāng)于會將相同key的數(shù)據(jù)拉取到一個shuffle read task中再進(jìn)行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實(shí)現(xiàn)與join同樣的效果,也就是map join,此時就不會發(fā)生shuffle操作,也就不會發(fā)生數(shù)據(jù)傾斜。
注意:RDD是并不能直接進(jìn)行廣播的,只能將RDD內(nèi)部的數(shù)據(jù)通過collect拉取到Driver內(nèi)存然后再進(jìn)行廣播。
1. 核心思路:
不使用join算子進(jìn)行連接操作,而使用broadcast變量與map類算子實(shí)現(xiàn)join操作,進(jìn)而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到Driver端的內(nèi)存中來,然后對其創(chuàng)建一個broadcast變量;接著對另外一個RDD執(zhí)行map類算子,在算子函數(shù)內(nèi),從broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進(jìn)行比對,如果連接key相同的話,那么就將兩個RDD的數(shù)據(jù)用你需要的方式連接起來。
根據(jù)上述思路,根本不會發(fā)生shuffle操作,從根本上杜絕了join操作可能導(dǎo)致的數(shù)據(jù)傾斜問題。
當(dāng)join操作有數(shù)據(jù)傾斜問題并且其中一個RDD的數(shù)據(jù)量較小時,可以優(yōu)先考慮這種方式,效果非常好。
map join的過程如下圖所示:
map join過程
2. 不適用場景分析:
由于Spark的廣播變量是在每個Executor中保存一個副本,如果兩個RDD數(shù)據(jù)量都比較大,那么如果將一個數(shù)據(jù)量比較大的RDD做成廣播變量,那么很有可能會造成內(nèi)存溢出。
故障排除1. 避免OOM-out of memory
在Shuffle過程,reduce端task并不是等到map端task將其數(shù)據(jù)全部寫入磁盤后再去拉取,而是map端寫一點(diǎn)數(shù)據(jù),reduce端task就會拉取一小部分?jǐn)?shù)據(jù),然后立即進(jìn)行后面的聚合、算子函數(shù)的使用等操作。
reduce端task能夠拉取多少數(shù)據(jù),由reduce拉取數(shù)據(jù)的緩沖區(qū)buffer來決定,因?yàn)槔∵^來的數(shù)據(jù)都是先放在buffer中,然后再進(jìn)行后續(xù)的處理,buffer的默認(rèn)大小為48MB。
reduce端task會一邊拉取一邊計算,不一定每次都會拉滿48MB的數(shù)據(jù),可能大多數(shù)時候拉取一部分?jǐn)?shù)據(jù)就處理掉了。
雖然說增大reduce端緩沖區(qū)大小可以減少拉取次數(shù),提升Shuffle性能,但是有時map端的數(shù)據(jù)量非常大,寫出的速度非?,此時reduce端的所有task在拉取的時候,有可能全部達(dá)到自己緩沖的最大極限值,即48MB,此時,再加上reduce端執(zhí)行的聚合函數(shù)的代碼,可能會創(chuàng)建大量的對象,這可能會導(dǎo)致內(nèi)存溢出,即OOM。
如果一旦出現(xiàn)reduce端內(nèi)存溢出的問題,我們可以考慮減小reduce端拉取數(shù)據(jù)緩沖區(qū)的大小,例如減少為12MB。
在實(shí)際生產(chǎn)環(huán)境中是出現(xiàn)過這種問題的,這是典型的以性能換執(zhí)行的原理。reduce端拉取數(shù)據(jù)的緩沖區(qū)減小,不容易導(dǎo)致OOM,但是相應(yīng)的,reudce端的拉取次數(shù)增加,造成更多的網(wǎng)絡(luò)傳輸開銷,造成性能的下降。
注意,要保證任務(wù)能夠運(yùn)行,再考慮性能的優(yōu)化。
2. 避免GC導(dǎo)致的shuffle文件拉取失敗
在Spark作業(yè)中,有時會出現(xiàn)shuffle file not found的錯誤,這是非常常見的一個報錯,有時出現(xiàn)這種錯誤以后,選擇重新執(zhí)行一遍,就不再報出這種錯誤。
出現(xiàn)上述問題可能的原因是Shuffle操作中,后面stage的task想要去上一個stage的task所在的Executor拉取數(shù)據(jù),結(jié)果對方正在執(zhí)行GC,執(zhí)行GC會導(dǎo)致Executor內(nèi)所有的工作現(xiàn)場全部停止,比如BlockManager、基于netty的網(wǎng)絡(luò)通信等,這就會導(dǎo)致后面的task拉取數(shù)據(jù)拉取了半天都沒有拉取到,就會報出shuffle file not found的錯誤,而第二次再次執(zhí)行就不會再出現(xiàn)這種錯誤。
可以通過調(diào)整reduce端拉取數(shù)據(jù)重試次數(shù)和reduce端拉取數(shù)據(jù)時間間隔這兩個參數(shù)來對Shuffle性能進(jìn)行調(diào)整,增大參數(shù)值,使得reduce端拉取數(shù)據(jù)的重試次數(shù)增加,并且每次失敗后等待的時間間隔加長。
JVM GC導(dǎo)致的shuffle文件拉取失敗調(diào)整數(shù)據(jù)重試次數(shù)和reduce端拉取數(shù)據(jù)時間間隔:
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
.set("spark.shuffle.io.retryWait", "60s")
3. YARN-CLIENT模式導(dǎo)致的網(wǎng)卡流量激增問題
在YARN-client模式下,Driver啟動在本地機(jī)器上,而Driver負(fù)責(zé)所有的任務(wù)調(diào)度,需要與YARN集群上的多個Executor進(jìn)行頻繁的通信。
假設(shè)有100個Executor,1000個task,那么每個Executor分配到10個task,之后,Driver要頻繁地跟Executor上運(yùn)行的1000個task進(jìn)行通信,通信數(shù)據(jù)非常多,并且通信品類特別高。這就導(dǎo)致有可能在Spark任務(wù)運(yùn)行過程中,由于頻繁大量的網(wǎng)絡(luò)通訊,本地機(jī)器的網(wǎng)卡流量會激增。
注意,YARN-client模式只會在測試環(huán)境中使用,而之所以使用YARN-client模式,是由于可以看到詳細(xì)全面的log信息,通過查看log,可以鎖定程序中存在的問題,避免在生產(chǎn)環(huán)境下發(fā)生故障。
在生產(chǎn)環(huán)境下,使用的一定是YARN-cluster模式。在YARN-cluster模式下,就不會造成本地機(jī)器網(wǎng)卡流量激增問題,如果YARN-cluster模式下存在網(wǎng)絡(luò)通信的問題,需要運(yùn)維團(tuán)隊進(jìn)行解決。
4. YARN-CLUSTER模式的JVM棧內(nèi)存溢出無法執(zhí)行問題
當(dāng)Spark作業(yè)中包含SparkSQL的內(nèi)容時,可能會碰到Y(jié)ARN-client模式下可以運(yùn)行,但是YARN-cluster模式下無法提交運(yùn)行(報出OOM錯誤)的情況。
YARN-client模式下,Driver是運(yùn)行在本地機(jī)器上的,Spark使用的JVM的PermGen的配置,是本地機(jī)器上的spark-class文件,JVM永久代的大小是128MB,這個是沒有問題的,但是在YARN-cluster模式下,Driver運(yùn)行在YARN集群的某個節(jié)點(diǎn)上,使用的是沒有經(jīng)過配置的默認(rèn)設(shè)置,PermGen永久代大小為82MB。
SparkSQL的內(nèi)部要進(jìn)行很復(fù)雜的SQL的語義解析、語法樹轉(zhuǎn)換等等,非常復(fù)雜,如果sql語句本身就非常復(fù)雜,那么很有可能會導(dǎo)致性能的損耗和內(nèi)存的占用,特別是對PermGen的占用會比較大。
所以,此時如果PermGen占用好過了82MB,但是又小于128MB,就會出現(xiàn)YARN-client模式下可以運(yùn)行,YARN-cluster模式下無法運(yùn)行的情況。
解決上述問題的方法是增加PermGen(永久代)的容量,需要在spark-submit腳本中對相關(guān)參數(shù)進(jìn)行設(shè)置,設(shè)置方法如下:
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
通過上述方法就設(shè)置了Driver永久代的大小,默認(rèn)為128MB,最大256MB,這樣就可以避免上面所說的問題。
5. 避免SparkSQL JVM棧內(nèi)存溢出
當(dāng)SparkSQL的sql語句有成百上千的or關(guān)鍵字時,就可能會出現(xiàn)Driver端的JVM棧內(nèi)存溢出。
JVM棧內(nèi)存溢出基本上就是由于調(diào)用的方法層級過多,產(chǎn)生了大量的,非常深的,超出了JVM棧深度限制的遞歸。(我們猜測SparkSQL有大量or語句的時候,在解析SQL時,例如轉(zhuǎn)換為語法樹或者進(jìn)行執(zhí)行計劃的生成的時候,對于or的處理是遞歸,or非常多時,會發(fā)生大量的遞歸)
此時,建議將一條sql語句拆分為多條sql語句來執(zhí)行,每條sql語句盡量保證100個以內(nèi)的子句。根據(jù)實(shí)際的生產(chǎn)環(huán)境試驗(yàn),一條sql語句的or關(guān)鍵字控制在100個以內(nèi),通常不會導(dǎo)致JVM棧內(nèi)存溢出。

請輸入評論內(nèi)容...
請輸入評論/評論長度6~500個字
最新活動更多
推薦專題
- 1 UALink規(guī)范發(fā)布:挑戰(zhàn)英偉達(dá)AI統(tǒng)治的開始
- 2 北電數(shù)智主辦酒仙橋論壇,探索AI產(chǎn)業(yè)發(fā)展新路徑
- 3 降薪、加班、裁員三重暴擊,“AI四小龍”已折戟兩家
- 4 “AI寒武紀(jì)”爆發(fā)至今,五類新物種登上歷史舞臺
- 5 國產(chǎn)智駕迎戰(zhàn)特斯拉FSD,AI含量差幾何?
- 6 光計算迎來商業(yè)化突破,但落地仍需時間
- 7 東陽光:2024年扭虧、一季度凈利大增,液冷疊加具身智能打開成長空間
- 8 地平線自動駕駛方案解讀
- 9 封殺AI“照騙”,“淘寶們”終于不忍了?
- 10 優(yōu)必選:營收大增主靠小件,虧損繼續(xù)又逢關(guān)稅,能否乘機(jī)器人東風(fēng)翻身?