Spark調(diào)優(yōu)之RDD算子調(diào)優(yōu)(面試常問,建議收藏)
Spark調(diào)優(yōu)之RDD算子調(diào)優(yōu)
不廢話,直接進入正題!
1. RDD復用
在對RDD進行算子時,要避免相同的算子和計算邏輯之下對RDD進行重復的計算,如下圖所示:
RDD的重復計算
對上圖中的RDD計算架構進行修改,得到如下圖所示的優(yōu)化結果:
RDD架構優(yōu)化
2. 盡早filter
獲取到初始RDD后,應該考慮盡早地過濾掉不需要的數(shù)據(jù),進而減少對內(nèi)存的占用,從而提升Spark作業(yè)的運行效率。
3. 讀取大量小文件-用wholeTextFiles
當我們將一個文本文件讀取為 RDD 時,輸入的每一行都會成為RDD的一個元素。
也可以將多個完整的文本文件一次性讀取為一個pairRDD,其中鍵是文件名,值是文件內(nèi)容。
val input:RDD[String] = sc.textFile("dir.log")
如果傳遞目錄,則將目錄下的所有文件讀取作為RDD。文件路徑支持通配符。
但是這樣對于大量的小文件讀取效率并不高,應該使用 wholeTextFiles
返回值為RDD[(String, String)],其中Key是文件的名稱,Value是文件的內(nèi)容。
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])
wholeTextFiles讀取小文件:
val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\data\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\r\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
4. mapPartition和foreachPartition
mapPartitions
map(_….) 表示每一個元素
mapPartitions(_….) 表示每個分區(qū)的數(shù)據(jù)組成的迭代器
普通的map算子對RDD中的每一個元素進行操作,而mapPartitions算子對RDD中每一個分區(qū)進行操作。
如果是普通的map算子,假設一個partition有1萬條數(shù)據(jù),那么map算子中的function要執(zhí)行1萬次,也就是對每個元素進行操作。
map 算子
如果是mapPartition算子,由于一個task處理一個RDD的partition,那么一個task只會執(zhí)行一次function,function一次接收所有的partition數(shù)據(jù),效率比較高。
mapPartition 算子
比如,當要把RDD中的所有數(shù)據(jù)通過JDBC寫入數(shù)據(jù),如果使用map算子,那么需要對RDD中的每一個元素都創(chuàng)建一個數(shù)據(jù)庫連接,這樣對資源的消耗很大,如果使用mapPartitions算子,那么針對一個分區(qū)的數(shù)據(jù),只需要建立一個數(shù)據(jù)庫連接。
mapPartitions算子也存在一些缺點:對于普通的map操作,一次處理一條數(shù)據(jù),如果在處理了2000條數(shù)據(jù)后內(nèi)存不足,那么可以將已經(jīng)處理完的2000條數(shù)據(jù)從內(nèi)存中垃圾回收掉;但是如果使用mapPartitions算子,但數(shù)據(jù)量非常大時,function一次處理一個分區(qū)的數(shù)據(jù),如果一旦內(nèi)存不足,此時無法回收內(nèi)存,就可能會OOM,即內(nèi)存溢出。
因此,mapPartitions算子適用于數(shù)據(jù)量不是特別大的時候,此時使用mapPartitions算子對性能的提升效果還是不錯的。(當數(shù)據(jù)量很大的時候,一旦使用mapPartitions算子,就會直接OOM)
在項目中,應該首先估算一下RDD的數(shù)據(jù)量、每個partition的數(shù)據(jù)量,以及分配給每個Executor的內(nèi)存資源,如果資源允許,可以考慮使用mapPartitions算子代替map。
foreachPartition
rrd.foreache(_….) 表示每一個元素
rrd.forPartitions(_….) 表示每個分區(qū)的數(shù)據(jù)組成的迭代器
在生產(chǎn)環(huán)境中,通常使用foreachPartition算子來完成數(shù)據(jù)庫的寫入,通過foreachPartition算子的特性,可以優(yōu)化寫數(shù)據(jù)庫的性能。
如果使用foreach算子完成數(shù)據(jù)庫的操作,由于foreach算子是遍歷RDD的每條數(shù)據(jù),因此,每條數(shù)據(jù)都會建立一個數(shù)據(jù)庫連接,這是對資源的極大浪費,因此,對于寫數(shù)據(jù)庫操作,我們應當使用foreachPartition算子。
與mapPartitions算子非常相似,foreachPartition是將RDD的每個分區(qū)作為遍歷對象,一次處理一個分區(qū)的數(shù)據(jù),也就是說,如果涉及數(shù)據(jù)庫的相關操作,一個分區(qū)的數(shù)據(jù)只需要創(chuàng)建一次數(shù)據(jù)庫連接,如下圖所示:
foreachPartition 算子
使用了foreachPartition 算子后,可以獲得以下的性能提升:
對于我們寫的function函數(shù),一次處理一整個分區(qū)的數(shù)據(jù);
對于一個分區(qū)內(nèi)的數(shù)據(jù),創(chuàng)建唯一的數(shù)據(jù)庫連接;
只需要向數(shù)據(jù)庫發(fā)送一次SQL語句和多組參數(shù);
在生產(chǎn)環(huán)境中,全部都會使用foreachPartition算子完成數(shù)據(jù)庫操作。foreachPartition算子存在一個問題,與mapPartitions算子類似,如果一個分區(qū)的數(shù)據(jù)量特別大,可能會造成OOM,即內(nèi)存溢出。

請輸入評論內(nèi)容...
請輸入評論/評論長度6~500個字
最新活動更多
推薦專題
- 1 UALink規(guī)范發(fā)布:挑戰(zhàn)英偉達AI統(tǒng)治的開始
- 2 北電數(shù)智主辦酒仙橋論壇,探索AI產(chǎn)業(yè)發(fā)展新路徑
- 3 降薪、加班、裁員三重暴擊,“AI四小龍”已折戟兩家
- 4 “AI寒武紀”爆發(fā)至今,五類新物種登上歷史舞臺
- 5 國產(chǎn)智駕迎戰(zhàn)特斯拉FSD,AI含量差幾何?
- 6 光計算迎來商業(yè)化突破,但落地仍需時間
- 7 東陽光:2024年扭虧、一季度凈利大增,液冷疊加具身智能打開成長空間
- 8 地平線自動駕駛方案解讀
- 9 封殺AI“照騙”,“淘寶們”終于不忍了?
- 10 優(yōu)必選:營收大增主靠小件,虧損繼續(xù)又逢關稅,能否乘機器人東風翻身?