亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        基于RDD關鍵度的Spark檢查點管理策略

        2017-12-16 05:09:32英昌甜王維慶錢育蓉
        計算機研究與發(fā)展 2017年12期
        關鍵詞:檢查點內存分區(qū)

        英昌甜 于 炯 卞 琛 王維慶 魯 亮 錢育蓉

        1(新疆大學電氣工程博士后科研流動站 烏魯木齊 830046) 2(新疆大學軟件學院 烏魯木齊 830008) 3((新疆大學電氣工程學院 烏魯木齊 830046)

        基于RDD關鍵度的Spark檢查點管理策略

        英昌甜1,2于 炯2卞 琛2王維慶1,3魯 亮2錢育蓉2

        1(新疆大學電氣工程博士后科研流動站 烏魯木齊 830046)2(新疆大學軟件學院 烏魯木齊 830008)3((新疆大學電氣工程學院 烏魯木齊 830046)

        (yingct@xju.edu.com)

        Spark默認容錯機制由程序員設置檢查點,并利用彈性分布式數(shù)據(jù)集(resilient distributed dataset, RDD)的血統(tǒng)(lineage)進行計算.在應用程序復雜度高、迭代次數(shù)多以及數(shù)據(jù)量較大時,恢復過程需要耗費大量的計算開銷.同時,在執(zhí)行恢復任務時,僅考慮數(shù)據(jù)本地性選擇節(jié)點,并未考慮節(jié)點的計算能力,這都會導致恢復時間增加,無法最大化發(fā)揮集群的性能.因此,在建立Spark執(zhí)行模型、檢查點模型和RDD關鍵度模型的基礎上,提出一種基于關鍵度的檢查點管理(criticality checkpoint management, CCM)策略,其中包括檢查點設置算法、失效恢復算法和清理算法.其中檢查點設置算法通過分析作業(yè)中RDD的屬性以及對作業(yè)恢復時間的影響,選擇關鍵度大的RDD作為檢查點存儲;恢復算法根據(jù)各節(jié)點的計算能力做出決策,選擇合適的節(jié)點執(zhí)行恢復任務;清理算法在磁盤空間不足時,清除關鍵度較低的檢查點.實驗結果表明:該策略在略增加執(zhí)行時間的情況下,能夠選擇有備份價值的RDD作為檢查點,在節(jié)點失效時能夠有效地降低恢復開銷,提高節(jié)點的磁盤有效利用率.

        內存計算;Spark;檢查點管理;失效恢復;RDD屬性

        近年來,隨著互聯(lián)網(wǎng)的快速發(fā)展,特別是云計算的普及,全球數(shù)據(jù)量以每年約50%的速度遞增,大數(shù)據(jù)[1-3]日益受到人們的重視.2015年9月,國務院印發(fā)《促進大數(shù)據(jù)發(fā)展行動綱要》,確定了大數(shù)據(jù)發(fā)展的國家頂層設計,大數(shù)據(jù)與各行各業(yè)的結合已是行業(yè)未來發(fā)展的必然趨勢.隨著大數(shù)據(jù)時代的到來,數(shù)據(jù)每天都在急劇快速膨脹,發(fā)掘這些數(shù)據(jù)的價值,需要一種高效而穩(wěn)定的分布式計算框架和模型.在分布式計算框架中,Apache Spark[4-6]由于其基于內存的高性能計算模式以及豐富靈活的編程接口,得到了廣泛的支持和應用,大有逐漸取代Hadoop MapReduce成為新一代大數(shù)據(jù)計算引擎的趨勢.

        Spark采用了基于分布式共享內存的彈性分布式數(shù)據(jù)集(resilient distributed datasets, RDD)[7]作為數(shù)據(jù)結構,RDD是Spark對分布式內存的抽象,具有可重構性、不變性、分區(qū)局部性以及可序列化等特性[8-10].作為Apache頂級的開源項目,Spark基于自身的核心部分,在迭代計算、交互式查詢計算以及批量流計算等方面,開發(fā)了適應大數(shù)據(jù)處理的多種場景生態(tài)組件,如SQL處理引擎Spark SQL和Shark、流式處理引擎Spark Streaming、機器學習系統(tǒng)MLLib、圖計算框架GraphX、統(tǒng)計分析工具SparkR以及分布式內存文件系統(tǒng)Tachyon等.

        Spark是基于MapReduce算法實現(xiàn)的分布式計算框架,因此具有MapReduce的優(yōu)點[11-12].同時Spark在很多方面都彌補了MapReduce的不足,比MapReduce的通用性更好、迭代運算效率更高、作業(yè)延遲更低.Spark的主要優(yōu)勢包括:提供了一套支持DAG圖的分布式并行計算的編程框架,減少多次計算之間中間結果寫到HDFS的開銷;提供Cache機制來支持需要反復迭代計算或者多次數(shù)據(jù)共享,減少數(shù)據(jù)讀取的I/O開銷;使用多線程池模型來減少任務啟動開銷,shuffle過程中避免不必要的sort操作以及減少磁盤I/O操作;具有更廣泛的數(shù)據(jù)集操作類型.

        在Spark任務的執(zhí)行過程中,首先將HDFS中存儲的數(shù)據(jù)作為數(shù)據(jù)源加載到RDD中進行處理,再通過一系列的操作生成計算結果,每次操作所產生的中間結果都以RDD的形式存儲在內存中.Spark通過數(shù)據(jù)集的血統(tǒng)(lineage)實現(xiàn)容錯,當節(jié)點失效導致數(shù)據(jù)丟失時,Spark可以根據(jù)血統(tǒng)重新計算,以實現(xiàn)丟失數(shù)據(jù)的自動重建.為了避免錯誤恢復的代價與運行時間成正比增長,Spark提供了檢查點(checkpoint)[13]功能,通過設置檢查點來記錄中間狀態(tài),避免從頭開始計算的漫長恢復.

        然而,Spark集群默認的容錯機制將檢查點選擇和設置的權利交給編程人員,而程序員往往根據(jù)經(jīng)驗進行選擇,使結果充滿不確定性.數(shù)據(jù)丟失后恢復的效果好壞和效率高低,會隨著程序員的水平不同而出現(xiàn)巨大差異.錯誤的檢查點策略不僅會導致程序變慢,恢復效率降低,甚至會浪費持久化存儲(磁盤/固態(tài)硬盤等)的空間,影響其他作業(yè)執(zhí)行效率.此外,由于現(xiàn)有分布式框架中的檢查點策略,往往根據(jù)時間周期定時設置,并未對具體作業(yè)進行分析,考慮影響作業(yè)恢復效率的重要因素,因此選取的檢查點并不一定有效,反而可能影響整體系統(tǒng)的計算和容錯性能.

        為解決這一問題,本文主要做了3方面工作:

        1) 對內存計算框架的作業(yè)執(zhí)行機制進行分析,建立執(zhí)行效率模型,給出了RDD計算代價和任務執(zhí)行時間的定義.

        2) 通過分析RDD的恢復過程,建立了檢查點恢復模型,給出了RDD關鍵度、失效恢復比的定義,并證明這些定義與任務恢復效率的關系,為算法設計提供基礎模型.

        3) 在相關模型定義和證明的基礎上,提出了檢查點管理策略的問題定義,以此作為算法設計的主要依據(jù).

        1 相關工作

        檢查點/恢復策略是分布式計算廣泛使用的容錯技術,在國內外都有較多的相關研究[14-19].傳統(tǒng)的檢查點策略主要分為3類:應用級實現(xiàn)、用戶級實現(xiàn)和系統(tǒng)級實現(xiàn).

        1) 應用級實現(xiàn).由編程人員或自動程序將檢查點代碼與應用程序代碼整合,檢查點活動由應用程序自動執(zhí)行.將檢查點存儲到持久化存儲中,并且當節(jié)點故障后,從檢查點重啟.這種方法的挑戰(zhàn)在于需要編程人員對設置檢查點的應用有透徹的了解.

        2) 用戶級實現(xiàn).需要用戶級的數(shù)據(jù)庫來設置檢查點,并且應用程序與數(shù)據(jù)庫相鏈接,這種方法對用戶不透明,因為應用的修改、編譯都鏈接到檢查點數(shù)據(jù)庫.

        3) 系統(tǒng)級實現(xiàn).檢查點/恢復策略可以實現(xiàn)在系統(tǒng)級別,包括OS內核或硬件.當實現(xiàn)在系統(tǒng)級別,對用戶通常是透明的,并且對應用程序不需要修改.

        另外一些研究成果主要考慮分布式環(huán)境下并行計算框架的失效恢復策略.Storm[20]是一個實時流式處理平臺,運行在Java虛擬機之上;使用Clojure和Java編寫,并且支持多語言編程;它依靠Nimbus在集群分發(fā)代碼,并將任務分配給工作節(jié)點;由Zookeeper為其提供了容錯,并與Nimbus協(xié)調重新分配任務到其他可用節(jié)點;然而,沒有設置檢查點機制.Apache S4[21]是一個開源的流處理平臺,利用Zookeeper提供容錯.與Storm不同,它提供了檢查點機制.在一個節(jié)點出現(xiàn)故障時,故障轉移機制重新啟動一個新節(jié)點進行任務恢復.為了減少延遲,檢查點是異步的,這意味著每個獨立節(jié)點執(zhí)行檢查點,沒有全局一致性.文獻[22-23]提出了一種基于內存的分布式文件系統(tǒng)Tachyon,兼容包括Spark在內的多種計算框架.Tachyon本質上是將內存存儲功能從Spark或Yarn中分離出來,使上層計算框架實現(xiàn)更高的執(zhí)行效率.在Tachyon的實現(xiàn)中,文件采用兼容HDFS的Block方式進行管理,使用不同的存儲媒介對數(shù)據(jù)分層次緩存,但檢查點算法僅保存當前最新生成的RDD,與任務和RDD的特性無關.文獻[24]提出了內存文件系統(tǒng)RAMCloud,將內存作為文件的存儲介質,而磁盤則作為備份介質,為了提高效率,RAMCloud使用日志結構存儲,利用集群的并發(fā)能力和高速帶寬的Infiniband實現(xiàn)快速恢復,保證了數(shù)據(jù)的可用性和完整性,但Spark與RAMCloud并不兼容,因為RAMCloud的高內存占用率會影響Spark的計算性能.

        因此,為了解決檢查點設置的問題,其中的重點就是選擇哪些有價值的RDD作為檢查點存儲在磁盤中,以及在數(shù)據(jù)丟失后應選擇什么樣的節(jié)點進行恢復.在實時系統(tǒng)或數(shù)據(jù)庫系統(tǒng)中,對周期設置檢查點而言,所有數(shù)據(jù)的重要性相同,僅僅根據(jù)周期設置時的時間選擇最新生成的數(shù)據(jù)設置為檢查點.與此不同的是,由于Spark基本存儲單元為RDD,不同RDD所具有的屬性如操作復雜度、血統(tǒng)長度、計算代價和RDD數(shù)據(jù)量大小各不相同,對于作業(yè)恢復時所產生的恢復效果不同,因此在作業(yè)執(zhí)行過程中,異步存儲對恢復更有價值的RDD在磁盤中,在設置檢查點的同時盡量不影響作業(yè)執(zhí)行效率.通過對Spark作業(yè)執(zhí)行機制和執(zhí)行時間進行理論分析,在恢復時根據(jù)數(shù)據(jù)本地性、節(jié)點空閑情況和節(jié)點能力選擇節(jié)點,并執(zhí)行恢復任務,從而縮短恢復時間.

        2 問題的建模和分析

        本節(jié)通過分析Spark作業(yè)執(zhí)行機制,定義了RDD和作業(yè)的執(zhí)行開銷,提出了作業(yè)執(zhí)行模型、檢查點模型和關鍵度模型,并且在理論基礎上對目標問題進行了定義.

        2.1 Spark作業(yè)執(zhí)行機制

        在Spark應用中,整個執(zhí)行流程在邏輯上會形成有向無環(huán)圖(DAG).Action算子觸發(fā)之后,根據(jù)RDD的血統(tǒng),將所有累積的算子形成一個有向無環(huán)圖,然后由調度器調度該圖上的任務進行運算.Spark根據(jù)RDD之間不同的依賴關系切分形成不同的階段(stage),一個階段包含一系列函數(shù)執(zhí)行流水線.

        Spark作業(yè)DAG的典型示例如圖1所示,A,B,C,D,E,F(xiàn)和G分別代表不同的RDD,RDD內的方框代表分區(qū),虛線框為階段.數(shù)據(jù)從HDFS輸入Spark,形成RDDA和RDDC,RDDC上執(zhí)行map操作,轉換為RDDD,RDDD和RDDE執(zhí)行union操作,轉換為F,而在B和F通過join連接轉化為G的過程中會執(zhí)行Shuffle,最后RDDG輸出并保存到HDFS中.

        Fig. 1 Directed acyclic graph of Spark job圖1 Spark作業(yè)的有向無環(huán)圖

        Spark在未設置檢查點算法時,若數(shù)據(jù)失效,則需要利用血統(tǒng)重新計算來實現(xiàn)恢復.若所有需要的數(shù)據(jù)都丟失,則RDD必須重新計算生成,使任務的完成時間延長,還增加了額外的計算開銷.若能為有價值的RDD設置檢查點,則能有效縮減任務恢復時間.

        2.2 作業(yè)執(zhí)行模型

        定義1. RDD計算代價.Spark任務中,分區(qū)由父節(jié)點為輸入數(shù)據(jù)計算生成.設Parentsi jk為分區(qū)Pi jk的父節(jié)點集合.分區(qū)Pi jk的計算代價為數(shù)據(jù)讀取代價與數(shù)據(jù)處理代價之和,即:

        TPi jk=read(Parentsi jk)+proc(Parentsi jk).

        (1)

        RDD的所有分區(qū)由集群工作節(jié)點并行計算生成,因此其計算代價為所有分區(qū)計算代價的最大值,即:

        TRDDi j=max(TPi j1,TPi j2,…,TPi jk).

        (2)

        定義2. 作業(yè)執(zhí)行時間.如圖1所示,Spark以寬依賴為分界點,將作業(yè)劃分為多個階段執(zhí)行,因此階段分為窄依賴和寬依賴2類.

        由于Spark任務以分區(qū)為最小粒度單位,因此在任務分配時,分區(qū)計算時間與節(jié)點計算能力有關.設cpw為工作節(jié)點計算能力,單位是tuple/s;SPi jk為分區(qū)Pi jk的元組個數(shù).

        由于窄依賴的父分區(qū)為上一個RDD對應位置的一個分區(qū),則窄依賴分區(qū)計算時間:

        (3)

        對于窄依賴階段,每個階段包括多條流水線(每條流水線包括多個RDD的不同分區(qū)).記窄依賴Stage共包含m個RDD,RDD相同位置的分區(qū)組成流水線,所有RDD劃分為x條流水線,單條流水線的分區(qū)集合為pipei x={Pi 1x,Pi 2x,…,Pi jx},那么單條流水線的執(zhí)行時間可表示為

        (4)

        對于stagei,記其流水線集合為Pipesi={pipei 1,pipei 2,…,pipei x},那么該階段的執(zhí)行時間應為各流水線執(zhí)行時間最大值,即:

        NTstagei=max(Tpipei 1,Tpipei 2,…,Tpipei x)=

        寬依賴的父分區(qū)集合為上一個RDD的所有分區(qū),即整個RDD,且寬依賴RDD的分區(qū)計算時間大于窄依賴RDD的分區(qū)計算時間,則寬依賴分區(qū)計算時間:

        (6)

        由于寬依賴階段中僅包含1個RDD,則寬依賴階段的計算時間為

        WTstagei=max{WTPi j1,WTPi j2,…,WTPi jk}.

        (7)

        一個作業(yè)由若干個階段構成,由于Spark以寬依賴為界劃分階段,而作業(yè)的最后一個階段必為寬依賴(所有執(zhí)行均為寬依賴操作),記作業(yè)中寬、窄依賴的個數(shù)分別為m和n,那么作業(yè)執(zhí)行時間可表示為

        (8)

        2.3 檢查點模型

        在節(jié)點故障時,會使存儲在該節(jié)點內存上的多個RDD部分甚至所有分區(qū)不可用,這將導致作業(yè)無法繼續(xù)正常執(zhí)行,在未設置檢查點的情況下,作業(yè)需要回溯,直到找到可用的RDD.極端情況下,甚至需要重新調度,之前所計算的所有工作和耗費的系統(tǒng)資源都會浪費.因此,對于作業(yè)而言,有必要設置檢查點,從而降低節(jié)點宕機后產生的恢復開銷.

        定義3. 作業(yè)恢復代價.作業(yè)執(zhí)行到任意RDD,若集群中某個工作節(jié)點發(fā)生故障,作業(yè)將丟失該工作節(jié)點計算的所有中間結果.記故障的工作節(jié)點編號為k(工作節(jié)點編號對應流水線序號和寬依賴RDD中的分區(qū)編號),那么對于所有窄依賴階段,每個階段丟失一條流水線的數(shù)據(jù),則在不考慮恢復調度開銷時,窄依賴階段的恢復代價為丟失流水線的重新計算代價,即:

        (9)

        而對于寬依賴階段而言,RDD的一個分區(qū)丟失,因此在不考慮恢復調度開銷時,寬依賴階段的恢復代價為該分區(qū)的重新計算代價,即:

        WRi=WTPi jk.

        (10)

        由于工作節(jié)點故障為隨機事件,發(fā)生故障時當前計算的RDD可能位于任何階段內,因此,若故障發(fā)生在窄依賴階段,則作業(yè)恢復代價還應考慮丟失分區(qū)在流水線的前續(xù)節(jié)點的重新計算代價.而若故障發(fā)生在寬依賴階段,由于該階段僅包含一個RDD,無前續(xù)節(jié)點恢復代價,僅需要恢復該RDD.

        設當前作業(yè)已執(zhí)行的階段中共有x個窄依賴、y個寬依賴,正在執(zhí)行的階段共有m個RDD,當前正在計算RDDi h,在不考慮恢復調度開銷時作業(yè)恢復代價可表示為

        (11)

        即作業(yè)恢復代價等于前續(xù)寬、窄依賴階段的恢復代價之和,再加上丟失分區(qū)在當前階段中父分區(qū)的計算代價之和.

        定理1. 設Jobi={RDDi 1,RDDi 2,…,RDDi n},RDDi(j-1)∈Jobi且RDDi j∈Jobi.若RDDi j具有更長的血統(tǒng),并且RDDi(j-1)是RDDi j唯一的parentRDD.那么,在執(zhí)行Jobi的計算節(jié)點都失效時,恢復RDDi j的開銷較高,且為TRDDi j.

        證明. 若RDDi j具有更長的血統(tǒng),RDDi(j-1)是RDDi j的parentRDD.設RDDi(j-1)的血統(tǒng)長度為k,則RDDi j的血統(tǒng)長度為k+1.考慮2種情況:

        1) 若Jobi設置了檢查點,則只需要從檢查點處開始計算恢復.若恢復所需要的最新檢查點為RDDi p,恢復調度開銷為常量αi的情況下,則恢復開銷分別為

        2) 若Jobi未設置檢查點,在計算節(jié)點失效時,則所有RDD都需要重新計算,重新部署調度開銷為常量βi,則恢復開銷分別為

        無論是情況1或情況2,明顯可看出,RRDDi j>RRDDi(j-1)且RRDDi j-RRDDi(j-1)=TRDDi j.

        證畢.

        定理2. 設Jobi={RDDi 1,RDDi 2,…,RDDi n},RDDi(j-1)∈Jobi,且RDDi j∈Jobi.RDDi(j-1)為RDDi j的唯一parentRDD.若節(jié)點失效時,僅丟失了RDDi j的第l個分區(qū)Pi jl,那么在RDDi j的操作為窄依賴或寬依賴時,其恢復Pi jl的開銷不同,且寬依賴時恢復開銷為max(RPi j1,RPi j2,…,RPi jk).

        證明. 由于RDDi j的parentRDD為RDDi(j-1),則操作為窄依賴時可知丟失第l個分片Pi jl,只需通過RDDi(j-1)的對應父分片Pi j(l-1)通過流水線計算獲得,故恢復Pi j(l-1)開銷為

        RRDDi j(narrow)=RPTi jl=
        αi+read(PTi(j-1)l)+proc(PTi jl).

        操作為寬依賴時,丟失第l個分片Pi jl,需通過RDDi(j-1)所有的分片計算得到Pi jl,則恢復PTi j(l-1)的開銷為

        RRDDi j(wide)=RRDDi j=max(RPi j1,RPi j2,…,RPi jk).

        因此,RRDDi j(wide)≥RRDDi j(narrow).

        證畢.

        定理3. 設Jobi={RDDi 1,RDDi 2,…,RDDi n},RDDi(j-1),RDDi j,RDDi(j+1)∈Jobi.RDDi(j-1)是RDDi(j-1)和RDDi j的唯一parentRDD,RDDi(j-1)已備份為檢查點并存儲在正常運行節(jié)點,且計算代價procRDDi(j+1)≥procRDDi j.當計算節(jié)點失效時,RDDi j和RDDi(j+1)丟失,則從RDDi(j-1)檢查點進行恢復的代價RRDDi(j+1)≥RRDDi j.

        證明. 由于procRDDi(j+1)≥procRDDi j,則對于RDDi(j+1)中任意第l個分片的計算代價procPTi(j+1)l≥procPTi jl.

        當執(zhí)行恢復計算時,RDDi(j-1)已備份為檢查點,則恢復RDDi j的開銷為RRDDi j=max(RPi j1,RPi j2,…,RPi jk).對第l個分片而言,恢復該分片的開銷為

        RPi jl=TPi jl=αi+read(Pi(j-1)l)+proc(Pi jl).

        同樣,恢復RDDi(j+1)的開銷為RRDDi(j+1)=max(RPi(j+1)1,RPi(j+1)2,…,RPi(j+1)k).對第l個分片而言,恢復該分片的開銷為

        RPi(j+1)l=TPi(j+1)l=
        αi+read(Pi(j-1)l)+proc(Pi(j+1)l).

        由于procRDDi(j+1)≥procRDDi j,可知RPi(j+1)l≥RPi jl,同時RRDDi(j+1)≥RRDDi j.

        證畢.

        2.4 RDD關鍵度模型

        根據(jù)2.3節(jié)的分析,可以判斷出RDD與恢復開銷相關的因素有3個:

        1) RDD的血統(tǒng)長度.在作業(yè)中具有更長血統(tǒng)的RDD恢復開銷更大,證明詳見定理1.因此當設置檢查點時,應優(yōu)先選擇長血統(tǒng)的RDD,從而降低對恢復開銷的影響.

        2) RDD類型(指寬依賴或窄依賴).在寬依賴情況下,丟失1個子RDD分區(qū),需重算的每個父RDD的每個分區(qū)會產生冗余計算開銷,使寬依賴在恢復時開銷更大,證明詳見定理2.因此,應優(yōu)先選擇寬依賴的RDD作為檢查點,從而降低恢復開銷.

        3) RDD計算代價.計算代價越高的RDD恢復時開銷越大,應優(yōu)先備份計算代價高的RDD,證明詳見定理3.另外,RDD大小對數(shù)據(jù)備份時長和數(shù)據(jù)恢復讀取時長都會產生影響,從而也作為考慮的關鍵因素.

        定義4. RDD關鍵度.表示RDD對任務恢復效率的重要程度.在傳統(tǒng)的檢查點策略中,并未引進權重的概念,被選擇作為檢查點的數(shù)據(jù)在本質上與其他數(shù)據(jù)沒有區(qū)別,而在Spark中則不同,不同的RDD對恢復的重要程度不同.在RDD丟失時,需要重新計算,而不同的RDD恢復需要的計算開銷不同.

        綜合考慮相關因素,記LDRDDi j表示為LDRDDi j血統(tǒng)長度,TypeRDDi j為類型,CostRDDi j為計算代價,SizeRDDi j為數(shù)據(jù)量大小.則RDD的關鍵度表示為

        (12)

        式(12)表明,血統(tǒng)長度、RDD類型、計算代價與RDD關鍵度成正比關系,而容量大小則成反比關系.關鍵度越大,恢復時對恢復效率的影響越大,因此作為檢查點備份的必要性越大.

        定義5. 失效恢復比.用于表示恢復任務分配與節(jié)點計算能力的適應程度.記恢復計算節(jié)點集合Workers={w1,w2,…,wn},其計算能力C={c1,c2,…,cn},RT為恢復任務總量,則執(zhí)行恢復任務的時間開銷均值可定義為

        (13)

        對于任意計算節(jié)點wi,分配給其恢復任務RTi時,其執(zhí)行時間、均值的方差和失效恢復比分別為

        (14)

        Dwi=(Twi-E)2,

        (15)

        (16)

        其中,wi∈Workers.

        基于定義,從作業(yè)分配的角度來看,作業(yè)執(zhí)行時間也可表示為

        Tjob=max(Tw1,Tw2,…,Twn).

        (17)

        由于節(jié)點的失效恢復比與方差成反比,因此比值越大,方差越小,表示節(jié)點恢復作業(yè)完成時間越趨近均值,因此當所有工作節(jié)點的失效恢復比取最大值時恢復任務的執(zhí)行時間最短.

        2.5 檢查點管理策略問題定義

        2.1節(jié)至2.4節(jié)已經(jīng)對作業(yè)和檢查點機制作了比較詳細的闡述,基于這些定義,對我們的檢查點算法進行形式化表示.

        優(yōu)化目標:

        minRjob.

        (18)

        s.t.

        (19)

        其中,j∈jobs,wi∈Workers,r∈R.

        優(yōu)化目標:

        (20)

        (21)

        s.t.

        (22)

        其中,j∈jobs,RDDj k∈jobj,wi∈Workers,r∈R.

        目標是最大化RDD關鍵度和失效恢復比,約束條件同上.因此,在設置檢查點時,應選擇已生成的RDD中對恢復效率影響最大,即關鍵度最大的RDD作為檢查點進行設置.在節(jié)點失效進行恢復時,應選擇節(jié)點能力更強的節(jié)點執(zhí)行作業(yè)恢復.

        3 檢查點管理策略

        本節(jié)提出基于關鍵度的檢查點管理(criticality based checkpoint management,CCM)策略,其中包括檢查點設置算法、失效恢復算法和檢查點清理算法,具體流程圖如圖2所示.

        Fig. 2 Flow chart of checkpoint strategy圖2 檢查點策略流程圖

        作業(yè)在設置檢查點和恢復算法的粒度可以分為2個級別:1)RDD級別;2)分區(qū)級別.在設置檢查點時,檢查點選擇可以在RDD級別、分區(qū)級別以權重度、固定時間間隔等多種為算法以異步檢查點的方式進行設置.在算法恢復時,宕機節(jié)點上的任務需要進行恢復,以最新的檢查點序列為基礎,由Spark調度器重新調度執(zhí)行.根據(jù)集群資源情況,分配需要的資源.

        RDD級別以作業(yè)的RDD為粒度,根據(jù)RDD的重要度選擇更優(yōu)的檢查點,問題在于粒度較大,當某個節(jié)點宕機丟失某些RDD的分區(qū),需要重新計算所有的RDD,從磁盤中讀取最新檢查點信息的所有內容,浪費系統(tǒng)資源.分區(qū)級別以RDD的分區(qū)為粒度,根據(jù)分區(qū)的關鍵度,為每個分區(qū)選擇異步的檢查點,在丟失某些分區(qū)時不需要恢復整個RDD,只需恢復丟失的分區(qū),無論是備份檢查點時的磁盤開銷,還是恢復時的讀取開銷和執(zhí)行開銷,都比較少.但問題在于,以分區(qū)為粒度和異步備份檢查點時、若有多個任務同時在Spark集群中執(zhí)行,檢查點設置開銷和管理難度明顯增大.因此,采用以RDD為設置檢查點的基本單位,分區(qū)為恢復檢查點的基本單位.

        3.1 基礎數(shù)據(jù)構建

        基于關鍵度的檢查點算法需要構建的初始化參數(shù)與基礎數(shù)據(jù)如下:

        1) 生成RDD結構樹treeRDDs

        通過在Spark源碼中插入監(jiān)聽代碼,遍歷作業(yè)的DAG圖,獲取DAG圖中點和邊的信息,以及輸入RDDId、操作類型、輸出RDDId,并通過DAG生成RDD結構樹treeRDDs.

        2) 獲取RDD深度

        通過分析DAG圖即可獲得,實際運行時血統(tǒng)長度為迭代次數(shù)與血統(tǒng)長度相乘.

        3) 獲取RDD類型值

        寬依賴操作的RDD具有作為檢查點的意義,因此遍歷RDD結構樹,將操作為寬依賴的RDD寫入列表,并根據(jù)該RDD分片個數(shù)獲得類型值.

        4) RDD計算代價

        在實際的Spark環(huán)境中,作業(yè)的DAG圖生成后,管理節(jié)點監(jiān)控所有RDD的狀態(tài)變化,通過記錄各狀態(tài)變化的時間點即可得出起始時間和完成時間.

        5) RDD大小

        在作業(yè)執(zhí)行過程中,每計算完成一個RDD,即可根據(jù)其多個分區(qū)大小求和,進而獲得該RDD的大小.

        6) 計算關鍵度

        通過RDD關鍵度公式,為RDD計算并保存.

        3.2 檢查點設置算法

        如算法1所示,根據(jù)RDD的關鍵度,在作業(yè)執(zhí)行時間內選擇RDD作為檢查點進行設置.檢查點設置的時間從源RDD之后第1個生成的RDD開始作為檢查點開始,到終點RDD之前最后一個生成的RDD結束為結束.在作業(yè)執(zhí)行時,源RDD和終點RDD不需要進行檢查點設置.由于源RDD存儲在磁盤中,終點RDD為任務結束的RDD,會根據(jù)需要在結束后寫入磁盤,因此不需要考慮.在作業(yè)執(zhí)行過程中進行檢查點設置時對比最新生成的多個RDD,并選擇當前關鍵度最大的RDD.選擇好需要備份的RDD,執(zhí)行檢查點設置操作,將該RDD備份.計算任務首先將檢查點臨時緩存在局部內存中,然后由另一個獨立于計算任務的并行任務負責將緩存在內存中的檢查點數(shù)據(jù)文件拷貝到HDFS.

        算法1. 檢查點設置算法.

        輸入:結構樹treeRDDs;

        輸出:檢查點列表checkpointlist.

        初始化:checkpointlist←newArray;

        checkpoint←newHash;

        cutlist←newList;

        i←0;

        j←1;

        maxCR←0.

        ①checkpointlist←Null;

        ②checkpoint(treeRDDs[i]);

        ③checkpointlist.add(treeRDDs[i]);

        /*添加生成的第1個RDD作為檢查點*/

        ④ while(Cfinished=1 &&Tfinished=0)

        ⑤ fori=0 tonewgenerateRDD.lengthdo

        ⑥candidateslist←getnewRDD;

        ⑦candidates[i].cost←getRDDCost;

        ⑧calculate(candidates[i].CR);

        ⑨ if (candidates[i].CR>maxCR) then

        ⑩nextcheckpoint[j]←candidates[i];

        3.3 檢查點恢復算法

        管理節(jié)點在每一個固定的時間間隔要求工作節(jié)點來發(fā)送它們的當前狀態(tài).工作節(jié)點定期發(fā)送心跳給管理節(jié)點,如果在一個時間限制沒有收到工作節(jié)點的心跳,標記“失敗”.在這種情況下,管理節(jié)點指示工作節(jié)點利用最近的檢查點進行重新啟動恢復.如算法2所示,宕機后,由Spark執(zhí)行恢復操作時,從最后設置的檢查點RDD處進行恢復.恢復時,優(yōu)先選擇離執(zhí)行恢復的工作節(jié)點最近的檢查點副本,從而降低網(wǎng)絡開銷和恢復時延.將最新的檢查點讀入內存,并且可以根據(jù)需要將檢查點列表中的檢查點讀入,從而降低恢復和執(zhí)行開銷.當某個RDD需要恢復但未設置檢查點時,重新執(zhí)行血統(tǒng),通過其父節(jié)點恢復;若已將該RDD設置檢查點,可以將該RDD讀入內存.若有寬依賴,或丟失了RDD的所有分區(qū),則需讀取所需的RDD的所有分區(qū)到內存;若丟失了RDD的部分分區(qū),且恢復血統(tǒng)中無寬依賴,則只需讀取丟失分區(qū)所需檢查點到內存進行計算.

        算法2. 檢查點恢復算法.

        輸入:當前結構樹curtreeRDDs、檢查點列表checkpointlist、需要恢復的RDDrecoveryRDDs.

        初始化:parentsRDD←newHash;

        checkpointRDD←newHash;

        i,j,k←0;

        recoveryworker←assign.freeworker.

        ① fori=0 torecoveryRDDs.length-1 do

        ②parentsRDD←getLineage(

        recoveryRDDs[i],curtreeRDDs);

        /*獲取需要恢復RDD的父RDD*/

        ③ forj=0 toparentsRDD.length-1 do

        ④checkpointRDD←get(checkpointlist,

        parentsRDD);

        ⑤ fork=0 torecoveryRDDs.partitionnum-1 do

        ⑥ if (wideDependency(recoveryRDDs[i])∨lost(recoveryRDDs[i])

        ⑦read(parentsRDD[j]);

        /*操作為寬依賴或丟失所有分區(qū)*/

        ⑧ end if

        ⑩read(parentsRDD[j].partition[k]);

        /*操作為窄依賴且丟失部分分區(qū)*/

        /*根據(jù)血統(tǒng)進行恢復計算*/

        3.4 檢查點清理算法

        隨著任務執(zhí)行時間的增長,保存的檢查點恢復信息越來越多,其中有些恢復信息成為過時無用的恢復信息.檢查點清理就是刪除這些過時無用的恢復信息.檢查點清理策略對恢復協(xié)議性能有直接影響.不同的恢復策略,不同的應用所需要存儲的恢復信息量都會有不同,執(zhí)行檢查點清理會伴隨有開銷.因此有效的組織存儲恢復信息和執(zhí)行檢查點清理都會對系統(tǒng)性能產生影響,提高集群資源利用率.

        對Spark檢查點機制而言,檢查點存儲在持久化存儲磁盤或固態(tài)硬盤中.就磁盤空間而言,空間較大,并不是主要的瓶頸.因此,最簡單的做法就是在任務執(zhí)行完畢后,將該任務所設置的所有檢查點刪除.但每次檢查點備份時為3個副本,因此,當系統(tǒng)中并行執(zhí)行多個任務時,若磁盤空間有限,那么在備份過程中需要執(zhí)行清理算法,可以根據(jù)檢查點的關鍵度進行清理.

        檢查點清理算法,在不影響其他作業(yè)的情況下并行執(zhí)行.如算法3所示,清理的2個條件為:

        1) 當任務的磁盤備份空間受限時,在添加新檢查點時,對已有檢查點進行清理,以滿足新檢查點的空間需求;

        2) 系統(tǒng)設置檢查點清理周期,到達該時間周期時進行清理.

        清理算法的2個步驟:

        1) 新添加一個檢查點,磁盤空間不足,將已有的檢查點關鍵度進行過濾,將權重小于新檢查點的對象放入候選列表.

        2) 將候選列表按關鍵度從小到大的順序排列.搜索候選列表,若存在目標,其容量滿足新檢查點的要求則進行清理;否則,當用戶設定的清理時間到達時,才能將所有候列表中的檢查點進行清理.

        算法3. 檢查點清理算法.

        輸入:檢查點列表checkpointlist、當前需添加的檢查點RDD[i]、存儲空間空閑容量freecapacity.

        初始化:candidates←newHash;

        v←RDD[i].CR;

        s←RDD[i].size.

        ① forj=0 tocheckpointlist.length-1 do

        ② ifv>checkpointlist[j].CRthen

        ③candidates.add(checkpointlist[j]);

        ④ end if

        ⑤ end for

        ⑥cancadidates.orderbyCR();

        /*對關鍵度進行排序*/

        ⑦ forj=0 tocancadidates.length-1 do

        ⑧ if (freecapacity≤mincapacity∧

        cancadidates[j].size>s)

        ⑨clean(candidates[j]);

        ⑩checkpoint(RDDs[i]);

        /*選擇空間滿足大小的檢查點替換*/

        /*到達系統(tǒng)清理時間*/

        /*清理所有檢查點*/

        4 實驗與評價

        本節(jié)將通過實驗進行比較和評價,驗證檢查點自動選擇算法、恢復算法和檢查點清理算法的有效性.

        4.1 實驗環(huán)境

        實驗環(huán)境用1臺服務器和8個工作節(jié)點建立計算集群,服務器作為Spark的Master和Hadoop的NameNode.為體現(xiàn)工作節(jié)點的計算能力不同,8個工作節(jié)點由1個高效節(jié)點、6個普通節(jié)點和1個慢節(jié)點組成,其中普通節(jié)點的配置如表1所示,高效節(jié)點配備4核CPU,16 GB內存和4個千兆網(wǎng)卡,而慢任務節(jié)點僅有單核CPU,1 GB內存和1個百兆網(wǎng)卡.

        Table1 Configuration Parameters of Worker Node表1 Worker節(jié)點配置參數(shù)

        4.2 算法綜合評估測試

        對于本文提出CCM策略進行測試,實驗數(shù)據(jù)首先選取WordCount,TeraSort,K-Means,PageRank 4種算法作為作業(yè)進行分析.其中WordCount作業(yè)量為4.9 GB,TeraSort作業(yè)量為4.5 GB,K-Means輸入數(shù)據(jù)總樣本點為4 000 000,維度為20,點群個數(shù)為5,迭代1次.PageRank頁面?zhèn)€數(shù)為3 000 000,迭代3次.表2為不同算法在集群失效節(jié)點個數(shù)為1時執(zhí)行,且執(zhí)行到第100秒時關閉1個普通節(jié)點node3,對比3種策略對恢復時間(execution time, ET)以及與未優(yōu)化策略相比的加速時間(accelerate time, AT)和加速比(accelerate rate, AR)的影響.這3種策略分別為未優(yōu)化策略(Without opt)、Tachyon檢查點策略(Tachyon checkpoint)和CCM策略.

        Table 2 Recovery Efficiency with Different Strategies inSingle Node Failure

        在所有的作業(yè)中,CCM策略的作業(yè)完成時間都優(yōu)于傳統(tǒng)Spark,在設置檢查點的情況下,宕機恢復后,4類作業(yè)的總執(zhí)行時間要小于未設置檢查點的情況,從而證明了算法在多種類型作業(yè)下都具有良好的優(yōu)化效果.通過觀察數(shù)據(jù)發(fā)現(xiàn),不同作業(yè)對恢復效率的提高程度各不相同,這是因為不同類型作業(yè)寬依賴操作個數(shù)和數(shù)據(jù)大小各不相同,因此在作業(yè)類型不同的情況下算法的優(yōu)化效果無明顯規(guī)律.

        Fig. 3 Memory usage of different jobs on node3圖3 普通節(jié)點node3上不同作業(yè)的內存利用率

        Fig. 4 Disk I/O rate of different jobs on node3圖4 普通節(jié)點node3上不同作業(yè)的磁盤I/O速率

        Fig. 5 The network I/O rate of different jobs on node3圖5 普通節(jié)點node3上不同作業(yè)的網(wǎng)絡I/O速率

        同樣,對于內存利用率、磁盤I/O和網(wǎng)絡I/O的情況,在作業(yè)類型變化時也具有不同的特點.圖3~5分別為本文提出的CCM策略下監(jiān)控普通節(jié)點node3在執(zhí)行過程中的內存利用率、磁盤I/O速率和網(wǎng)絡I/O速率變化情況.在內存利用率上,與作業(yè)的類型和輸入數(shù)據(jù)的分布情況有關.對于相同的算法而言,所需處理的數(shù)據(jù)量越大,內存資源使用量越大.由圖3可知,WordCount和TeraSort隨著執(zhí)行時間的增加,具有相對穩(wěn)定的內存占用率;而K-Means和PageRank則隨著處理任務的階段不同,具有不同的內存占用率.

        在磁盤I/O速率方面,無論任務是處理本地數(shù)據(jù)還是網(wǎng)絡數(shù)據(jù),都會在某個節(jié)點上產生相應的本地數(shù)據(jù)讀取,消耗一定的磁盤I/O。若處理網(wǎng)絡數(shù)據(jù),還要產生額外的網(wǎng)絡I/O.由于作業(yè)需要從磁盤中讀取數(shù)據(jù),并在新的RDD生成時設置檢查點,因此產生了較為頻繁的磁盤I/O.由圖4可知,其中WordCount的磁盤I/O更為明顯,其他3類作業(yè)則頻率較低.K-Means在100 s后磁盤I/O有明顯增加,這是由于節(jié)點失效,分配給node3恢復任務,此時需要從磁盤中讀取相應檢查點,從而實現(xiàn)恢復.

        對于網(wǎng)絡I/O而言,其開銷大小與作業(yè)的類型和任務并行度有關.由于Spark具有數(shù)據(jù)本地性的特點,盡可能保證節(jié)點上執(zhí)行的任務處理本地數(shù)據(jù),因此網(wǎng)絡I/O主要源于寬依賴階段,另外小部分網(wǎng)絡I/O則是處理遠程數(shù)據(jù).由圖5可看出,對于4類不同的作業(yè),網(wǎng)絡I/O開銷相差不大.其中WordCount作業(yè)的寬依賴階段只需合并少量數(shù)據(jù),因此其網(wǎng)絡I/O開銷較小.

        另外,由表2可知,在設置檢查點的情況下,本文提出的CCM策略的K-Means和PageRank的恢復加速情況要好于Tachyon的檢查點策略,而WordCount和TeraSort與Tachyon類似,這是對比K-Means和PageRank,WordCount和TeraSort中的寬依賴操作較少,因此在使用基于lineage的檢查點策略也能夠較好地設置失效恢復所需檢查點.并且計算開銷與算法復雜度和輸入數(shù)據(jù)的大小相關,當作業(yè)一定時輸入數(shù)據(jù)越大,算法開銷越大,在此粗略地認為輸入數(shù)據(jù)越大,相應RDD的計算開銷也會相應增加,同時增加磁盤讀寫的開銷.對應不同的作業(yè),節(jié)點的處理速度也不同.

        4.3 檢查點設置算法

        為了進一步對比和分析CCM策略,我們選用PageRank進行性能測試、評價與比較.實驗數(shù)據(jù)選用SNAP[25]提供的有向圖數(shù)據(jù)集,數(shù)據(jù)集列表如表3所示.

        利用節(jié)點和連接數(shù)差異較大的2個數(shù)據(jù)集Web-Google和Wiki-Talk分別迭代1~10次對該算法性能驗證,并使用nmon監(jiān)測執(zhí)行時間和任務檢查點的大小.PageRank任務在多個數(shù)據(jù)集上執(zhí)行,使用PageRank有2方面的原因:1)PageRank主要用于有向圖的計算,是一個典型的迭代計算算法;2)PageRank是計算密集型算法,因此對檢查點系統(tǒng)更敏感,更利于驗證算法.對于數(shù)據(jù)密集型任務而言,代價較高,因為需在帶寬遠低于內存的集群網(wǎng)絡間拷貝大量的數(shù)據(jù),同時也將產生大量的存儲開銷.

        Table 3 Information of Datasets表3 測試數(shù)據(jù)集列表

        圖6表示與現(xiàn)有Spark內存系統(tǒng)使用不同策略的檢查點設置算法的執(zhí)行效率進行對比的情況.其中Web-Google和Wiki-Talk帶下標Tachyon,experience,CCM分別代表:1)Tachyon設置檢查點的策略,即僅考慮RDD的深度,即當選擇檢查點時,選擇最新生成的RDD進行持久化存儲;2)Spark程序員選取檢查點的經(jīng)驗,僅考慮RDD的操作復雜度,即當設置檢查點時選擇寬依賴的RDD進行持久化存儲;3)采用本文提出的基于關鍵度的檢查點設置算法,即當選擇檢查點時選擇最新生成的RDD中關鍵度最大的進行持久化存儲.

        Fig. 6 Efficiency of checkpoint algorithm with different parameters圖6 不同策略下檢查點設置算法的執(zhí)行效率

        由圖6可知,對比不同數(shù)據(jù)集,Wiki-Talk具有較大的連接數(shù)和節(jié)點數(shù),計算代價較高,因此無論采取什么樣的參數(shù)設置,都使其具有較大的檢查點大小,因此執(zhí)行時間較長,對應檢查點存儲設置的平均時間開銷也隨之增加,并且迭代次數(shù)的增加對檢查點平均時間開銷的影響較小.由于全局檢查點備選列表是在計算之前生成,因此對任務實際時間影響較小.隨著任務和輸入數(shù)據(jù)的規(guī)模不同,設置時間不同.檢查點選擇的大小,平均時間與所選的數(shù)據(jù)集有關.

        而對比不同的參數(shù)情況,與其他情況相比,僅考慮RDD計算代價的情況下設置的檢查點時間開銷更大,因為計算代價與RDD的大小和復雜度都相關,因此具有更大的檢查點大小.雖然可以用于宕機后RDD的恢復,但對比很長血統(tǒng)的RDD來說這樣的恢復耗時較長.因此,將某些RDD進行檢查點操作保存在穩(wěn)定存儲上是有幫助的.通常情況下,對于包含寬依賴的長血統(tǒng)的RDD設置檢查點操作是非常有用的.比如PageRank算法中的排名數(shù)據(jù)集.在這種情況下,集群中某個節(jié)點的故障會使從各個父RDD得出某些數(shù)據(jù)丟失,這時候就需要重算.相反,對于那些窄依賴與穩(wěn)定存儲上數(shù)據(jù)的父RDD來說,對其進行檢查點操作就不是必要的.

        4.4 檢查點恢復算法

        通過實驗在節(jié)點的失效率(failure rate,fr)分別取值為0.125,0.25,0.375的情況下,對比Web-Google和Wiki-Talk數(shù)據(jù)集得到PageRank在不同恢復算法下的執(zhí)行時間和恢復情況.

        由圖7可知,對比失效率不同的情況,隨著失效節(jié)點個數(shù)的增加,任務執(zhí)行時間也隨之增加.這是由于失效率越大,意味著失效的節(jié)點越多,因此要恢復的RDD越多,需要重新計算產生相應的時間開銷.對比不同數(shù)據(jù)集的情況,Wiki-Talk和Web-Google在不同算法下Wiki-Talk的時間開銷差距較大,這是由于計算量大小的區(qū)別.

        Fig. 7 Execution time of different datasets with different fr圖7 不同失效率時不同數(shù)據(jù)集的執(zhí)行時間對比

        對迭代次數(shù)1~10進行對比,由于未設置檢查點的Spark原生系統(tǒng)需要利用血統(tǒng)進行恢復丟失的RDD,因此當?shù)螖?shù)增加時時間開銷隨之增加.在節(jié)點失效時,同時會使運行在該節(jié)點的任務執(zhí)行失敗,同時丟失存儲在其內存上的已生成RDD分區(qū).在失效恢復時,Spark為了對丟失的RDD分區(qū)進行恢復,利用集群中其他主機重新執(zhí)行失效任務.這些任務需要將輸入數(shù)據(jù)重新讀取,并利用血統(tǒng)進行RDD重建.隨著作業(yè)迭代次數(shù)的增加,血統(tǒng)變長,此時需要計算RDD的時間越長,則恢復開銷越大.而失效恢復策略在恢復時間開銷方面沒有顯著的增加,因為該策略利用檢查點設置算法設置了檢查點,可以通過從HDFS中讀取檢查點進行恢復,從而減少RDD的重復計算,降低恢復時間開銷.

        4.5 檢查點清理算法

        定義6. 有效空間利用率.在群集工作結點的備份所有檢查點RDD之中,對恢復執(zhí)行效率有加速作用的容量之和占檢查點總容量的比率.

        檢查點清理(checkpoint cleaning, CC)算法是檢查點選擇算法的后續(xù)同步操作,同樣在磁盤趨近滿載時才能體現(xiàn)效能.圖8為對比Web-Google和Wiki-Talk數(shù)據(jù)集在使用檢查點清理算法策略時和未優(yōu)化時的有效空間利用率.如圖8所示,傳統(tǒng)Spark框架隨著并發(fā)應用數(shù)的增加,有效磁盤利用率的惡化程度也越來越高;而檢查點清理算法的有效磁盤利用率則較為穩(wěn)定.對比來看,采用檢查點清理算法的Spark框架,其有效磁盤利用率普遍高于傳統(tǒng)Spark框架.根據(jù)并行任務的特性,當其父RDD的所有子RDD都被設置檢查點時,則該父RDD在恢復時不會被使用到,因此存儲該RDD對作業(yè)恢復效率沒有影響.而檢查點清理算法在工作節(jié)點的某個RDD需要清理時,通知群集的其他工作節(jié)點清除該RDD的其他副本,因此能在不影響任務恢復效率的前提下提高了群集磁盤空間的有效利用率.

        Fig. 8 Valuable capacity rate of CC圖8 檢查點清理算法的有效空間利用率

        綜合比較表2以及圖3~8,在不同數(shù)據(jù)集、策略的情況下,對比任務的執(zhí)行時間和恢復時間可知,在提出失效恢復策略中,檢查點策略會增加少量的時間開銷,然而對比傳統(tǒng)Spark策略中,由程序員選擇檢查點的不確定因素甚至是異常風險,這些額外時間和空間開銷是有價值的.失效恢復算法基于檢查點設置算法,在設置檢查點時考慮RDD的血統(tǒng)長度、計算代價、操作復雜度和容量等因素.權重越大的RDD,重新計算的恢復成本也越高,優(yōu)先將這些恢復成本較高的RDD設置為檢查點,可以降低任務整體的重新計算代價.在執(zhí)行恢復算法時,選擇計算能力強的節(jié)點進行恢復,可以對任務的恢復效率進行有效的提高.

        5 總結與展望

        本文針對內存計算框架Spark的失效恢復問題,首先對內存計算框架的任務執(zhí)行機制進行分析,建立執(zhí)行模型和檢查點模型.通過分析檢查點恢復過程,給出了RDD關鍵度和失效恢復比的定義,并證明這些定義與任務恢復效率的關系,為算法設計提供基礎模型.在相關模型定義和證明的基礎上,提出了基于RDD關鍵度的檢查點管理策略問題定義,以此作為算法設計的主要依據(jù).通過算法的問題定義求解,計算RDD關鍵度,設計了檢查點設置算法、恢復算法和并行清理算法.最后,通過不同的實驗證明算法的有效性,實驗結果表明,該策略優(yōu)化了內存計算框架的檢查點管理,提高了作業(yè)的恢復效率.

        未來工作主要集中在3個方面:

        1) 分析內存計算框架不同類型操作資源需求的一般規(guī)律,設計適應作業(yè)負載和類型的檢查點策略;

        2) 對內存計算框架的內存管理進行研究,優(yōu)化Executor現(xiàn)有的任務內存分配策略;

        3) 通過分析計算節(jié)點性能和作業(yè)DAG圖,利用樣本和歷史記錄執(zhí)行預測RDD和作業(yè)的計算開銷,從而協(xié)助資源分配和并行任務調度方案做決策.

        [1]Walker S J. Big data: A revolution that will transform how we live, work, and think[J]. International Journal of Advertising, 2014, 17(1): 181-183

        [2] Meng Xiaofeng, Ci Xiang. Big data management: Concepts, techniques and challenges[J]. Journal of Computer Research and Development, 2013, 50(1): 146-169 (in Chinese)(孟小峰, 慈祥. 大數(shù)據(jù)管理: 概念、技術與挑戰(zhàn)[J]. 計算機研究與發(fā)展, 2013, 50(1): 146-169)

        [3] Chen C P, Zhang Chunyang. Data-intensive applications, challenges, techniques and technologies: A survey on big data[J]. Information Sciences, 2014, 275(11): 314-347

        [4] Kambatla K, Kollias G, Kumar V, et al. Trends in big data analytics[J]. Journal of Parallel and Distributed Computing, 2014, 74(7): 2561-2573

        [5] Zaharia M, Chowdhury M, Das T, et al. Fast and interactive analytics over Hadoop data with Spark[J]. ;Login:, 2012, 37(4): 45-51

        [6] Apache. Spark overview[EB/OL]. 2011 [2016-03-18]. http://spark.apache.org

        [7] Zaharia M, Chowdhury M, Das T, et al. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing[C] //Proc of the 9th USENIX Conf on Networked Systems Design and Implementation. Berkeley, CA: USENIX Association, 2012

        [8] SAP. HANA overview[EB/OL]. 2011 [2016-09-21]. http://hana.sap.com/abouthana.html

        [9] Dean J, Ghemawat S. MapReduce: Simplified data processing on large clusters[C] //Proc of the 6th Symp on Operating System Design and Implementation (OSDI). New York: ACM, 2004: 137-150

        [10] Apache. Spark machine learning library (MLlib)[EB/OL]. 2012 [2016-03-18]. http://spark.incubator.apache.org/docs/latest/mllib-guide.html

        [11] Lin Xiuqin, Wang Peng, Wu Bin. Log analysis in cloud computing environment with Hadoop and Spark[C] //Proc of the 5th IEEE Int Conf on Broadband Network & Multimedia Technology (IC-BNMT). Piscataway, NJ: IEEE, 2013: 273-276

        [12] Dong Xiangyu, Xie Yuan, Muralimanohar N, et al. Hybrid checkpointing using emerging nonvolatile memories for future exascale system[J]. ACM Trans on Architecture and Code Optimization, 2011, 8(2): Article No.6

        [13] Dimitriou I. A retrial queue for modeling fault-tolerant systems with checkpointing and rollback recovery[J]. Computers & Industrial Engineering, 2015, 79: 156-167

        [14] Ifeanyi P E, David L, Bran S, et al. A survey of fault tolerance mechanisms and checkpoint/restart implementations for high performance computing systems[J]. Journal of Supercomputing, 2013, 65(3): 1302-1326

        [15] Zhou Enqiang, Lu Yutong, Shen Zhiyu. Implementation of checkpoint system toward large scale parallel computing[J]. Journal of Computer Research and Development, 2005, 42(6): 987-992 (in Chinese)(周恩強, 盧宇彤, 沈志宇. 一個適合大規(guī)模集群并行計算的檢查點系統(tǒng)[J]. 計算機研究與發(fā)展, 2005, 42(6): 987-992)

        [16] Yi Huizhan, Wang Feng, Zuo Ke, et al. Asynchronous checkpoint/restart based on memory buffer[J]. Journal of Computer Research and Development, 2014, 51(6): 1229-1239 (in Chinese)(易會戰(zhàn), 王鋒, 左克, 等. 基于內存緩存的異步檢查點容錯技術[J].計算機研究與發(fā)展, 2014, 51(6): 1229-1239)

        [17] Wan Hu, Xu Yuanchao, Yan Junfeng, et al. Mitigating log cost through non-volatile memory and checkpoint optimization[J]. Journal of Computer Research and Development, 2015, 52(6): 1351-1361 (in Chinese)(萬虎, 徐遠超, 閆俊峰, 等. 通過非易失存儲和檢查點優(yōu)化緩解日志開銷[J]. 計算機研究與發(fā)展, 2015, 52(6): 1351-1361)

        [18] Cores I, Rodríguez G, Martín M J, et al. In-memory application-level checkpoint-based migration for MPI programs[J]. Journal of Supercomputing, 2014, 70(2): 660-670

        [19] Cao T, Vaz S M, Sowell B, et al. Fast checkpoint recovery algorithms for frequently consistent applications[C] //Proc of the 2011 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2011: 265-276

        [20] Chardonnens T, Cudre-Mauroux P, Grund M, et al. Big data analytics on high velocity streams: A case study[C] //Proc of 2013 IEEE Int Conf on Big Data. Piscataway, NJ: IEEE, 2013: 784-787

        [21] Neumeyer L, Robbins B, Nair A, et al. S4: Distributed stream computing platform[C] //Proc of the 10th IEEE Int Conf on Data Mining Workshops (ICDMW 2010). Piscataway, NJ: IEEE, 2010: 170-177

        [22] Li Haoyuan, Ghodsi A, Zaharia M, et al. Tachyon: Memory throughput I/O for cluster computing frameworks[C/OL]. 2013 [2016-03-18]. https://people.eecs.berkeley.edu/~alig/papers/tachyon-workshop.pdf

        [23] Li Haoyuan, Ghodsi A, Zaharia M, et al. Tachyon: Reliable, memory speed storage for cluster computing frameworks[C] //Proc of the 2014 ACM Symp on Cloud Computing. New York: ACM, 2014

        [24] Ongaro D, Rumble S M, Stutsman R, et al. Fast crash recovery in RAMCloud[C] //Proc of the 23rd ACM Symp on Operating Systems Principles. New York: ACM, 2011: 29-41

        [25] Jure L. Stanford network analysis project[EB/OL]. 2009 [2016-03-18]. http://snap.stanford.edu

        CriticalityCheckpointManagementStrategyBasedonRDDCharacteristicsinSpark

        Ying Changtian1,2, Yu Jiong2, Bian Chen2, Wang Weiqing1,3, Lu Liang2, and Qian Yurong2

        1(PostdoctoralResearchStationofElectricalEngineering,XinjiangUniversity,Urumqi830046)2(SchoolofSoftware,XinjiangUniversity,Urumqi830008)3(SchoolofElectricalEngineering,XinjiangUniversity,Urumqi830046)

        The default fault tolerance mechanism of Spark is setting the checkpoint by programmer. When facing data loss, Spark recomputes the tasks based on the RDD lineage to recovery the data. Meanwhile, in the circumstance of complicated application with multiple iterations and large amount of input data, the recovery process may cost a lot of computation time. In addition, the recompute task only considers the data locality by default regardless the computing capabilities of nodes, which increases the length of recovery time. To reduce recovery cost, we establish and demonstrate the Spark execution model, the checkpoint model and the RDD critically model. Based on the theory, the criticality checkpoint management (CCM) strategy is proposed, which includes the checkpoint algorithm, the failure recovery algorithm and the cleaning algorithm. The checkpoint algorithm is used to analyze the RDD charactersitics and its influence on the recovery time, and selects valuable RDDs as checkpoints. The failure recovery algorithm is used to choose the appropriate nodes to recompute the lost RDDs, and cleaning algorithm cleans checkpoints when the disk space becomes insufficient. Experimental results show that: the strategy can reduce the recovery overhead efficiently, select valuable RDDs as checkpoints, and increase the efficiency of disk usage on the nodes with sacrificing the execution time slightly.

        memory computing; Spark; checkpoint management; failure recovery; RDD characteristics

        2016-09-20;

        2017-07-04

        國家自然科學基金項目(61262088,61462079,61363083,61562086,51667020);新疆維吾爾自治區(qū)自然科學基金項目(2017D01A20);新疆維吾爾自治區(qū)高校科研計劃(XJEDU2016S106)

        This work was supported by the National Natural Science Foundation of China (61262088, 61462079, 61363083, 61562086, 51667020), the Natural Science Foundation of Xinjiang Uygur Autonomous Region of China (2017D01A20), and the Higher Education Research Program of Xinjiang Uygur Autonomous Region (XJEDU2016S106).

        于炯(yujiong@xju.edu.cn)

        TP311

        YingChangtian, born in 1989. PhD in Xinjiang University. Student member of CCF. Her main research interests include parallel computing, distributed system, and memory computing, etc.

        YuJiong, born in 1964. Professor and PhD supervisor. Senior member of CCF. His main research interests include grid computing, parallel computing, etc.

        BianChen, born in 1981. Associate professor and PhD. Senior member of CCF. His main research interests include parallel computing, distributed system, etc.

        WangWeiqing, born in 1959. Professor and PhD supervisor. His main research interests include power system relay protection, wind power generation control and grid connection technology (wwq59@xju.edu.cn).

        LuLiang, born in 1990. PhD candidate in Xinjiang University. Student member of CCF. His main research interests include flow processing, real-time computing.

        QianYurong, born in 1981. Professor and master supervisor. Senior member of CCF. Her main research interests include data mining.

        猜你喜歡
        檢查點內存分區(qū)
        Spark效用感知的檢查點緩存并行清理策略①
        上海實施“分區(qū)封控”
        免疫檢查點抑制劑相關內分泌代謝疾病
        “春夏秋冬”的內存
        當代陜西(2019年13期)2019-08-20 03:54:22
        免疫檢查點抑制劑在腫瘤治療中的不良反應及毒性管理
        浪莎 分區(qū)而治
        分布式任務管理系統(tǒng)中檢查點的設計
        基于SAGA聚類分析的無功電壓控制分區(qū)
        電測與儀表(2015年8期)2015-04-09 11:50:16
        基于多種群遺傳改進FCM的無功/電壓控制分區(qū)
        電測與儀表(2015年7期)2015-04-09 11:40:16
        基于內存的地理信息訪問技術
        国产一品二品三区在线观看| 伊人久久网国产伊人| 亚洲国产精品国自产拍久久蜜av | 国产综合精品久久久久成人| 日本在线一区二区免费| 久久精品亚洲熟女av麻豆| 国产免费三级av在线| 天堂国产一区二区三区| 亚洲色欲色欲www在线观看| 少妇厨房愉情理伦片bd在线观看| 久久久久久免费播放一级毛片| 日本一区二区三级免费| 国产麻豆剧果冻传媒一区| 国产va免费精品观看| 超碰Av一区=区三区| 一区二区三区在线观看高清视频| 一本色道久久亚洲综合| 亚洲欧美一区二区三区在线| 欧美俄罗斯乱妇| 亚洲欧美香港在线观看三级片| 亚洲不卡av一区二区三区四区 | 国产精品久久国产三级国不卡顿| 亚洲欧美日韩高清中文在线| 亚洲视频在线播放免费视频| 国产一区二区三区在线大屁股| 最美女人体内射精一区二区| 国产在线观看www污污污| 欧美午夜a级精美理论片| 免费在线观看一区二区| 日本岛国一区二区三区四区| 无码孕妇孕交在线观看| 丁香五香天堂网| 综合久久久久6亚洲综合| 亚洲精品综合一区二区| 偷拍一区二区三区四区视频| 国产一区二区三区四区五区加勒比| 狠狠色噜噜狠狠狠97影音先锋| 黄色中文字幕视频网站| 国产精品久久久久久av| 国产亚洲午夜高清国产拍精品| 国产AV边打电话边出轨|