顏曉蓮,章 剛,邱曉紅
(1.江西理工大學(xué) 軟件工程學(xué)院(南昌),江西 南昌 330013;2.江西北大科技園,江西 南昌 330013)
分布式消息系統(tǒng)作為分布式系統(tǒng)重要的模塊間消息傳遞組件,利用可靠、高效的與平臺無關(guān)的消息傳遞與分發(fā),可實現(xiàn)分布式系統(tǒng)內(nèi)部解耦以及分布式系統(tǒng)各模塊的有效集成,因而受到業(yè)界高度關(guān)注[1]。
ApacheKafka[2-5]是當(dāng)前較為主流、基于發(fā)布-訂閱機制的高吞吐量分布式消息系統(tǒng),前期由LinkedIn開發(fā)后由Apache基金管理并開源。其優(yōu)勢包括:(1)支持上層應(yīng)用端多語言開發(fā),如C#、JAVA、PHP、Python、Ruby等;(2)支持與平臺無關(guān)的消息傳遞與分發(fā);(3)支持準(zhǔn)實時性的大規(guī)模消息處理;(4)支持on-line水平擴展。相對其他消息系統(tǒng),Kafka憑借眾多技術(shù)優(yōu)勢,已在各行業(yè)企業(yè)級應(yīng)用中普及。
在Kafka中,多個Broker(服務(wù)器)組成Kafka集群,并被ZooKeeper集中管理。Producer為消息生產(chǎn)者,Consumer為消息消費者,Kafka將每個新產(chǎn)生的消息進行劃分并歸類到某個主題Topic中(Topic可理解為邏輯存儲單元)。每個Topic被劃為多個分區(qū)Partition(Partition可理解為實際存儲單元),這些分區(qū)Partition按某種規(guī)則均勻地部署到多個Broker上。根據(jù)Kafka系統(tǒng)定義和推薦,Producer生產(chǎn)的消息,依據(jù)Hash算法被分發(fā)至其所屬Topic相應(yīng)的Partition上。
Consumer作為消費者訂閱其關(guān)注的主題Topic(可訂閱多個),Kafka按Range策略(即均勻分配)將Consumer分配至其關(guān)注Topic下眾多Partition之一上,由該Partition作為服務(wù)接入端,并依次消費其關(guān)注的主題Topic下所有Partition的感興趣消息。當(dāng)訂閱流量或分發(fā)消息數(shù)量增加時,Kafka可通過配置文件管理增加Partition數(shù)量,實現(xiàn)on-line水平擴展從而提升系統(tǒng)性能與吞吐量。
伴隨大數(shù)據(jù)時代來臨,各行各業(yè)對大數(shù)據(jù)技術(shù)的需求越發(fā)強烈。Kafka作為消息中間件,不僅在分布式系統(tǒng)中扮演重要角色,同時也已成為大數(shù)據(jù)流處理框架Apache Samza的核心組件之一。但隨著Kafka應(yīng)用的多樣化,其自身的一些不足逐漸顯現(xiàn)。
其中不足之一便是Partition過載問題(Partition overload problem,POP)。Partition在Kafka中扮演承上啟下角色,上連消息生產(chǎn)者Producer,下連消息消費者Consumer,Partition服務(wù)性能決定著Broker及Kafka整體性能,對POP問題研究將為后期優(yōu)化Broker、Consumer乃至Kafka整個系統(tǒng)性能打下基礎(chǔ)。
在綜合衡量已有研發(fā)成果及文中所關(guān)注的重心的基礎(chǔ)上,認為POP問題指消息分發(fā)、消息存儲或消息消費、消息訂閱等操作造成主題Topic下Partition過度服務(wù),并影響到支撐Partition的實際物理載體Broker的性能。
通常而言,在大型商業(yè)應(yīng)用影響下,某時刻會造成某個(或多個)主題Topic源源不斷地涌入新消息,并依據(jù)Hash算法向其Partition分發(fā),此時Partition不僅要處理消息存儲還要處理Consumer的服務(wù)請求,當(dāng)新消息數(shù)量達到某閾值時,必將導(dǎo)致Partition過載,而這將影響到Partition的物理載體Broker的性能。
雖然,Kafka可通過配置文件增加Partition數(shù)量,緩解Partition過載現(xiàn)象出現(xiàn),但依然存在如下問題:(1)這種由人為主觀判定及人為修改的方式,不僅準(zhǔn)確度無法保證而且極為僵化;(2)Partition文件配置管理與基于Hash算法的消息分發(fā)相互獨立、相互分離,無法根據(jù)Partition實際情況建立協(xié)同工作機制。這些問題的存在,已使得Kafka無法滿足當(dāng)前多樣化應(yīng)用需求。
當(dāng)前有關(guān)Kafka中Partition過載問題討論極為少見。研究成果較為常見的包括:(1)ZooKeeper集中管理機制[6-7],主要討論業(yè)務(wù)復(fù)雜化后,Broker、Consumer、Consumer Group等注冊管理,Topic與Broker映射關(guān)系以及Partition分配等管理機制,有助于提升系統(tǒng)整體效率;(2)Broker[8-9]負載均衡,主要討論虛擬化背景下Broker如何實現(xiàn)接入負載均衡,有助于提升Broker資源利用率;(3)Consumer[10-11]負載均衡,主要討論大規(guī)模數(shù)據(jù)處理環(huán)境下,傳統(tǒng)Kafka易造成的高開銷、高誤差率等問題,有助于降低系統(tǒng)耗能、提升服務(wù)質(zhì)量。這些雖都對Kafka系統(tǒng)實現(xiàn)優(yōu)化,但都無法解釋Partition過載問題。
針對此,提出一種改進型Partition負載優(yōu)化算法(IPOOA算法),該算法實現(xiàn)消息分發(fā)預(yù)測以及消息分發(fā)與文件配置管理協(xié)同,從而可有效緩解Partition過載問題出現(xiàn)。
算法思想:新消息產(chǎn)生后,IPOOA算法先根據(jù)實際業(yè)務(wù)提取業(yè)務(wù)關(guān)鍵字Key,依據(jù)Hash分發(fā)規(guī)則計算分發(fā)至Partition,接著算法評估該Partition的即時服務(wù)耗量,如果即時服務(wù)耗量在閾值范圍內(nèi),則新消息被分發(fā)至該Partition,否則算法依次計算與該Partition相似度較高的候選Partition,并評估候選Partition的即時服務(wù)耗量,如果滿足閾值范圍,則新消息被分發(fā)至候選Partition,否則重復(fù)計算候選Partition,直至迭代次數(shù)超過半數(shù)Partition總量。如果依然沒有完成消息分發(fā)任務(wù),則通知Kafka自動修改配置文件新增Partition并存儲新消息,從而能夠有效緩解Partition過載。
按照Kafka的定義,消息分發(fā)機制共包括Hash分發(fā)、隨機分發(fā)以及輪詢分發(fā)等(如圖1所示),實際中企業(yè)級應(yīng)用使用范圍較廣的是Hash分發(fā)機制。
(a)Hash分發(fā)機制
(b)隨機分發(fā)機制
(c)輪詢分發(fā)機制
該機制大致過程如下:
Step1:指定消息的Key(通常選取實際業(yè)務(wù)所含關(guān)鍵字符);
Step2:基于Key實現(xiàn)Hash(Key);
Step3:根據(jù)mod(Hash(key))結(jié)果將消息分發(fā)至指定Partition;
Step4:返回Step1。
Hash分發(fā)機制相對其余兩種方式,其能夠較好地保證消息均勻有序分發(fā),因而被行業(yè)廣泛普及使用。但Hash消息分發(fā)無法根據(jù)Partition實際負載情況進行有序分發(fā),從而易加重Partition負載。
1.2.1 即時服務(wù)耗量(instant service consumption,ISC)
ISC反映當(dāng)前t時刻Partition中消息消費產(chǎn)生的服務(wù)消耗量,對任意消息k而言,在時刻t產(chǎn)生的服務(wù)消耗量Cmt由t時刻消息k訂閱數(shù)CmtNum及t時刻消息k訪問連接數(shù)CmtCon線性加權(quán)組成,如式(1):
Cmtt(k)=λ1CmtNumt(k,N1)+λ2CmtCont(k,N2)
(1)
其中,λ1∈(0,1)和λ2∈(0,1)為權(quán)重系數(shù),N1和N2分別為訂閱總數(shù)和連接總數(shù)。
t時刻Partition的ISC可表示為:
(2)
1.2.2 Partition相似度(partition similarity,PS)
PS反映某一時刻兩個Partition所存消息的相似程度,對任意Partition而言,在時刻t所存儲消息隊列表示為Partitiont={Meg1,Meg2,…,MegNUM},NUM為消息總數(shù)。則時刻t任意兩個Partition的PS可根據(jù)加權(quán)閔可夫斯基距離(Minkowski distance)計算,如式(3):
其中,p≥1為指數(shù)參數(shù),θ∈(0,1)為權(quán)重系數(shù)。
1.2.3 算法過程
Step1:初始化Partition配置文件,載入Kafka系統(tǒng)中,并設(shè)置各類參數(shù)λ1,λ2,p,θ以及Θ(Cmt閾值),設(shè)定迭代次數(shù),轉(zhuǎn)入Step2;
Step2:等待新消息導(dǎo)入,并根據(jù)Hash分發(fā)算法計算其分發(fā)至Partition,轉(zhuǎn)入Step3;
Step3:根據(jù)式(1)、式(2)計算該Partition的ISC值,轉(zhuǎn)入Step4;
Step4:判定該Partition的ISC值是否滿足閾值Θ,如果滿足則新消息存儲并轉(zhuǎn)入Step2.;否則轉(zhuǎn)入Step5;
Step5:根據(jù)式(3)依次計算該Partition與候選Partition的PS值,并挑選出最優(yōu)PS值,轉(zhuǎn)入Step3;
Step6:如果迭代次數(shù)超過半數(shù)Partition總量,則通知Kafka自動修改配置文件新增Partition,并將新消息存儲在新增Partition上,根據(jù)實際情況轉(zhuǎn)入Step2或轉(zhuǎn)入Step7;
Step7:退出算法。
軟硬件環(huán)境:選取12個Broker(服務(wù)器)作為Kafka集群,CPU型號為Xeon E5-2620V3,內(nèi)存8G,SATA硬盤300G,操作系統(tǒng)為SUSE Linux Enterprise Server 15。
核心參數(shù)設(shè)置:在綜合考慮文獻對參數(shù)取值的建議和基于多次重復(fù)實驗的結(jié)果,參數(shù)設(shè)定如下:p=1 ORp=2,λ1,λ2∈(0.35,0.65),θ1,θ2,…∈(0.1,0.9),Θ∈[0.5,0.65],其中實驗中所有權(quán)重系數(shù)之和都為1。
場景模擬:12個Broker服務(wù)器分成3個功能區(qū),其中3個服務(wù)器作為Producer消息生產(chǎn)者不斷模擬分發(fā)消息,3個服務(wù)器作為Consumer消息消費者不斷模擬消費消息,Producer與Consumer隨機分布在不同區(qū)域,另外6個服務(wù)器作Kafka集群服務(wù)器集中管理,處理Producer消息分發(fā)以及Consumer消息消費[12-15]。
對比算法:為展示實驗的客觀性,分別選取傳統(tǒng)Kafka算法[2-5],基于Broker負載均衡的BL算法[8]和基于Consumer負載均衡的CL算法[10]與融合文中IPOOA算法的Kafka相比較。
測試指標(biāo):為體現(xiàn)實驗的全面性,將從多個維度驗證算法的性能:(1)Kafka集群CPU使用率(Kafka CPU rate,KCR);(2)Kafka服務(wù)延時率(Kafka service delay rate,KSDR);(3)Kafka系統(tǒng)收斂延時比(Kafka system convergence delay rate,KSCDR)。
實驗方案:
實驗1:在并發(fā)規(guī)模為2 000環(huán)境下,KCR、KSDR及KSCDR對比如表1所示。
表1 在并發(fā)規(guī)模為2 000環(huán)境下,4種算法的KCRKSDRKSCDR對比 %
實驗2:在并發(fā)規(guī)模為3 500環(huán)境下,KCR、KSDR及KSCDR對比如表2所示。
表2 在并發(fā)規(guī)模為3 500環(huán)境下,4種算法的KCRKSDRKSCDR對比 %
實驗3:在并發(fā)規(guī)模為5 000環(huán)境下,KCR、KSDR及KSCDR對比如表3所示。
表3 在并發(fā)規(guī)模為5 000環(huán)境下,4種算法的KCRKSDRKSCDR對比 %
實驗總結(jié):在并發(fā)規(guī)模逐漸增加下,融合文中算法的Kafka系統(tǒng)(IPOOA_Kafka)在各項指標(biāo)層面相對較優(yōu),主要原因在于IPOOA_Kafka能夠?qū)崿F(xiàn)預(yù)測消息分發(fā)以及消息分發(fā)與文件配置協(xié)同工作,從而能緩解Partition過載問題出現(xiàn),提升系統(tǒng)整體性能。
針對Kafka中Partition文件配置管理所存在的被動、僵化及孤立等不足,使得Partition過載問題無法有效解決,提出一種改進型Partition過載優(yōu)化算法。該算法通過即時服務(wù)耗量,Partition相似度和配置文件自動修改相結(jié)合,實現(xiàn)消息分發(fā)預(yù)測以及消息分發(fā)與文件配置管理協(xié)同,從而可有效緩解Partition過載問題出現(xiàn)。實驗從Kafka集群CPU使用率、Kafka服務(wù)延時率、Kafka系統(tǒng)收斂延時比等幾個方面驗證了算法的有效性及合理性。未來將重點圍繞消息分發(fā)、消息訂閱及文件配置管理等多層面協(xié)同展開研究。