亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        一種基于Actor模型的彈性可伸縮的流處理框架

        2017-05-13 03:44:04詹杭龍劉瀾濤康亮環(huán)曹東剛
        計算機研究與發(fā)展 2017年5期
        關(guān)鍵詞:路由表處理單元消息

        詹杭龍 劉瀾濤 康亮環(huán) 曹東剛 謝 冰

        (高可信軟件技術(shù)教育部重點實驗室(北京大學) 北京 100871)(北京大學(天津濱海)新一代信息技術(shù)研究院 天津 300450)(zhanhl@pku.edu.cn)

        一種基于Actor模型的彈性可伸縮的流處理框架

        詹杭龍 劉瀾濤 康亮環(huán) 曹東剛 謝 冰

        (高可信軟件技術(shù)教育部重點實驗室(北京大學) 北京 100871)(北京大學(天津濱海)新一代信息技術(shù)研究院 天津 300450)(zhanhl@pku.edu.cn)

        流處理是一種重要的大數(shù)據(jù)應(yīng)用模式,在金融、廣告、物聯(lián)網(wǎng)、社交網(wǎng)絡(luò)等眾多領(lǐng)域得到了廣泛應(yīng)用.在流處理場景中,流數(shù)據(jù)的產(chǎn)生速度往往變化劇烈且不容易預測.這時,如果數(shù)據(jù)流量峰值超過處理系統(tǒng)的承載能力,可能使得系統(tǒng)運行緩慢甚至崩潰,導致處理作業(yè)失效;如果為了應(yīng)對數(shù)據(jù)流量峰值而過度配置資源,則可能在系統(tǒng)輕載時產(chǎn)生不必要的浪費.為了解決流處理中負載和資源的匹配問題,流處理系統(tǒng)應(yīng)該具有彈性可伸縮的能力,一方面以高效的方式組織運算資源;另一方面能根據(jù)數(shù)據(jù)流量的實時變化自動地調(diào)整資源使用量.然而,現(xiàn)有的流處理框架對于彈性可伸縮的支持尚很初步.介紹了一種基于Actor模型的彈性可伸縮的流處理框架eSault.eSault首先基于Actor模型將批量的處理單元進行分層管理,通過2層路由機制實現(xiàn)了對伸縮性的支持;在此基礎(chǔ)上,設(shè)計一個基于數(shù)據(jù)處理延遲的過載判斷算法和基于數(shù)據(jù)處理速度的輕載判斷算法來指導系統(tǒng)對資源的有效使用,進而實現(xiàn)彈性可伸縮的流處理.實驗結(jié)果表明:eSault具有較好的性能,而且能夠很好地實現(xiàn)彈性可伸縮.

        流處理;Actor模型;云計算;彈性可伸縮;2層路由機制

        大數(shù)據(jù)時代數(shù)據(jù)規(guī)模不斷增加,數(shù)據(jù)產(chǎn)生的速度越來越快.在很多領(lǐng)域,數(shù)據(jù)的價值隨著時間的推移迅速流失[1],應(yīng)用對數(shù)據(jù)時效性的要求越來越強,這就要求數(shù)據(jù)處理系統(tǒng)能夠?qū)Υ罅俊靶迈r”數(shù)據(jù)進行實時分析.例如社交網(wǎng)絡(luò)公司可能需要在幾分鐘內(nèi)分析話題走向、廣告商可能需要實時分析哪些用戶點擊了廣告、服務(wù)運營商可能需要在幾秒內(nèi)通過分析日志文件發(fā)現(xiàn)系統(tǒng)異常等.因此,流處理作為一種契合上述應(yīng)用場景的處理模式得到了廣泛應(yīng)用.流處理是指對一連串在時間上連續(xù)的消息數(shù)據(jù)進行實時分析、運算的處理過程[2].在流處理中,一個受到普遍關(guān)注的問題是,消息數(shù)據(jù)往往由外部產(chǎn)生,其流量經(jīng)常處于變化之中,甚至會突然爆發(fā)式增長.例如,亞洲移動電話網(wǎng)絡(luò)的呼叫記錄在峰值時可以達到每秒幾十萬條記錄,而在低谷時只有每秒幾千條記錄;重大新聞引起新聞網(wǎng)站的訪問量突然增大[3].在這種場景下,如果高峰值的消息流量超過了流處理系統(tǒng)的承受能力,可能導致系統(tǒng)運行緩慢甚至崩潰;而如果為了應(yīng)對消息流量峰值而過度配置資源,則可能在系統(tǒng)輕載時產(chǎn)生不必要的浪費.這些現(xiàn)象的本質(zhì)其實是流處理中的運算資源無法與負載變化實現(xiàn)動態(tài)匹配.為了解決這個問題,一些流處理系統(tǒng)通過在負載過高時,隨機丟棄一些消息以應(yīng)對流量峰值;另一些流處理系統(tǒng)通過重排消息或定義消息優(yōu)先級,從而在系統(tǒng)負載較高時優(yōu)先處理一些消息[4];此外,大部分流處理系統(tǒng)通過使用消息隊列對消息數(shù)據(jù)進行緩存[5],從而平滑輸入流量,但是這種方式違背了流處理實時性和低延遲性的需求,并沒有真正解決此問題.

        近年來,云計算的發(fā)展為解決流處理中運算資源與負載變化的動態(tài)匹配問題提供了新的思路.云計算是一種基于互聯(lián)網(wǎng)的計算方式,其運算資源是按需聚合與彈性綁定的[6].在云環(huán)境中,一個作業(yè)在運行過程時具有獲取更多資源的能力.對于流處理作業(yè),如果能夠在消息流量較大時向云環(huán)境申請更多的資源,在負載變低時合理釋放部分運算資源,這樣便能較好地實現(xiàn)運算資源與負載變化的動態(tài)匹配.這樣的流處理系統(tǒng)被稱為是彈性可伸縮的.彈性是云計算的基本屬性之一[7].云環(huán)境能夠?qū)⒌讓拥姆植际郊航M織起來,通過虛擬機部署設(shè)施(如OpenStack)以及資源調(diào)度工具(如Yarn,Mesos等)為上層的處理作業(yè)提供彈性的運算資源.然而,僅有云環(huán)境的彈性支持是不夠的,上層的處理作業(yè)還需要根據(jù)運行時負載的大小實時地調(diào)整對運算資源的使用規(guī)模,這樣才能夠?qū)崿F(xiàn)彈性可伸縮.

        為了實現(xiàn)彈性可伸縮的流處理系統(tǒng),有2個必備條件:1)流處理系統(tǒng)是可伸縮的.伸縮性是指系統(tǒng)可以利用變化的資源以調(diào)整負載承受力的能力[8].2)系統(tǒng)有一套自適應(yīng)算法,能夠根據(jù)運行時負載的變化來決策如何對運算資源進行伸縮.然而,現(xiàn)有的流處理系統(tǒng)尚未完全支持這2方面的條件.典型的流處理框架系統(tǒng)如Apache S4[9],Storm[10],Mill-Wheel[11],Spark Streaming[12]等尚未完全支持彈性可伸縮.一些學術(shù)工作對彈性的流處理技術(shù)進行了研究,取得了一定進展.如Esc[13]通過中心式的負載監(jiān)控器監(jiān)控機器負載情況,根據(jù)MAPE loop自動分析負載情況并觸發(fā)彈性伸縮.Esc不支持有狀態(tài)處理單元的伸縮,并且在處理單元內(nèi)部消息通過單點轉(zhuǎn)發(fā)給所有處理元素,效率較低.StreamCloud[3]將流處理單元劃分為子處理單元,并根據(jù)機器負載情況動態(tài)遷移子處理單元,從而實現(xiàn)彈性伸縮.但StreamCloud只提供有限的查詢操作,并不支持通用的流數(shù)據(jù)分析.SEEP[14]實現(xiàn)了狀態(tài)管理系統(tǒng),通過中心式的負載監(jiān)控實現(xiàn)了有狀態(tài)處理單元的動態(tài)擴展和狀態(tài)容錯.然而,SEEP缺乏自適應(yīng)的調(diào)度機制來實現(xiàn)彈性可伸縮.

        本文介紹了一種基于Actor 模型的支持彈性可伸縮的流處理框架eSault.eSault除了實現(xiàn)通用流處理框架的基本功能外,重點是支持了應(yīng)用的彈性可伸縮運行.eSault首先基于Actor模型將批量的處理單元進行分層管理,通過2層路由機制實現(xiàn)了對伸縮性的支持;在此基礎(chǔ)上,設(shè)計一個基于數(shù)據(jù)處理延遲的過載判斷算法和基于數(shù)據(jù)處理速度的輕載判斷算法來指導系統(tǒng)對資源的有效使用,進而實現(xiàn)彈性可伸縮的流處理.

        與現(xiàn)有彈性流處理系統(tǒng)的研究工作相比,本文的主要特點在于:

        1) 同時支持彈性擴展和彈性收縮.現(xiàn)有研究多只關(guān)注彈性擴展的實現(xiàn),而eSault基于處理元素動態(tài)創(chuàng)建退出機制和2層路由機制的彈性實現(xiàn)方式,在統(tǒng)一的解決方案下同時支持了彈性擴展和彈性收縮.

        2) 基于消息處理延遲和速度的負載判斷算法.現(xiàn)有研究工作主要基于機器資源使用情況進行負載判斷,這種方式的限制在于:①資源使用情況的獲得需要底層資源管理系統(tǒng)的支持,這增加了框架層與資源層解耦的難度;②流處理應(yīng)用需要綜合使用網(wǎng)絡(luò)、CPU、內(nèi)存甚至磁盤等資源,較難設(shè)計一種能夠準確反映應(yīng)用負載情況的綜合指標.而eSault設(shè)計的基于消息處理延遲的過載判斷算法和基于消息處理速度的輕載判斷算法,完全在應(yīng)用層實現(xiàn)負載判斷,更加直接地監(jiān)控應(yīng)用的性能.

        3) 完全基于Actor模型的設(shè)計與實現(xiàn).eSault探索了基于Actor模型設(shè)計與實現(xiàn)彈性流處理框架的可行性,并得到了較好結(jié)果.

        1 Actor模型與彈性伸縮

        Actor模型是一種并發(fā)編程模型,由Hewitt等人[15]在1973年提出.它把“Actor”作為并發(fā)編程的基本元素,Actor可以根據(jù)收到的消息進行本地決策,用于創(chuàng)建更多Actor,發(fā)送更多消息和決定如何響應(yīng)下一個消息.Actor模型如今已成為許多計算理論和并發(fā)系統(tǒng)的理論基礎(chǔ).Actor模型具有許多特性,例如無共享狀態(tài)、簡單的高層抽象、異步非阻塞的事件驅(qū)動編程模型等,這些特性使其非常適合用來對并發(fā)程序建模.此外,目前大部分Actor模型的實現(xiàn)中都將Actor實現(xiàn)得非常輕量,可以快速地批量創(chuàng)建和銷毀,“幾十萬甚至上百萬進程同時并行運行十分常見,而且經(jīng)常僅僅占用很少的內(nèi)存”[16].這為實現(xiàn)支持彈性的流處理框架帶來了2點好處:

        1) 簡化流處理框架的編程模型.在流處理應(yīng)用中,數(shù)據(jù)流的key往往數(shù)量巨大.如果使用輕量級Actor實現(xiàn)處理元素,我們可以為每個key標識的數(shù)據(jù)子流啟動一個處理元素,使用戶在編寫處理元素的處理邏輯時可以直接對數(shù)據(jù)子流進行處理,而不需進一步進行數(shù)據(jù)分流,從而簡化流處理框架的編程模型.

        2) 簡化彈性的實現(xiàn)機制.實現(xiàn)系統(tǒng)彈性的基礎(chǔ)是伸縮性,可以通過批量創(chuàng)建和銷毀輕量級Actor,實現(xiàn)處理元素的批量遷移,從而動態(tài)調(diào)整處理元素在集群中的分布.

        因此,基于Actor模型對彈性流處理系統(tǒng)建模并予以實現(xiàn),一方面可以簡化流處理系統(tǒng)的設(shè)計;另一方面可以充分利用Actor模型的特點簡化框架的編程模型并高效地實現(xiàn)彈性伸縮.在現(xiàn)有的基于Actor模型設(shè)計和實現(xiàn)的流處理框架中,S4尚未支持彈性;而Esc雖然支持了彈性,但一方面其論文中并未表明其彈性收縮支持,另一方面其作為原型系統(tǒng)實現(xiàn)較為初步,性能優(yōu)化空間較大.

        2 eSault系統(tǒng)設(shè)計

        2.1 編程模型與系統(tǒng)架構(gòu)

        eSault的編程模型如圖1(a)所示,將流處理應(yīng)用的處理單元根據(jù)功能的不同分為了Spout和Bolt 2種類型.一個流處理應(yīng)用事實上是Spout與Bolt拼接成的DAG圖,Spout是圖的源節(jié)點,其他節(jié)點為Bolt,圖中的邊表示處理單元之間的數(shù)據(jù)路由.Spout產(chǎn)生Tuple格式的流數(shù)據(jù),傳遞給Bolt處理,經(jīng)過多級Bolt處理后生成最終結(jié)果從輸出端流出.其中,Tuple是框架中數(shù)據(jù)流的傳輸形式,它事實上是一個鍵值對(key/value),框架中的數(shù)據(jù)流都是由連續(xù)不斷的Tuple組成的.Spout是流處理應(yīng)用的數(shù)據(jù)流來源,它源源不斷地生成Tuple形式的數(shù)據(jù)流交由后續(xù)的Bolt處理.Bolt是流處理應(yīng)用負責數(shù)據(jù)處理的單元,它接收由上游傳來的Tuple數(shù)據(jù),調(diào)用用戶定義的處理方法對數(shù)據(jù)進行處理后,將新產(chǎn)生的Tuple數(shù)據(jù)發(fā)送給下游Bolt進行處理.流處理應(yīng)用的主要處理邏輯都在各階段的Bolt中實現(xiàn).

        eSault是基于Actor模型設(shè)計的,其各功能模塊及其子模塊在設(shè)計時都嚴格保證了無共享狀態(tài),且只通過發(fā)送消息交換數(shù)據(jù),每個模塊都可以抽象成1個Actor.eSault的系統(tǒng)架構(gòu)如圖1(b)所示.應(yīng)用驅(qū)動運行在用戶端,為用戶提供編程模型中應(yīng)用程序的接口,使用戶得以構(gòu)建、提交和控制流處理應(yīng)用.框架驅(qū)動運行在集群中,框架的所有其他模塊都由框架驅(qū)動啟動并控制.Spout/Bolt處理單元:Spout和Bolt在集群中的運行實例,其包含分布在集群中的大量處理元素(processing element, PE).框架驅(qū)動模塊通過控制Spout與Bolt,使得流數(shù)據(jù)可以依據(jù)應(yīng)用程序所定義的邏輯一步步進行處理.Ack(acknowledgement)服務(wù)器保證了所有在規(guī)定時間內(nèi)處理完成的消息會被確認,而其他處理超時的消息將由Spout重發(fā),從而保證了至少1次(at least once)的消息語義.資源接口封裝了資源管理器的管理接口,框架驅(qū)動通過調(diào)用資源接口申請和釋放資源,而不需考慮具體的下層資源管理器類型,從而保證框架與資源管理器解耦.

        Fig. 1 The architecture and programming model of eSault圖1 eSault的系統(tǒng)架構(gòu)與編程模型

        2.2 系統(tǒng)架構(gòu)與編程模型

        流處理單元是eSault的數(shù)據(jù)處理模塊,其處理邏輯由用戶定義.在編程模型層面一個流處理單元是一個整體,但在實際運行過程中,框架會在集群中啟動大量處理元素,使它們并行地執(zhí)行用戶定義的處理邏輯.用戶通過應(yīng)用驅(qū)動將實現(xiàn)的流處理單元代碼提交給流處理框架,然后通過應(yīng)用驅(qū)動提供的方法創(chuàng)建流處理單元實例并對流處理單元進行動態(tài)拼接,從而實現(xiàn)流處理應(yīng)用.

        2.2.1 子處理單元與處理元素

        在流處理系統(tǒng)運行過程中,處理單元會在集群中啟動并管理數(shù)量巨大的處理元素.如果處理單元對這些處理元素進行集中管理,會使處理單元的邏輯變得復雜,運行時負載也較高,很容易導致處理單元運行異常.所以eSault將處理單元劃分為多個子處理單元,實現(xiàn)分層管理.子處理單元是處理單元的組成部分,與處理單元不同的是,其只運行在1臺機器上,并且在該機器上啟動和管理大量的處理元素.處理單元通過啟動和管理多個子處理單元,間接地管理分布在集群中的大量處理元素.圖2所示Spout與Bolt中對處理單元進行分層管理的結(jié)構(gòu)圖.其中,子處理單元管理器是處理單元的功能模塊,其負責啟動和管理所有子處理單元;PE管理器是子處理單元的功能模塊,其負責在子處理單元所在的機器上啟動和管理大量的處理元素.添加子處理單元后,所有處理元素均由子處理單元管理,處理單元只需管理數(shù)量有限的子處理單元即可.這樣,處理單元將主要的處理元素管理邏輯下放至子處理單元,從而分散負載并簡化了管理邏輯,使系統(tǒng)變得更加穩(wěn)定,也有利于提高路由效率.

        Fig. 2 Hierarchical management of processing unit圖2 Spout與Bolt中處理單元的分層管理

        2.2.2 2層路由機制

        一個典型的流處理應(yīng)用通常由許多處理單元組成,而每個處理單元在同一時間會啟動大量的處理元素.在如此大規(guī)模的處理元素之間路由消息,保證消息嚴格按照key進行分發(fā),并且使這個過程高效、動態(tài)、可靠是非常困難的.為了保證消息轉(zhuǎn)發(fā)效率,同時又使路由表可以在運行過程中動態(tài)進行更改,eSault提出了2層路由轉(zhuǎn)發(fā)機制.

        如圖3所示,eSault的2層路由轉(zhuǎn)發(fā)機制的主要思想就是結(jié)合集中路由和分布路由,在子處理單元之間進行分布路由,在子處理單元內(nèi)部進行集中路由.源處理單元的所有處理元素均將產(chǎn)生的數(shù)據(jù)發(fā)送給所在子處理單元的輸出路由器;輸出路由器將數(shù)據(jù)按照key值路由給相應(yīng)的目標子處理單元的輸入路由器;目標子處理單元的輸入路由器收到數(shù)據(jù)后,將數(shù)據(jù)轉(zhuǎn)發(fā)給相應(yīng)的處理元素.輸入路由器和輸出路由器是eSault的2層轉(zhuǎn)發(fā)機制的核心構(gòu)件,兩者內(nèi)部各保存有1張路由表用來進行數(shù)據(jù)路由.這2張路由表的設(shè)計對于eSault的消息轉(zhuǎn)發(fā)效率影響巨大,下面分別介紹根據(jù)輸入路由器和輸出路由器各自的功能特點設(shè)計和路由表的數(shù)據(jù)結(jié)構(gòu).

        Fig. 3 Two-ayer routing forwarding mechanism圖3 eSault的2層路由機制

        輸入路由表使用散列(Hash)表實現(xiàn),表的鍵是輸入數(shù)據(jù)流的key,表的值是處理該key所標記的數(shù)據(jù)流的處理元素的索引PEindex.當有輸入數(shù)據(jù)時,輸入路由器在路由表中查找數(shù)據(jù)的key所對應(yīng)的路由表項,從而得到該數(shù)據(jù)對應(yīng)的處理元素,并將該數(shù)據(jù)轉(zhuǎn)發(fā)給該處理元素.在大規(guī)模的數(shù)據(jù)量下進行快速地增刪改查,Hash表是一個非常理想的選擇,因為理想情況下Hash表的增刪改查的平均時間復雜度都為O(1),與表項數(shù)目無關(guān),所以使用Hash表可以高效地實現(xiàn)輸入路由表.

        輸出路由表的作用是將所有key盡可能平衡地分給所有目標處理單元的子處理單元,并保證路由效率盡可能的高.eSault的輸出路由表使用線索2叉樹表示的類似區(qū)間樹的數(shù)據(jù)結(jié)構(gòu)實現(xiàn),2叉樹中的節(jié)點由key和PEindex組成,其中key表示[key, 后繼節(jié)點的key)的區(qū)間范圍,若沒有后繼節(jié)點,則表示[key, 最大整數(shù)INT_MAX]的區(qū)間范圍;PEindex則表示該區(qū)間對應(yīng)的子處理單元的索引.該數(shù)據(jù)結(jié)構(gòu)的主要特點是可以將在查詢某個整數(shù)所在的子區(qū)間、分裂任意區(qū)間和任意相鄰區(qū)間的時間復雜度控制在logn以內(nèi),其中n為樹中存儲的子區(qū)間個數(shù).

        圖4展示了區(qū)間分裂的過程,初始情況圖4(a)中總的區(qū)間范圍為[0,INT_MAX];圖4(b)中通過插入INT_MAX2+1節(jié)點,實現(xiàn)了對區(qū)間[0,INT_MAX]的分裂操作;圖4(c)中插入了INT_MAX4+1,實現(xiàn)了對區(qū)間[0,INT_MAX2]的分裂操作;圖4(d)中進一步插入了INT_MAX×34+1,實現(xiàn)了對區(qū)間[INT_MAX2+1,INT_MAX]的分裂操作.當有輸出數(shù)據(jù)時,輸出路由器首先對數(shù)據(jù)中的key在[0,INT_MAX]的區(qū)間內(nèi)進行重新散列(rehash),然后在2叉樹中查找小于等于散列值的最大節(jié)點,之后取出該節(jié)點對應(yīng)的子處理單元,即為輸出數(shù)據(jù)的目標子處理單元.該操作在logn時間內(nèi)完成,n為子區(qū)間的個數(shù)也即子處理單元的個數(shù),因為子處理單元的數(shù)量一般與集群規(guī)模在同一數(shù)量級,最多達到數(shù)百數(shù)千的級別,所以該時間開銷是可以接受的.使用線索2叉樹實現(xiàn)輸出路由表的最大作用在于配合輸入路由表可以非常方便的實現(xiàn)彈性伸縮,這在2.3節(jié)中會進一步介紹.

        Fig. 4 Interval splitting procedure of fan-out routing table圖4 eSault輸出路由表的分裂過程

        2.3 彈性可伸縮機制

        實現(xiàn)伸縮性的關(guān)鍵是能夠在有新可用資源時,在新資源上處理任務(wù),從而利用新資源提高系統(tǒng)并行度;在資源減少時,將被減少資源中的任務(wù)重新調(diào)度到可用資源上處理,從而使系統(tǒng)正常運行[17].對于流處理應(yīng)用來說,也就是在有新資源時,能夠?qū)?shù)據(jù)流分流至新資源上進行處理;在資源減少時,能夠?qū)⒈粶p少資源處理的數(shù)據(jù)流導流至可用資源進行處理.

        eSault伸縮機制的主要設(shè)計思想是在處理單元的層面,以子處理單元為單位實現(xiàn)伸縮.當有新資源時,負載較高的處理單元會在新資源上創(chuàng)建子處理單元,并將部分數(shù)據(jù)流分流至新的子處理單元,以提高處理能力;當資源減少時,處理單元會將受影響的子處理單元進行遷移,或直接將其輸入數(shù)據(jù)流合流至未受影響的子處理單元.

        圖5(a)完整地描述處理單元的動態(tài)擴展過程:

        ① 處理單元在新資源上創(chuàng)建新的子處理單元;

        ② 處理單元修改子處理單元路由表,將被分流子處理單元對應(yīng)的區(qū)間進行分裂;

        ③ 處理單元將新的子處理單元路由表發(fā)送給所有源處理單元;

        ④ 源處理單元收到路由表后用其更新所有子處理單元的輸出路由表;

        ⑤ 輸出路由表的變化使一部分數(shù)據(jù)被導流至新建的子處理單元;

        ⑥ 子處理單元動態(tài)創(chuàng)建新的處理元素處理數(shù)據(jù)流;

        ⑦ 被分流的子處理單元中的處理元素因為超時退出.

        至此,處理單元的動態(tài)擴展過程完成.處理單元的動態(tài)收縮過程與動態(tài)擴展過程相似,唯一的區(qū)別是需要將路由表中受影響子處理單元對應(yīng)的區(qū)間與其相鄰區(qū)間進行合并,從而實現(xiàn)將其輸入數(shù)據(jù)合流入其相鄰區(qū)間對應(yīng)的子處理單元,故在此不再贅述.

        在伸縮性的基礎(chǔ)上,如果系統(tǒng)能夠自適應(yīng)地根據(jù)負載情況申請和釋放資源,并自動地觸發(fā)伸縮,則實現(xiàn)了彈性伸縮.eSault通過消息延遲監(jiān)控器監(jiān)控消息的處理延遲和處理速度,并根據(jù)基于消息處理延遲的過載判斷算法和基于消息處理速度的輕載判斷算法,實現(xiàn)了自動根據(jù)負載情況申請和釋放資源并觸發(fā)伸縮機制,從而最終實現(xiàn)了彈性可伸縮.

        Fig. 5 Workload monitoring and scaling of eSault圖5 eSault的伸縮機制與延遲監(jiān)控器

        2.3.1 消息處理延遲監(jiān)控

        在對系統(tǒng)負載進行動態(tài)監(jiān)控時,資源使用情況是最直接的衡量指標,一些研究彈性的流處理系統(tǒng),例如SEEP,Esc等,也都使用這一指標.但是,使用其作為負載衡量指標也存在一些缺陷:一方面資源使用情況的獲得需要底層資源管理系統(tǒng)的支持,這使框架難以與資源管理系統(tǒng)解耦;另一方面,流處理應(yīng)用需要綜合使用網(wǎng)絡(luò)、CPU、內(nèi)存甚至磁盤等資源,很難設(shè)計一種能夠準確反映應(yīng)用負載情況的綜合指標.因此,eSault選擇在應(yīng)用層通過性能監(jiān)測負載,因為無論底層任何資源成為瓶頸,最終表現(xiàn)都是應(yīng)用無法達到性能要求,而且這也有助于實現(xiàn)框架與資源層解耦.eSault認為消息處理延遲是非常理想的負載衡量指標,因為其他指標都只能部分地反映負載情況.例如,輸入消息數(shù)量很大時,如果每條消息實際處理時間很短,則處理負載并不一定會高,也就很難確定一個消息數(shù)量作為負載限額;同理,使用輸入數(shù)據(jù)吞吐量和消息隊列長度也會出現(xiàn)類似的情況.然而,消息處理延遲綜合反映了網(wǎng)絡(luò)傳輸時間、排隊時間和處理時間,而且因為流處理應(yīng)用的核心價值就在于在線實時處理從而降低處理延遲,所以消息處理延遲是流處理應(yīng)用的非常理想負載衡量指標.

        eSault的負載監(jiān)控是由每個處理單元獨立進行的,即每個處理單元監(jiān)控自身各子處理單元的負載,并根據(jù)負載情況作出彈性伸縮的決策.eSault在Bolt中添加了延遲監(jiān)控器模塊.如圖5(b)所示,延遲監(jiān)控器周期性地給Bolt的所有子處理單元發(fā)送探針(probe),探針流經(jīng)輸入路由器、最近一次處理數(shù)據(jù)的處理元素和輸出路由器后,返回延遲監(jiān)控器,之后延遲監(jiān)控器即可通過探針發(fā)出時間和返回時間,判斷對應(yīng)子處理單元的消息處理延遲.

        2.3.2 基于消息處理延遲的過載判斷算法

        算法的思路如下:延遲監(jiān)控器每隔PROBE_PERIOD向所有子處理單元發(fā)送探針,并監(jiān)控探針的處理時間是否超過MAX_LATENCY,如果有一個子處理單元在OVERLOAD_REACTION_TIME個采樣周期內(nèi)的總超時次數(shù)所占比例超過OVER_LOAD_FACTOR,則認為該子處理單元過載,需要申請新的資源,并觸發(fā)伸縮機制將該子處理單元分裂至新資源上.

        算法中的4個關(guān)鍵變量:MAX_LATENCY,PROBE_PERIOD,OVERLOAD_REACTION_TIME和OVERLOAD_REACTION_FACTOR需要應(yīng)用配置指定,以改變算法的額外開銷、靈敏度等屬性.

        1)MAX_LATENCY.算法允許的最大探針處理延遲.改變該變量,可以改變算法可容忍的最大消息處理延遲,應(yīng)根據(jù)處理單元的任務(wù)類型和應(yīng)用對消息處理處理延遲的要求合理配置該值.

        2)PROBE_PERIOD.發(fā)射探針的采樣周期.改變該變量,可以調(diào)整算法的額外開銷和靈敏度.增大該值,會使采樣周期變長,采樣次數(shù)變少,從而使發(fā)送和處理探針帶來的額外開銷減小,但也會使算法對過載的反應(yīng)時間變長,算法靈敏度下降;反之,會使算法的額外開銷增大,反應(yīng)時間變短,靈敏度提高.

        3)OVERLOAD_REACTION_TIME.過載判斷的反應(yīng)時間.改變該變量可以調(diào)整算法的反應(yīng)時間,從而調(diào)整算法靈敏度.增大該值,算法需要更長的時間才能確定過載,因而靈敏度下降;反之,算法靈敏度上升.

        4)OVERLOAD_REACTION_FACTOR.允許的OVERLOAD_REACTION_TIME內(nèi)超時記錄占總記錄數(shù)的比例.改變該變量,可以調(diào)整算法判斷條件的嚴格程度,從而調(diào)整算法靈敏度.增大該值,算法允許的超時次數(shù)增大,過載判斷條件更為嚴格,算法靈敏度下降;反之,算法靈敏度上升.

        2.3.3 基于消息處理速度的輕載判斷算法

        消息處理延遲在判斷過載時非常有效,但在判斷輕載時卻無法顯著反映負載情況.消息處理延遲主要由網(wǎng)絡(luò)傳輸時間、排隊時間和處理時間組成,動態(tài)擴展的主要目的是降低排隊時間和處理時間.在高負載情況下排隊時間和處理時間成為消息處理延遲的主要部分,所以其可以顯著反應(yīng)排隊時間和處理時間,從而反映系統(tǒng)負載;而在輕載情況下網(wǎng)絡(luò)傳輸時間成為消息處理延遲的主要部分,其不再顯著反映排隊時間和處理時間,從而無法顯著反映系統(tǒng)負載.

        為了解決這個問題,eSault設(shè)計了基于消息處理速度的輕載判斷算法,通過消息處理速度是否顯著低于峰值,判斷子處理單元是否處于輕載狀態(tài).算法的主要思想是:延遲監(jiān)控器依然每隔PROBE_PERIOD向所有子處理單元發(fā)送探針;子處理單元的輸入路由器會統(tǒng)計2次探針之間,子處理單元處理的消息總數(shù),并在收到探針時將該結(jié)果存入探針中;延遲監(jiān)控器根據(jù)探針中的消息總數(shù)是否低于LOW_WATERMARK×消息處理峰值判斷子處理單元是否輕載;如果有1個子處理單元在UNDERLOAD_REACTION_TIME個采樣周期內(nèi)的輕載次數(shù)所占比例超過UNDERLOAD_REACTION_FACTOR且在OVERLOAD_REACTION_TIME個采樣周期內(nèi)超時次數(shù)為0,則認為該子處理單元輕載,需要觸發(fā)伸縮機制將該子處理單元與相鄰單元合并,并釋放資源.

        在該算法中,當消息處理速度顯著提高時,算法會將所有峰值信息重置,因為其認為這意味著系統(tǒng)的工作負載發(fā)生了顯著變化,算法應(yīng)當使用最新的峰值來做決策.在基于消息處理速度的輕載判斷算法中,當探針超時的時候,說明系統(tǒng)進入了新一輪高負載運行狀態(tài),算法通過使歷史峰值信息無效化來適應(yīng)新的工作負載.算法中的4個關(guān)鍵變量:LOW_WATERMARK,PROBE_PERIOD,UNDERLOAD_REACTION_TIME和UNDERLOAD_REACTION_FACTOR同樣需要應(yīng)用配置指定,4個變量的作用與基于消息處理延遲的過載判斷算法基本一一對應(yīng).

        至此,eSault通過在處理單元中增加延遲監(jiān)控器監(jiān)控各子處理單元的消息處理延遲和處理速度,并通過基于消息處理延遲的過載判斷算法和基于消息處理速度的輕載判斷算法自動分析子處理單元負載,從而實現(xiàn)了根據(jù)負載自適應(yīng)地分配和釋放資源,并自動地觸發(fā)伸縮,最終實現(xiàn)了彈性可伸縮.

        3 系統(tǒng)實現(xiàn)與實驗分析

        3.1 系統(tǒng)實現(xiàn)

        本文使用編程庫Akka實現(xiàn)了eSault的原型系統(tǒng)*https://github.com/pkusei/Sault.Akka是一個運行在JVM上的基于Actor模型的開源工具包和運行時.Akka具有輕量級Actor,Actor位置透明、消息分發(fā)高效等特點,非常適合構(gòu)建高效的分布式并發(fā)應(yīng)用.在實現(xiàn)eSault的過程中,所有的功能模塊均使用Akka的Actor進行刻畫,這使得eSault的結(jié)構(gòu)非常簡單直觀.

        3.2 彈性可伸縮效果驗證

        本實驗的主要目的是驗證eSault彈性可伸縮的能力,即證明eSault上運行的流處理應(yīng)用可以隨輸入流量的變化自動調(diào)整資源使用量,并保證處理延遲的穩(wěn)定.實驗的主要思路是:階段性地調(diào)整Emitter生成單詞的速度,并在此過程中監(jiān)測單詞生成速度、單詞平均處理延遲和Counter的子處理單元數(shù)目的變化情況.其中輸入流量對應(yīng)單詞生成速度,并行度對應(yīng)Counter的子處理單元數(shù),延遲對應(yīng)單詞平均處理延遲.

        如圖6(a)所示實驗過程中,輸入流量共經(jīng)歷了2次上升和下降的變化周期,每個周期為時約250 s.在每個周期內(nèi),輸入流量在前30 s內(nèi),每10 s提升約50 000 tuples/s的單詞生成速度,并在達到峰值后穩(wěn)定約70 s;此后每20 s下降約50 000 tuple/s的單詞生成速度,并在達到谷值后穩(wěn)定約70 s.

        Fig. 6 Verification of elastic effect圖6 彈性效果驗證

        通過分析輸入流量、處理延遲和并行度三者的變化關(guān)系,可以發(fā)現(xiàn)如下符合實驗預期的現(xiàn)象:

        1) 實驗過程中的大部分時間,消息處理延遲基本穩(wěn)定在較低水平.如圖6(b)所示,在整個實驗過程中,應(yīng)用的消息處理延遲基本穩(wěn)定在100 ms以下,即使在輸入流量達到峰值后,消息處理延遲在大部分時間也穩(wěn)定在100 ms以下.

        2) 并行度隨輸入流量的變化趨勢明顯.圖6(c)中可以觀察到Counter的初始并行度為2,隨著輸入流量提高,其并行度快速增加以適應(yīng)負載增加,最終達到8;隨著輸入流量降低,其并行度逐漸減少以釋放多余資源,最終達到1.上述實驗結(jié)果基本滿足實驗預期,可以證明eSault基本支持了流處理應(yīng)用的彈性可伸縮.

        3.3 彈性可伸縮的必要性驗證

        本實驗的主要目的是驗證上述彈性效果實驗中,系統(tǒng)能夠根據(jù)數(shù)據(jù)流負載自動作出資源調(diào)整,從而保證系統(tǒng)不會因突然的數(shù)據(jù)流量高峰崩潰,也不會在數(shù)據(jù)流量較低時浪費資源.實驗的主要思路是:使用與彈性效果實驗中相同的Emitter,測試不同并行度的Counter,觀察單詞平均處理延遲的變化情況,并與彈性效果實驗中的延遲變化情況進行比較,從而驗證彈性伸縮的必要性.

        圖7展示了在預設(shè)為不同并行度以及彈性可伸縮執(zhí)行的情況下,處理延遲隨著輸入流量而變化的情況.由于并行度為1,2的情況較為特殊,單獨在圖7(a)中展示;其余并行度以及彈性執(zhí)行的情況在圖7(b)中展示;圖7(c)詳細展示了輸入流量達到峰值且彈性伸縮延遲穩(wěn)定后(60~110 s)不同并行度情況下的延遲比較.

        Fig. 7 Necessity validation of elasticity圖7 彈性必要性驗證

        通過觀察實驗結(jié)果可以發(fā)現(xiàn),當并行度固定為1和2時,系統(tǒng)在輸入流量上升后產(chǎn)生了嚴重的消息堆積,最終導致底層通信機制因來不及處理心跳信息而出錯,使系統(tǒng)無法正常運行.而彈性伸縮情況下,雖然Counter的初始并行度也為2,但其通過自動提高并行度度過了流量高峰,并保持了延遲的基本穩(wěn)定.這說明較低的并行度無法在規(guī)定延遲內(nèi)處理數(shù)據(jù)流量高峰,而彈性伸縮機制可以通過增加資源應(yīng)對流量高峰,此現(xiàn)象符合實驗預期.

        如圖7(b)所示,當并行度固定為4和6時,系統(tǒng)可以承受數(shù)據(jù)流量高峰,且沒有彈性伸縮的延遲波動期.但如圖7(c)所示,在彈性伸縮的延遲穩(wěn)定后,并行度為4和6的情況下,處理延遲會高于彈性伸縮的情況.并行度為8的情況下,延遲均值、方差和最大值均顯著低于并行度為4和6的情況,而僅略高于并行度為10的情況.這說明8是該流量峰值下合適的并行度,而eSault的彈性機制確定的并行度恰為8,這說明eSault的彈性伸縮機制準確地找到了最合適的并行度.不過因為彈性伸縮機制本身有一定開銷,所以最終彈性情況下的延遲略高于并行度為8的情況.

        上述實驗結(jié)果基本滿足實驗預期,可以證明eSault的彈性支持可以準確地根據(jù)輸入流量找到最佳并行度,既能應(yīng)對流量高峰,又能在流量低谷節(jié)約資源.

        4 結(jié)束語

        本文基于Actor模型設(shè)計與實現(xiàn)的彈性流處理框架eSault,除了實現(xiàn)其他通用流處理框架的基本功能外,還重點支持了應(yīng)用的彈性伸縮.實驗證明了eSault可以準確根據(jù)輸入流量決定資源使用量,既能在流量高峰時保持延遲穩(wěn)定,又能在流量低谷時節(jié)約資源,達到了預期效果.

        未來的工作包括2個方面:

        1) 更加智能的自適應(yīng)算法,使得參數(shù)配置可以根據(jù)數(shù)據(jù)流歷史情況挖掘流量變化規(guī)律自動調(diào)整,對于流量波動較少但幅度較大的數(shù)據(jù)流,采用激進的參數(shù)配置;對于流量波動幅度較小但頻繁的數(shù)據(jù)流,采用穩(wěn)健的參數(shù)配置.

        2) 有狀態(tài)處理單元的彈性伸縮,進一步研究如何將狀態(tài)管理原語以盡可能透明地方式納入eSault的編程模型,并在框架內(nèi)支持彈性伸縮過程中的狀態(tài)遷移,從而實現(xiàn)對有狀態(tài)處理單元彈性伸縮的原生支持.

        [1]Cheng Xueqi, Jin Xiaolong, Wang Yuanzhuo, et al. Survey on big data system and analytic technology[J]. Journal of Software, 2014, 25(9): 1889-1908 (in Chinese)(程學旗, 靳小龍, 王元卓, 等. 大數(shù)據(jù)系統(tǒng)和分析技術(shù)綜述[J]. 軟件學報, 2014, 25(9): 1889-1908)

        [2]Cui Xingcan, Yu Xiaohui, Liu Yang, et al. Distributed stream processing: A survey[J]. Journal of Computer Research and Development, 2015, 52(2): 318-332 (in Chinese)(崔星燦, 禹曉輝, 劉洋, 等. 分布式流處理技術(shù)綜述[J]. 計算機研究與發(fā)展, 2015, 52(2): 318-332)

        [3]Gulisano V, Jimenez-Peris R, Patino-Martinez M, et al. Streamcloud: An elastic and scalable data streaming system[J]. IEEE Trans on Parallel and Distributed Systems, 2012, 23(12): 2351-2365

        [4]Hummer W, Satzger B, Dustdar S. Elastic stream processing in the cloud[J]. Wiley Interdisciplinary Reviews: Data Mining and Knowledge Discovery, 2013, 3(5): 333-345

        [5]Qi Kaiyuan, Han Yanbo, Zhao Zhuofeng, et al. MapReduce intermediate result cache for concurrent data stream processing[J]. Journal of Computer Research and Development, 2013, 50(1): 111-121 (in Chinese)(亓開元, 韓燕波, 趙卓峰, 等. 支持高并發(fā)數(shù)據(jù)流處理的MapReduce中間結(jié)果緩存[J]. 計算機研究與發(fā)展, 2013, 50(1): 111-121)

        [6]Lu Xicheng, Wang Huaimin, Wang Ji. Internet-based virtual computing environment (iVCE): Concepts and architecture[J]. Scientia Sinica: Informationis, 2006, 49(6): 681-701

        [7]Buyya R, Broberg J, Goscinski A. Cloud Computing: Principles and Paradigms[M]. New York: John Wiley & Sons, 2011: 457-490

        [8]Herbst N R, Kounev S, Reussner R. Elasticity in cloud computing: What it is, and what it is not[C] //Proc of the 10th Int Conf on Autonomic Computing. Berkeley, CA: USENIX Association, 2013: 23-27

        [9]Neumeyer L, Robbins B, Nair A, et al. S4: Distributed stream computing platform[C] //Proc of the 13th Int Conf on Data Mining Workshops. Piscataway, NJ: IEEE, 2010: 170-177

        [10]Toshniwal A, Taneja S, Shukla A, et al. Storm@ twitter[C] //Proc of the 2014 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2014: 147-156

        [12]Zaharia M, Das T, Li H, et al. Discretized streams: An efficient and fault-tolerant model for stream processing on large clusters[C] //Proc of the 4th USENIX Conf on Hot Topics in Cloud Computing. Berkeley, CA: USENIX Association, 2012: 10

        [13]Satzger B, Hummer W, Leitner P, et al. Esc: Towards an elastic stream computing platform for the cloud[C] //Proc of 2011 Int Conf on Cloud Computing. Piscataway, NJ: IEEE, 2011: 348-355

        [14]Fernandez R, Migliavacca M, Kalyvianaki E, et al. Integrating scale out and fault tolerance in stream processing using operator state management[C] //Proc of the 2013 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2013: 725-736

        [15]Hewitt C, Bishop P, Steiger R. A universal modular actor formalism for artificial intelligence[C] //Proc of the 3rd Int Joint Conf on Artificial Intelligence. San Francisco, CA: Morgan Kaufmann, 1973: 235-245

        [16]Cesarini F, Thompson S. Erlang Programming[M]. Sebastopol, CA: O’Reilly Media, Inc, 2009

        [17]Zhan Hanglong, Kang Lianghuan, Cao Donggang. DETS: A dynamic and elastic task scheduler supporting multiple parallel schemes[C] //Proc of the 8th Int Symp on Service Oriented System Engineering. Piscataway, NJ: IEEE, 2014: 278-283

        An Elastic Scalable Stream Processing Framework Based on Actor Model

        Zhan Hanglong, Liu Lantao, Kang Lianghuan, Cao Donggang, and Xie Bing

        (Key Laboratory of High Confidence Software Technologies (Peking University), Ministry of Education, Beijing 100871)(Peking University Information Technology Institute(Tianjin Binhai), Tianjin 300450)

        In the era of big data, stream processing has been widely applied in financial industry, advertising, Internet of things, social networks and many other fields. In streaming scenarios, the generation speed of stream data tends to be fluctuant and difficult to predict. If the streaming peak is larger than system capacity, the system may run slowly or even crash, which leads to job failure. If excessive resources are provided in case of streaming peak, there can be unnecessary waste under light load. In order to solve the matching problem between stream processing load and resources, stream processing system should be elastically scalable, which means that provided resources can be adjusted automatically according to the real-time change of stream flow. Although some researches have made great progress in stream processing, it is still an open problem that how to design an elastic scalable system. This paper introduces eSault, an elastically scalable stream processing framework based on Actor model. eSault firstly manages the processing units stratified hierarchically based on Actor model, and realizes scalability with two-layer routing mechanism. On this basis, eSault proposes an overload judgment algorithm based on data processing delay and light load judgment algorithm based on the data processing speed to efficiently allocate the resources, and achieve elastically scalable stream processing. Experiments show that eSault has good performance, and can achieve flexible scalability well.

        stream processing; Actor model; cloud computing; elastic scalable; two-layer routing mechanism

        Zhan Hanglong, born in 1989. Received his PhD degree from the School of Electronics Engineering and Computer Science, Peking University in 2016. His main research interests include big data, system software, parallel and distributed computing, etc.

        Liu Lantao, born in 1990. Received his MSc degree from the School of Electronics Engineering and Computer Science, Peking University in 2015. His main research interests include big data, system software, parallel and distributed computing, etc.

        Kang Lianghuan, born in 1986. Received his PhD degree from the School of Electronics Engineering and Computer Science, Peking University in 2015. His main research interests include distributed systems, concurrent programming structures and languages, etc.

        Cao Donggang, born in 1975. Received his PhD degree from the School of Electronics Engineering and Computer Science, Peking University in 2004. Currently associate professor at Peking University. His main research interests include system software, parallel and distributed computing, etc.

        Xie Bing, born in 1970. Received his PhD degree from the School of Computer, National University of Defense Technology in 1998. Currently professor and PhD supervisor at Peking University. His main research interests include software engineering, formal methods and software reuse, etc (xiebing@pku.edu.cn).

        2015-12-09;

        2016-08-08

        國家“八六三”高技術(shù)研究發(fā)展計劃基金項目(2015AA01A202);國家“九七三”重點基礎(chǔ)研究計劃基金項目(2011CB302604);國家自然科學基金項目(61272154,61421091);百度云服務(wù)開放平臺示范項目(2015年) This work was supported by the National High Technology Research and Development Program of China (863 Program)(2015AA01A202), the National Basic Research Program of China (973 Program)(2011CB302604), the National Natural Science Foundation of China (G61272154, G61421091), and the Baidu Cloud Service Open Platform Demonstration Project (2015).

        曹東剛(caodg@pku.edu.cn)

        TP391

        猜你喜歡
        路由表處理單元消息
        不同生物鏈組合對黃河下游地區(qū)引黃水庫富營養(yǎng)化及藻類控制
        城市污水處理廠設(shè)備能耗及影響因素分析研究
        科技資訊(2021年10期)2021-07-28 04:04:53
        長填齡滲濾液MBR+NF組合工藝各處理單元的DOM化學多樣性
        一種高可用負載均衡網(wǎng)絡(luò)數(shù)據(jù)采集處理的方法及系統(tǒng)
        基于OSPF特殊區(qū)域和LSA的教學設(shè)計與實踐
        一張圖看5G消息
        組播狀態(tài)異常導致故障
        消息
        消息
        消息
        亚洲图区欧美| 亚洲国产精品成人精品无码区在线 | 黑丝美女喷水在线观看| 中文字幕精品一区二区的区别| 国产放荡对白视频在线观看| 日本少妇人妻xxxxx18| 亚洲人成无码网站十八禁| 亚洲不卡av一区二区三区四区| 国产黄大片在线观看| 久久精品国产亚洲av成人| 精品国产亚洲av麻豆尤物| 亚洲av熟女中文字幕| 国产99在线 | 亚洲| 毛片网站视频| 一个人的视频免费播放在线观看| 蜜桃精品人妻一区二区三区| 国产精品毛片久久久久久久| 正在播放淫亚洲| 中文字幕亚洲一区视频| 国产在线精品一区二区三区| 国产亚洲精品第一综合麻豆| AV在线毛片| 亚洲综合第一页中文字幕| 麻豆精品国产精华精华液好用吗| 精品国产AⅤ一区二区三区4区| 亚洲精品一区二区三区日韩| 久久不见久久见www日本网| 韩国19禁无遮挡啪啪无码网站| 国产亚洲日本人在线观看| 天堂网日韩av在线播放一区 | 久久精品无码一区二区三区不卡| 一区二区高清免费日本| 一区二区三区av波多野结衣| 特黄aa级毛片免费视频播放| 色婷婷精久久品蜜臀av蜜桃| 97一期涩涩97片久久久久久久 | 2021久久最新国产精品| 日韩人妻免费一区二区三区| 人妻少妇被粗大爽.9797pw| 免费夜色污私人影院在线观看 | 国产又色又爽无遮挡免费软件|