李梓楊,于炯,,卞琛,張譯天,蒲勇霖,王躍飛,魯亮
(1.新疆大學(xué)信息科學(xué)與工程學(xué)院,新疆 烏魯木齊 830046;2.新疆大學(xué)軟件學(xué)院,新疆 烏魯木齊 830008;3.廣東金融學(xué)院互聯(lián)網(wǎng)金融與信息工程學(xué)院,廣東 廣州 510521;4.中國民航大學(xué)計(jì)算機(jī)科學(xué)與技術(shù)學(xué)院,天津 300300)
隨著云計(jì)算、物聯(lián)網(wǎng)、人工智能等新技術(shù)的不斷發(fā)展,自動(dòng)駕駛、智慧城市、智能工業(yè)等新產(chǎn)業(yè)和新服務(wù)模式的不斷興起和發(fā)展,人們的生活方式也變得更舒適、更便捷。根據(jù)希捷公司發(fā)布的調(diào)查結(jié)果,預(yù)測到2025 年全球數(shù)據(jù)量將高達(dá)163 ZB,其中25%的數(shù)據(jù)需要被實(shí)時(shí)計(jì)算和處理,這些數(shù)據(jù)主要應(yīng)用于物聯(lián)網(wǎng)[1]和人工智能等相關(guān)領(lǐng)域。
隨著數(shù)據(jù)規(guī)模和計(jì)算復(fù)雜度的提升,以MapReduce[2]和彈性分布式數(shù)據(jù)集(RDD,resilient distributed dataset)[3]編程模型為代表的批處理[4]模式已經(jīng)無法滿足應(yīng)用的實(shí)時(shí)性要求,大數(shù)據(jù)流式計(jì)算應(yīng)運(yùn)而生。流式計(jì)算將計(jì)算任務(wù)抽象為數(shù)據(jù)流模型,具有實(shí)時(shí)性、易失性、突發(fā)性、無序性和無限性的特征[5],能夠提供大規(guī)模數(shù)據(jù)的分布式實(shí)時(shí)處理,已經(jīng)分別得到學(xué)術(shù)界和產(chǎn)業(yè)界的廣泛關(guān)注。其中,Apache Flink 是目前應(yīng)用最廣泛的新興流式計(jì)算平臺。然而,面對不斷波動(dòng)變化的計(jì)算負(fù)載,F(xiàn)link平臺存在可擴(kuò)展性和可伸縮性較差,應(yīng)對負(fù)載波動(dòng)的能力不足,已經(jīng)成為制約平臺發(fā)展的嚴(yán)峻而又亟待解決的問題,受到開源社區(qū)的重點(diǎn)關(guān)注。
針對上述問題,本文提出基于流網(wǎng)絡(luò)的Flink平臺彈性資源調(diào)度策略(FAR-Flink,Flow-network based auto rescale strategy for Flink),并將其應(yīng)用于Flink 平臺。本文的主要貢獻(xiàn)如下。
1)提出基于流網(wǎng)絡(luò)對流式計(jì)算拓?fù)溥M(jìn)行建模的思想,并在此基礎(chǔ)上提出流網(wǎng)絡(luò)劃分等概念,定義了上下游節(jié)點(diǎn)的劃分方式,為提出彈性資源調(diào)度算法和彈性資源調(diào)度策略提供了模型的支撐。
2)提出流網(wǎng)絡(luò)的容量值構(gòu)建算法,采用初值定義和反饋調(diào)節(jié)相結(jié)合的方式,確定流網(wǎng)絡(luò)每條邊的容量值,為保障彈性資源調(diào)度算法的優(yōu)化效果奠定基礎(chǔ)。
3)在流網(wǎng)絡(luò)模型的基礎(chǔ)上提出彈性資源調(diào)度算法,在合理分配新增計(jì)算負(fù)載的同時(shí)定位集群性能瓶頸,生成彈性資源調(diào)度計(jì)劃,通過擴(kuò)充計(jì)算節(jié)點(diǎn)突破瓶頸,提高集群整體性能。
4)提出一種狀態(tài)數(shù)據(jù)分簇和分桶管理的思想,并在此基礎(chǔ)上提出狀態(tài)數(shù)據(jù)高效遷移策略,有效提高了集群執(zhí)行檢查點(diǎn)和數(shù)據(jù)恢復(fù)的效率,降低執(zhí)行彈性資源調(diào)度計(jì)劃的時(shí)間開銷。
面對大數(shù)據(jù)流式計(jì)算平臺存在突發(fā)性[6]問題,現(xiàn)有的流式計(jì)算平臺均未提供默認(rèn)的彈性資源調(diào)度機(jī)制,計(jì)算平臺存在可伸縮性和可擴(kuò)展性不足的問題,已經(jīng)得到學(xué)術(shù)界和產(chǎn)業(yè)界的廣泛關(guān)注。目前,現(xiàn)有的研究成果主要基于以下4 個(gè)出發(fā)點(diǎn)來解決該問題:1)通過彈性資源調(diào)度策略提高集群的可伸縮性;2)通過優(yōu)化任務(wù)調(diào)度策略提高集群的性能;3)通過優(yōu)化數(shù)據(jù)分配策略提高作業(yè)的執(zhí)行效率;4)通過減小網(wǎng)絡(luò)通信開銷以降低計(jì)算時(shí)延。
通過彈性資源調(diào)度策略提高集群資源的可伸縮性,是提高集群性能最有效的方法。其中,文獻(xiàn)[6]提出適用于Nephel[7]數(shù)據(jù)流處理平臺的響應(yīng)式資源調(diào)度策略,通過建立數(shù)學(xué)模型計(jì)算每個(gè)算子的并行度,并通過任務(wù)遷移實(shí)現(xiàn)集群資源的動(dòng)態(tài)伸縮,但其任務(wù)遷移過程中的網(wǎng)絡(luò)傳輸開銷較大。文獻(xiàn)[8]通過監(jiān)控集群的性能指標(biāo),建立針對Storm 平臺無狀態(tài)數(shù)據(jù)流的彈性資源調(diào)度策略。文獻(xiàn)[9]通過提出分布式彈性資源管理協(xié)議,實(shí)現(xiàn)集群規(guī)模對輸入負(fù)載的快速響應(yīng)。文獻(xiàn)[10]通過提出高效的狀態(tài)數(shù)據(jù)管理策略,實(shí)現(xiàn)集群在橫向和縱向上的可擴(kuò)展性。文獻(xiàn)[11]提出基于動(dòng)態(tài)參數(shù)優(yōu)化的彈性資源數(shù)據(jù)流處理平臺。此外,文獻(xiàn)[12-16]分別從資源分配、任務(wù)調(diào)度、負(fù)載優(yōu)化等不同的角度,提出數(shù)據(jù)流彈性資源調(diào)度策略。
優(yōu)化任務(wù)調(diào)度策略是另一種提高集群性能的有效方法。文獻(xiàn)[17]提出優(yōu)化流式作業(yè)的通用原則,實(shí)現(xiàn)了透明的拓?fù)鋬?yōu)化策略。文獻(xiàn)[18]提出對流式計(jì)算拓?fù)涞膱D優(yōu)化策略,在提高吞吐量的同時(shí)降低時(shí)延。文獻(xiàn)[19]通過對關(guān)鍵路徑節(jié)點(diǎn)進(jìn)行重分配,在降低計(jì)算時(shí)延的同時(shí)減少系統(tǒng)能耗。此外,文獻(xiàn)[20]通過優(yōu)化任務(wù)拓?fù)浣Y(jié)構(gòu)的方式,有效提高了集群性能。
優(yōu)化數(shù)據(jù)分配策略是提高作業(yè)執(zhí)行效率的有效方式,其中最典型的策略是負(fù)載均衡。文獻(xiàn)[21]通過實(shí)現(xiàn)上下游節(jié)點(diǎn)算子的靈活遷移和動(dòng)態(tài)鏈接[22],應(yīng)對內(nèi)存不足造成的背壓(backpressure)問題。文獻(xiàn)[23]提出自定義的代價(jià)模型,在鄰近代價(jià)閾值時(shí)啟動(dòng)分區(qū)映射算法[24],實(shí)現(xiàn)節(jié)點(diǎn)間計(jì)算負(fù)載的最優(yōu)分配。文獻(xiàn)[25]將流式計(jì)算拓?fù)涠x為流網(wǎng)絡(luò)模型并從中尋找優(yōu)化路徑,從而提高集群吞吐量。此外,文獻(xiàn)[26-27]也分別提出不同的負(fù)載均衡策略,以提升集群的整體性能。
通過降低網(wǎng)絡(luò)傳輸開銷以提高傳輸性能,也是提高作業(yè)執(zhí)行效率的有效方式。文獻(xiàn)[28]提出根據(jù)計(jì)算任務(wù)的不同類型動(dòng)態(tài)調(diào)節(jié)緩沖池的分區(qū)大小,有效降低傳輸開銷以提高傳輸性能。文獻(xiàn)[22]通過對并行度相等的上下游算子進(jìn)行動(dòng)態(tài)鏈接,有效減少了節(jié)點(diǎn)間的數(shù)據(jù)通信。文獻(xiàn)[29]通過將節(jié)點(diǎn)內(nèi)進(jìn)程間網(wǎng)絡(luò)通信轉(zhuǎn)化為線程間通信,有效降低了通信開銷。此外,文獻(xiàn)[30-31]分別從不同角度降低數(shù)據(jù)傳輸?shù)耐ㄐ砰_銷,提高集群性能。
其中,通過彈性資源調(diào)度策略提高集群的可伸縮性,能夠從根本上解決負(fù)載波動(dòng)導(dǎo)致集群響應(yīng)能力不足的問題,但現(xiàn)有研究成果多針對Storm 平臺提出,受平臺內(nèi)核之間的差異而無法直接移植于Flink。因此本文提出適用于Flink 平臺的彈性資源調(diào)度策略,與現(xiàn)有研究成果的不同之處介紹如下。
1)基于流網(wǎng)絡(luò)對集群拓?fù)浣Y(jié)構(gòu)進(jìn)行抽象,采用初值定義和反饋調(diào)節(jié)相結(jié)合的方式構(gòu)建模型,準(zhǔn)確評估了節(jié)點(diǎn)的計(jì)算能力和集群的性能瓶頸,并在此基礎(chǔ)上制定了最優(yōu)的彈性資源調(diào)度計(jì)劃。
2)重點(diǎn)考慮有狀態(tài)數(shù)據(jù)流的任務(wù)調(diào)度問題,提出狀態(tài)數(shù)據(jù)分桶管理的思想,有效提高了狀態(tài)數(shù)據(jù)的遷移效率。
3)通過分析Flink 平臺內(nèi)核的結(jié)構(gòu)和特點(diǎn),結(jié)合狀態(tài)數(shù)據(jù)管理和檢查點(diǎn)機(jī)制,克服平臺內(nèi)核之間的差異,提出了適用于Flink 的彈性資源調(diào)度策略。
4)在實(shí)驗(yàn)中,提出通過監(jiān)控?cái)?shù)據(jù)傳輸量評估數(shù)據(jù)遷移效率的方法,驗(yàn)證了狀態(tài)數(shù)據(jù)遷移算法的優(yōu)化效果。在此基礎(chǔ)上,通過對比實(shí)驗(yàn)得出主流彈性資源調(diào)度策略的優(yōu)缺點(diǎn)及其適用場景。
本節(jié)從有狀態(tài)流式計(jì)算的特點(diǎn)出發(fā),建立了流網(wǎng)絡(luò)模型和狀態(tài)數(shù)據(jù)管理模型。首先通過分析有向無環(huán)圖(DAG,directed acyclic graph)中節(jié)點(diǎn)容量與流量的數(shù)學(xué)關(guān)系,建立流網(wǎng)絡(luò)模型,為彈性資源調(diào)度算法提供了模型的支撐。此外,通過分析現(xiàn)有狀態(tài)數(shù)據(jù)管理策略的不足,指出影響狀態(tài)數(shù)據(jù)遷移效率的主要原因,為設(shè)計(jì)高效的狀態(tài)數(shù)據(jù)遷移算法提供了理論依據(jù)。
在分布式數(shù)據(jù)流處理平臺中,用戶定義的計(jì)算邏輯是封裝在算子中的,并由各工作節(jié)點(diǎn)并行化執(zhí)行算子的計(jì)算邏輯,各工作節(jié)點(diǎn)被稱為對應(yīng)算子的實(shí)例。由此可知,當(dāng)集群的計(jì)算資源總量和拓?fù)浣Y(jié)構(gòu)確定后,其所能承擔(dān)的最高計(jì)算負(fù)載也隨之確定,計(jì)算資源不足將導(dǎo)致集群性能急劇下降。
因此,為了解決集群資源不足導(dǎo)致的性能問題,將流式計(jì)算的拓?fù)浣Y(jié)構(gòu)定義為流網(wǎng)絡(luò)模型。在該模型中,將節(jié)點(diǎn)能夠處理的最高計(jì)算負(fù)載定義為容量c(vi,vj),當(dāng)前時(shí)刻節(jié)點(diǎn)實(shí)際處理的負(fù)載定義為流量f(vi,vj),通過構(gòu)建對應(yīng)的增進(jìn)網(wǎng)絡(luò)并尋找優(yōu)化路徑,實(shí)現(xiàn)計(jì)算負(fù)載的最優(yōu)分配,從而定位集群的性能瓶頸并制定相應(yīng)的彈性資源調(diào)度計(jì)劃。
定義1流網(wǎng)絡(luò)。如圖1 所示,設(shè)集群拓?fù)錇閱卧从邢驘o環(huán)圖G=(V,E),其中V={v1,v2,…,vn}是圖中所有節(jié)點(diǎn)的集合,s∈S是流網(wǎng)絡(luò)的源點(diǎn),ti∈T是匯點(diǎn),E={(vi,vj)|i,j∈[1,n],n=|V|}是有向邊的集合,(vi,vj)表示節(jié)點(diǎn)vi與vj間的傳輸鏈路。且?(vi,vj)∈E有c(vi,vj)≥0,表示邊(vi,vj)允許數(shù)據(jù)傳輸速率的最大值,稱為邊(vi,vj)的容量;f(vi,vj)表示邊(vi,vj)當(dāng)前時(shí)刻的實(shí)際傳輸速率,稱為邊(vi,vj)當(dāng)前的流量。此外,當(dāng)前時(shí)刻從數(shù)據(jù)源點(diǎn)向網(wǎng)絡(luò)輸入數(shù)據(jù)速率的和記為該網(wǎng)絡(luò)的流量,即
圖1 流網(wǎng)絡(luò)模型
在流網(wǎng)絡(luò)中,容量與流量的單位均為tuple/s,且有0≤f(vi,vj)≤c(vi,vj)恒成立。
通過定義流網(wǎng)絡(luò)模型,將流式計(jì)算的DAG 拓?fù)滢D(zhuǎn)化為單源有向無環(huán)圖,用容量c(vi,vj)表示邊(vi,vj)允許數(shù)據(jù)傳輸?shù)淖畲笾?,并用流量f(vi,vj)表示邊(vi,vj)實(shí)際的傳輸速率。其中,每條邊的容量值表示對應(yīng)節(jié)點(diǎn)的處理能力和數(shù)據(jù)鏈路的傳輸能力,只有準(zhǔn)確計(jì)算每條邊的容量值才能有效評估集群的性能指標(biāo),從而實(shí)現(xiàn)集群資源的最優(yōu)分配。因此,通過4.1 節(jié)提出的流網(wǎng)絡(luò)構(gòu)建算法,采用數(shù)學(xué)計(jì)算與反饋調(diào)節(jié)相結(jié)合的方式,計(jì)算網(wǎng)絡(luò)中每條邊的容量大小,構(gòu)建整個(gè)流網(wǎng)絡(luò)的形態(tài),對彈性資源調(diào)度算法的執(zhí)行是至關(guān)重要的。
定義2增進(jìn)網(wǎng)絡(luò)。設(shè)有流網(wǎng)絡(luò)G=(V,E),且f是G的一個(gè)流,則Gf=(Vf,Ef)是流網(wǎng)絡(luò)G由f產(chǎn)生的增進(jìn)網(wǎng)絡(luò),其形態(tài)如圖2 所示,其中?vi∈V有vi∈Vf,?(vi,vj)∈E在Gf中對應(yīng)的增進(jìn)容量函數(shù)cf(vi,vj)為
其中,c(vi,vj)為原網(wǎng)絡(luò)中邊的容量函數(shù),f(vi,vj)為原網(wǎng)絡(luò)中邊的流量函數(shù)
由定義2 可知,增進(jìn)網(wǎng)絡(luò)表示在原流網(wǎng)絡(luò)中,每條邊上流量提升的空間。因此在式(2)中,對于所有與原網(wǎng)絡(luò)同向的邊,其容量值為原網(wǎng)絡(luò)中容量與流量的差值;對于所有與原網(wǎng)絡(luò)反向的邊,其容量值與原網(wǎng)絡(luò)的流量值相等。這樣,在增進(jìn)網(wǎng)絡(luò)中尋找一條從源點(diǎn)到匯點(diǎn)的無環(huán)路徑,表示原流網(wǎng)絡(luò)中提升流量的空間,即提升集群吞吐量的空間。
定義3優(yōu)化路徑。設(shè)有流網(wǎng)絡(luò)G=(V,E),Gf=(Vf,Ef)是G由流f產(chǎn)生的增進(jìn)網(wǎng)絡(luò)。則在Gf中從源點(diǎn)s至任意匯點(diǎn)ti的一條無環(huán)路徑p:s→vi→ti都是原網(wǎng)絡(luò)G的一條優(yōu)化路徑。其中,集合P={(vi,vj)| 邊(vi,vj)在路徑p上}為優(yōu)化路徑上邊的集合,則該路徑對應(yīng)的遞增量為
其中,cf(vi,vj)為邊(vi,vj)在增進(jìn)網(wǎng)絡(luò)中的容量值。因此,將fp作用于f,得到流f在路徑p上的一個(gè)遞增流,記為f'=f↑fp,且對應(yīng)邊的流量為
其中,f(vi,vj)是原網(wǎng)絡(luò)中對應(yīng)邊的流量。由于遞增量表示優(yōu)化路徑上每條邊均能夠提升流量的空間,因此選取該路徑上增進(jìn)容量的最小值。
設(shè)流網(wǎng)絡(luò)G=(V,E)的一個(gè)流為f,Gf=(Vf,Ef)為G由流f產(chǎn)生的增進(jìn)網(wǎng)絡(luò),路徑p為Gf中的任意一條優(yōu)化路徑,則流f在路徑p上的遞增流f↑fp也是原網(wǎng)絡(luò)G的一個(gè)流,且其流量為
圖2 增進(jìn)網(wǎng)絡(luò)模型
由定義2 和定義3 可知,增進(jìn)網(wǎng)絡(luò)描述了在原網(wǎng)絡(luò)中,每條邊在容量限制下可能提升流量的空間,則優(yōu)化路徑表示提升網(wǎng)絡(luò)流量、合理分配堆積數(shù)據(jù)的有效方案。因此,當(dāng)源點(diǎn)產(chǎn)生堆積數(shù)據(jù)時(shí),沿著優(yōu)化路徑的方向分配負(fù)載,就一定能夠提高集群的吞吐量。當(dāng)增進(jìn)網(wǎng)絡(luò)中不存在優(yōu)化路徑時(shí),集群達(dá)到其吞吐量的峰值,則一定存在一個(gè)算子成為整個(gè)集群的性能瓶頸。
根據(jù)流網(wǎng)絡(luò)模型可知,通過準(zhǔn)確計(jì)算每條邊上容量與流量的取值,量化集群計(jì)算能力與計(jì)算負(fù)載的數(shù)學(xué)關(guān)系,從而分析集群當(dāng)前時(shí)刻的資源分配和使用情況。然而,當(dāng)集群因計(jì)算資源不足而導(dǎo)致性能下降時(shí),需要定義資源優(yōu)化配置模型,通過定義流網(wǎng)絡(luò)的劃分來尋找集群的性能瓶頸,從而制定最優(yōu)的彈性資源調(diào)度計(jì)劃。
定義4流網(wǎng)絡(luò)劃分。設(shè)有流網(wǎng)絡(luò)G=(V,E),其中s∈S是流網(wǎng)絡(luò)的源點(diǎn),ti∈T是匯點(diǎn)。則該網(wǎng)絡(luò)的一個(gè)劃分D=(X,Y)將節(jié)點(diǎn)集V分為X和Y這2個(gè)集合,其中Y=V-X,使s∈X,ti∈Y,且有X∩Y=?,X∪Y=V。 ?vi,v j∈Oi有vi,v j∈X或vi,v j∈Y。則該劃分D=(X,Y)對應(yīng)的容量記為
該劃分對應(yīng)的流量記為
其中,容量最小的劃分為該流網(wǎng)絡(luò)的最小劃分。
如圖3 所示,流網(wǎng)絡(luò)的一個(gè)劃分將數(shù)據(jù)源點(diǎn)和匯點(diǎn)分在2 個(gè)不同的集合中,且同一個(gè)算子的不同實(shí)例不橫跨任何一個(gè)劃分。因此,一個(gè)劃分中容量和流量的關(guān)系反映了不同算子之間在資源配置和計(jì)算性能上的差異,為定位整個(gè)集群的性能瓶頸提供了可行的方案。
設(shè)流網(wǎng)絡(luò)G=(V,E)的一個(gè)流為f,Gf=(Vf,Ef)是G由f產(chǎn)生的增進(jìn)網(wǎng)絡(luò)。當(dāng)Gf中不存在任何優(yōu)化路徑時(shí),f是G的最大流,則至少存在一個(gè)劃分D=(X,Y),使且D是G的最小劃分。
當(dāng)增進(jìn)網(wǎng)絡(luò)中不存在任何優(yōu)化路徑時(shí),原網(wǎng)絡(luò)的流量達(dá)到最大值,且當(dāng)前流量值與最小劃分的容量值相等,則最小劃分所對應(yīng)的算子成為集群的性能瓶頸。因此,F(xiàn)AR-Flink 策略的核心思想為:首先,通過計(jì)算DAG 模型中每條邊的容量大小,將其轉(zhuǎn)化為流網(wǎng)絡(luò)模型;其次,通過計(jì)算對應(yīng)的增進(jìn)網(wǎng)絡(luò)和優(yōu)化路徑,實(shí)現(xiàn)計(jì)算負(fù)載的最優(yōu)分配,當(dāng)集群性能達(dá)到瓶頸時(shí),再通過尋找流網(wǎng)絡(luò)的最小劃分,制定彈性資源調(diào)度計(jì)劃;最后,通過基于分簇和分桶的狀態(tài)數(shù)據(jù)管理模型,實(shí)現(xiàn)狀態(tài)數(shù)據(jù)高效遷移算法,并完成彈性資源調(diào)度計(jì)劃的執(zhí)行。
根據(jù)資源優(yōu)化配置模型,當(dāng)數(shù)據(jù)源點(diǎn)的輸入速率發(fā)生變化時(shí),可制定最優(yōu)的彈性資源調(diào)度策略。但由于Flink 是有狀態(tài)數(shù)據(jù)流處理平臺,每個(gè)節(jié)點(diǎn)會保存當(dāng)前的狀態(tài)數(shù)據(jù),為了實(shí)現(xiàn)高效的節(jié)點(diǎn)間狀態(tài)數(shù)據(jù)遷移策略,降低彈性資源調(diào)度計(jì)劃的執(zhí)行開銷,必須建立合理的狀態(tài)數(shù)據(jù)管理模型,并在此基礎(chǔ)上設(shè)計(jì)高效的數(shù)據(jù)遷移算法。在數(shù)據(jù)遷移過程中,盡可能降低對Hadoop 分布式文件系統(tǒng)(HDFS,Hadoop distributed file system)的訪問頻次,減少不必要的數(shù)據(jù)傳輸,從而有效提高動(dòng)態(tài)資源調(diào)度策略的執(zhí)行效率。
圖3 流網(wǎng)絡(luò)劃分示意
如圖4 所示,當(dāng)集群由v1,v2∈O1這2 個(gè)實(shí)例擴(kuò)充至v1,v2,v6∈O1這3 個(gè)實(shí)例時(shí),需要考慮原本由2 個(gè)節(jié)點(diǎn)維護(hù)的狀態(tài)數(shù)據(jù)d1和d2如何分配到3 個(gè)節(jié)點(diǎn)中,以實(shí)現(xiàn)節(jié)點(diǎn)間計(jì)算任務(wù)的最優(yōu)分配;此外,在對狀態(tài)數(shù)據(jù)執(zhí)行快照和重分配的過程中,都會頻繁地訪問HDFS 執(zhí)行數(shù)據(jù)讀寫的操作,這會占用大量的網(wǎng)絡(luò)傳輸資源,進(jìn)而影響節(jié)點(diǎn)間的數(shù)據(jù)傳輸性能。針對上述問題,提出高效的節(jié)點(diǎn)間狀態(tài)數(shù)據(jù)遷移算法,能夠有效提高動(dòng)態(tài)資源調(diào)度計(jì)劃的執(zhí)行效率。
圖4 狀態(tài)數(shù)據(jù)分配模型
設(shè)算子On待處理的數(shù)據(jù)為二元組tuplei=(key,value),則該元組對應(yīng)的簇為
即具有相同key 的hash 值的數(shù)據(jù)元組屬于同一個(gè)簇。在現(xiàn)有的Flink 平臺中,目前是按key 的hash值不同,分簇對狀態(tài)數(shù)據(jù)進(jìn)行管理的,稱為KeyedState,是目前狀態(tài)數(shù)據(jù)管理的主要策略。但由于數(shù)據(jù)元組有多種不同的key,hash 函數(shù)發(fā)生碰撞的概率較低,通常狀態(tài)數(shù)據(jù)的分布較為分散,在數(shù)據(jù)遷移過程中需要頻繁訪問HDFS,占用大量的網(wǎng)絡(luò)傳輸資源并引入較高的傳輸開銷,進(jìn)而影響了彈性資源調(diào)度算法的執(zhí)行效率。因此,通過提出狀態(tài)數(shù)據(jù)分桶管理的思想,并設(shè)計(jì)優(yōu)化的狀態(tài)數(shù)據(jù)遷移算法,能夠有效減少節(jié)點(diǎn)對HDFS 的訪問頻次,減少網(wǎng)絡(luò)傳輸開銷以提高算法性能。
在第3 節(jié)建立的相關(guān)模型基礎(chǔ)上,本節(jié)提出了FAR-Flink 策略,該策略分別包含流網(wǎng)絡(luò)構(gòu)建算法、彈性資源調(diào)度算法和狀態(tài)數(shù)據(jù)遷移算法。該策略主要分為以下3 個(gè)步驟,具體的執(zhí)行流程如圖5 所示。
步驟1通過流網(wǎng)絡(luò)構(gòu)建算法計(jì)算每條邊的容量大小,建立流網(wǎng)絡(luò)模型。
步驟2通過彈性資源調(diào)度算法定位性能瓶頸,制定調(diào)度策略。
步驟3通過狀態(tài)數(shù)據(jù)遷移算法執(zhí)行調(diào)度策略,實(shí)現(xiàn)集群規(guī)模的彈性伸縮。
圖5 FAR-Flink 策略執(zhí)行流程
根據(jù)流網(wǎng)絡(luò)模型可知,準(zhǔn)確評估網(wǎng)絡(luò)每條邊的容量大小對策略的執(zhí)行效果是至關(guān)重要的。經(jīng)實(shí)驗(yàn)分析得出,容量值與以下4 個(gè)因素有關(guān)。
1)節(jié)點(diǎn)間的網(wǎng)絡(luò)傳輸性能和傳輸資源占用情況,傳輸資源不足將導(dǎo)致容量值減小。
2)節(jié)點(diǎn)內(nèi)的計(jì)算性能和計(jì)算資源占用情況,計(jì)算資源不足導(dǎo)致容量值減小。
3)節(jié)點(diǎn)所承擔(dān)計(jì)算任務(wù)本身的復(fù)雜程度,計(jì)算任務(wù)越復(fù)雜則容量值越小。
4)下游節(jié)點(diǎn)的計(jì)算資源剩余情況,根據(jù)Netty組件的水位機(jī)制,下游節(jié)點(diǎn)的數(shù)據(jù)阻塞會引起反壓現(xiàn)象,導(dǎo)致上游節(jié)點(diǎn)容量值下降。
在流式計(jì)算平臺中,數(shù)據(jù)傳輸性能對計(jì)算的實(shí)時(shí)性影響較大,因網(wǎng)絡(luò)擁塞導(dǎo)致數(shù)據(jù)在節(jié)點(diǎn)緩存中被滯留是流式計(jì)算平臺面臨的主要性能瓶頸。當(dāng)一臺PC 機(jī)中關(guān)閉所有與Flink 無關(guān)的進(jìn)程時(shí),在純凈的網(wǎng)絡(luò)環(huán)境中,該節(jié)點(diǎn)實(shí)際可用于數(shù)據(jù)傳輸?shù)木W(wǎng)絡(luò)帶寬資源為
其中,Nvi(Data)為實(shí)際可用于節(jié)點(diǎn)間數(shù)據(jù)傳輸?shù)木W(wǎng)絡(luò)帶寬資源;為物理節(jié)點(diǎn)間原有的網(wǎng)絡(luò)帶寬資源總量,如節(jié)點(diǎn)間使用百兆帶寬的網(wǎng)絡(luò)連接,則為操作系統(tǒng)本身的靜態(tài)網(wǎng)絡(luò)傳輸開銷;為進(jìn)程間心跳信息的傳輸開銷;為節(jié)點(diǎn)間歇性執(zhí)行檢查點(diǎn)(checkpoint)時(shí)向HDFS 和Zookepper寫入狀態(tài)數(shù)據(jù)的傳輸開銷;為其他隨機(jī)因素可能產(chǎn)生的極少量傳輸開銷。單位均為MB/s。
因此,該節(jié)點(diǎn)對應(yīng)輸入鏈路的容量值為
其中,|Eji|為節(jié)點(diǎn)vi對應(yīng)輸入邊的數(shù)目,size(tuple.fi)為數(shù)據(jù)元組中每個(gè)字段所占空間的大小,單位為B。
根據(jù)上述分析,流網(wǎng)絡(luò)構(gòu)建算法的執(zhí)行過程如算法1 所示。
算法1流網(wǎng)絡(luò)構(gòu)建算法
輸入集群拓?fù)浣Y(jié)構(gòu)T,最高計(jì)算時(shí)延閾值θ
輸出流網(wǎng)絡(luò)G
在算法1 中,首先確定流網(wǎng)絡(luò)中頂點(diǎn)和邊的集合(步驟1)和步驟2));其次根據(jù)每個(gè)計(jì)算節(jié)點(diǎn)的網(wǎng)絡(luò)傳輸資源,計(jì)算節(jié)點(diǎn)對應(yīng)輸入邊的初始容量值(步驟3)~步驟6));最后在作業(yè)執(zhí)行過程中,根據(jù)計(jì)算時(shí)延的平均值與閾值之間的關(guān)系,對流網(wǎng)絡(luò)每條邊的容量值進(jìn)行反饋調(diào)節(jié)。當(dāng)平均計(jì)算時(shí)延大于設(shè)定的閾值,且對應(yīng)邊的流量值仍小于容量時(shí),表明設(shè)定的容量過大,則以η為步長減小對應(yīng)邊的容量;當(dāng)平均計(jì)算時(shí)延遠(yuǎn)小于設(shè)定的閾值,但流量值已經(jīng)接近容量時(shí),表明對應(yīng)邊的容量值過小,則以η為步長增大對應(yīng)邊的容量。實(shí)驗(yàn)數(shù)據(jù)表明,通過網(wǎng)絡(luò)開銷計(jì)算出的初始容量值往往是數(shù)據(jù)傳輸?shù)臉O限值,受計(jì)算開銷等其他因素的影響,實(shí)際合理的容量值應(yīng)小于初始值,因此需要通過反饋調(diào)節(jié)將容量值調(diào)整至合理的范圍內(nèi)。
通過流網(wǎng)絡(luò)構(gòu)建算法建立流網(wǎng)絡(luò)模型后,當(dāng)源點(diǎn)產(chǎn)生數(shù)據(jù)堆積時(shí),通過彈性資源調(diào)度算法合理分配計(jì)算負(fù)載、定位性能瓶頸并生成彈性資源調(diào)度計(jì)劃。
圖6 為彈性資源調(diào)度策略示意,假設(shè)圖中有劃分D=(X,Y),當(dāng)且僅當(dāng)
則Y集合中第一個(gè)算子O2有可能成為集群的性能瓶頸,需要增加資源并擴(kuò)大O2的并行度,其中λ是當(dāng)前劃分D=(X,Y)對應(yīng)流量與容量的比值,且有0.85≤λ≤1,即當(dāng)一個(gè)劃分對應(yīng)的流量達(dá)到容量值的85%時(shí),算法認(rèn)為該劃分對應(yīng)的算子可能成為集群的性能瓶頸。
圖6 彈性資源調(diào)度策略示意
基于資源優(yōu)化配置模型,彈性資源調(diào)度算法的執(zhí)行過程如算法2 所示。
算法2彈性資源調(diào)度算法
輸入流網(wǎng)絡(luò)G,流網(wǎng)絡(luò)的一個(gè)流f,數(shù)據(jù)源點(diǎn)每個(gè)分區(qū)的堆積值lags[]
輸出完成任務(wù)調(diào)度后新的流網(wǎng)絡(luò)模型G′
在算法2 中,根據(jù)增進(jìn)網(wǎng)絡(luò)與優(yōu)化路徑的定義,首先由流網(wǎng)絡(luò)G的一個(gè)流f生成對應(yīng)的增進(jìn)網(wǎng)絡(luò)Gf(步驟1)~步驟5)),當(dāng)數(shù)據(jù)源點(diǎn)產(chǎn)生堆積時(shí),在增進(jìn)網(wǎng)絡(luò)中尋找優(yōu)化路徑,并沿其方向提高原網(wǎng)絡(luò)的流量(步驟6)~步驟11))。當(dāng)增進(jìn)網(wǎng)絡(luò)中不存在優(yōu)化路徑時(shí),如果數(shù)據(jù)源點(diǎn)仍有堆積,則在流網(wǎng)絡(luò)中尋找流量與容量的比值大于λ(即滿足式(10))的劃分,增加其Y集合中第一個(gè)算子的并行度(步驟12)~步驟18)),從而得到需要擴(kuò)大并行度的算子。最后,通過調(diào)用狀態(tài)數(shù)據(jù)遷移算法(步驟19)),完成節(jié)點(diǎn)間的狀態(tài)數(shù)據(jù)遷移,從而實(shí)現(xiàn)計(jì)算資源的動(dòng)態(tài)調(diào)度。
在執(zhí)行彈性資源調(diào)度策略時(shí),節(jié)點(diǎn)間狀態(tài)數(shù)據(jù)遷移會產(chǎn)生大量網(wǎng)絡(luò)傳輸開銷,但Flink 狀態(tài)數(shù)據(jù)管理策略并不適用于高效的數(shù)據(jù)遷移。因此,通過提出狀態(tài)數(shù)據(jù)管理模型和遷移算法,降低數(shù)據(jù)傳輸開銷,提高遷移效率。
定義5桶。設(shè)算子On待處理的數(shù)據(jù)為一個(gè)二元組tuplei=(key,value),算子On共持有kn個(gè)桶,則該元組對應(yīng)的桶為
則對應(yīng)負(fù)責(zé)處理該元組的實(shí)例為
其中,vi為算子On的第i個(gè)實(shí)例,。這樣就建立了數(shù)據(jù)桶與計(jì)算節(jié)點(diǎn)間的映射關(guān)系,將桶作為計(jì)算節(jié)點(diǎn)維護(hù)狀態(tài)數(shù)據(jù)的基本單位。此外,參數(shù)kn是算子On所持有桶的數(shù)目,用戶可根據(jù)狀態(tài)數(shù)據(jù)的規(guī)模和復(fù)雜程度進(jìn)行適當(dāng)?shù)恼{(diào)整。
圖7 表示由狀態(tài)數(shù)據(jù)簇到分桶管理的映射方式,其中kn=10,算子并行度由|On|=3 擴(kuò)大至|On|=4。在狀態(tài)數(shù)據(jù)遷移算法中,結(jié)合Flink 支持的檢查點(diǎn)(checkpoint)機(jī)制,在執(zhí)行快照時(shí)將狀態(tài)數(shù)據(jù)以桶為單位發(fā)送至HDFS,同一個(gè)桶的數(shù)據(jù)存儲在相鄰的位置。當(dāng)算子的并行度改變時(shí),根據(jù)式(11)和式(12)重新計(jì)算每個(gè)桶到節(jié)點(diǎn)的映射關(guān)系,最后由節(jié)點(diǎn)從HDFS 中拉取對應(yīng)的狀態(tài)數(shù)據(jù),完成計(jì)算節(jié)點(diǎn)的狀態(tài)數(shù)據(jù)恢復(fù)。
結(jié)合狀態(tài)數(shù)據(jù)分桶管理的特點(diǎn),可提出狀態(tài)數(shù)據(jù)遷移算法如算法3 所示。
算法3狀態(tài)數(shù)據(jù)遷移算法
輸入需增加并行度的算子集 operator[]
輸出完成任務(wù)調(diào)度后新的流網(wǎng)絡(luò)模型G′
圖7 狀態(tài)數(shù)據(jù)遷移示意
在算法3 中,首先對每個(gè)節(jié)點(diǎn)執(zhí)行檢查點(diǎn)操作(步驟3)~步驟7)):將節(jié)點(diǎn)的狀態(tài)數(shù)據(jù)發(fā)送至HDFS,并將狀態(tài)數(shù)據(jù)的句柄信息保存在ZooKeeper中,記錄狀態(tài)數(shù)據(jù)對應(yīng)的桶。其次從資源池中獲取一個(gè)新的計(jì)算節(jié)點(diǎn),擴(kuò)大對應(yīng)算子的并行度,并由JobManager 重新計(jì)算狀態(tài)數(shù)據(jù)桶到計(jì)算節(jié)點(diǎn)的映射關(guān)系。再次根據(jù)新生成的集群拓?fù)浣Y(jié)果,對每個(gè)節(jié)點(diǎn)執(zhí)行數(shù)據(jù)恢復(fù)操作:從ZooKeeper 中獲取狀態(tài)數(shù)據(jù)對應(yīng)的句柄,并從HDFS 中拉取對應(yīng)的數(shù)據(jù)。最后再次執(zhí)行算法1,以修正每條邊對應(yīng)的容量值。
在流網(wǎng)絡(luò)構(gòu)建算法中,參數(shù)η是動(dòng)態(tài)調(diào)節(jié)容量值的步長。η過大會導(dǎo)致調(diào)整幅度過于劇烈,η過小會導(dǎo)致調(diào)整幅度不夠,2 種情況均會導(dǎo)致最終的容量值不準(zhǔn)確。因此參數(shù)η的計(jì)算方法如下。
根據(jù)定義1,c(vi,vj)是邊(vi,vj)每秒鐘允許傳輸?shù)淖畲笤M個(gè)數(shù),且1 s=1 000 ms。根據(jù)文獻(xiàn)[26]的時(shí)延統(tǒng)計(jì)算法,節(jié)點(diǎn)間傳輸數(shù)據(jù)的平均計(jì)算時(shí)延為
其中,vj.fi是數(shù)據(jù)元組到達(dá)節(jié)點(diǎn)vj的發(fā)現(xiàn)時(shí)間[26],vj.di是數(shù)據(jù)元組離開節(jié)點(diǎn)vj的完成時(shí)間[26]。則容量值為對應(yīng)節(jié)點(diǎn)在1 000 ms內(nèi)所能傳輸?shù)钠骄M數(shù)目,將式(13)代入得
由于η是調(diào)節(jié)容量值的步長,即容量函數(shù)的變化率,則對容量函數(shù)求導(dǎo)得
此外,根據(jù)容量值與計(jì)算時(shí)延的函數(shù)關(guān)系,通過記錄的平均計(jì)算時(shí)延,求得調(diào)節(jié)前與調(diào)節(jié)后的容量值偏差應(yīng)為
因此,為了避免調(diào)整過于劇烈導(dǎo)致容量值出現(xiàn)抖動(dòng)的現(xiàn)象,學(xué)習(xí)參數(shù)η取容量函數(shù)的導(dǎo)數(shù)與偏差的較小者,即
實(shí)驗(yàn)表明,通過式(17)分別計(jì)算流網(wǎng)絡(luò)中每條邊學(xué)習(xí)因子η,再通過算法1 調(diào)整對應(yīng)邊的容量函數(shù),能夠取得比較準(zhǔn)確的容量值,流網(wǎng)絡(luò)構(gòu)建算法有比較好的優(yōu)化效果。
由于流式計(jì)算對平臺的性能和實(shí)時(shí)性有很高的要求,因此彈性資源調(diào)度策略應(yīng)具備很高的性能,且不能引入過高的時(shí)間開銷。流網(wǎng)絡(luò)構(gòu)建算法首先遍歷流網(wǎng)絡(luò)的每條邊,以確定其容量的初始值;再遍歷每個(gè)節(jié)點(diǎn),從而對邊的容量值進(jìn)行反饋調(diào)節(jié),因此構(gòu)建算法的時(shí)間復(fù)雜度為T(n)=O(|V|+|E|)。
此外,彈性資源調(diào)度算法是基于廣度優(yōu)先搜索(BFS,breadth first search)實(shí)現(xiàn)的流量遞增算法,已知BFS 算法的時(shí)間復(fù)雜度為T(n)=O(|V|+|E|),且在算法2 中,3 次循環(huán)的最高循環(huán)次數(shù)分別為流網(wǎng)絡(luò)的節(jié)點(diǎn)數(shù)目、邊的數(shù)目以及當(dāng)前流量與彈性資源調(diào)度量的差值,因此彈性資源調(diào)度算法的時(shí)間復(fù)雜度為T(n)=O(|V|+(|V|+|E|)(fmax-f)+|D|),由于流網(wǎng)絡(luò)中邊的數(shù)目大于節(jié)點(diǎn)數(shù)目和劃分的數(shù)目,因此算法的時(shí)間復(fù)雜度為T(n)=O(|E|(fmax-f))。
數(shù)據(jù)遷移算法是典型的分布式并行化算法,其中的每個(gè)步驟都分別在不同的節(jié)點(diǎn)上執(zhí)行,節(jié)點(diǎn)之間通過ZooKeeper 進(jìn)行統(tǒng)一協(xié)調(diào),并且計(jì)算過程較為簡單,其時(shí)間開銷主要取決于狀態(tài)數(shù)據(jù)的規(guī)模和節(jié)點(diǎn)間網(wǎng)絡(luò)傳輸?shù)男阅?,因此沒有具體的數(shù)學(xué)計(jì)算式來表達(dá)其時(shí)間復(fù)雜度,本文5.4 節(jié)通過實(shí)驗(yàn)驗(yàn)證了算法的時(shí)間開銷較原系統(tǒng)有一定的下降。
為了驗(yàn)證FAR-Flink 策略的有效性,本文通過設(shè)置對比實(shí)驗(yàn),將FAR-Flink 與原生Flink 1.6.0 版本的系統(tǒng)形成對比,并與本文工作密切相關(guān)的Elastic Nephel[6]策略形成對比,通過執(zhí)行代表不同作業(yè)類型的典型Benchmark,驗(yàn)證了算法的優(yōu)化效果和執(zhí)行開銷,并分析了相關(guān)參數(shù)對算法的影響。
實(shí)驗(yàn)搭建的集群由21 臺同構(gòu)的PC 機(jī)組成。其中,F(xiàn)link 集群包含一個(gè)JobManager 節(jié)點(diǎn)、6 個(gè)TaskManager 節(jié)點(diǎn),資源池中包含4 個(gè)TaskManager備用節(jié)點(diǎn),在執(zhí)行彈性資源調(diào)度計(jì)劃動(dòng)態(tài)擴(kuò)展資源時(shí)啟動(dòng)備用節(jié)點(diǎn),并部署相應(yīng)的計(jì)算任務(wù)。其他相關(guān)組件包括3 個(gè)節(jié)點(diǎn)構(gòu)成的Hadoop 集群、3 個(gè)節(jié)點(diǎn)構(gòu)成的Kafka 集群和3 個(gè)節(jié)點(diǎn)構(gòu)成的ZooKeeper 集群。另外,由一個(gè)獨(dú)立的節(jié)點(diǎn)執(zhí)行流網(wǎng)絡(luò)構(gòu)建算法和彈性資源調(diào)度算法。其中,每個(gè)同構(gòu)節(jié)點(diǎn)的硬件配置和軟件配置分別如表1 和表2 所示。
表1 節(jié)點(diǎn)硬件配置參數(shù)
表2 節(jié)點(diǎn)軟件配置參數(shù)
同時(shí),為了使Flink 集群達(dá)到最優(yōu)的計(jì)算性能,根據(jù)現(xiàn)有的軟硬件環(huán)境,對Flink 的相關(guān)配置參數(shù)進(jìn)行了調(diào)整,其中重要的配置項(xiàng)及其參數(shù)值如表3所示。
表3 性能參數(shù)配置
在流網(wǎng)絡(luò)模型中,每條邊的容量值應(yīng)客觀反映節(jié)點(diǎn)的計(jì)算能力和節(jié)點(diǎn)間的傳輸能力,容量值評估不準(zhǔn)確可能會導(dǎo)致計(jì)算時(shí)延升高而無法滿足計(jì)算的實(shí)時(shí)性要求。為了探究容量函數(shù)與計(jì)算時(shí)延的關(guān)系,實(shí)驗(yàn)為每條邊設(shè)置相等的容量值,并不斷遞增。同時(shí),基于Flink Metrics 體系的Latency Tracking機(jī)制采集的計(jì)算時(shí)延如圖8 所示。
圖8 容量與計(jì)算時(shí)延關(guān)系
由圖8 可以看出,隨著時(shí)間的推移,容量值持續(xù)增大,每個(gè)TaskSlot 的計(jì)算時(shí)延均有一定的波動(dòng),但總體呈上升趨勢,甚至出現(xiàn)階躍上升的現(xiàn)象,且計(jì)算時(shí)延越高,其波動(dòng)變化越劇烈。這表明過高地估計(jì)節(jié)點(diǎn)的計(jì)算能力,會導(dǎo)致數(shù)據(jù)元組被阻塞、計(jì)算時(shí)延升高,但在滿足計(jì)算時(shí)延約束的前提下容量值總會被盡可能地增加,從而提高集群的吞吐量。因此,在當(dāng)前實(shí)驗(yàn)環(huán)境中,當(dāng)容量函數(shù)取30 000~40 000 tuple/s 時(shí),平均計(jì)算時(shí)延在400 ms 以下,這是比較合理的取值區(qū)間。
此外,在流網(wǎng)絡(luò)構(gòu)建算法中,參數(shù)η確定了調(diào)節(jié)容量函數(shù)的步長,對流網(wǎng)絡(luò)的穩(wěn)定性和收斂速率有決定性作用。如圖9 所示,當(dāng)使用固定的η取值執(zhí)行構(gòu)建算法時(shí),流網(wǎng)絡(luò)中存在收斂速率和穩(wěn)定性難以權(quán)衡的問題:較大的η值會導(dǎo)致容量值劇烈波動(dòng)而無法趨于穩(wěn)定,較小的η值會導(dǎo)致網(wǎng)絡(luò)收斂時(shí)間較長(約50~60 s)。但通過式(17)計(jì)算的動(dòng)態(tài)η取值能夠客觀反映當(dāng)前容量值與理想值的差,隨著η的不斷減小,流網(wǎng)絡(luò)的容量值迅速收斂于理想值,并逐漸趨于穩(wěn)定。
圖9 學(xué)習(xí)參數(shù)對比
文獻(xiàn)[6]提出針對Flink 平臺內(nèi)核的彈性資源調(diào)度策略,與本文工作的研究目標(biāo)最接近。為了驗(yàn)證FAR-Flink 的優(yōu)化效果,實(shí)驗(yàn)在Flink 原系統(tǒng)、Elastic Nephel[6]和FAR-Flink 上分別執(zhí)行WordCount、TwitterSentiment、IncrementalLearing 和Streaming-Benchmarks 代表不同作業(yè)類型的標(biāo)準(zhǔn)Benchmark,它們分別來自Flink 源碼中的示例和Intel 等公司在開源社區(qū)Github 上的貢獻(xiàn)。實(shí)驗(yàn)結(jié)果分別如圖10~圖13 所示。
采用與5.2 節(jié)相同的實(shí)驗(yàn)配置,分別在原系統(tǒng)和FAR-Flink 上執(zhí)行WordCount 作業(yè),并采集Kafka緩沖池中的數(shù)據(jù)堆積和計(jì)算時(shí)延,如圖10 所示。由圖10(a)和圖10(b)可知,計(jì)算時(shí)延與Kafka 中的數(shù)據(jù)堆積呈正相關(guān),即實(shí)驗(yàn)驗(yàn)證了數(shù)據(jù)堆積導(dǎo)致時(shí)延升高的理論推測是正確的。此外,由于原系統(tǒng)執(zhí)行默認(rèn)的資源調(diào)度策略,因資源不足導(dǎo)致計(jì)算時(shí)延不斷升高。FAR-Flink 通過彈性資源調(diào)度算法合理分配Kafka 中的堆積數(shù)據(jù),并動(dòng)態(tài)增加了算子OFlatMap的并行度。但由于執(zhí)行狀態(tài)數(shù)據(jù)遷移有一定的時(shí)間開銷,導(dǎo)致數(shù)據(jù)堆積有短暫的上升。從圖10(c)中可以看出,Elastic Nephel(EN)的時(shí)延上升時(shí)間較長。FAR-Flink 執(zhí)行任務(wù)遷移過程中,持續(xù)時(shí)間比EN 縮短了約40 s。
圖10 WordCount 作業(yè)執(zhí)行效率對比
TwitterSentiment 是在生產(chǎn)環(huán)境中實(shí)際應(yīng)用的標(biāo)準(zhǔn)Benchmark。實(shí)驗(yàn)采用與文獻(xiàn)[32]相同的實(shí)驗(yàn)配置,并以10 s 為周期采集節(jié)點(diǎn)的內(nèi)存使用率。得到如圖11 所示的實(shí)驗(yàn)結(jié)果。
由圖11 可知,隨著計(jì)算負(fù)載的不斷升高,EN和FAR-Flink 分別在第540 s 和第720 s 各增加了一個(gè)節(jié)點(diǎn),擴(kuò)大算子并行度并執(zhí)行狀態(tài)數(shù)據(jù)遷移。但由于作業(yè)執(zhí)行中產(chǎn)生狀態(tài)數(shù)據(jù)并占用大量內(nèi)存資源,狀態(tài)數(shù)據(jù)遷移過程產(chǎn)生了一定的時(shí)間開銷:EN系統(tǒng)的2 次遷移分別消耗約14 s 和18 s,F(xiàn)AR-Flink分別消耗約7 s 和13 s。但FAR-Flink 遷移過程中吞吐量較低且內(nèi)存資源消耗較高,這是因?yàn)閳?zhí)行restore 操作需要一定時(shí)間,且遷移過程中產(chǎn)生大量的HDFS 寫操作。但隨著計(jì)算資源的增加,系統(tǒng)的吞吐量有明顯的上升,基本滿足輸入負(fù)載的要求,且內(nèi)存資源占用降低至合理可接受的范圍內(nèi),這說明2 種彈性資源調(diào)度策略均有效提升集群性能。
圖11 TwitterSentiment 作業(yè)執(zhí)行情況對比
為了驗(yàn)證FAR-Flink 在高計(jì)算復(fù)雜度、高CPU資源占用場景下的優(yōu)化效果,實(shí)驗(yàn)運(yùn)行了IncrementalLearing 作業(yè),并以10 s 為周期采集節(jié)點(diǎn)的CPU 利用率,得到如圖12 所示的實(shí)驗(yàn)結(jié)果。
圖12 IncrementalLearing 作業(yè)執(zhí)行情況
由圖12 可知,集群執(zhí)行彈性資源調(diào)度策略計(jì)劃的過程需要一定的時(shí)間開銷,EN 和FAR-Flink都在第600 s 左右增加了一個(gè)計(jì)算節(jié)點(diǎn),其中EN執(zhí)行策略的時(shí)間開銷較高(約42 s),但執(zhí)行過程中其他節(jié)點(diǎn)計(jì)算性能下降,集群吞吐量驟減。FAR-Flink 執(zhí)行策略的時(shí)間開銷較低(約18 s),但執(zhí)行restore 過程中整個(gè)集群有極短暫的停滯,其中第600 s 時(shí)檢測輸出數(shù)據(jù)量為0 tuple/s,在短暫的時(shí)間內(nèi)恢復(fù)了任務(wù)所有計(jì)算節(jié)點(diǎn)的狀態(tài)數(shù)據(jù),因此在下一個(gè)階段的吞吐量值急劇回升。在資源利用方面,EN 在數(shù)據(jù)遷移過程中的CPU 利用率值劇烈抖動(dòng),F(xiàn)AR-Flink 的CPU 利用率值急劇下降后又快速回升,這都與其執(zhí)行調(diào)度策略的過程相關(guān),與吞吐量的檢測結(jié)果是一致的。總體上,2 種策略均通過增加計(jì)算資源提高了集群性能,F(xiàn)AR-Flink 有效縮短了EN 執(zhí)行調(diào)度策略的時(shí)間,但其執(zhí)行過程中計(jì)算任務(wù)有非常短暫的停滯。
為了進(jìn)一步驗(yàn)證FAR-Flink 在實(shí)際應(yīng)用場景下的性能,實(shí)驗(yàn)采用Yahoo 公司在GitHub 上開源的Streaming-Benchmarks[32],并分別從吞吐量、計(jì)算時(shí)延、Kafka 數(shù)據(jù)堆積、堆內(nèi)存利用率、CPU 利用率5 個(gè)不同的維度,監(jiān)測集群和作業(yè)的運(yùn)行情況,監(jiān)測結(jié)果如圖13 所示。
由圖13 可以看出,隨著計(jì)算負(fù)載的持續(xù)上升,EN 和FAR-Flink 系統(tǒng)都分別出現(xiàn)資源不足導(dǎo)致集群性能下降的問題。其中,EN 系統(tǒng)連續(xù)動(dòng)態(tài)增加了2 個(gè)計(jì)算節(jié)點(diǎn),由于計(jì)算復(fù)雜且節(jié)點(diǎn)狀態(tài)數(shù)據(jù)規(guī)模較大,數(shù)據(jù)遷移過程產(chǎn)生了較高的時(shí)間開銷,在第一次數(shù)據(jù)遷移后集群吞吐量有少量的回升就立刻進(jìn)入第二次遷移,整個(gè)遷移過程共持續(xù)約273 s。FAR-Flink 首先通過彈性資源調(diào)度算法合理分配計(jì)算負(fù)載,并在第600 s 和第780 s 時(shí)分別動(dòng)態(tài)增加一個(gè)計(jì)算節(jié)點(diǎn),其數(shù)據(jù)遷移過程分別持續(xù)了21.6 s和36.7 s,較EN 系統(tǒng)的數(shù)據(jù)遷移時(shí)間有明顯的下降。同時(shí),2 種算法均通過動(dòng)態(tài)增加計(jì)算資源有效提升了集群性能,其中EN 系統(tǒng)的數(shù)據(jù)遷移時(shí)間較長,但遷移過程中集群可保證較低的吞吐量。FAR-Flink 系統(tǒng)能夠在前期合理分配計(jì)算負(fù)載,且有效縮短了數(shù)據(jù)遷移過程的時(shí)間開銷,但遷移過程中作業(yè)有極短暫的停滯,遷移完成后集群性能有較明顯的上升。
綜上所述,實(shí)驗(yàn)在4 種資源調(diào)度策略下分別執(zhí)行不同的標(biāo)準(zhǔn)Benchmark,通過性能對比得出了不同算法的優(yōu)缺點(diǎn)及適用場景,實(shí)驗(yàn)結(jié)果如表4 所示。實(shí)驗(yàn)證明,F(xiàn)AR-Flink 通過合理分配計(jì)算負(fù)載、動(dòng)態(tài)增加計(jì)算資源、降低數(shù)據(jù)遷移開銷3種策略相結(jié)合的方式,有效提高集群性能。與原系統(tǒng)相比,在計(jì)算負(fù)載波動(dòng)上升期間,針對不同類型的Benchmark,集群吞吐量平均提高了27.61%,狀態(tài)數(shù)據(jù)遷移時(shí)間平均縮短了45.36%,具有一定的優(yōu)化效果。
圖13 Streaming-Benchmarks 作業(yè)執(zhí)行效率
表4 對比實(shí)驗(yàn)結(jié)果
為了準(zhǔn)確評估FAR-Flink 執(zhí)行數(shù)據(jù)遷移過程產(chǎn)生的網(wǎng)絡(luò)傳輸開銷,驗(yàn)證數(shù)據(jù)遷移不會長時(shí)間占用過多的網(wǎng)絡(luò)傳輸資源而影響集群性能,實(shí)驗(yàn)分別執(zhí)行TwitterSentiment 和Streaming-Benchmarks 作業(yè),并監(jiān)測網(wǎng)絡(luò)傳輸數(shù)據(jù)以評估網(wǎng)絡(luò)傳輸開銷,得到如圖14 所示的實(shí)驗(yàn)結(jié)果。
由圖14 可知,除去節(jié)點(diǎn)間可忽略的靜態(tài)數(shù)據(jù)傳輸外,計(jì)算節(jié)點(diǎn)會依據(jù)相關(guān)配置周期性地執(zhí)行狀態(tài)數(shù)據(jù)快照,并向HDFS 發(fā)送數(shù)據(jù),在執(zhí)行動(dòng)態(tài)資源調(diào)度計(jì)劃時(shí),節(jié)點(diǎn)會從HDFS 拉取相應(yīng)的數(shù)據(jù)并執(zhí)行狀態(tài)數(shù)據(jù)的恢復(fù)操作。在2 種策略執(zhí)行過程中,需要傳輸?shù)臄?shù)據(jù)總量是基本相同的。但由于EN 以塊(bulk)為單位從遠(yuǎn)端拉取數(shù)據(jù),其數(shù)據(jù)分布較為分散且存在大量碎片化數(shù)據(jù),因此單位時(shí)間內(nèi)的數(shù)據(jù)傳輸速率較低,傳輸時(shí)間較長。FAR-Flink 以桶(bucket)為單位從遠(yuǎn)端拉取數(shù)據(jù),其數(shù)據(jù)分布較為集中且?guī)缀鯖]有碎片化數(shù)據(jù),計(jì)算節(jié)點(diǎn)在短時(shí)間內(nèi)集中拉取需要的數(shù)據(jù),數(shù)據(jù)傳輸速率較高,傳輸時(shí)間較短。
綜上所述,通過基于分簇和分桶的狀態(tài)數(shù)據(jù)遷移算法,合理應(yīng)對碎片化數(shù)據(jù)傳輸?shù)膯栴},有效降低數(shù)據(jù)遷移的網(wǎng)絡(luò)傳輸開銷,通過提高傳輸效率縮短執(zhí)行動(dòng)態(tài)資源調(diào)度策略的時(shí)間。但實(shí)驗(yàn)結(jié)果表明,這種方式仍會產(chǎn)生一定的時(shí)間開銷,在合理可接受的范圍內(nèi)對執(zhí)行效率有一定的影響,如何進(jìn)一步提高數(shù)據(jù)遷移效率以縮短遷移時(shí)間,將是下一步研究工作的主要方向。
圖14 數(shù)據(jù)遷移的網(wǎng)絡(luò)傳輸開銷對比
作為集流處理與批處理為一體的統(tǒng)一大數(shù)據(jù)處理平臺,Apache Flink 得到學(xué)術(shù)界和產(chǎn)業(yè)界的廣泛關(guān)注,但其可擴(kuò)展性和可伸縮性不足的問題,已經(jīng)嚴(yán)重制約了平臺的發(fā)展。本文提出了基于流網(wǎng)絡(luò)的數(shù)據(jù)流動(dòng)態(tài)資源調(diào)度策略,通過合理分配負(fù)載、動(dòng)態(tài)增加資源、高效數(shù)據(jù)遷移3 種策略相結(jié)合的方式,從根本上解決了因計(jì)算資源不足而影響集群性能的問題,并有效降低了網(wǎng)絡(luò)傳輸?shù)臅r(shí)間開銷。
但本文算法也存在一定的局限性,首先,本文算法對ZooKeeper有很強(qiáng)的依賴性,需要ZooKeeper中保存相關(guān)的數(shù)據(jù)結(jié)構(gòu);其次,狀態(tài)數(shù)據(jù)遷移過程中作業(yè)會有極短暫的停滯(約2~3 s),但算法的執(zhí)行開銷在可接受的范圍內(nèi)。因此,未來的研究工作將主要集中于以下3 個(gè)方面。
1)降低資源調(diào)度策略對ZooKeeper 的依賴程度,在限制單點(diǎn)計(jì)算和傳輸負(fù)載的前提下,嘗試由JobManager 統(tǒng)一提供資源調(diào)度和數(shù)據(jù)共享服務(wù)。
2)通過提出新的狀態(tài)數(shù)據(jù)管理和遷移策略,降低數(shù)據(jù)遷移開銷,縮短調(diào)度計(jì)劃的執(zhí)行時(shí)間,提高執(zhí)行效率。
3)通過提出計(jì)算任務(wù)的熱部署方案和節(jié)點(diǎn)間狀態(tài)數(shù)據(jù)實(shí)時(shí)共享策略,實(shí)現(xiàn)對用戶作業(yè)無感知的、完全透明的在線資源調(diào)度策略。