侯偉凡,樊 瑋,張宇翔
(中國民航大學(xué) 計(jì)算機(jī)科學(xué)與技術(shù)學(xué)院,天津 300300)
改進(jìn)的Spark Shuffle內(nèi)存分配算法
侯偉凡*,樊 瑋,張宇翔
(中國民航大學(xué) 計(jì)算機(jī)科學(xué)與技術(shù)學(xué)院,天津 300300)
Shuffle性能是影響大數(shù)據(jù)集群性能的重要指標(biāo),Spark自身的Shuffle內(nèi)存分配算法試圖為內(nèi)存池中的每一個(gè)Task平均分配內(nèi)存,但是在實(shí)驗(yàn)中發(fā)現(xiàn),由于各Task對(duì)于內(nèi)存需求的不均衡導(dǎo)致了內(nèi)存的浪費(fèi)和運(yùn)行效率較低的問題。針對(duì)上述問題,提出一種改進(jìn)的Spark Shuffle內(nèi)存分配算法。該算法根據(jù)Task的內(nèi)存申請(qǐng)量和歷史運(yùn)行數(shù)據(jù)將Task按內(nèi)存需求分為大小兩類,對(duì)小內(nèi)存需求型Task作“分割化”處理,對(duì)大內(nèi)存需求型Task基于Task溢出次數(shù)和溢出后等待時(shí)間分配內(nèi)存。該算法充分利用內(nèi)存池的空閑內(nèi)存,可以在數(shù)據(jù)傾斜導(dǎo)致的Task內(nèi)存需求不均衡的情況下進(jìn)行Task內(nèi)存分配的自適應(yīng)調(diào)節(jié)。實(shí)驗(yàn)結(jié)果表明,改進(jìn)后算法較原算法降低了Task的溢出率,減少了Task的周轉(zhuǎn)時(shí)間,提高了集群的運(yùn)行性能。
Apache Spark;Shuffle;自適應(yīng);內(nèi)存分配;運(yùn)行性能
近年來,大數(shù)據(jù)已然成為科技領(lǐng)域和企業(yè)領(lǐng)域關(guān)注的熱點(diǎn),其中蘊(yùn)含的巨大價(jià)值成為存儲(chǔ)和處理大數(shù)據(jù)的巨大動(dòng)力[1]。在大數(shù)據(jù)發(fā)展的過程中,Hadoop[2]和Spark[3]計(jì)算引擎逐漸被大多數(shù)企業(yè)和機(jī)構(gòu)認(rèn)可,其中Spark因其基于內(nèi)存的計(jì)算框架,運(yùn)算速度更快,被愈發(fā)重視。
Shuffle性能問題是很多分布式系統(tǒng)所共有的問題,究其原因是Shuffle過程會(huì)對(duì)集群的CPU、內(nèi)存、磁盤和網(wǎng)絡(luò)造成巨大負(fù)擔(dān),其中的任何一個(gè)因素都有可能成為集群運(yùn)算的瓶頸,因此改進(jìn)集群的Shuffle性能成為很多研究人員研究的目標(biāo)。目前的研究主要分為底層優(yōu)化、Map和Reduce端優(yōu)化和對(duì)MapReduce模型的改進(jìn)。
1)底層優(yōu)化:Zaharia等[4]提出一種延遲調(diào)度的算法,該算法能有效提升數(shù)據(jù)的本地化程度,從而提高集群的工作效率。
2)Map和Reduce端的優(yōu)化:Guo等[5]提出一種獨(dú)立工作的Shuffle服務(wù)——iShuffle。它可以估計(jì)Map階段輸出partition的大小,并將Map輸出的partition自動(dòng)均衡放置到各個(gè)Reduce節(jié)點(diǎn)中;還可以減少任務(wù)的排定,使得reduce tasks的調(diào)度更加靈活。熊倩等[6]提出通過對(duì)Map節(jié)點(diǎn)上同一個(gè)作業(yè)的多個(gè)Map任務(wù)所產(chǎn)生的大量臨時(shí)結(jié)果數(shù)據(jù)作合并的機(jī)制,減少了Map節(jié)點(diǎn)的輸出結(jié)果數(shù)據(jù)量,從而減少了整個(gè)MapReduce作業(yè)執(zhí)行時(shí)間,進(jìn)而提高了性能。彭輔權(quán)等[7]分別從Map端數(shù)據(jù)壓縮、重構(gòu)遠(yuǎn)程數(shù)據(jù)拷貝傳輸協(xié)議等方面優(yōu)化和重構(gòu)Shuffle過程,使Shuffle階段的運(yùn)行效率得到提高。Davidson等[8]針對(duì)Spark平臺(tái)經(jīng)常會(huì)遇到的Shuffle性能瓶頸提出了幾種應(yīng)對(duì)的替代方案,以減輕處理這些瓶頸相關(guān)的系統(tǒng)開銷。
3)MapReduce模型的改進(jìn):李玉林等[9]提出一種改進(jìn)型的MapReduce模型——MBR(Map-Balance-Reduce)模型。通過增加Balance任務(wù),對(duì)Map任務(wù)處理完成的中間數(shù)據(jù)進(jìn)行均衡操作,從而保證Reduce任務(wù)的完成時(shí)間基本一致。
目前有很多針對(duì)Hadoop Shuffle階段的改進(jìn),Spark作為基于內(nèi)存的運(yùn)算框架,在其內(nèi)部實(shí)現(xiàn)上與Hadoop有很多不同。隨著Spark版本的不斷演變,對(duì)于Spark Shuffle數(shù)據(jù)持久化的問題已經(jīng)得到了很多的優(yōu)化,但是并沒有從Task內(nèi)存分配的角度做工作。因此本文通過研究源碼,從Shuffle過程中內(nèi)存分配的角度,針對(duì)Task的內(nèi)存分配進(jìn)行改進(jìn),使Shuffle效率得到提高。
Spark內(nèi)存分配如圖1所示。Spark默認(rèn)的JVM(Java Virtual Machine)堆大小為512 MB,可以通過spark.executor.memory參數(shù)進(jìn)行調(diào)整。這里的內(nèi)存基本上是由Executor內(nèi)部的所有任務(wù)所共享。為了避免內(nèi)存溢出,因此只使用90%,可通過spark.storage.safetyFraction調(diào)整。Spark將要處理的數(shù)據(jù)存儲(chǔ)在Storage部分,這個(gè)部分占Safe的60%,可以通過spark.storage.memoryFraction控制。Shuffle可用的內(nèi)存大小占Safe的20%,由spark.shuffle.memoryFraction控制。由于Shuffle數(shù)據(jù)的大小是估算出來的,一是為了降低開銷,二是為了降低估算誤差,因此使用spark.shuffle.safetyFraction作為一個(gè)保險(xiǎn)系數(shù),降低實(shí)際Shuffle使用的內(nèi)存閾值,起到一定的緩沖作用,降低實(shí)際內(nèi)存占用超過用戶配置值的概率。默認(rèn)情況下spark.shuffle.safetyFraction的值為0.8,由此可以得出,真正用于Shuffle的內(nèi)存大小為spark.executor.memory * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction。Unroll memory用作數(shù)據(jù)的序列化和反序列化,由spark.storage.unrollFraction控制,默認(rèn)為0.2[10]。本文在此內(nèi)存分配框架下,研究圖1中用于Shuffle操作的部分。
圖1 Spark的內(nèi)存分配Fig. 1 Memory allocation of Spark
Spark Shuffle線程內(nèi)存管理器ShuffleMemoryManager負(fù)責(zé)管理Shuffle線程占有內(nèi)存的分配與釋放。Task通過ExternalAppendOnlyMap和ExternalSorter向ShuffleMemoryManager申請(qǐng)內(nèi)存,在運(yùn)行結(jié)束后由ShuffleMemoryManager回收。為了使每一個(gè)Task都能比較公平地獲取內(nèi)存,它采取的內(nèi)存分配策略是對(duì)于正在運(yùn)行的N個(gè)Task,每個(gè)Task至少可以申請(qǐng)總內(nèi)存的1/(2N),至多申請(qǐng)1/N。在運(yùn)行過程中,N會(huì)隨著內(nèi)存池中正在運(yùn)行Task的數(shù)目不斷變化。
現(xiàn)有Spark Shuffle內(nèi)存分配算法采取對(duì)內(nèi)存池中Task公平分配資源的策略,一定程度上保證了每個(gè)Task在被調(diào)度運(yùn)行時(shí)都能較為公平地獲取資源并運(yùn)行。但是它并未考慮每個(gè)Task所需的資源量,僅從Task的數(shù)量上考慮公平性,這將導(dǎo)致內(nèi)存的使用率較低,并且內(nèi)存需求較多的Task可能會(huì)頻繁溢出,甚至出現(xiàn)OutOfMemory錯(cuò)誤。
并且executor中能同時(shí)運(yùn)行的Task總數(shù)由CPU core數(shù)決定,當(dāng)內(nèi)存需求較大的Task被調(diào)度時(shí),其申請(qǐng)的內(nèi)存會(huì)因?yàn)閮?nèi)存不足而出現(xiàn)溢出,需要等待其他Task運(yùn)行完畢空出內(nèi)存后才能繼續(xù)運(yùn)行,這將大大影響整個(gè)作業(yè)的運(yùn)行效率。
此外,由于Spark Shuffle的Fetch過程是一邊Fetch數(shù)據(jù)一邊處理的,所以當(dāng)一個(gè)分區(qū)的全部數(shù)據(jù)在處理結(jié)束前是無法統(tǒng)計(jì)其數(shù)據(jù)大小的。
基于以上問題,首先將資源需求較少的Task作“分割化”處理,即將內(nèi)存資源需求較小的Task分成若干塊,這樣可使某一時(shí)刻內(nèi)存需求較少的Task所占有內(nèi)存量變少,因此內(nèi)存需求較大的Task被調(diào)度運(yùn)行時(shí)可以獲得更多內(nèi)存避免溢出。這樣做的代價(jià)是內(nèi)存需求較少Task的內(nèi)存申請(qǐng)次數(shù)會(huì)增加并有可能出現(xiàn)溢出,但其溢出量很少,處理速度很快,并且不會(huì)出現(xiàn)頻繁溢出的情況,相比于大規(guī)模的磁盤IO操作,仍然提升了Application的整體效率,尤其在數(shù)據(jù)集有數(shù)據(jù)傾斜的時(shí)候。然后本文認(rèn)為一個(gè)Task溢出次數(shù)越多且溢出后等待時(shí)間越長(zhǎng),那么它所需計(jì)算的原始數(shù)據(jù)量也就越大。基于此,本文提出一種小內(nèi)存需求型Task作“分割化”處理,大內(nèi)存需求型Task基于Task溢出次數(shù)和Task溢出后等待時(shí)間的Shuffle內(nèi)存分配算法來改進(jìn)源碼的公平分配算法。
由于要將內(nèi)存需求較小的Task作“分割化”處理,所以首先在Task申請(qǐng)內(nèi)存時(shí)需要一個(gè)指標(biāo)來區(qū)分Task是大內(nèi)存需求型Task還是小內(nèi)存需求型Task。定義該指標(biāo)為Memavg,如式(1)~(2)所示。其中:L為所有運(yùn)行結(jié)束時(shí)未發(fā)生溢出Task的集合;K為已被調(diào)入過內(nèi)存池的Task的集合。Memavg代表的是所有運(yùn)行結(jié)束時(shí)未發(fā)生溢出Task所占用內(nèi)存的平均值,當(dāng)一個(gè)新被調(diào)用Task的內(nèi)存請(qǐng)求比Memavg大時(shí),認(rèn)為該Task為大內(nèi)存需求型的Task;反之,認(rèn)為該Task為小內(nèi)存需求型的Task。
(1)
(2)
2.1.1 為小內(nèi)存需求型Task分配內(nèi)存
式(3)表明了小內(nèi)存需求型Task實(shí)際申請(qǐng)到的內(nèi)存,算法直接給予其申請(qǐng)內(nèi)存的一半與空閑內(nèi)存的最小值。
(3)
2.1.2 為大內(nèi)存需求型Task分配內(nèi)存
在源碼使用的公平分配算法中,擁有每個(gè)Task可使用內(nèi)存的最小保證。對(duì)于大內(nèi)存需求型Task,也給予它最小的內(nèi)存保證Memlow,保證其執(zhí)行效率并避免不必要的溢出操作,如式(4)所示。其中maxMemory為可使用的最大內(nèi)存,numActiveTasks是指當(dāng)前executor中正在運(yùn)行的Task數(shù)目。
(4)
公平分配算法中,有一個(gè)對(duì)Task可獲取最大內(nèi)存的限制,值為maxMemory/numActiveTasks。它不考慮Task內(nèi)存需求的已有表現(xiàn),因而不能根據(jù)不同Task的數(shù)據(jù)特征對(duì)內(nèi)存分配作出合理分配。而且它還有一個(gè)很嚴(yán)重的問題,當(dāng)一個(gè)executor中的Task滿載運(yùn)行,即numActiveTask為其最大值時(shí),如果有一個(gè)Task的內(nèi)存需求量大于maxMemory/numActiveTasks,那么它將不得不溢出,并等待大多數(shù)Task運(yùn)行完畢,即numActiveTasks變小時(shí)才能被再度運(yùn)行,這將嚴(yán)重拖延整個(gè)Application的運(yùn)行效率?;诖吮疚母鶕?jù)Task的溢出歷史和溢出后等待時(shí)間的表現(xiàn)來計(jì)算出它應(yīng)得的內(nèi)存,進(jìn)而自適應(yīng)的調(diào)整內(nèi)存分配。式(5)為空閑內(nèi)存的計(jì)算方法,其中taskMemory管理著內(nèi)存池中所有未執(zhí)行完成Task的id和已占有的內(nèi)存。
(5)
其中M為taskMemory中包含的元素集合。對(duì)空閑內(nèi)存的分配權(quán)重weight如式(6)所示,一個(gè)Task能從空閑內(nèi)存中得到的份額取決于其溢出次數(shù)占未運(yùn)行完成Task溢出總數(shù)的比值,和溢出后等待調(diào)用的時(shí)間與未運(yùn)行完成Task溢出后等待調(diào)用時(shí)間的比值,其中這兩部分的權(quán)重由a1和a2確定(a1+a2=1)。由于溢出的次數(shù)是影響Task運(yùn)行效率的主要因素,而溢出后等待時(shí)間取決于內(nèi)存池的整體狀態(tài),加上多次實(shí)驗(yàn)的經(jīng)驗(yàn),在實(shí)驗(yàn)中設(shè)a1=0.7、a2=0.3。
(6)
由式(5)~(6)可以得到Task可獲得的最大內(nèi)存閾值Memhigh,即式(7)。它由兩部分組成,至少可以獲得內(nèi)存池分給所有活動(dòng)狀態(tài)下Task的均值,此外它可額外獲得一部分空閑內(nèi)存,提高內(nèi)存的利用率,這部分的大小是通過溢出歷史和溢出后等待時(shí)間的歷史一起決定的,這使得溢出次數(shù)較多,等待時(shí)間較長(zhǎng)的Task有機(jī)會(huì)獲得更大內(nèi)存,以利于加快其執(zhí)行進(jìn)度。
(7)
此外,在公平分配算法中,若Task溢出到磁盤,那么它所占有的全部?jī)?nèi)存將被回收,若該Task是大內(nèi)存需求型Task,那么它下次被調(diào)度時(shí)很難獲得比溢出前更大的內(nèi)存資源,因此在本文中設(shè)定一個(gè)Memrelease,見式(8),它使得Task在溢出時(shí)不釋放所占有的全部?jī)?nèi)存,而是根據(jù)其溢出歷史釋放一部分內(nèi)存,這有利于該Task下次被調(diào)用時(shí)避免再次溢出,其釋放的內(nèi)存也有利于內(nèi)存池中其他Task的運(yùn)行。
taskMemory(j)
(8)
式(9)代表當(dāng)前Task所能獲得的最大內(nèi)存:
taskMemory(j)))
(9)
式(10)~(11)表明了大內(nèi)存需求型Task實(shí)際申請(qǐng)到的內(nèi)存。首先需要判斷該Task已占有的內(nèi)存是否已滿足Memlow,若已滿足則為Task分配min(maxToGrant,Memfree)的內(nèi)存,如式(10);若不滿足,則需要判斷空閑內(nèi)存能否滿足最小內(nèi)存保證Memlow,若不能滿足則需要等待內(nèi)存池中有足夠內(nèi)存時(shí)再調(diào)度運(yùn)行,若能滿足則為Task分配min(maxToGrant,Memfree)的內(nèi)存,如式(11)。
toGrantj=min(maxToGrantj,Memfree)
(10)
(11)
本文提出的算法中,使用HashMap和HashSet存放Task的各項(xiàng)歷史數(shù)據(jù):taskMemory存放未運(yùn)行完成Task的唯一編號(hào)和已占用的內(nèi)存容量;taskNotSpill記錄未發(fā)生溢出Task的編號(hào);waitingTime存放已溢出Task的編號(hào)及等待調(diào)用所耗費(fèi)的時(shí)間;spillCount記錄已溢出Task的編號(hào)及溢出次數(shù)。
如圖2為算法的運(yùn)行流程。當(dāng)Task需要內(nèi)存資源來保證其運(yùn)行時(shí),Task就會(huì)向ShuffleMemoryManager發(fā)出申請(qǐng)。ShuffleMemoryManager在接收到請(qǐng)求后首先查看TaskMemory是否有該Task的編號(hào),若沒有,則表示該Task為尚未運(yùn)行過,那么接下來將為它初始化各個(gè)變量。然后比較其Memapply和Memavg的大小來判斷該Task是大內(nèi)存需求型還是小內(nèi)存需求型,若是小內(nèi)存需求型,則直接賦予它min(Memapply/2,Memfree)的內(nèi)存,若是大內(nèi)存需求型,則首先需要判斷它目前已占用的內(nèi)存是否大于內(nèi)存池大小的1/(2N),若滿足則直接賦予它min(maxToGrant,Memfree)的內(nèi)存,若不能滿足則繼續(xù)判斷當(dāng)前Memfree與min(maxToGrant,maxMemory/(2*numActiveTasks)-Memused)的大小,若能滿足則賦予它min(maxToGrant,Memfree)的內(nèi)存,若不能滿足,由于其最小內(nèi)存保證,需要等待其他Task運(yùn)行完成空出內(nèi)存后繼續(xù)申請(qǐng)。
改進(jìn)的Spark Shuffle內(nèi)存分配算法的偽代碼如下。
輸入 Task申請(qǐng)的內(nèi)存值numBytes;
輸出 算法實(shí)際為Task分配的內(nèi)存值toGrant。
1)
if !taskMemory.contains(id) then
2)
初始化taskMemory(id),spillCount(id),waitingTime(id)為0,taskNotSpill.add(id)
3)
notifyAll
4)
end if
5)
while True do
6)
按式(1)、(5)~(7)更新numActiveTasks,memAvg,freeMemory,weight和memHigh的值
7)
ifnumBytes 8) taskMemory(id)←taskMemory(id)+min(numBytes/2,freeMemory) 9) return min(numByte/2,freeMemory) 10) end if /*7)~10)為小內(nèi)存需求型Task分配內(nèi)存*/ 11) maxToGrant←min(numBytes,max(0,memHigh- curMem)) /*curMem為Task當(dāng)前已占用的內(nèi)存*/ 12) ifcurMem 13) iffreeMemory≥min(maxToGrant,memLow-curMem) then 14) toGrant←min(maxToGrant,freeMemory) 15) taskMemory(id)←taskMemory(id)+toGrant 16) returntoGrant 17) else wait 18) end if 19) elsetoGrant←min(maxToGrant,freeMemory) 20) taskMemory(id) ←taskMemory(id)+toGrant 21) returntoGrant 22) end if /*11)~22)為大內(nèi)存需求型Task分配內(nèi)存*/ 23) end 算法的時(shí)間復(fù)雜度為O(n),其中n為一個(gè)Executor中所要運(yùn)行的Task數(shù)。 圖2 改進(jìn)算法流程Fig. 2 Flow chart of the improved algorithm Spark的編譯包括make-distribution.sh編譯、SBT編譯和Maven編譯三種方式,本文選擇make-distribution.sh的編譯方式。Spark源碼的編譯過程較為繁瑣,需要很多依賴才能完成,但是實(shí)踐后會(huì)對(duì)Spark的整體系統(tǒng)架構(gòu)有更深入理解。 為了驗(yàn)證本文提出的基于溢出次數(shù)和Spill Task等待時(shí)間改進(jìn)算法的正確性,本研究使用四臺(tái)服務(wù)器搭建Spark集群,其中:表1顯示各節(jié)點(diǎn)在集群中的角色,表2顯示了各節(jié)點(diǎn)的性能,表3顯示了各軟件版本信息。 本文選取“groupByKey”操作作為驗(yàn)證算法性能的實(shí)驗(yàn)案例。實(shí)驗(yàn)程序?yàn)樽约壕帉懙腤ordcount程序,實(shí)驗(yàn)數(shù)據(jù)集為安然郵件數(shù)據(jù)集和亞馬遜評(píng)論數(shù)據(jù)集。根據(jù)實(shí)驗(yàn)需要,生成了均勻的數(shù)據(jù)“mail”和不均勻的數(shù)據(jù)“mailSkew”,如表4所示。實(shí)驗(yàn)中還使用Shuffle階段可利用的內(nèi)存量“ShuffleMem”(由公式spark.executor.memroy*spark.shuffle.safetyFraction*spark.shuffle.memoryFraction計(jì)算所得)來比較兩個(gè)算法在不同內(nèi)存環(huán)境下的性能。 表1 實(shí)驗(yàn)集群角色Tab.1 Roles in experiment cluster 表2 集群內(nèi)各節(jié)點(diǎn)配置Tab. 2 Configuration of each node in the cluster 表3 軟件版本信息Tab. 3 Software version information 表4 實(shí)驗(yàn)數(shù)據(jù)說明Tab. 4 Experimental data description 本文使用以下參數(shù)評(píng)估算法性能:totalTime為完成Application所需時(shí)間;Max為耗時(shí)最長(zhǎng)的Task所用時(shí)間;ShuffleSpill為Application中所有Task的溢出量。 從圖3可以看出,兩種算法在均勻數(shù)據(jù)集上完成Application所需時(shí)間、Shuffle Task運(yùn)行最長(zhǎng)時(shí)間、和溢出量上相差不大,改進(jìn)算法略優(yōu)于源碼的公平算法。這主要是由于在均勻數(shù)據(jù)集下,Task需要處理的數(shù)據(jù)量比較平均,對(duì)于單個(gè)Task而言內(nèi)存的需求量較小,Task可以在未溢出的情況下運(yùn)行完成。圖3(c)、(d)最開始兩算法都有較大規(guī)模的溢出,這是因?yàn)閷?duì)整體內(nèi)存的配置過低,每個(gè)Task可以分配的內(nèi)存過小,導(dǎo)致溢出。 totalTime是完成整個(gè)Application所需要的時(shí)間,是衡量算法性能最直觀的指標(biāo)。從圖4(a)可以看出:改進(jìn)算法在所設(shè)置的內(nèi)存量上全面優(yōu)于源碼的公平算法,可以減少運(yùn)行時(shí)間的10%~12.5%。Max是Shuffle階段運(yùn)行時(shí)間最長(zhǎng)的Task所消耗的時(shí)間,該Task一般為處理傾斜數(shù)據(jù)的Task,因此Max值的大小將直接影響整個(gè)Application的運(yùn)行時(shí)間。從圖4(b)可以看出:改進(jìn)算法的Max值要明顯小于源碼的公平算法,尤其是在可分配內(nèi)存較小的情況下。這主要是因?yàn)椋创a的公平算法規(guī)定每一個(gè)Task最多可以獲得內(nèi)存池中內(nèi)存的1/N,然而這部分內(nèi)存并不能滿足處理傾斜數(shù)據(jù)Task的需求,因此該Task會(huì)頻繁地溢出直到對(duì)內(nèi)存需求小的Task運(yùn)行完畢,空出足夠的內(nèi)存才能成功運(yùn)行,這其中頻繁的溢出和等待調(diào)度都相當(dāng)耗時(shí)。而改進(jìn)算法將對(duì)內(nèi)存需求較小的Task進(jìn)行 “分割化”處理,每次只從內(nèi)存池中獲取較少的內(nèi)存,因此當(dāng)內(nèi)存需求較大的Task向內(nèi)存池申請(qǐng)內(nèi)存時(shí),內(nèi)存池就可以分配足夠多的內(nèi)存,從而避免了一些溢出,減少了Task的整體運(yùn)行時(shí)間。 圖3 處理mail文件時(shí)不同參數(shù)的對(duì)比Fig. 3 Comparison of different parameters in handling mail files Shufflespill是Application中所有Task的總溢出量。由圖4(c)~(d)可以看出改進(jìn)算法的溢出量顯著少于源碼的公平算法。因?yàn)楣剿惴ú荒芗皶r(shí)滿足需求量較大的Task的內(nèi)存需求,因此會(huì)造成Task的頻繁溢出,導(dǎo)致ShuffleSpill的量很大。改進(jìn)算法通過Task的溢出次數(shù)和溢出后的等待時(shí)間準(zhǔn)確捕捉到需要內(nèi)存較多的Task,并在其下次調(diào)度時(shí)從空閑內(nèi)存中多分配一部分資源,使其可以盡快運(yùn)行完成,從而減少了很多溢出量。 本文使用了兩種數(shù)據(jù)集,分別是均勻的數(shù)據(jù)集和不均勻的數(shù)據(jù)集,其中不均勻的數(shù)據(jù)集中傾斜的數(shù)據(jù)約占19%。實(shí)驗(yàn)在不同數(shù)據(jù)集上分別用源碼公平分配算法和改進(jìn)算法來執(zhí)行WordCount程序,并使用totalTime、Max、ShuffleSpill三個(gè)指標(biāo)來衡量?jī)蓚€(gè)算法的性能。從3.3節(jié)和3.4節(jié)的實(shí)驗(yàn)來看,改進(jìn)算法在均勻數(shù)據(jù)集上的性能略好于公平分配算法,在不均勻的數(shù)據(jù)集上可以看出經(jīng)過算法的自適應(yīng)調(diào)節(jié),Task的溢出量較公平分配算法有大幅度的減少,使集群整體性能提升10%~12.5%。 綜上所述,改進(jìn)算法能夠在有數(shù)據(jù)傾斜時(shí)更加高效地利用內(nèi)存,減少對(duì)內(nèi)存資源需求較大Task的溢出量,從而提升Application的整體運(yùn)行效率。 圖4 處理mailSkew文件時(shí)不同參數(shù)的對(duì)比Fig. 4 Comparison of different parameters in handling mailSkew files 本文對(duì)Spark Shuffle內(nèi)存分配算法進(jìn)行了研究,提出一種小內(nèi)存需求型Task作“分割化”處理、大內(nèi)存需求型Task基于Task溢出次數(shù)和Task溢出后等待時(shí)間的Shuffle內(nèi)存分配算法來改進(jìn)源碼的公平分配算法,對(duì)整個(gè)內(nèi)存池的使用率和各Task的運(yùn)行效率作出改進(jìn),并對(duì)源碼進(jìn)行重新編譯和部署。實(shí)驗(yàn)結(jié)果表明,改進(jìn)算法能有效提升Spark在處理傾斜數(shù)據(jù)時(shí)的運(yùn)行效率。Shuffle環(huán)節(jié)是大多數(shù)Spark作業(yè)的瓶頸,因此如何提升Shuffle階段的性能也成為眾多研究人員思考的問題。現(xiàn)實(shí)情況下,影響Shuffle性能的原因還有很多,如:代碼開發(fā)、資源參數(shù)和數(shù)據(jù)傾斜等。在本文研究的基礎(chǔ)之上,下一步將考慮其他對(duì)Shuffle性能影響的因素,與實(shí)際問題相結(jié)合,使Spark的Shuffle性能得到更大提升。 References) [1] 程學(xué)旗,靳小龍,王元卓,等.大數(shù)據(jù)系統(tǒng)和分析技術(shù)綜述[J].軟件學(xué)報(bào),2014,25(9):1889-1908.(CHENG X Q, JIN X L, WANG Y Z, et al. Survey on big data system and analytic technology [J]. Journal of Software, 2014, 25(9): 1889-1908.)[2] Apache. Apache Hadoop [2017- 04- 20]. http://apache.hadoop.org. [3] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing [C]// Proceedings of the 2012 9th USENIX Conference on Networked Systems Design and Implementation. Berkeley, CA: USENIX Association, 2012: 2-2. [4] ZAHARIA M, BORTHAKUR D, SARMA J S, et al. Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling [C]// Proceedings of the 2010 5th European Conference on Computer Systems. New York: ACM, 2010: 265-278. [5] GUO Y F, RAO J, CHENG D Z, et al. iShuffle: improving Hadoop performance with shuffle-on-write [J]. IEEE Transactions on Parallel & Distributed Systems, 2017, 28(6): 1649-1662. [6] 熊倩,張龑,郭明,等.MapReduce Shuffle性能改進(jìn)[J].計(jì)算機(jī)應(yīng)用,2017,37(S1):58-62,67.(XIONG Q, ZHANG Y, GUO M, et al. Performance improvement of MapReduce Shuffle [J]. Journal of Computer Applications, 2017, 37(S1): 58-62, 67.) [7] 彭輔權(quán),金蒼宏,吳明暉,等.MapReduce中shuffle優(yōu)化與重構(gòu)[J].中國科技論文,2012,7(4):241-245.(PENG F Q, JIN C H, WU M H, et al. Optimization and reconstruction shuffle in MapReduce [J]. China Sciencepaper, 2012, 7(4): 241-245.) [8] DAVIDSON A, OR A. Optimizing shuffle performance in Spark [EB/OL]. [2017- 04- 12]. https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf. [9] 李玉林,董晶.基于Hadoop的MapReduce模型的研究與改進(jìn)[J].計(jì)算機(jī)工程與設(shè)計(jì),2012,33(8):3110-3116.(LI Y L, DONG J. Study and improvement of MapReduce based on Hadoop [J]. Computer Engineering and Design, 2012, 33(8): 3110-3116.) [10] GRISHCHENKO A. Distributed systems architecture [EB/OL]. [2017- 04- 12]. https://0x0fff.com/spark-architecture. [11] WANG J H, QIU M K, GUO B, et al. Phase-reconfigurable Shuffle optimization for Hadoop MapReduce [J]. IEEE Transactions on Cloud Computing, 2015, PP(99): 1-1. [12] LI J G, LIN X L, CUI X L, et al. Improving the Shuffle of Hadoop MapReduce [C]// Proceedings of the 2013 IEEE 5th International Conference on Cloud Computing Technology and Science. Piscataway, NJ: IEEE, 2013: 266-273 The work is partially supported by the National Natural Science Foundation of China (U1533104). HOUWeifan, born in 1992, M. S. candidate. His research interests include intelligent algorithm, data mining. FANFanwei, born in 1968, Ph. D., professor. His research interests include intelligent information processing, software engineering. ZHANGYuxiang, born in 1975, Ph. D., associate professor. His research interests include network data analysis, distributed network. ImprovedSparkShufflememoryallocationalgorithm HOU Weifan*, FAN Wei, ZHANG Yuxiang (CollegeofComputerScienceandTechnology,CivilAviationUniversityofChina,Tianjin300300,China) Shuffle performance is an important indicator of affecting cluster performance for big data frameworks. The Shuffle memory allocation algorithm of Spark itself tries to allocate memory evenly for every Task in the memory pool, but it is found in experiments that the memory was wasted and the efficiency was low due to the imbalance of memory requirements for each Task. In order to solve the problem, an improved Spark Shuffle memory allocation algorithm was proposed. According to the amount of memory applications and historical operating data, the Task was divided into two categories based on memory requirements. The “split”processing was carried out for the Task of small memory requirements, while the memory was allocated based on the number of Task overflows and the waiting time after overflow for the Task of large memory requirements. By taking full advantage of the free memory of memory pool, the adaptive adjustment of Task memory allocation could be realized under the condition of unbalanced Task memory requirements caused by the data skew. The experimental results show that, compared with the original algorithm, the improved algorithm can reduce the overflow rate of the Task, decrease the turnaround time of the Task, and improve the running performance of the cluster. Apache Spark; Shuffle; adaptive; memory allocation; running performance 2017- 05- 09 ; 2017- 07- 24。 國家自然科學(xué)基金資助項(xiàng)目(U1533104)。 侯偉凡(1992—),男,內(nèi)蒙古烏蘭浩特人,碩士研究生,主要研究方向:智能算法、數(shù)據(jù)挖掘; 樊瑋(1968—),男,陜西乾縣人,教授,博士,CCF會(huì)員,主要研究方向:智能信息處理、軟件工程; 張宇翔(1975—),男,山西五寨人, 副教授,博士,CCF會(huì)員,主要研究方向:網(wǎng)絡(luò)數(shù)據(jù)分析、分布式網(wǎng)絡(luò)。 1001- 9081(2017)12- 3401- 05 10.11772/j.issn.1001- 9081.2017.12.3401 (*通信作者電子郵箱rainingxa@126.com) TP311.5 A3 實(shí)驗(yàn)結(jié)果及分析
3.1 重新編譯源碼過程
3.2 實(shí)驗(yàn)環(huán)境
3.3 均勻數(shù)據(jù)集實(shí)驗(yàn)結(jié)果分析
3.4 不均勻數(shù)據(jù)集實(shí)驗(yàn)結(jié)果分析
3.5 實(shí)驗(yàn)結(jié)論
4 結(jié)語
——國外課堂互動(dòng)等待時(shí)間研究的現(xiàn)狀與啟示