呂勤
摘要:目前市場(chǎng)上的大數(shù)據(jù)流式處理系統(tǒng)普遍存在計(jì)算結(jié)果不能共享、實(shí)時(shí)處理性能不高、計(jì)算時(shí)間窗口固定和不能動(dòng)態(tài)擴(kuò)容等問(wèn)題。針對(duì)這些問(wèn)題,本文設(shè)計(jì)了一種基于新型時(shí)間切片原理,具備動(dòng)態(tài)資源調(diào)度、系統(tǒng)容錯(cuò)、動(dòng)態(tài)窗口計(jì)算能力的高性能大數(shù)據(jù)流式處理系統(tǒng)。
關(guān)鍵詞:大數(shù)據(jù)流式處理;時(shí)間窗口;實(shí)時(shí)計(jì)算
中圖分類(lèi)號(hào):TP319 文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):1007-9416(2020)03-0140-03
1 大數(shù)據(jù)流式處理面臨的挑戰(zhàn)
目前業(yè)內(nèi)主流的大數(shù)據(jù)流式處理系統(tǒng)面臨諸多挑戰(zhàn),最突出的包括計(jì)算資源和存儲(chǔ)資源的動(dòng)態(tài)調(diào)度分配、系統(tǒng)容錯(cuò)和動(dòng)態(tài)時(shí)間窗口調(diào)整等。在資源調(diào)度方面,許多流式處理系統(tǒng)普遍采用多節(jié)點(diǎn)進(jìn)行分布式數(shù)據(jù)計(jì)算和數(shù)據(jù)結(jié)果的存儲(chǔ),技術(shù)難點(diǎn)在于數(shù)據(jù)流對(duì)多個(gè)計(jì)算節(jié)點(diǎn)的均勻分配以及數(shù)據(jù)結(jié)果的均勻存儲(chǔ)和各個(gè)節(jié)點(diǎn)資源的均衡使用。此外,在大數(shù)據(jù)流式處理過(guò)程中,數(shù)據(jù)分發(fā)節(jié)點(diǎn)、計(jì)算節(jié)點(diǎn)和存儲(chǔ)節(jié)點(diǎn)間存在大量的數(shù)據(jù)交互,網(wǎng)絡(luò)資源消耗極高,往往成為系統(tǒng)性能瓶頸。在系統(tǒng)容錯(cuò)方面需要在單個(gè)節(jié)點(diǎn)出現(xiàn)故障時(shí),能保證整體系統(tǒng)的正常運(yùn)行。當(dāng)整個(gè)系統(tǒng)資源不足以滿(mǎn)足數(shù)據(jù)的處理時(shí),還要能在保證當(dāng)前系統(tǒng)的穩(wěn)定運(yùn)行情況下動(dòng)態(tài)添加資源,以實(shí)現(xiàn)系統(tǒng)處理能力的動(dòng)態(tài)擴(kuò)展[1]。在計(jì)算窗口方面,因?yàn)榇嬖谟?jì)算任務(wù)執(zhí)行過(guò)程中只保存數(shù)據(jù)處理的結(jié)果數(shù)據(jù)、流入的原始數(shù)據(jù)被丟棄等原因,多數(shù)流式處理系統(tǒng)的計(jì)算窗口都是靜態(tài)的,無(wú)法支持在計(jì)算任務(wù)啟動(dòng)后臨時(shí)調(diào)整計(jì)算窗口。
2 大數(shù)據(jù)流式處理系統(tǒng)架構(gòu)
為了滿(mǎn)足業(yè)務(wù)發(fā)展對(duì)高性能實(shí)時(shí)指標(biāo)計(jì)算能力的需要,同時(shí)解決目前行業(yè)內(nèi)同類(lèi)工具普遍存在的問(wèn)題,采用新型的時(shí)間切片原理,在利用Kafka、Netty和Redis等開(kāi)源組件的技術(shù)特性的基礎(chǔ)上,設(shè)計(jì)了一套高性能的大數(shù)據(jù)流式處理系統(tǒng)。該系統(tǒng)主要功能包括接受外部系統(tǒng)流水?dāng)?shù)據(jù)、實(shí)時(shí)指標(biāo)計(jì)算和向外部系統(tǒng)提供指標(biāo)查詢(xún)等功能。系統(tǒng)分為四層架構(gòu),分別為通訊層、服務(wù)層(Server層)、計(jì)算層(Agent層)和計(jì)算結(jié)果緩存層,每層均為多節(jié)點(diǎn)集群,緩存層由Redis集群組成,計(jì)算層與緩存層的節(jié)點(diǎn)按照1∶1配置,部署在同一物理節(jié)點(diǎn)上[2]。系統(tǒng)架構(gòu)參見(jiàn)圖1。
(1)通信層負(fù)責(zé)與外部系統(tǒng)的交互。通訊層采用kafka作為數(shù)據(jù)流入起點(diǎn)。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),十分適合作為流式處理系統(tǒng)的數(shù)據(jù)“生產(chǎn)者”和“消費(fèi)者”。通訊層通過(guò)訂閱kafka中的topic,接受外部系統(tǒng)的流水?dāng)?shù)據(jù)并發(fā)送到后端服務(wù)層。同時(shí)通訊層還負(fù)責(zé)接受外部系統(tǒng)的指標(biāo)查詢(xún)服務(wù)請(qǐng)求,并將從服務(wù)層獲取的查詢(xún)結(jié)果反饋給外部系統(tǒng)。
(2)服務(wù)層主要功能包括計(jì)算指令分發(fā)、計(jì)算任務(wù)監(jiān)控和指標(biāo)結(jié)果二次加工等。在接收到通訊層發(fā)來(lái)的流水?dāng)?shù)據(jù)后,服務(wù)層先通過(guò)對(duì)流入數(shù)據(jù)關(guān)鍵值的判斷,確定需要執(zhí)行的計(jì)算任務(wù)(計(jì)算任務(wù)是最小的計(jì)算單位),再通過(guò)對(duì)計(jì)算任務(wù)參數(shù)的組合計(jì)算,確定具體的計(jì)算節(jié)點(diǎn)。服務(wù)層將流水?dāng)?shù)據(jù)和需要執(zhí)行的計(jì)算任務(wù)列表進(jìn)行封裝,通過(guò)Netty分發(fā)到指定的計(jì)算節(jié)點(diǎn)。
服務(wù)層通過(guò)收集計(jì)算節(jié)點(diǎn)的執(zhí)行狀態(tài)實(shí)現(xiàn)對(duì)計(jì)算任務(wù)監(jiān)控。當(dāng)所有的計(jì)算任務(wù)都執(zhí)行成功后,服務(wù)層認(rèn)為當(dāng)前流入數(shù)據(jù)已被系統(tǒng)正常處理。如果有計(jì)算任務(wù)執(zhí)行失敗,數(shù)據(jù)分發(fā)節(jié)點(diǎn)會(huì)分析失敗原因并決定是否轉(zhuǎn)派其他計(jì)算節(jié)點(diǎn)再次執(zhí)行。
服務(wù)層在響應(yīng)指標(biāo)查詢(xún)服務(wù)請(qǐng)求,獲取緩存節(jié)點(diǎn)上的中間結(jié)果后,根據(jù)預(yù)置的處理公式進(jìn)行二次或多次加工,最終完成服務(wù)。
(3)計(jì)算層主要功能包括計(jì)算任務(wù)執(zhí)行、異常處理、數(shù)據(jù)序列化以及時(shí)間切片等。計(jì)算節(jié)點(diǎn)識(shí)別服務(wù)層分發(fā)的數(shù)據(jù),并使用對(duì)應(yīng)的算子和原有緩存節(jié)點(diǎn)中的計(jì)算結(jié)果合并計(jì)算,合并計(jì)算的結(jié)果再次保存到緩存節(jié)點(diǎn)中,并向服務(wù)層返回任務(wù)執(zhí)行成功狀態(tài)。若計(jì)算任務(wù)出現(xiàn)異常,則向服務(wù)層返回計(jì)算任務(wù)執(zhí)行失敗狀態(tài),結(jié)果數(shù)據(jù)不保存。
(4)緩存層主要功能為存儲(chǔ)計(jì)算結(jié)果,由Redis集群構(gòu)成。Redis集群內(nèi)置的自動(dòng)分區(qū)、復(fù)制、LRU逐出、事務(wù)等特性為系統(tǒng)緩存層的高可用和高性能提供了保證。
3 系統(tǒng)技術(shù)特色與實(shí)現(xiàn)原理
該系統(tǒng)具備動(dòng)態(tài)資源調(diào)度、系統(tǒng)容錯(cuò)和動(dòng)態(tài)調(diào)整指標(biāo)計(jì)算時(shí)間窗口等能力,處理性能高效,運(yùn)行穩(wěn)定。該系統(tǒng)實(shí)現(xiàn)以上能力,主要實(shí)現(xiàn)了以下幾項(xiàng)關(guān)鍵的技術(shù)。
3.1 動(dòng)態(tài)資源調(diào)度
資源調(diào)度的核心技術(shù)是本地?cái)?shù)據(jù)本地計(jì)算的MPP架構(gòu)和均勻存儲(chǔ)均勻計(jì)算。
(1)本地?cái)?shù)據(jù)本地計(jì)算的MPP架構(gòu)設(shè)計(jì):通過(guò)將計(jì)算節(jié)點(diǎn)和存儲(chǔ)節(jié)點(diǎn)1∶1的部署在同一物理節(jié)點(diǎn)的部署方式,實(shí)現(xiàn)數(shù)據(jù)的讀取、計(jì)算和結(jié)果回寫(xiě)均能在同一物理節(jié)點(diǎn)上執(zhí)行,形成MPP架構(gòu),大幅減少網(wǎng)絡(luò)開(kāi)銷(xiāo)[3]。
(2)均勻存儲(chǔ)均勻計(jì)算:系統(tǒng)采用Redis的hash算法以及Server、Agent與Redis的協(xié)同實(shí)現(xiàn)了均勻存儲(chǔ)均勻計(jì)算。具體來(lái)講,均勻存儲(chǔ)的實(shí)現(xiàn)是利用Redis Cluster4.0的新技術(shù)特性,把所有數(shù)據(jù)劃分為16384個(gè)不同的虛擬槽,根據(jù)機(jī)器的性能可以把不同的槽位分配給不同的Redis節(jié)點(diǎn)。通過(guò)對(duì)流水?dāng)?shù)據(jù)的關(guān)鍵值和計(jì)算任務(wù)的ID進(jìn)行hash計(jì)算找到數(shù)據(jù)對(duì)應(yīng)的槽位。Server端存儲(chǔ)一張槽位與機(jī)器的對(duì)應(yīng)關(guān)系表,在根據(jù)槽位找到對(duì)應(yīng)的具體機(jī)器的同時(shí),也確定了對(duì)應(yīng)的Redis存儲(chǔ)節(jié)點(diǎn)。在這種機(jī)制下某個(gè)Redis節(jié)點(diǎn)只會(huì)存儲(chǔ)對(duì)應(yīng)部分的數(shù)據(jù),并且保證數(shù)據(jù)均勻分散在不同的Redis節(jié)點(diǎn)上。均勻計(jì)算也遵循了同樣的原理。服務(wù)層通過(guò)同樣的流水?dāng)?shù)據(jù)的關(guān)鍵值和計(jì)算任務(wù)的ID進(jìn)行hash計(jì)算找到計(jì)算任務(wù)對(duì)應(yīng)的槽位和對(duì)應(yīng)的Agent節(jié)點(diǎn)。計(jì)算任務(wù)和數(shù)據(jù)存儲(chǔ)的分布遵循同樣的hash算法,也保證了計(jì)算節(jié)點(diǎn)上的計(jì)算任務(wù)和存儲(chǔ)節(jié)點(diǎn)上的數(shù)據(jù)的一致性。
3.2 系統(tǒng)容錯(cuò)
在傳統(tǒng)的Keepalived+Nginx的容錯(cuò)技術(shù)以外,該系統(tǒng)還通過(guò)健康狀態(tài)監(jiān)控機(jī)制實(shí)現(xiàn)系統(tǒng)容錯(cuò)。Server節(jié)點(diǎn)和Agent節(jié)點(diǎn)都有各自的健康狀態(tài)監(jiān)聽(tīng)模塊來(lái)記錄本節(jié)點(diǎn)的健康狀態(tài)。此外Agent節(jié)點(diǎn)定期向Server層發(fā)送心跳匯報(bào)節(jié)點(diǎn)健康狀態(tài)信息,Server層根據(jù)Agent節(jié)點(diǎn)的健康狀態(tài)動(dòng)態(tài)調(diào)整數(shù)據(jù)的分發(fā)和計(jì)算任務(wù)的分配,保證分發(fā)時(shí)避開(kāi)不健康的計(jì)算節(jié)點(diǎn)。
在計(jì)算節(jié)點(diǎn)出現(xiàn)故障時(shí),通過(guò)兩段處理的方式保證計(jì)算任務(wù)的正常進(jìn)行。第一段處理,服務(wù)層在分發(fā)計(jì)算內(nèi)容前,先判斷計(jì)算節(jié)點(diǎn)的健康狀態(tài),如果該計(jì)算節(jié)點(diǎn)狀態(tài)為非健康,則隨機(jī)選擇一個(gè)健康的計(jì)算節(jié)點(diǎn)進(jìn)行分發(fā)。由于新選擇的計(jì)算節(jié)點(diǎn)對(duì)應(yīng)的緩存節(jié)點(diǎn)中未存儲(chǔ)對(duì)應(yīng)的數(shù)據(jù),該計(jì)算節(jié)點(diǎn)通過(guò)Redis Cluster的保障機(jī)制從其他緩存節(jié)點(diǎn)找到對(duì)應(yīng)的中間數(shù)據(jù),保證計(jì)算服務(wù)的持續(xù)進(jìn)行。第二段處理,計(jì)算節(jié)點(diǎn)在出現(xiàn)故障時(shí)會(huì)嘗試自動(dòng)重啟。如果重啟失敗,則繼續(xù)維持第一段處理,如果重啟成功,則Agent會(huì)向Server重新上報(bào)健康狀態(tài),恢復(fù)正常工作。
3.3 動(dòng)態(tài)窗口
所謂動(dòng)態(tài)時(shí)間窗口,即在不需要重新發(fā)布計(jì)算任務(wù)的前提下,可以實(shí)時(shí)調(diào)整指標(biāo)查詢(xún)窗口的大小,且能立即獲得對(duì)應(yīng)的結(jié)果。目前大多數(shù)基于窗口的流式計(jì)算解決方案都不能很好地支持動(dòng)態(tài)窗口。其根本原因在于這些解決方案的計(jì)算任務(wù)都是基于源數(shù)據(jù)的,由于源數(shù)據(jù)的數(shù)據(jù)量大,導(dǎo)致無(wú)法在長(zhǎng)時(shí)間保存源數(shù)據(jù)的同時(shí)又支持快速計(jì)算指標(biāo)結(jié)果。在本處理系統(tǒng)中,動(dòng)態(tài)窗口的實(shí)現(xiàn)基于系統(tǒng)獨(dú)有的時(shí)間切片的設(shè)計(jì)原理。
時(shí)間切片設(shè)計(jì)原理:時(shí)間切片是將時(shí)序數(shù)據(jù)按照一定的時(shí)間間隔聚合后的新的時(shí)序數(shù)據(jù)。一個(gè)時(shí)間切片是數(shù)據(jù)信息保存的最小單位,也是保存數(shù)據(jù)的主要形式。在本系統(tǒng)中,時(shí)間切片中不存儲(chǔ)源數(shù)據(jù),而是存儲(chǔ)該切片時(shí)間段內(nèi)所有流水?dāng)?shù)據(jù)按照指標(biāo)配置進(jìn)行聚合計(jì)算后得到的結(jié)果數(shù)據(jù),這種設(shè)計(jì)大幅壓縮了數(shù)據(jù)存儲(chǔ)量,可以存儲(chǔ)較長(zhǎng)時(shí)間的數(shù)據(jù)。本系統(tǒng)通過(guò)Redis技術(shù)實(shí)現(xiàn)流水?dāng)?shù)據(jù)的時(shí)間切片處理。當(dāng)流水?dāng)?shù)據(jù)進(jìn)入到某個(gè)具體的數(shù)據(jù)計(jì)算任務(wù)后,首先提取流水?dāng)?shù)據(jù)的發(fā)生時(shí)間,然后根據(jù)流水?dāng)?shù)據(jù)的關(guān)鍵值和計(jì)算任務(wù)ID拼裝成Redis的key,根據(jù)key取出對(duì)應(yīng)的value,并讀取value中最新切片的時(shí)間。如果流水?dāng)?shù)據(jù)的發(fā)生時(shí)間比該切片時(shí)間早,則不需要?jiǎng)?chuàng)建新的切片,找到當(dāng)前的流水?dāng)?shù)據(jù)對(duì)應(yīng)的時(shí)間切片,并根據(jù)任務(wù)里配置的聚合函數(shù)計(jì)算結(jié)果并更新該時(shí)間切片的value,更新回Redis中。如果流水?dāng)?shù)據(jù)的發(fā)生時(shí)間晚于目前的處理時(shí)間,則需要?jiǎng)?chuàng)建新的切片,并根據(jù)任務(wù)里配置的聚合函數(shù)計(jì)算出當(dāng)前流水?dāng)?shù)據(jù)的結(jié)果后存入Redis中。時(shí)間切片在Redis中的存儲(chǔ)參見(jiàn)圖2。
在指標(biāo)查詢(xún)?nèi)蝿?wù)中,時(shí)間切片的選取采用落入即包含的策略。切片選取策略參見(jiàn)圖3。以5分鐘切片為例,從起始切片開(kāi)始,每隔5分鐘開(kāi)啟一個(gè)新的切片,T為當(dāng)前時(shí)刻,H為指標(biāo)計(jì)算所需的持續(xù)時(shí)間,從T-H到T就是指標(biāo)計(jì)算的時(shí)間長(zhǎng)度。如果T-H正好處于時(shí)間切片首尾,就將X個(gè)時(shí)間切片納入計(jì)算,如果T-H在某個(gè)時(shí)間切片的中間位置,則采用時(shí)間覆蓋原則,將該時(shí)間切片全部納入計(jì)算。在圖3中,虛線(xiàn)段是指標(biāo)計(jì)算的理論時(shí)間長(zhǎng)度,實(shí)線(xiàn)段是指標(biāo)計(jì)算的實(shí)際時(shí)間長(zhǎng)度。采用這種策略,雖然在精度上有細(xì)微損失,但大幅提高了計(jì)算效率。當(dāng)需要?jiǎng)討B(tài)調(diào)整指標(biāo)查詢(xún)的時(shí)間窗口時(shí),無(wú)需對(duì)源數(shù)據(jù)重新計(jì)算,只需要通過(guò)必要的二次計(jì)算即可實(shí)時(shí)獲取新的結(jié)果,實(shí)現(xiàn)動(dòng)態(tài)窗口設(shè)置的實(shí)時(shí)生效。
4 結(jié)論
本文提出的基于窗口計(jì)算的大數(shù)據(jù)流式處理系統(tǒng)已經(jīng)實(shí)際投入應(yīng)用。整個(gè)系統(tǒng)采用三臺(tái)服務(wù)器搭建Kafka集群、兩臺(tái)服務(wù)器搭建服務(wù)層、四臺(tái)服務(wù)器搭建數(shù)據(jù)計(jì)算集群和存儲(chǔ)集群。在壓力測(cè)試中,在并發(fā)數(shù)為100且單條數(shù)據(jù)大小不超過(guò)5M的情況下,集群的指標(biāo)查詢(xún)性能達(dá)到7萬(wàn)TPS,成功率100%。以實(shí)際生產(chǎn)數(shù)據(jù)測(cè)試,在60秒內(nèi)發(fā)起2.3億次指標(biāo)查詢(xún)請(qǐng)求,系統(tǒng)的平均響應(yīng)時(shí)間為16ms,TPS為24600左右,且此時(shí)的各節(jié)點(diǎn)硬件資源使用均未達(dá)到極限。通過(guò)橫向動(dòng)態(tài)擴(kuò)展和網(wǎng)絡(luò)優(yōu)化,該流式處理系統(tǒng)的計(jì)算能力還能線(xiàn)性提高[4]。
參考文獻(xiàn)
[1] 孫大為,張廣艷,鄭緯民.大數(shù)據(jù)流式計(jì)算:關(guān)鍵技術(shù)及系統(tǒng)實(shí)例[J].軟件學(xué)報(bào),2014,25(04):839-862.
[2] 王奇.基于發(fā)布訂閱的分布式復(fù)雜事件處理系統(tǒng)的研究與實(shí)現(xiàn)[D].北京:北京郵電大學(xué),2018.
[3] 蘇錦.基于Netty的高性能RPC服務(wù)器的研究與實(shí)現(xiàn)[D].南京:南京郵電大學(xué),2018.
[4] 崔曉旻.基于Netty的高可服務(wù)消息中間件的研究與實(shí)現(xiàn)[D].成都:電子科技大學(xué),2014.
Abstract:At present, there are many problems in big data streaming processing systems in the market, such as the calculation results can not be shared, the real-time processing performance is not high, the calculation time window is fixed, and the calculation capacity cannot be expanded dynamically. To solve these problems, this paper designs a high-performance streaming data processing system based on the new time slicing principle, which has the ability of dynamic resource scheduling, system fault tolerance and dynamic window computing.
Key words:big data stream processing; time window; real time computing
數(shù)字技術(shù)與應(yīng)用2020年3期