訂閱
糾錯
加入自媒體

Spark調(diào)優(yōu)之RDD算子調(diào)優(yōu)(面試常問,建議收藏)

2021-03-13 08:59
園陌
關注

Spark調(diào)優(yōu)之RDD算子調(diào)優(yōu)

不廢話,直接進入正題!

1. RDD復用

在對RDD進行算子時,要避免相同的算子和計算邏輯之下對RDD進行重復的計算,如下圖所示:

RDD的重復計算

對上圖中的RDD計算架構進行修改,得到如下圖所示的優(yōu)化結果:

RDD架構優(yōu)化

2. 盡早filter

獲取到初始RDD后,應該考慮盡早地過濾掉不需要的數(shù)據(jù),進而減少對內(nèi)存的占用,從而提升Spark作業(yè)的運行效率。

3. 讀取大量小文件-用wholeTextFiles

當我們將一個文本文件讀取為 RDD 時,輸入的每一行都會成為RDD的一個元素。

也可以將多個完整的文本文件一次性讀取為一個pairRDD,其中鍵是文件名,值是文件內(nèi)容。

val input:RDD[String] = sc.textFile("dir.log")

如果傳遞目錄,則將目錄下的所有文件讀取作為RDD。文件路徑支持通配符。

但是這樣對于大量的小文件讀取效率并不高,應該使用 wholeTextFiles
返回值為RDD[(String, String)],其中Key是文件的名稱,Value是文件的內(nèi)容。

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

wholeTextFiles讀取小文件:

val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\data\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\r\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)

4. mapPartition和foreachPartition

mapPartitions

map(_….)  表示每一個元素

mapPartitions(_….)  表示每個分區(qū)的數(shù)據(jù)組成的迭代器

普通的map算子對RDD中的每一個元素進行操作,而mapPartitions算子對RDD中每一個分區(qū)進行操作。

如果是普通的map算子,假設一個partition有1萬條數(shù)據(jù),那么map算子中的function要執(zhí)行1萬次,也就是對每個元素進行操作。

map 算子

如果是mapPartition算子,由于一個task處理一個RDD的partition,那么一個task只會執(zhí)行一次function,function一次接收所有的partition數(shù)據(jù),效率比較高。

mapPartition 算子

比如,當要把RDD中的所有數(shù)據(jù)通過JDBC寫入數(shù)據(jù),如果使用map算子,那么需要對RDD中的每一個元素都創(chuàng)建一個數(shù)據(jù)庫連接,這樣對資源的消耗很大,如果使用mapPartitions算子,那么針對一個分區(qū)的數(shù)據(jù),只需要建立一個數(shù)據(jù)庫連接。

mapPartitions算子也存在一些缺點:對于普通的map操作,一次處理一條數(shù)據(jù),如果在處理了2000條數(shù)據(jù)后內(nèi)存不足,那么可以將已經(jīng)處理完的2000條數(shù)據(jù)從內(nèi)存中垃圾回收掉;但是如果使用mapPartitions算子,但數(shù)據(jù)量非常大時,function一次處理一個分區(qū)的數(shù)據(jù),如果一旦內(nèi)存不足,此時無法回收內(nèi)存,就可能會OOM,即內(nèi)存溢出。

因此,mapPartitions算子適用于數(shù)據(jù)量不是特別大的時候,此時使用mapPartitions算子對性能的提升效果還是不錯的。(當數(shù)據(jù)量很大的時候,一旦使用mapPartitions算子,就會直接OOM)

在項目中,應該首先估算一下RDD的數(shù)據(jù)量、每個partition的數(shù)據(jù)量,以及分配給每個Executor的內(nèi)存資源,如果資源允許,可以考慮使用mapPartitions算子代替map。

foreachPartition

rrd.foreache(_….) 表示每一個元素

rrd.forPartitions(_….)  表示每個分區(qū)的數(shù)據(jù)組成的迭代器

在生產(chǎn)環(huán)境中,通常使用foreachPartition算子來完成數(shù)據(jù)庫的寫入,通過foreachPartition算子的特性,可以優(yōu)化寫數(shù)據(jù)庫的性能。

如果使用foreach算子完成數(shù)據(jù)庫的操作,由于foreach算子是遍歷RDD的每條數(shù)據(jù),因此,每條數(shù)據(jù)都會建立一個數(shù)據(jù)庫連接,這是對資源的極大浪費,因此,對于寫數(shù)據(jù)庫操作,我們應當使用foreachPartition算子。

與mapPartitions算子非常相似,foreachPartition是將RDD的每個分區(qū)作為遍歷對象,一次處理一個分區(qū)的數(shù)據(jù),也就是說,如果涉及數(shù)據(jù)庫的相關操作,一個分區(qū)的數(shù)據(jù)只需要創(chuàng)建一次數(shù)據(jù)庫連接,如下圖所示:

foreachPartition 算子

使用了foreachPartition 算子后,可以獲得以下的性能提升:

對于我們寫的function函數(shù),一次處理一整個分區(qū)的數(shù)據(jù);

對于一個分區(qū)內(nèi)的數(shù)據(jù),創(chuàng)建唯一的數(shù)據(jù)庫連接;

只需要向數(shù)據(jù)庫發(fā)送一次SQL語句和多組參數(shù);

在生產(chǎn)環(huán)境中,全部都會使用foreachPartition算子完成數(shù)據(jù)庫操作。foreachPartition算子存在一個問題,與mapPartitions算子類似,如果一個分區(qū)的數(shù)據(jù)量特別大,可能會造成OOM,即內(nèi)存溢出。

1  2  下一頁>  
聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯(lián)系舉報。

發(fā)表評論

0條評論,0人參與

請輸入評論內(nèi)容...

請輸入評論/評論長度6~500個字

您提交的評論過于頻繁,請輸入驗證碼繼續(xù)

暫無評論

暫無評論

    掃碼關注公眾號
    OFweek人工智能網(wǎng)
    獲取更多精彩內(nèi)容
    文章糾錯
    x
    *文字標題:
    *糾錯內(nèi)容:
    聯(lián)系郵箱:
    *驗 證 碼:

    粵公網(wǎng)安備 44030502002758號