梁毅 程石帆 常世祿 劉飛
摘要:分布式內(nèi)存計(jì)算平臺(tái)Spark是海量數(shù)據(jù)處理領(lǐng)域的最新技術(shù)進(jìn)展。動(dòng)態(tài)資源分配下Spark可根據(jù)應(yīng)用的負(fù)載情況動(dòng)態(tài)地追增、關(guān)閉任務(wù)執(zhí)行器。然而,關(guān)閉任務(wù)執(zhí)行器會(huì)造成緩存數(shù)據(jù)丟失,導(dǎo)致不必要的重計(jì)算開(kāi)銷,該情況在Spark交互式數(shù)據(jù)查詢應(yīng)用中尤為常見(jiàn)。為盡量減少任務(wù)執(zhí)行器關(guān)閉以提升查詢效率,設(shè)計(jì)實(shí)現(xiàn)一種基于預(yù)測(cè)的Spark動(dòng)態(tài)資源分配策略。該策略基于馬爾科夫理論構(gòu)建Spark交互式數(shù)據(jù)查詢應(yīng)用的非活躍期持續(xù)時(shí)間預(yù)測(cè)模型,并依據(jù)預(yù)測(cè)結(jié)果確定任務(wù)執(zhí)行器的關(guān)閉時(shí)機(jī)。試驗(yàn)結(jié)果表明,相比既有的Spark動(dòng)態(tài)資源分配策略,采用基于預(yù)測(cè)的資源分配策略可使Spark交互式數(shù)據(jù)查詢效率平均提升59.34%。
關(guān)鍵詞:分布式計(jì)算平臺(tái);Spark;大數(shù)據(jù)處理技術(shù);動(dòng)態(tài)資源分配;數(shù)據(jù)查詢
DOIDOI:10.11907/rjdk.181493
中圖分類號(hào):TP3-05
文獻(xiàn)標(biāo)識(shí)碼:A文章編號(hào)文章編號(hào):1672?7800(2018)012?0043?05
Prediction?based Dynamic Resource Allocation Strategy for Spark Platform
LIANG Yi, CHENG Shi?fan, CHANG Shi?lu, LIU Fei
(Computer Academy,Beijing University of Technology,Beijing 100124,China)
Abstract:The distributed in?memory computing framework Spark is the latest technological advancement in the field of massive data processing. Under dynamic resource allocation, Spark can dynamically increase and close executors according to the workload of the application. However, removing executors would result in the loss of cached data and lead to unnecessary recomputing cost. This situation is particularly common in Spark interactive data query applications. Therefore, it is necessary to minimize the closing of the executors to improve the query efficiency. This paper designs and implements a prediction?based dynamic resource allocation strategy for Spark platform. This strategy constructs a non?active duration prediction model of Spark interactive data query application based on Markov theory, and determines the closing time of executors according to the prediction result. The experimental results show that compared with Sparks dynamic resource allocation strategy, the efficiency of Sparks interactive data query can be improved by 59.34%.
Key Words:distributed comuting platform; Spark; big data processing technology; dynamic resource allocation; data query
0?引言
隨著互聯(lián)網(wǎng)蓬勃發(fā)展,當(dāng)今社會(huì)已進(jìn)入大數(shù)據(jù)時(shí)代[1]。與傳統(tǒng)數(shù)據(jù)不同,大數(shù)據(jù)時(shí)代的數(shù)據(jù)具有4個(gè)顯著特征:規(guī)模性、多樣性、高速性和價(jià)值性。為了應(yīng)對(duì)該新特征,利用多個(gè)計(jì)算節(jié)點(diǎn)協(xié)同計(jì)算以增強(qiáng)數(shù)據(jù)處理能力的分布式數(shù)據(jù)處理技術(shù)受到學(xué)術(shù)界和工業(yè)界廣泛關(guān)注[2]。Spark是繼Hadoop之后的下一代大數(shù)據(jù)核心處理技術(shù),是海量數(shù)據(jù)處理領(lǐng)域的最新技術(shù)進(jìn)展[3]。
Spark平臺(tái)所有任務(wù)均在任務(wù)執(zhí)行器中執(zhí)行,任務(wù)執(zhí)行器是包含CPU資源和內(nèi)存資源的載體。為了充分利用平臺(tái)資源,Spark提供動(dòng)態(tài)資源分配技術(shù)。動(dòng)態(tài)資源分配技術(shù)可根據(jù)Spark應(yīng)用負(fù)載到達(dá)強(qiáng)度,追增或關(guān)閉任務(wù)執(zhí)行器。如果任務(wù)執(zhí)行器閑置時(shí)間超過(guò)用戶設(shè)定的閾值,則會(huì)關(guān)閉該任務(wù)執(zhí)行器。同樣地,在交互式數(shù)據(jù)查詢應(yīng)用(下稱“應(yīng)用”)下,如果連續(xù)兩個(gè)查詢間隔時(shí)間超過(guò)了用戶設(shè)定的閾值,也會(huì)關(guān)閉該應(yīng)用任務(wù)執(zhí)行器,造成緩存數(shù)據(jù)丟失。下次查詢到來(lái)時(shí),如果使用丟失的緩存數(shù)據(jù)就會(huì)帶來(lái)重計(jì)算開(kāi)銷,影響查詢的響應(yīng)時(shí)間。因此,優(yōu)化海量數(shù)據(jù)處理平臺(tái)下的動(dòng)態(tài)資源分配方式受到學(xué)術(shù)界廣泛關(guān)注。Hadoop平臺(tái)動(dòng)態(tài)資源分配優(yōu)化主要是解決Map和Reduce階段的數(shù)據(jù)傾斜問(wèn)題[4?5]以及任務(wù)執(zhí)行本地化問(wèn)題[6?7],通常依據(jù)對(duì)任務(wù)執(zhí)行特征和數(shù)據(jù)分布特征調(diào)整不同節(jié)點(diǎn)的資源。也有一些動(dòng)態(tài)資源分配研究是關(guān)于流式處理平臺(tái)Storm[8]或Spark Streaming[9]的,主要針對(duì)流式處理中數(shù)據(jù)到達(dá)率的不同,對(duì)流式處理應(yīng)用所占用資源進(jìn)行動(dòng)態(tài)增減[10?11]。還有一些針對(duì)云環(huán)境下的動(dòng)態(tài)資源分配,面向云環(huán)境下不同計(jì)算框架對(duì)計(jì)算資源進(jìn)行追增或減少[12?13]。
上述既有動(dòng)態(tài)資源策略的優(yōu)化方法不能平移到既有Spark平臺(tái)的動(dòng)態(tài)資源分配上。因?yàn)镾park以任務(wù)執(zhí)行器作為任務(wù)載體,不存在Map和Reduce階段,也不存在多計(jì)算框架共用資源,而Spark應(yīng)用中除了流式應(yīng)用,還包含在線交互式數(shù)據(jù)查詢等多種應(yīng)用。
本文分析了既有Spark動(dòng)態(tài)資源分配的不足,提出基于馬爾可夫預(yù)測(cè)結(jié)果的Spark動(dòng)態(tài)資源分配策略。該策略可先根據(jù)應(yīng)用歷史非活躍期的持續(xù)時(shí)間預(yù)測(cè)應(yīng)用下次處于非活躍期的時(shí)間變化,再根據(jù)預(yù)測(cè)結(jié)果決定應(yīng)用下次處于非活躍期時(shí)是否關(guān)閉任務(wù)執(zhí)行器。
1?相關(guān)技術(shù)
1.1?Spark系統(tǒng)簡(jiǎn)介
Apache Spark是一個(gè)圍繞速度、易用性和復(fù)雜分析構(gòu)建的大數(shù)據(jù)處理框架。最初在2009年由加州大學(xué)伯克利分校AMPLab開(kāi)發(fā),并于2010年成為Apache的開(kāi)源項(xiàng)目之一。在Spark缺省的Standalone部署模式下,其架構(gòu)如圖1所示。
Spark平臺(tái)采用Master/Slave架構(gòu),包含一個(gè)Master和多個(gè)Worker。其中,Master主要負(fù)責(zé)管理平臺(tái)中用戶提交的應(yīng)用和資源分配,Worker主要負(fù)責(zé)為應(yīng)用啟動(dòng)任務(wù)執(zhí)行器(Executor);Driver中SparkContext是用戶在客戶端(Client)編寫(xiě)的對(duì)象。SparkContext中包含一個(gè)DAGScheduler對(duì)象、一個(gè)TaskScheduler對(duì)象和一個(gè)SchedulerBackend對(duì)象。在Spark平臺(tái)架構(gòu)中,Executor是任務(wù)真正執(zhí)行和緩存數(shù)據(jù)真正存儲(chǔ)的地方。
1.2?Spark運(yùn)行環(huán)境
Spark運(yùn)行環(huán)境如圖2所示。Spark引入新的抽象彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets,RDDs)[14]作為分布式數(shù)據(jù)集的抽象表達(dá)。RDDs將輸入數(shù)據(jù)和計(jì)算過(guò)程中產(chǎn)生的中間數(shù)據(jù)盡量保存在內(nèi)存中,直到計(jì)算流程的最后階段才寫(xiě)入持久化存儲(chǔ),從而減少多次讀寫(xiě)磁盤(pán)的I/O消耗。并且,Spark大數(shù)據(jù)平臺(tái)以有向無(wú)環(huán)圖(Directed Acyclic Graph,DAG)描述更加復(fù)雜的數(shù)據(jù)處理邏輯,并提供了更加豐富的操作算子[15?16]。
1.3?Spark動(dòng)態(tài)資源分配
Spark提供了以任務(wù)執(zhí)行器為粒度的動(dòng)態(tài)資源分配方式[19]。以任務(wù)執(zhí)行器為粒度是指動(dòng)態(tài)資源分配方式可以根據(jù)不同階段的負(fù)載強(qiáng)度,以任務(wù)執(zhí)行器為粒度,動(dòng)態(tài)地追增或回收該應(yīng)用占用的CPU和內(nèi)存資源。在以任務(wù)執(zhí)行器為粒度的動(dòng)態(tài)資源分配方式下,Spark會(huì)周期性地檢測(cè)某個(gè)應(yīng)用是否存在任務(wù)處于等待調(diào)度狀態(tài),如果是,就為該應(yīng)用追增任務(wù)執(zhí)行器;否則,結(jié)束該輪追增過(guò)程。Spark每次為應(yīng)用追增任務(wù)執(zhí)行器的數(shù)量與檢測(cè)次數(shù)成2的冪次方函數(shù)關(guān)系。例如:Spark在第一次檢測(cè)到應(yīng)用需要被追增任務(wù)執(zhí)行器時(shí),只會(huì)追增1個(gè)任務(wù)執(zhí)行器,在第二、三次檢測(cè)到應(yīng)用需要被追增任務(wù)執(zhí)行器時(shí),就會(huì)追增2、4個(gè)任務(wù)執(zhí)行器,以此類推,直到檢測(cè)到該應(yīng)用不存在任務(wù)處于等待調(diào)度狀態(tài)?;厥杖蝿?wù)執(zhí)行器的流程相對(duì)簡(jiǎn)單,當(dāng)任務(wù)執(zhí)行器的“閑置(沒(méi)有任務(wù)執(zhí)行)”時(shí)間超過(guò)用戶設(shè)定的閾值時(shí),直接關(guān)閉任務(wù)執(zhí)行器。關(guān)閉任務(wù)執(zhí)行器既可以釋放任務(wù)執(zhí)行器的CPU資源,又可以釋放任務(wù)執(zhí)行器的內(nèi)存資源。如果應(yīng)用的任務(wù)執(zhí)行器數(shù)量已達(dá)到用戶設(shè)置的閾值,即使這些任務(wù)執(zhí)行器處于閑置狀態(tài)的持續(xù)時(shí)間已經(jīng)超過(guò)用戶設(shè)定的閾值,也不會(huì)觸發(fā)關(guān)閉任務(wù)執(zhí)行器機(jī)制。也就是說(shuō),每個(gè)應(yīng)用都有自己的最小任務(wù)執(zhí)行器數(shù)量。
2?基于馬爾可夫模型預(yù)測(cè)的Spark動(dòng)態(tài)資源分配
2.1?馬爾可夫模型
馬爾可夫模型(Markov Model)是一種統(tǒng)計(jì)模型,用于研究隨機(jī)過(guò)程。適用于馬爾可夫模型的隨機(jī)過(guò)程稱作馬爾可夫過(guò)程。
2.1.1?馬爾可夫過(guò)程
馬爾可夫(1956-1922)是俄國(guó)著名數(shù)學(xué)家,馬爾可夫過(guò)程因被他提出而命名。馬爾可夫過(guò)程簡(jiǎn)稱馬氏過(guò)程,它主要描述了實(shí)際可能會(huì)遇到的一種隨機(jī)過(guò)程,其特點(diǎn)是在當(dāng)前時(shí)刻狀態(tài)已知的條件下,隨機(jī)過(guò)程所處狀態(tài)僅與當(dāng)前時(shí)刻狀態(tài)有關(guān),而與過(guò)程在前的狀態(tài)無(wú)關(guān),該特性被稱為馬爾可夫性或無(wú)后效性?,F(xiàn)實(shí)生活中有許多過(guò)程都是馬爾可夫過(guò)程。馬爾可夫過(guò)程的數(shù)學(xué)定義如下:
定義1?設(shè)?{X(t),t∈T}為一隨機(jī)過(guò)程,如果對(duì)于時(shí)間t的任意n個(gè)值t?1<t?2<…<t?n,在X(t?i)=x?i,i=1,2,…,n-1的條件下,X(t?n)的分布函數(shù)恰好等于在X(t?n-1)=X?n-1條件下X(t?n)?的分布函數(shù),即:
則稱隨機(jī)過(guò)程?X(t)?為馬爾可夫過(guò)程。
Spark交互式數(shù)據(jù)查詢應(yīng)用非活躍期持續(xù)時(shí)間的變化是無(wú)規(guī)律的,非活躍期持續(xù)時(shí)間變長(zhǎng)、變短或不變都只與當(dāng)前狀態(tài)有關(guān),而與歷史狀態(tài)無(wú)關(guān)。因此,可以使用馬爾可夫模型預(yù)測(cè)Spark交互式數(shù)據(jù)查詢應(yīng)用非活躍期持續(xù)時(shí)間的變化。
2.1.2?馬爾可夫鏈
馬爾克夫鏈?zhǔn)邱R爾可夫過(guò)程中最簡(jiǎn)單的一類。定義如下:
定義2?設(shè)馬爾可夫過(guò)程?{X(t),t∈T}的狀態(tài)空間為I,且對(duì)離散空間I中的隨機(jī)序列{X?n,n=0,1,2…},若在任意時(shí)刻n以及任意的狀態(tài)i?0,i?1,…,i?n-1?,滿足:
則隨機(jī)序列?{X?n,n=0,1,2…}?為馬爾可夫鏈。
2.1.3?一步轉(zhuǎn)移概率
定義3?設(shè)隨機(jī)序列?{X?n,n=0,1,2…}為一條馬爾可夫鏈,狀態(tài)空間為I?,稱條件概率,式(3)為馬爾可夫鏈在時(shí)刻?n?的一步轉(zhuǎn)移概率。
由于從狀態(tài)i出發(fā),經(jīng)過(guò)一步轉(zhuǎn)移后,必須能夠到達(dá)狀態(tài)空間I的一個(gè)狀態(tài),故一步轉(zhuǎn)移概率p?ij(n)需滿足下列條件:①p?ij(n)≥0,i,j∈I;②∑?p?ij(n)=1,i∈I。
定義4?若任意?i,j∈I,馬爾可夫鏈{X?n,n=0,1,2…}的轉(zhuǎn)移概率p?ij(n)與n無(wú)關(guān),則稱馬爾可夫鏈?zhǔn)驱R次的,并令p?ij(n)為p?ij?。
2.1.4?狀態(tài)轉(zhuǎn)移矩陣
定義5?設(shè)?P為一步轉(zhuǎn)移概率p?ij?所組成的矩陣為一步轉(zhuǎn)移概率矩陣,則式(4)為一步轉(zhuǎn)移概率矩陣。
通過(guò)馬爾可夫模型,可以對(duì)某個(gè)隨機(jī)問(wèn)題歷史數(shù)據(jù)構(gòu)成的時(shí)間序列進(jìn)行分析,根據(jù)該時(shí)間序列中各狀態(tài)之間的一步轉(zhuǎn)移概率構(gòu)造出一步轉(zhuǎn)移概率矩陣,然后使用轉(zhuǎn)移概率矩陣與當(dāng)前狀態(tài)作為輸入,給出下一個(gè)狀態(tài)的預(yù)測(cè)狀態(tài)。
2.2?交互式數(shù)據(jù)應(yīng)用非活躍期持續(xù)時(shí)間預(yù)測(cè)
由于Spark交互式數(shù)據(jù)查詢應(yīng)用非活躍期持續(xù)時(shí)間是波動(dòng)的,該策略利用馬爾可夫鏈方法,建立了一個(gè)隨機(jī)狀態(tài)鏈,將交互式應(yīng)用非活躍期持續(xù)時(shí)間看作是馬爾可夫過(guò)程中的各個(gè)狀態(tài),對(duì)交互式應(yīng)用非活躍期持續(xù)時(shí)間進(jìn)行基于概率的預(yù)測(cè)。該策略中狀態(tài)空間的定義為定義6。
定義6?狀態(tài)空間?I?。該策略中馬爾可夫模型劃分各個(gè)狀態(tài),用以描述未來(lái)交互式應(yīng)用非活躍期持續(xù)時(shí)間的變化趨勢(shì),其定義如式(5)所示。
其中,?X表示預(yù)測(cè)模型的狀態(tài)空間,x?1表示交互式數(shù)據(jù)查詢應(yīng)用非活躍期持續(xù)時(shí)間變長(zhǎng),x?2表示交互式數(shù)據(jù)查詢應(yīng)用非活躍期持續(xù)時(shí)間不變,x?3?表示交互式數(shù)據(jù)查詢應(yīng)用非活躍期持續(xù)時(shí)間變短。
算法中一步狀態(tài)轉(zhuǎn)移概率可以通過(guò)對(duì)歷史數(shù)據(jù)分析和當(dāng)前狀態(tài)求得,假設(shè)當(dāng)前狀態(tài)為?x?i,轉(zhuǎn)移之后可能的狀態(tài)為x?j(x?i,x?j∈I),那么當(dāng)前狀態(tài)從x?i轉(zhuǎn)移到x?j狀態(tài)的概率p?ij?計(jì)算方法如式(6)所示。
其中,N?x?ix?j表示歷史數(shù)據(jù)中從x?i轉(zhuǎn)移到x?j狀態(tài)的總次數(shù),∑N?x?i表示歷史數(shù)據(jù)中從當(dāng)前狀態(tài)x?i轉(zhuǎn)移的所有可能的轉(zhuǎn)移次數(shù)。有了p?ij的值,就可以定義一步狀態(tài)轉(zhuǎn)移矩陣P:
P=p?11p?12p?13?p?21p?22p?23?p?31p?32p?33
根據(jù)一步狀態(tài)轉(zhuǎn)移矩陣?P?可求得當(dāng)前狀態(tài)轉(zhuǎn)移的下一個(gè)最有可能的狀態(tài)。
下面給出基于馬爾可夫模型的交互式數(shù)據(jù)查詢應(yīng)用非活躍期持續(xù)時(shí)間預(yù)測(cè)算法。
算法1?交互式數(shù)據(jù)查詢應(yīng)用非活躍期持續(xù)時(shí)間預(yù)測(cè)算法
Input:交互式數(shù)據(jù)查詢應(yīng)用非活躍期歷史持續(xù)時(shí)間數(shù)據(jù)T?n,當(dāng)前狀態(tài)x?i
Output:交互式數(shù)據(jù)查詢應(yīng)用下一次非活躍期持續(xù)時(shí)間的狀態(tài)變化x?j
//計(jì)算歷史數(shù)據(jù)中從當(dāng)前狀態(tài)x?i轉(zhuǎn)移的所有可能的轉(zhuǎn)移次數(shù)
∑N?x?i ←computeTotalTimes(T?n,x?i )
//計(jì)算歷史數(shù)據(jù)中從當(dāng)前狀態(tài)x?i轉(zhuǎn)移的所有到x?j狀態(tài)的轉(zhuǎn)移次數(shù)
for(j←1 to 3)
N?x?i x?j←computeTimes(T?n,x?j,x?i)
end for
//計(jì)算所有轉(zhuǎn)移到x?j的概率
for(j←1 to 3)
p?ij←N?x?i x?j∑N?x?i
end for
//構(gòu)建一步轉(zhuǎn)移矩陣
P←mat(p?ij)
//從一步轉(zhuǎn)移矩陣中找出從x?i轉(zhuǎn)移最大概率的x?j
for(j←1 to 3)
p?ij←max(P)
end for
//返回預(yù)測(cè)狀態(tài)
return x?j
通過(guò)上述算法,先構(gòu)建馬爾可夫模型的一步轉(zhuǎn)移矩陣,然后根據(jù)一步轉(zhuǎn)移矩陣中概率的最大值預(yù)測(cè)交互式數(shù)據(jù)查詢應(yīng)用非活躍期持續(xù)時(shí)間變化。
2.3?基于預(yù)測(cè)結(jié)果的Spark動(dòng)態(tài)資源分配策略
基于馬爾可夫預(yù)測(cè)結(jié)果的Spark動(dòng)態(tài)資源分配策略,具體基于算法1的預(yù)測(cè)結(jié)果,判斷應(yīng)用下次處于非活躍期時(shí)是否應(yīng)該關(guān)閉任務(wù)執(zhí)行器。
算法2?基于預(yù)測(cè)結(jié)果的Spark動(dòng)態(tài)資源分配算法
Input:算法1返回的預(yù)測(cè)狀態(tài)
Output:是否應(yīng)該移除任務(wù)執(zhí)行器
Input:算法1返回的預(yù)測(cè)狀態(tài)x?j
Output:是否應(yīng)該移除任務(wù)執(zhí)行器isRemoving
if(x?j==x?1)
isRemoving←true
end if
else
isRemoving←false
end else
算法2與算法1都是在作業(yè)提交時(shí)觸發(fā)的。由算法2可知,如果預(yù)測(cè)到應(yīng)用下次處于非活躍期時(shí)間變長(zhǎng),就認(rèn)為應(yīng)用下次處于非活躍期時(shí)應(yīng)該關(guān)閉任務(wù)執(zhí)行器;否則,認(rèn)為應(yīng)用下次處于非活躍期時(shí)不應(yīng)該關(guān)閉任務(wù)執(zhí)行器。當(dāng)Spark達(dá)到滿足關(guān)閉任務(wù)執(zhí)行器的時(shí)間閾值時(shí),如果為true,則關(guān)閉任務(wù)執(zhí)行器;否則,不要關(guān)閉任務(wù)執(zhí)行器。
2.4?系統(tǒng)實(shí)現(xiàn)
數(shù)據(jù)放置策略是基于Spark Standalone集群部署模式實(shí)現(xiàn)的。在Spark Standalone 集群部署模式下,改造后的Spark系統(tǒng)架構(gòu)如圖3所示。
在圖3中,Driver端新增了統(tǒng)計(jì)歷史非活躍期持續(xù)時(shí)間機(jī)制。DAGScheduler負(fù)責(zé)檢測(cè)用戶是否在交互式數(shù)據(jù)查詢應(yīng)用中提交了作業(yè)。一旦檢測(cè)到用戶提交了作業(yè),算法1就會(huì)觸發(fā),同時(shí),DAGScheduler會(huì)統(tǒng)計(jì)系統(tǒng)當(dāng)前時(shí)間作為作業(yè)開(kāi)始時(shí)間。當(dāng)作業(yè)執(zhí)行完畢后,Driver會(huì)把作業(yè)執(zhí)行完畢時(shí)間作為非活躍期開(kāi)始時(shí)間。T?begin和T?end都保存在BlockManagerMsater中。本輪作業(yè)開(kāi)始時(shí)間與上一輪作業(yè)結(jié)束時(shí)間即為上一輪作業(yè)結(jié)束后應(yīng)用非活躍期的持續(xù)時(shí)間。每當(dāng)作業(yè)執(zhí)行完畢,ExecutorAllocationManager(EAM)就會(huì)根據(jù)算法2決定應(yīng)用下輪處于非活躍期時(shí)是否應(yīng)該嘗試關(guān)閉任務(wù)執(zhí)行器。
3?性能評(píng)估
3.1?實(shí)驗(yàn)環(huán)境及負(fù)載選擇
實(shí)驗(yàn)測(cè)試環(huán)境由7臺(tái)物理節(jié)點(diǎn)構(gòu)成,每臺(tái)節(jié)點(diǎn)軟、硬件配置如表1所示。在測(cè)試環(huán)境中,1臺(tái)物理節(jié)點(diǎn)作為主節(jié)點(diǎn),其余6個(gè)節(jié)點(diǎn)作為從節(jié)點(diǎn)。
為了驗(yàn)證預(yù)測(cè)策略所帶來(lái)的性能提升及預(yù)測(cè)準(zhǔn)確度,實(shí)驗(yàn)在輸入數(shù)據(jù)、任務(wù)執(zhí)行器保留數(shù)量和任務(wù)執(zhí)行器內(nèi)存一定的情況下進(jìn)行,選取對(duì)象為Spark原始的動(dòng)態(tài)資源分配策略。其中,輸入數(shù)據(jù)為“TPC?H”,生成12GB表數(shù)據(jù),任務(wù)執(zhí)行器保留數(shù)量為3個(gè),內(nèi)存為8GB,過(guò)期時(shí)間統(tǒng)一設(shè)置為30s。
實(shí)驗(yàn)選取Spark?SQL作為測(cè)試應(yīng)用,TPC?H on Hive中的q1到q10作為查詢負(fù)載,且每個(gè)查詢負(fù)載的時(shí)間間隔如表2所示。表2中,q1的時(shí)間間隔為80s,指的是緩存12GB表數(shù)據(jù)后需經(jīng)過(guò)80s提交查詢q1;q2的時(shí)間間隔為60s指的是查詢q1執(zhí)行完成后需經(jīng)過(guò)60s提交查詢q2。以此類推,在每次查詢結(jié)束后統(tǒng)計(jì)各個(gè)查詢執(zhí)行時(shí)間。
3.2?實(shí)驗(yàn)結(jié)果及分析
實(shí)驗(yàn)結(jié)果如圖4所示。從圖4可以看出,使用預(yù)測(cè)算法的Spark動(dòng)態(tài)資源分配策略在查詢q5、q7、q8和q9時(shí),所用時(shí)間比原始Spark動(dòng)態(tài)資源分配策略分別縮短了47.55%、56.72%、80.42%和52.7%。這是因?yàn)樵趒4、q6、q7和q8查詢開(kāi)始時(shí),預(yù)測(cè)策略預(yù)測(cè)到該查詢結(jié)束后Spark SQL的非活躍期時(shí)間變短,因此在查詢?cè)摬樵兘Y(jié)束后均沒(méi)有關(guān)閉任務(wù)執(zhí)行器,從而提升了查詢q5、q7、q8和q9的執(zhí)行速度。綜上所述,使用預(yù)測(cè)算法的Spark動(dòng)態(tài)資源分配策略相比原始Spark動(dòng)態(tài)資源分配策略,查詢效率最大提升了80.42%,平均提升了59.34%。
4?結(jié)語(yǔ)
本文面向Spark海量數(shù)據(jù)處理平臺(tái)的動(dòng)態(tài)資源分配,設(shè)計(jì)并實(shí)現(xiàn)了基于馬爾可夫的預(yù)測(cè)策略。通過(guò)對(duì)Spark交互式數(shù)據(jù)查詢應(yīng)用歷史非活躍期的持續(xù)時(shí)間預(yù)測(cè)應(yīng)用下一次處于非活躍期的時(shí)間變化,從而避免任務(wù)執(zhí)行器頻繁關(guān)閉帶來(lái)的緩存數(shù)據(jù)重計(jì)算開(kāi)銷?;隈R爾可夫預(yù)測(cè)模型對(duì)Spark交互式查詢應(yīng)用的非活躍期進(jìn)行預(yù)測(cè),再根據(jù)預(yù)測(cè)結(jié)果決定是否移除任務(wù)執(zhí)行器,能夠有效提高Spark查詢的執(zhí)行效率。實(shí)驗(yàn)表明,本文預(yù)測(cè)方法可以使Spark SQL查詢效率平均提升59.34%。
參考文獻(xiàn):
[1]?陶雪嬌,胡曉峰,劉洋.大數(shù)據(jù)研究綜述[J].系統(tǒng)仿真學(xué)報(bào),2013(s1):142?146.
[2]?戴炳榮,宋俊典,錢俊玲.云計(jì)算環(huán)境下海量分布式數(shù)據(jù)處理協(xié)同機(jī)制的研究[J].計(jì)算機(jī)應(yīng)用與軟件,2013,30(1):107?110.
[3]?ZAHARIA M, CHOWDHURY M, FRANKLIN M J, et al. Spark: cluster computing with working sets[J]. Usenix Conference on Hot Topics in Cloud Computing,2010,15(1):10.
[4]?LIU Z, ZHANG Q, ZHANI M F, et al. DREAMS: dynamic resource allocation for MapReduce with data skew[C]. International Symposium on Integrated Network Management,2015:18?26.
[5]?LIU Z, ZHANG Q, AHMED R, et al. Dynamic resource allocation for MapReduce with partitioning skew[J]. IEEE Transactions on Computers,2016,65(11):3304?3317.
[6]?SHAO Y, LI C, DONG W, et al. Energy?aware dynamic resource allocation on Hadoop yarn cluster[C]. IEEE International Conference on High PERFORMANCE Computing and Communications; IEEE International Conference on Smart City; IEEE International Conference on Data Science and Systems,2016:364?371.
[7]?MADSEN K G S, ZHOU Y. Dynamic resource management in a map reduce?style platform for fast data processing[C]. IEEE International Conference on Data Engineering Workshops,2014:10?13.
[8]?FRACHTENBERG E, PETRINI F, FERNANDEZ J, et al. STORM: lightning?fast resource management[C].Supercomputing, ACM/IEEE 2002 Conference,2002:1?26.
[9]?ZAHARIA M, XIN R S, WENDELL P, et al. Apache Spark: a unified engine for big data processing[J]. Communications of the Acm,2016,59(11):56?65.
[10]?CHENG D, CHEN Y, ZHOU X, et al. Adaptive scheduling of parallel jobs in spark streaming[C]. INFOCOM 2017?IEEE Conference on Computer Communications,2017:1?9.
[11]?LIAO X, GAO Z, JI W, et al. An enforcement of real time scheduling in Spark streaming[C]. Green Computing Conference and Sustainable Computing Conference IEEE,2016:1?6.
[12]?WARNEKE D, KAO O. Exploiting dynamic resource allocation for efficient parallel data processing in the cloud[J]. IEEE Transactions on Parallel & Distributed Systems,2011,22(6):985?997.
[13]?AN B, LESSER V, IRWIN D, et al. Automated negotiation with decommitment for dynamic resource allocation in cloud computing[C]. International Conference on Autonomous Agents and Multiagent Systems,2010:981?988.
[14]?ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault?tolerant abstraction for in?memory cluster computing[J]. Usenix Conference on Networked Systems Design and Implementation,2012,70(2):2.
[15]?ISARD M, BUDIU M, YU Y, et al. Dryad: distributed data?parallel programs from sequential building blocks[C]. Proceedings of the 2007 EuroSys Conference,2007:59?72.
[16]?YU Y, ISARD M, FETTERLY D, et al. DryadLINQ: a system for general?purpose distributed data?parallel computing using a high?level language[C]. Usenix Conference on Operating Systems Design &Implementation,2008:1?14.