吳恩慈
(上海淇毓信息科技有限公司,上海 200120)
Spark 由加州大學(xué)伯克利分校AMP 實(shí)驗(yàn)室開(kāi)發(fā),是基于RDD 的內(nèi)存計(jì)算框架,在流式計(jì)算場(chǎng)景中表現(xiàn)良好.在Spark 分布式流計(jì)算集群的實(shí)踐過(guò)程中,往往出現(xiàn)數(shù)據(jù)分配不均勻的現(xiàn)象,某個(gè)分區(qū)的數(shù)據(jù)顯著多于其它分區(qū),該節(jié)點(diǎn)的計(jì)算速度成為整個(gè)集群的性能瓶頸.任務(wù)執(zhí)行慢的節(jié)點(diǎn)往往導(dǎo)致內(nèi)存溢出,服務(wù)器CPU 使用率在短時(shí)間內(nèi)急劇增加.
發(fā)生數(shù)據(jù)傾斜時(shí),某個(gè)任務(wù)處理的數(shù)據(jù)量遠(yuǎn)大于其他分片,從而增加了整個(gè)階段的完成時(shí)間.由于原始數(shù)據(jù)源的分布不均勻,每個(gè)Reducer 在分區(qū)映射過(guò)程中計(jì)算的數(shù)據(jù)量也不相同,任務(wù)執(zhí)行時(shí)間不同,增加寬依賴階段的延遲,降低集群作業(yè)執(zhí)行的效率.雖然支持用戶定義分區(qū),但真正的數(shù)據(jù)分布難以預(yù)測(cè),無(wú)法保證自定義分區(qū)功能的合理性和準(zhǔn)確性.數(shù)據(jù)傾斜問(wèn)題很難規(guī)避,為有效改善數(shù)據(jù)傾斜問(wèn)題,本文在分析和研究國(guó)內(nèi)外對(duì)該問(wèn)題的研究成果和實(shí)踐經(jīng)驗(yàn)的基礎(chǔ)上,主要做了以下工作.
1)研究了Spark Shuffle 設(shè)計(jì)和算法實(shí)現(xiàn),分析了哈希和排序兩類(lèi)Shuffle 機(jī)制的實(shí)現(xiàn)過(guò)程,深入分析在Shuffle 過(guò)程發(fā)生數(shù)據(jù)傾斜的本質(zhì)原因.
2)分析了Spark 流計(jì)算集群中,發(fā)生數(shù)據(jù)傾斜常見(jiàn)業(yè)務(wù)場(chǎng)景,以及數(shù)據(jù)傾斜問(wèn)題的原因和發(fā)生過(guò)程,提供了問(wèn)題定位的方法和步驟.
3)提出了Broadcast 機(jī)制避免某些場(chǎng)景下Shuffle導(dǎo)致的數(shù)據(jù)傾斜問(wèn)題的方法,給出廣播變量分發(fā)機(jī)制和算法實(shí)現(xiàn).通過(guò)Broadcast 實(shí)現(xiàn)Join 算子的實(shí)驗(yàn),驗(yàn)證了該方法在性能上有較大提升.
MapReduce 框架數(shù)據(jù)傾斜解決方案包括靜態(tài)和動(dòng)態(tài)自適應(yīng)調(diào)整.靜態(tài)自適應(yīng)是事先分析數(shù)據(jù)集中鍵的分布特征,選擇適當(dāng)?shù)姆謪^(qū)方法.該方法需要總結(jié)出一套和業(yè)務(wù)相關(guān)的數(shù)據(jù)集抽象規(guī)則算法,可以通過(guò)機(jī)器學(xué)習(xí)的方式來(lái)訓(xùn)練算法,提高對(duì)各種類(lèi)型數(shù)據(jù)集的適配度.文獻(xiàn)[1]提出的LEEN 方法,在Skew Reduce 的基礎(chǔ)上將分析過(guò)程移至Map 完成后,考慮Reduce 端輸入數(shù)據(jù)的公平性和數(shù)據(jù)本地性,LEEN 方法在緩解慢任務(wù)和Shuffle 引起的網(wǎng)絡(luò)擁塞方面有較大的性能提升[1].
動(dòng)態(tài)自適應(yīng)的思想是在應(yīng)用運(yùn)行時(shí),實(shí)時(shí)檢測(cè)當(dāng)前各個(gè)Partition 中的數(shù)據(jù)填充情況.如果發(fā)現(xiàn)存在嚴(yán)重的數(shù)據(jù)傾斜問(wèn)題,在下一步進(jìn)行調(diào)整,以避免原始MapReduce 任務(wù)之間的不能全局優(yōu)化的問(wèn)題.文獻(xiàn)[2]提出了Skew Tune 方法,推測(cè)當(dāng)前任務(wù)需要完成時(shí)間,確定最可能的慢任務(wù)[2].掃描慢任務(wù)要處理的輸入數(shù)據(jù),在數(shù)據(jù)偏斜場(chǎng)景中性能有較好的表現(xiàn).文獻(xiàn)[3]提出在執(zhí)行流程中插入一個(gè)Intermediate Reduce(IR)的新階段,用于并行處理動(dòng)態(tài)的中間結(jié)果,使所有鍵值對(duì)都能夠利用到所有的節(jié)點(diǎn)資源[3].文獻(xiàn)[4]提出Adaptive MapReduce 策略,動(dòng)態(tài)地調(diào)整每個(gè)Mapper 處理的輸入數(shù)據(jù)量,在Mapper 端使用一個(gè)固定大小的哈希表執(zhí)行數(shù)據(jù)本地聚合[4].文獻(xiàn)[5]提出了任務(wù)重分割方法,通過(guò)監(jiān)視任務(wù)的執(zhí)行進(jìn)度,根據(jù)預(yù)定策略直接切割任務(wù)本身[5].
本文與上述研究成果的不同之處在于通過(guò)Broadcast機(jī)制將數(shù)據(jù)分發(fā)到計(jì)算節(jié)點(diǎn)中,實(shí)現(xiàn)數(shù)據(jù)本地性,從根本上避免Shuffle 過(guò)程,不需要做額外的開(kāi)發(fā)和部署.能夠快速解決特定場(chǎng)景下的數(shù)據(jù)傾斜問(wèn)題,具有較高的實(shí)用性.
分區(qū)策略(Partition)的主要作用是確定Shuffle 過(guò)程中Reducer 的數(shù)量,以及Mapper 側(cè)數(shù)據(jù)應(yīng)該分配給哪個(gè)Reducer.分區(qū)程序可以間接確定RDD 中的分區(qū)數(shù)和分區(qū)中的內(nèi)部數(shù)據(jù)記錄數(shù).Spark 內(nèi)置了Hash Partition 和Range Partition,支持自定義分區(qū)器.Hash Partition 的算法實(shí)現(xiàn)是獲取Key 的HashCode 除以子RDD 分區(qū)的數(shù)量取余.哈希分區(qū)器易于實(shí)現(xiàn)并且運(yùn)行速度快.但有一明顯的缺點(diǎn)是不關(guān)心鍵值的分布情況,其散列到不同分區(qū)的概率會(huì)因數(shù)據(jù)而有差別.
Range Partition 在一定程度上避免了該問(wèn)題.范圍分區(qū)器根據(jù)父RDD 的數(shù)據(jù)特征確定子RDD 分區(qū)的邊界,通過(guò)對(duì)數(shù)據(jù)進(jìn)行采樣和排序,將父RDD 數(shù)據(jù)劃分為M個(gè)數(shù)據(jù)塊.如果數(shù)據(jù)均勻分布,則為每個(gè)分區(qū)提取的樣本大小都很接近.如果所有數(shù)據(jù)都分配給每個(gè)分區(qū),則每個(gè)分區(qū)都會(huì)提取相同的數(shù)據(jù)量,并統(tǒng)計(jì)每個(gè)分區(qū)的實(shí)際數(shù)據(jù)量.若出現(xiàn)數(shù)據(jù)傾斜的情況,則對(duì)個(gè)別分區(qū)重新采樣.
Spark 作為MapReduce 框架的一種實(shí)現(xiàn),本質(zhì)上將Mapper 端輸出的數(shù)據(jù)通過(guò)Partition 算法,確定發(fā)送到相應(yīng)的Reducer,該過(guò)程涉及網(wǎng)絡(luò)傳輸和磁盤(pán)讀寫(xiě).Spark提供了哈希和排序兩類(lèi)Shuffle 機(jī)制.ShuffleManager主要作用是提供Shuffle Writer 和Shuffle Reader 過(guò)程.
Hash-Based Shuffle 機(jī)制的實(shí)現(xiàn)過(guò)程包括聚合和計(jì)算,使用哈希表存儲(chǔ)所有聚合數(shù)據(jù)的處理結(jié)果.聚合和計(jì)算過(guò)程不進(jìn)行排序,分區(qū)內(nèi)部的數(shù)據(jù)是無(wú)序的,如果希望有序就需要調(diào)用排序操作.哈希Shuffle 強(qiáng)制要求在Map 端進(jìn)行聚合操作,對(duì)于某些鍵值重復(fù)率不高的數(shù)據(jù)會(huì)影響效率.
如圖1所示,哈希Shuffle 算法實(shí)現(xiàn)過(guò)程.每個(gè)Mapper 都會(huì)根據(jù)Reducer 的數(shù)量創(chuàng)建一個(gè)相應(yīng)的桶(Bucket).Mapper 生成的結(jié)果將根據(jù)設(shè)置的分區(qū)算法填充到每個(gè)桶中.當(dāng)Reducer 啟動(dòng)時(shí),它將根據(jù)自己的TaskId 和它所依賴的MapperId,從遠(yuǎn)程或本地Block Manager 獲取相應(yīng)的Bucket 作為Reducer 的輸入.Bucket 是一種抽象概念,可以對(duì)應(yīng)于某個(gè)文件或文件的一部分.哈希Shuffle 算法實(shí)現(xiàn)中,Mapper 會(huì)針對(duì)每個(gè)Reducer 生成一個(gè)數(shù)據(jù)文件,當(dāng)Mapper 和Reducer數(shù)量比較多的時(shí)候,生成大量的磁盤(pán)文件.
圖1 Hash-Based Shuffle 算法實(shí)現(xiàn)過(guò)程
開(kāi)啟consolidate 優(yōu)化機(jī)制后,Shuffle Writer 過(guò)程中Task 不是為下游Stage 的每個(gè)Task 創(chuàng)建一個(gè)文件.每個(gè)ShuffleFileGroup 對(duì)應(yīng)一批磁盤(pán)文件,磁盤(pán)文件數(shù)與下游Stage 中的Task 總數(shù)相同.Executor 分配多少個(gè)Core 就可以并行執(zhí)行多少個(gè)任務(wù),第一批并行執(zhí)行中的每一個(gè)任務(wù)都創(chuàng)建一個(gè)ShuffleFileGroup,并將數(shù)據(jù)寫(xiě)入相應(yīng)的磁盤(pán)文件.當(dāng)執(zhí)行器執(zhí)行下一批任務(wù)時(shí),復(fù)用以前存在的ShuffleFileGroup 的磁盤(pán)文件,Task 將數(shù)據(jù)寫(xiě)入現(xiàn)有磁盤(pán)文件.consolidate 機(jī)制允許不同的任務(wù)復(fù)用同一批磁盤(pán)文件,減少了文件數(shù)量并提高了Shuffle Writer 的性能.
如圖2所示,Sort-Based Shuffle 算法實(shí)現(xiàn)過(guò)程.排序Shuffle 相比于哈希Shuffle,兩者的Shuffle Reader 過(guò)程是一致的,區(qū)別在Shuffle Writer 過(guò)程.排序Shuffle 允許Map 端不進(jìn)行聚合操作,在不指定聚合操作的情況下,排序Shuffle 機(jī)制Mapper 端用數(shù)據(jù)緩存區(qū)(Buffer)存儲(chǔ)所有的數(shù)據(jù).對(duì)于指定聚合操作的情況下,排序Shuffle 仍然使用哈希表存儲(chǔ)數(shù)據(jù),聚合過(guò)程與哈希Shuffle 的基本一致.無(wú)論是Buffer 還是Hash Map,每更新一次都檢查是否需要將現(xiàn)有的數(shù)據(jù)溢存到磁盤(pán)中,需要對(duì)數(shù)據(jù)進(jìn)行排序,存儲(chǔ)到一個(gè)文件中.更新完所有數(shù)據(jù)后,將多個(gè)文件合并為一個(gè)文件,并確保每個(gè)分區(qū)的內(nèi)部數(shù)據(jù)都是有序的.
排序Shuffle 機(jī)制包括普通和bypass 兩種運(yùn)行模式.當(dāng)Shuffle Reader Task 的數(shù)量小于等于bypass Merge Threshold 的值時(shí)就會(huì)啟用bypass 模式.在普通操作模式中,首先將數(shù)據(jù)寫(xiě)入內(nèi)存數(shù)據(jù)結(jié)構(gòu),根據(jù)不同的Shuffle 運(yùn)算符選擇不同的數(shù)據(jù)結(jié)構(gòu).如ReduceByKey 之類(lèi)的聚合運(yùn)算符選擇哈希數(shù)據(jù)結(jié)構(gòu),Join 類(lèi)的普通運(yùn)算符使用數(shù)組數(shù)據(jù)結(jié)構(gòu).在將每條數(shù)據(jù)寫(xiě)入內(nèi)存之后,確定是否已達(dá)到臨界閾值.如果達(dá)到臨界閾值則將內(nèi)存中的數(shù)據(jù)寫(xiě)到磁盤(pán),然后清空內(nèi)存中的數(shù)據(jù).
在溢出到磁盤(pán)文件之前,根據(jù)Key 對(duì)內(nèi)存中已有的數(shù)據(jù)進(jìn)行排序,排序后數(shù)據(jù)將批量寫(xiě)入磁盤(pán)文件.排序后的數(shù)據(jù)以每批10 000 個(gè)批量寫(xiě)入磁盤(pán)文件,從而有效減少磁盤(pán)IO 數(shù)量.將Task 的所有數(shù)據(jù)寫(xiě)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過(guò)程中,會(huì)發(fā)生多次磁盤(pán)溢出操作,生成多個(gè)臨時(shí)文件,最后合并所有先前的臨時(shí)磁盤(pán)文件.由于一個(gè)Task 僅對(duì)應(yīng)于一個(gè)磁盤(pán)文件,因此將單獨(dú)寫(xiě)入索引文件以標(biāo)識(shí)文件中每個(gè)下游Task 數(shù)據(jù)的開(kāi)始和結(jié)束位置,磁盤(pán)文件合并過(guò)程減少了文件數(shù)量.
在bypass 模式下,為每個(gè)下游Task 創(chuàng)建一個(gè)臨時(shí)磁盤(pán)文件,并根據(jù)Key 的HashCode 寫(xiě)入相應(yīng)的磁盤(pán)文件.寫(xiě)入磁盤(pán)文件時(shí)先寫(xiě)入內(nèi)存緩沖區(qū),然后在緩沖區(qū)滿后溢出到磁盤(pán)文件.它還將所有臨時(shí)磁盤(pán)文件合并到一個(gè)文件中,并創(chuàng)建一個(gè)單獨(dú)的索引文件.此過(guò)程的磁盤(pán)寫(xiě)入機(jī)制與未優(yōu)化的哈希Shuffle 相同,但是Shuffle Reader 性能會(huì)更好.這種機(jī)制和普通的排序Shuffle 之間的區(qū)別是磁盤(pán)寫(xiě)入機(jī)制不同,不會(huì)被排序.啟用bypass 機(jī)制的最大優(yōu)點(diǎn)是在Shuffle Writer 過(guò)程中不需要執(zhí)行數(shù)據(jù)排序操作,節(jié)省了部分性能開(kāi)銷(xiāo).
圖2 Sort-based Shuffle 算法實(shí)現(xiàn)
發(fā)生數(shù)據(jù)傾斜的原因主要包括輸入數(shù)據(jù)源分布不均勻,以及計(jì)算過(guò)程中數(shù)據(jù)拉取時(shí)間不均勻.輸入數(shù)據(jù)分布不均勻的一般表現(xiàn)為原始數(shù)據(jù),或者中間臨時(shí)數(shù)據(jù)中Key 分布不均勻[6].例如Spark Streaming 通過(guò)Direct Stream 讀取Kafka 數(shù)據(jù).由于Kafka 的每個(gè)分區(qū)都對(duì)應(yīng)于Spark 的相應(yīng)任務(wù),Kafka 相關(guān)主題的分區(qū)之間的數(shù)據(jù)是否平衡,直接決定Spark 在處理數(shù)據(jù)時(shí)是否會(huì)產(chǎn)生數(shù)據(jù)偏斜.如果使用隨機(jī)分區(qū),概率上分區(qū)之間的數(shù)據(jù)是平衡的,不會(huì)生成數(shù)據(jù)傾斜.但很多業(yè)務(wù)場(chǎng)景要求將具備同一特征的數(shù)據(jù)順序消費(fèi),需要將具有相同特征的數(shù)據(jù)放于同一個(gè)分區(qū)中.一個(gè)典型的場(chǎng)景是相同的用戶相關(guān)的PV 信息放置在相同的分區(qū)中,很容易導(dǎo)致數(shù)據(jù)傾斜.
數(shù)據(jù)拉取時(shí)間不均勻一般是硬件計(jì)算能力不均勻,或者網(wǎng)絡(luò)傳輸能力不均勻造成的[7].比如PageRank 算子分為三個(gè)Stage 運(yùn)行,由于第二個(gè)Stage 產(chǎn)生了Shuffle 是最容易發(fā)生數(shù)據(jù)傾斜,每個(gè)Task 處理分區(qū)數(shù)據(jù)綁定了各個(gè)頂點(diǎn)權(quán)重,然后收集其鄰接節(jié)點(diǎn)的權(quán)重.由于Executor 需要從非本地節(jié)點(diǎn)上拉取上一個(gè)Stage 中得到的節(jié)點(diǎn)信息.如果數(shù)據(jù)分布不均勻,某些節(jié)點(diǎn)會(huì)比其他節(jié)點(diǎn)承受更大的網(wǎng)絡(luò)流量和計(jì)算壓力.數(shù)據(jù)傾斜的計(jì)算時(shí)間主要花費(fèi)在Shuffle 上,提高Shuffle 性能有利于提高應(yīng)用程序的整體性能.
通過(guò)Spark 界面觀察每個(gè)階段任務(wù)當(dāng)前分配的數(shù)據(jù)量,進(jìn)一步確定數(shù)據(jù)的不均勻分布是否導(dǎo)致了數(shù)據(jù)傾斜[8].只要在代碼中看到Shuffle 類(lèi)操作符,或在Spark SQL 語(yǔ)句中看到導(dǎo)致Shuffle 的語(yǔ)句,就可以確定劃分Stage 的邊界.Stage 1 的每個(gè)任務(wù)開(kāi)始運(yùn)行時(shí),將首先執(zhí)行Shuffler Reader 操作,從Stage 0 的每個(gè)任務(wù)中提取需要處理的Key.比如Stage 1 在執(zhí)行ReduceByKey 操作符之后計(jì)算出最終RDD,然后執(zhí)行收集算子將所有數(shù)據(jù)拉到Driver 中.
數(shù)據(jù)傾斜往往發(fā)生在Shuffle 過(guò)程中,可能會(huì)觸發(fā)Shuffle 操作的算子包括GroupByKey、ReduceByKey和AggregateByKey 等.在執(zhí)行Shuffle 時(shí)必須將每個(gè)節(jié)點(diǎn)上的相同Key 拖動(dòng)到同一個(gè)節(jié)點(diǎn)上的Task 進(jìn)行處理.如果某個(gè)Key 對(duì)應(yīng)的數(shù)據(jù)量特別大就會(huì)發(fā)生數(shù)據(jù)傾斜,Job 運(yùn)行得非常緩慢,甚至可能因?yàn)槟硞€(gè)Task 處理的數(shù)據(jù)量過(guò)大導(dǎo)致內(nèi)存溢出.
通過(guò)廣播機(jī)制將只讀變量從一個(gè)節(jié)點(diǎn)發(fā)送到其他Executor 節(jié)點(diǎn),進(jìn)程內(nèi)運(yùn)行的任務(wù)屬于同一個(gè)應(yīng)用程序,在每個(gè)執(zhí)行器節(jié)點(diǎn)上放置廣播變量可以由該節(jié)點(diǎn)的所有任務(wù)共享[9].Torrent Broadcast 算法的基本思想是將廣播變量劃分為多個(gè)數(shù)據(jù)塊.當(dāng)某個(gè)執(zhí)行器獲得數(shù)據(jù)塊時(shí),當(dāng)前執(zhí)行器被視為數(shù)據(jù)服務(wù)器節(jié)點(diǎn).隨著越來(lái)越多的執(zhí)行器獲得數(shù)據(jù)塊,更多的數(shù)據(jù)服務(wù)器節(jié)點(diǎn)可用.廣播變量可以快速傳播到所有節(jié)點(diǎn).Torrent Broadcast 讀取數(shù)據(jù)的方式與讀取緩存類(lèi)似,使用Block Manager 自帶的NIO 通信方式傳遞數(shù)據(jù),存在的問(wèn)題是慢啟動(dòng)和占內(nèi)存[10].慢啟動(dòng)指的是剛開(kāi)始數(shù)據(jù)只在Driver 節(jié)點(diǎn)上,要等執(zhí)行器獲取多輪數(shù)據(jù)塊后,數(shù)據(jù)服務(wù)節(jié)點(diǎn)才會(huì)變得可觀,后面的獲取速度才會(huì)變快.執(zhí)行器在獲取完所有數(shù)據(jù)分塊后進(jìn)行反序列化時(shí),需要將近兩倍的內(nèi)存消耗.
Driver 先把廣播變量序列化為字節(jié)數(shù)組,然后切割成BLOCK_SIZE 大小的數(shù)據(jù)塊.在數(shù)據(jù)分區(qū)和切割之后,數(shù)據(jù)分區(qū)元信息作為全局變量被存儲(chǔ)在Driver節(jié)點(diǎn)的Block Manager 中.之后每個(gè)數(shù)據(jù)分塊都做相同的操作,Block Manager Master 可以被Driver 和所有Executor 訪問(wèn)到.執(zhí)行器反序列化Task 時(shí),先詢問(wèn)所在的Block Manager 是會(huì)否包含廣播變量,若存在就直接從本地Block Manager 讀取數(shù)據(jù).否則連接到Driver 節(jié)點(diǎn)的Block Manager Master 獲取數(shù)據(jù)塊的元信息.
廣播機(jī)制把只讀變量通過(guò)共享的方式有效的提高了集群的性能.大多數(shù)Spark 作業(yè)的性能主要消耗在Shuffle 環(huán)節(jié),該環(huán)節(jié)包含了大量的磁盤(pán)IO、序列化、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)炔僮?通過(guò)廣播機(jī)制避免Shuffle,可顯著提高應(yīng)用運(yùn)行速度.例如普通Join 操作會(huì)產(chǎn)生Shuffle,RDD 中相同的Key 需要通過(guò)網(wǎng)絡(luò)拉取到同一個(gè)節(jié)點(diǎn)上.在算子函數(shù)使用廣播變量時(shí),首先會(huì)判斷當(dāng)前Task 所執(zhí)行器內(nèi)存中是否有變量副本.如果有則直接使用,如果沒(méi)有則從Driver 或者其他Executor 節(jié)點(diǎn)上遠(yuǎn)程拉取一份放到本地執(zhí)行器內(nèi)存中.廣播變量保證了每個(gè)執(zhí)行器內(nèi)存中只駐留一份變量副本,Executor中的Task 執(zhí)行時(shí)共享該變量副本,減少變量副本的數(shù)量和網(wǎng)絡(luò)傳輸?shù)男阅荛_(kāi)銷(xiāo),降低了執(zhí)行器內(nèi)存的開(kāi)銷(xiāo),降低GC 的頻率,會(huì)極大地提升集群性能.
如圖3所示,Broadcast Join 算子Scala 代碼實(shí)現(xiàn).當(dāng)在RDD 上使用Join 類(lèi)操作或者在Spark SQL 中使用聯(lián)接語(yǔ)句時(shí),普通聯(lián)接運(yùn)算符會(huì)產(chǎn)生Shuffle 過(guò)程,并將相同的Key 數(shù)據(jù)拉入Shuffle Reader Task 進(jìn)行聯(lián)接操作.如果連接操作中RDD 或表的數(shù)據(jù)量相對(duì)較小,則不使用連接運(yùn)算符而是使用廣播變量和映射類(lèi)運(yùn)算符來(lái)實(shí)現(xiàn)Join 操作,從而完全避免了Shuffle 過(guò)程和數(shù)據(jù)歪斜.較小的RDD 中的數(shù)據(jù)通過(guò)收集操作直接拉入Driver 節(jié)點(diǎn)的內(nèi)存中,創(chuàng)建廣播變量.然后從廣播變量中獲取較小RDD 的數(shù)據(jù),并在另一個(gè)RDD 上執(zhí)行映射類(lèi)運(yùn)算符.根據(jù)連接鍵比較當(dāng)前RDD 的每個(gè)數(shù)據(jù).如果連接鍵相同則將兩個(gè)RDD 的數(shù)據(jù)連接在一起,實(shí)現(xiàn)Join 操作的效果.該方法不足在于Driver 和Executor 節(jié)點(diǎn)都要存儲(chǔ)廣播變量的全部數(shù)據(jù),比較消耗內(nèi)存.
圖3 Broadcast Join 算子Scala 代碼實(shí)現(xiàn)
本文使用的實(shí)驗(yàn)環(huán)境是一個(gè)由四個(gè)節(jié)點(diǎn)組成的服務(wù)器集群.集群采用主-從體系結(jié)構(gòu),其中一個(gè)是主節(jié)點(diǎn),其他三個(gè)是從節(jié)點(diǎn).Spark Job 和Hadoop 文件系統(tǒng)部署在同一節(jié)點(diǎn)上.大部分的Spark 作業(yè)會(huì)從外部存儲(chǔ)系統(tǒng)讀取輸入數(shù)據(jù),比如Hadoop 文件系統(tǒng),將其與存儲(chǔ)系統(tǒng)放得越近越好.在相同的節(jié)點(diǎn)上安裝Spark Standalone 模式集群,并單獨(dú)配置Spark 和Hadoop 的內(nèi)存和CPU 使用以避免干擾[11].實(shí)驗(yàn)服務(wù)器集群環(huán)境的硬件配置根據(jù)數(shù)據(jù)量調(diào)整,參考官方建議初始配置如下.
單個(gè)節(jié)點(diǎn)服務(wù)器最初配置四個(gè)磁盤(pán).Spark 很多計(jì)算都在內(nèi)存中進(jìn)行,但當(dāng)數(shù)據(jù)在內(nèi)存中裝不下的時(shí)候,仍然使用本地磁盤(pán)來(lái)存儲(chǔ)數(shù)據(jù),以及在不同階段之間保留中間的輸出.在實(shí)驗(yàn)中每個(gè)計(jì)算節(jié)點(diǎn)有4-8 個(gè)磁盤(pán).集群網(wǎng)絡(luò)最初配置為萬(wàn)兆網(wǎng)卡.當(dāng)數(shù)據(jù)在內(nèi)存中時(shí),許多Spark 應(yīng)用程序都與網(wǎng)絡(luò)密切相關(guān).使用萬(wàn)兆或更高的網(wǎng)絡(luò)是使這些應(yīng)用程序更快的比較好的方法.分布式Reduce 應(yīng)用程序尤其適用,比如group-by、reduce-by 等操作.
單個(gè)節(jié)點(diǎn)服務(wù)器的初始內(nèi)存配置為16GB.Spark應(yīng)用分配75%的內(nèi)存,剩下的部分留給操作系統(tǒng)和緩沖區(qū)緩存.使用Spark 監(jiān)視UI 的Storage 選項(xiàng)卡查看內(nèi)存使用情況,內(nèi)存使用情況受存儲(chǔ)等級(jí)和序列化格式的影響很大[12].單個(gè)計(jì)算節(jié)點(diǎn)的CPU 核心數(shù)量最初配置8 個(gè)Core.Spark 在每臺(tái)機(jī)器上可以擴(kuò)展到數(shù)幾十個(gè)Core.測(cè)試階段在每臺(tái)機(jī)器上提供8-16 個(gè)內(nèi)核,根據(jù)服務(wù)器負(fù)載的消耗可以配置更多.
如圖4所示,集群運(yùn)行時(shí)軟件版本信息,圖5是集群系統(tǒng)配置信息,都是當(dāng)前業(yè)界使用的穩(wěn)定版本和配置.實(shí)驗(yàn)數(shù)據(jù)來(lái)自于某金融機(jī)構(gòu)的客戶消費(fèi)貸款逾期信息,包括逾期客戶還款信息,以及催收行動(dòng)信息.選取逾期客戶還款信息數(shù)據(jù)量相對(duì)較少,10 萬(wàn)條記錄,數(shù)據(jù)結(jié)構(gòu)中包括記錄ID、還款時(shí)間、還款金額、銀行編碼、還款卡號(hào)、客戶姓名、客戶ID 等信息.逾期客戶機(jī)構(gòu)催收行動(dòng)數(shù)據(jù)量相對(duì)較大,1000 萬(wàn)條記錄,數(shù)據(jù)結(jié)構(gòu)包括記錄ID、催收行動(dòng)碼、行動(dòng)描述、客戶電話、客戶關(guān)系、通話時(shí)長(zhǎng)、客戶ID、客戶姓名、行動(dòng)時(shí)間、催收員、催收機(jī)構(gòu)等信息.根據(jù)客戶ID 進(jìn)行Join 操作,計(jì)算指標(biāo)包括機(jī)構(gòu)催收員工作量和催收員工作業(yè)績(jī).
圖4 集群軟件版本信息
如圖6所示,Broadcast Join 算子DAG 視圖中不存在Shuffle,普通Join 算子DAG 視圖的復(fù)雜度明顯高于Broadcast Join 算子.圖6(a)是普通Join 操作的DAG 視圖,根據(jù)RDD 的寬依賴關(guān)系分為三個(gè)階段,有向無(wú)環(huán)圖描述了階段之間的依賴關(guān)系,當(dāng)前Stage 只能在父Stage 之后執(zhí)行.從DAG 視圖清晰的看到普通Join 算子存在Shuffle 過(guò)程.圖6(b)是Broadcast Join 算子有向無(wú)環(huán)圖只有一個(gè)階段,邏輯過(guò)程相對(duì)簡(jiǎn)單.
圖5 集群系統(tǒng)配置信息
圖6 Join 算子DAG 視圖
如圖7所示,Broadcast Join 算子中Task 統(tǒng)計(jì)數(shù)據(jù)表明性能上存在明顯的優(yōu)勢(shì).圖7(a)是普通Join 算子某階段Task 性能統(tǒng)計(jì)數(shù)據(jù),包括Task 持續(xù)時(shí)間、GC 執(zhí)行時(shí)間和Shuffle 數(shù)據(jù)量等3 個(gè)方面的統(tǒng)計(jì)信息.圖7(b)是Broadcast Join 算子Task 性能統(tǒng)計(jì)數(shù)據(jù),相比于普通Join 算子的Task 性能統(tǒng)計(jì)信息,在3 個(gè)方面都存在明顯的優(yōu)勢(shì).
如圖8所示,Broadcast Joins 算子各個(gè)Stage 的磁盤(pán)讀寫(xiě)和網(wǎng)絡(luò)流量、任務(wù)持續(xù)時(shí)長(zhǎng)等都存在明顯的優(yōu)勢(shì).圖8(a)是普通Join 算子各個(gè)階段執(zhí)行情況,共有3 個(gè)階段,整個(gè)過(guò)程耗時(shí)6-7 秒,Shuffle 并行度為9,涉及Shuffle Reader 數(shù)據(jù)量61.3MB,Shuffle Writer 數(shù)據(jù)量61.3 MB.如圖8(b)所示,Broadcast Join 算子只有1 個(gè)階段,沒(méi)有涉及Shuffle 數(shù)據(jù)讀寫(xiě)過(guò)程,數(shù)據(jù)輸入117.4 MB,Task 總數(shù)量明顯小于普通Join 算子的數(shù)量,執(zhí)行時(shí)間是3 秒,和普通Join 算子相比在性能上有較大的提升.
圖7 Join 操作Task 性能數(shù)據(jù)
圖8 Join 算子各個(gè)Stage 執(zhí)行情況
如圖9所示,Broadcast Joins 算子不存在數(shù)據(jù)傾斜問(wèn)題.圖9(a)是普通Join 操作任務(wù)數(shù)據(jù)分配和執(zhí)的詳細(xì)數(shù)據(jù).從列Shuffle Read Size 可以看出,任務(wù)分配出現(xiàn)了數(shù)據(jù)傾斜問(wèn)題,被分配數(shù)據(jù)量較大的Task 執(zhí)行時(shí)間明顯高于其他任務(wù)的持續(xù)時(shí)間,消耗更大的資源和網(wǎng)絡(luò)流量,其他已經(jīng)完成計(jì)算的節(jié)點(diǎn)處于等待狀態(tài).圖9(b)是Broadcast join 算子任務(wù)數(shù)據(jù)分配和執(zhí)行情況明細(xì),從持續(xù)時(shí)間列和Input Size 列看出,數(shù)據(jù)幾乎是均勻分配,8 個(gè)任務(wù)的持續(xù)時(shí)間是1 秒,1 個(gè)是0.9 秒,充分發(fā)揮了數(shù)據(jù)本地性特性,每個(gè)節(jié)點(diǎn)的計(jì)算資源都被有效利用.
如圖10所示,Broadcast Join 算子在高并發(fā)的應(yīng)用情況下性能上存在穩(wěn)定的提升.如圖10(a)所示,普通Join 算子壓測(cè)統(tǒng)計(jì),通過(guò)壓測(cè)工具10 萬(wàn)次的統(tǒng)計(jì)結(jié)果,統(tǒng)計(jì)了平均持續(xù)時(shí)間、中位數(shù)和偏差情況.從圖中看出,持續(xù)時(shí)間絕大部分相對(duì)集中和穩(wěn)定在7 秒左右.如圖10(b)所示,Broadcast join 算子壓測(cè)情況統(tǒng)計(jì),比較相同的統(tǒng)計(jì)指標(biāo)存在明顯的優(yōu)勢(shì),持續(xù)時(shí)間基本集中和穩(wěn)定在4 秒左右.
本文研究了Spark Shuffle 設(shè)計(jì)和算法實(shí)現(xiàn),分析了哈希和排序兩類(lèi)Shuffle 機(jī)制的實(shí)現(xiàn)過(guò)程,深入分析在Shuffle 過(guò)程發(fā)生數(shù)據(jù)傾斜的本質(zhì)原因.進(jìn)一步分析了Spark 流計(jì)算集群中,發(fā)生數(shù)據(jù)傾斜常見(jiàn)業(yè)務(wù)場(chǎng)景,分析數(shù)據(jù)傾斜問(wèn)題的原因和發(fā)生過(guò)程,提供了問(wèn)題定位的方法和步驟.提出了廣播機(jī)制避免某些場(chǎng)景下的數(shù)據(jù)傾斜問(wèn)題,給出廣播變量分發(fā)機(jī)制和算法實(shí)現(xiàn).通過(guò)Broadcast 實(shí)現(xiàn)Join 算子的實(shí)驗(yàn),相對(duì)于直接操作Join 算子,通過(guò)DAG 視圖、任務(wù)持續(xù)時(shí)間、Shuffle 讀寫(xiě)數(shù)據(jù)量等指標(biāo)的比較和分析,驗(yàn)證了廣播機(jī)制在性能上有較大提升,壓力測(cè)試進(jìn)一步驗(yàn)證了在大規(guī)模應(yīng)用的情況下性能有穩(wěn)定的改善.
圖9 Join 算子數(shù)據(jù)分配和執(zhí)行
圖10 Join 算子壓測(cè)數(shù)據(jù)統(tǒng)計(jì)