俞華鋒
(浙江經(jīng)濟(jì)職業(yè)技術(shù)學(xué)院,浙江 杭州 310018)
在大數(shù)據(jù)處理領(lǐng)域中,Spark 云平臺越來越受到歡迎, 現(xiàn)在已經(jīng)演變成一個高速發(fā)展應(yīng)用廣泛的計(jì)算平臺,在各大電子商務(wù)網(wǎng)站都有使用。 Spark 云平臺適用于大數(shù)據(jù)處理的各個場合,與Hadoop 平臺有相似的地方, 但優(yōu)于它的是,Spark 計(jì)算的中間結(jié)果可以保存在內(nèi)存中, 從而不再需要讀寫HDFS, 因此Spark 能更好地適用于大數(shù)據(jù)領(lǐng)域的離線批處理、 數(shù)據(jù)挖掘、 機(jī)器學(xué)習(xí)、SQL 類處理、流式/實(shí)時計(jì)算、圖計(jì)算等各種不同類型的需要迭代計(jì)算的地方。
通過Spark 云平臺可以使得處理大數(shù)據(jù)的任務(wù)執(zhí)行的很快, 處理性能和效率很高。 當(dāng)然如果我們要使用Spark 開發(fā)出高效率和高性能的云計(jì)算平臺, 就必須對其各個方面進(jìn)行合理的設(shè)置和優(yōu)化, 否則Spark平臺的執(zhí)行效率可能會很低。 因此,如果要發(fā)揮Spark本身的優(yōu)勢, 就必須對其各個方面進(jìn)行綜合分析,并進(jìn)行合理的設(shè)置和優(yōu)化, 才能提高其性能。 本文主要探討如何設(shè)置和優(yōu)化Spark 平臺,來提高其性能,從而來提高大數(shù)據(jù)計(jì)算作業(yè)的執(zhí)行速度和執(zhí)行效率。
Spark 云平臺是處理Stream 數(shù)據(jù)的框架, 它是將數(shù)據(jù)分割成很小的時間片斷,以batch 批量處理的方式來處理Stream 數(shù)據(jù)。 這種批量處理的方式使得它可以同時兼顧實(shí)時和批量數(shù)據(jù)處理的邏輯和算法, 方便了需要將歷史和實(shí)時數(shù)據(jù)進(jìn)行挖掘和分析的應(yīng)用場合。Spark 云平臺通過序列化及類加載機(jī)制, 運(yùn)行在JAVA虛擬機(jī)上, 采用分布式方式執(zhí)行各個任務(wù)。 執(zhí)行任務(wù)的流程如圖1 所示。
圖1 執(zhí)行任務(wù)的流程圖
在Client 端機(jī)子上通過spark-submit 命令提交任務(wù)給 Master 機(jī)子后, 就會啟動一個屬于該任務(wù)的Driver進(jìn)程。 Driver 進(jìn)程根據(jù)部署模式,可能在本地機(jī)子上啟動,也可能在分布式集群中的某個Worker 工作節(jié)點(diǎn)上啟動。 Driver 進(jìn)程啟動以后,它向集群管理器申請執(zhí)行Spark 任務(wù)所需的資源Executor 進(jìn)程。 集群管理器收到Driver 進(jìn)程的申請后, 會根據(jù)申請的參數(shù), 在不同的Worker 節(jié)點(diǎn)上,啟動相應(yīng)數(shù)量的Executor 進(jìn)程。
Driver 進(jìn)程申請到了所需的資源之后,便開始調(diào)度和執(zhí)行任務(wù)代碼。 Driver 進(jìn)程首先將Spark 的任務(wù)代碼拆分成很多部分,每一部分稱之為stage,即每個stage對應(yīng)一部分代碼。 同時每個stage 新建一批子任務(wù),然后由各個Worker 節(jié)點(diǎn)上的Executor 進(jìn)程來執(zhí)行這些子任務(wù)。 第一個stage 的所有子任務(wù)執(zhí)行完畢之后,就將中間計(jì)算結(jié)果存儲到Worker 節(jié)點(diǎn)的本地磁盤中。 一個stag 執(zhí)行完以后,Driver 進(jìn)程就會執(zhí)行下一個stage,一直執(zhí)行到全部任務(wù)執(zhí)行完畢, 并且計(jì)算完所有的數(shù)據(jù)才停止。
Spark 的性能優(yōu)化, 需要我們根據(jù)不同的大數(shù)據(jù)應(yīng)用場景,對Spark 的各項(xiàng)任務(wù)進(jìn)行綜合的考慮,并提供多方面的技術(shù)解決方案來進(jìn)行優(yōu)化, 才能獲得最佳性能。 本文主要從開發(fā)Spark 任務(wù)時的優(yōu)化、資源調(diào)度時的優(yōu)化設(shè)置和數(shù)據(jù)傾斜時的優(yōu)化處理這三個方面進(jìn)行探討。
在開發(fā)Spark 任務(wù)時,應(yīng)根據(jù)具體的業(yè)務(wù)以及實(shí)際的應(yīng)用場景, 將一些性能優(yōu)化的基本原則靈活地運(yùn)用到Spark 任務(wù)中,例如,避免RDD 的重復(fù)設(shè)計(jì),合理的配置Spark 的各個參數(shù)以及一些特殊操作的優(yōu)化等等。
在開發(fā)一個Spark 任務(wù)時,首先根據(jù)任務(wù)相對應(yīng)的數(shù)據(jù)源創(chuàng)建一個初始的彈性分布式數(shù)據(jù)集RDD,然后對創(chuàng)建的這個RDD 執(zhí)行映射或歸約的操作,得到下一個中間的彈性分布式數(shù)據(jù)集RDD,然后對中間的RDD再執(zhí)行映射或歸約的操作, 直到計(jì)算出最終的結(jié)果。在上述的循環(huán)往復(fù)的操作過程中,不同的 RDD 會通過映射或歸約操作得到一系列的RDD 串。對于同一份數(shù)據(jù)源一般只應(yīng)該創(chuàng)建一個RDD, 如果創(chuàng)建了多個RDD, Spark 云平臺會對不同的RDD 分別進(jìn)行計(jì)算,得到的結(jié)果相似,失去了參考價值,因此增加了Spark 任務(wù)的資源開銷。
在設(shè)計(jì)RDD 時, 除了上述原則外, 還要在對不同的數(shù)據(jù)執(zhí)行映射或歸約的操作時, 盡量地復(fù)用同一個已經(jīng)存在的RDD。 例如,已經(jīng)創(chuàng)建了一個
根據(jù)具體的業(yè)務(wù)開發(fā)好Spark 任務(wù)代碼后,就應(yīng)該為相應(yīng)的任務(wù)配置相應(yīng)的資源。 我們可以通過sparksubmit 命令來設(shè)置特定任務(wù)的資源參數(shù)。 如果資源參數(shù)設(shè)置不合理, 就會導(dǎo)致集群的資源沒有發(fā)揮應(yīng)有的性能, 任務(wù)執(zhí)行會比較緩慢。 如果設(shè)置的資源參數(shù)過大,超過了集群能夠提供的極限,就會出現(xiàn)各種異常??傊灾?,資源參數(shù)要設(shè)置合理,否則就會導(dǎo)致Spark任務(wù)的執(zhí)行效率低下, 無法達(dá)到預(yù)期的性能。 因此我們需要對資源參數(shù)進(jìn)行設(shè)置和優(yōu)化處理。
怎么樣對Spark 的資源參數(shù)進(jìn)行優(yōu)化配置呢?主要是通過調(diào)節(jié)和優(yōu)化num-executors 和executor-memory 等參數(shù),來提高資源使用的效率,發(fā)揮集群的優(yōu)勢,從而提升Spark 任務(wù)的執(zhí)行性能。
num-executors 參數(shù)的作用是,設(shè)置Spark 執(zhí)行一個任務(wù)時需要執(zhí)行多少個Executor 進(jìn)程。 Driver 進(jìn)程在向Spark 云平臺申請資源時,系統(tǒng)會按照num-executors 參數(shù)設(shè)置的數(shù)量,在各個worker 工作節(jié)點(diǎn)上,啟動numexecutors 個Executor 進(jìn)程。 這個參數(shù)如果不設(shè)置的話,系統(tǒng)只會啟動很少量的Executor 進(jìn)程, 這樣就會導(dǎo)致運(yùn)行效率非常低,速度非常慢。num-executors 參數(shù)一般設(shè)置50~100 比較合適,執(zhí)行任務(wù)時集群管理器會啟動50~100 個左右的Executor 進(jìn)程, 大部分隊(duì)列可以得到充分的資源,達(dá)到性能最優(yōu)化。 當(dāng)然如果設(shè)置的太少,就發(fā)揮不了集群資源的優(yōu)勢,造成資源浪費(fèi)。
executor-memory 參數(shù)用于設(shè)置Executor 進(jìn)程的內(nèi)存。 Executor 內(nèi)存的大小很大程度上影響著Spark 任務(wù)執(zhí)行的速度。 我們可以把Executor 進(jìn)程的內(nèi)存大小設(shè)置為4-8G, 具體設(shè)置多少還得根據(jù)資源隊(duì)列的最大內(nèi)存限制是多少。 num-executors 和executor-memory 的積就是某個Spark 任務(wù)執(zhí)行的總內(nèi)存量, 如果超過了隊(duì)列的最大內(nèi)存量,性能也會下降。
在大數(shù)據(jù)業(yè)務(wù)處理中經(jīng)常會遇到的問題是數(shù)據(jù)傾斜。 例如, 在進(jìn)行shuffle 操作時, 可能會出現(xiàn)這種情況,大部分key 對應(yīng)幾條數(shù)據(jù),系統(tǒng)很快就處理完了,但是個別key 可能對應(yīng)了百萬級別的數(shù)據(jù), 系統(tǒng)可能需要花費(fèi)很長時間來處理。 而最長的task 花費(fèi)的時間決定了整個Spark 任務(wù)的執(zhí)行時間,此時的Spark 任務(wù)的執(zhí)行時間會很長。 數(shù)據(jù)傾斜的優(yōu)化處理就是使用各種解決方案來解決數(shù)據(jù)傾斜的問題, 以縮短任務(wù)的執(zhí)行時間,從而保證Spark 的執(zhí)行效率和性能。
由于大數(shù)據(jù)計(jì)算業(yè)務(wù)的需要, 經(jīng)常會對Hive 數(shù)據(jù)源執(zhí)行分析操作。 由于Hive 數(shù)據(jù)源中的數(shù)據(jù)不均勻,出現(xiàn)數(shù)據(jù)傾斜的幾率非常大, 在這種情況下, 我們可以先對數(shù)據(jù)根據(jù)key 進(jìn)行聚合操作, 即所謂的ETL 預(yù)處理, 然后,Spark 再對ETL 預(yù)處理之后的數(shù)據(jù)進(jìn)行處理。 由于Spark 處理的數(shù)據(jù)是聚合后的數(shù)據(jù),它就不需要使用原先的shuffle 操作了,也不會發(fā)生數(shù)據(jù)傾斜了。
如果在shuffle 操作時, 就少數(shù)幾個鍵值會造成數(shù)據(jù)傾斜, 當(dāng)然這少數(shù)幾個鍵值對任務(wù)本身的影響不大的話, 我們可以過濾掉這幾個鍵值。 因?yàn)檫@些鍵值被丟棄了,就不參加運(yùn)算了,也就不會產(chǎn)生數(shù)據(jù)傾斜。 例如, 我們可以使用where 子句過濾掉上述的鍵值,在Spark Core 中對RDD 也執(zhí)行相同的過濾操作, 過濾掉產(chǎn)生數(shù)據(jù)傾斜的鍵值。
本文首先闡述了Spark 云平臺性能優(yōu)化的意義,然后闡述了Spark 運(yùn)行的基本原理,最后探討了Spark 云平臺性能的優(yōu)化方法。 希望能對Spark 云平臺的研究提供一定的參考。 當(dāng)然本文只是簡單的提出了性能優(yōu)化的一些方法, 具體的實(shí)現(xiàn)和優(yōu)化處理的方法有待進(jìn)一步的研究與完善。