丁派克 曹芳芳 王曉玲
北京航天自動控制研究所,北京100854
隨著信息化、智能化、國產(chǎn)化的發(fā)展,航天設(shè)備產(chǎn)生的試驗數(shù)據(jù)日益增加,由此導(dǎo)致數(shù)據(jù)存儲和分析的可靠性要求和性能要求也在不斷提高。大規(guī)模數(shù)據(jù)的出現(xiàn)不僅對數(shù)據(jù)存儲產(chǎn)生了很大的壓力,同時也使數(shù)據(jù)分析、計算的難度大幅增加?!爸信d事件”、“棱鏡門”事件后,國家加強了自主可控的要求,基于核心技術(shù)堅持國產(chǎn)化原則,需要在國產(chǎn)平臺上實現(xiàn)大數(shù)據(jù)組件軟件的國產(chǎn)化適配和智能數(shù)據(jù)分析技術(shù)。但受限于國產(chǎn)化硬件性能較低的制約,國產(chǎn)化平臺的單機性能過低,為了提升系統(tǒng)的數(shù)據(jù)處理能力,只能通過增大處理器的數(shù)量來實現(xiàn)[1]。將基于Hadoop框架的MapReduce分布式計算框架的航天大數(shù)據(jù)處理軟件移植到國產(chǎn)硬件上運行時,出現(xiàn)數(shù)據(jù)解析速度過慢、數(shù)據(jù)處理性能不足的問題,無法滿足當(dāng)前的型號需求。針對當(dāng)前數(shù)據(jù)處理性能不足的問題,設(shè)計一種分布式數(shù)據(jù)預(yù)處理框架,并把預(yù)處理框架前移到數(shù)據(jù)計算模塊之前;在原有的MapReduce 分布式計算框架開發(fā)的基礎(chǔ)上,采用基于Spark 的數(shù)據(jù)計算框架。Spark 是一個針對超大數(shù)據(jù)集合的低延遲的集群分布式計算系統(tǒng),通過采用彈性分布式數(shù)據(jù)集RDD,消除了MapReduce計算過程中的臨時計算數(shù)據(jù)結(jié)果的落地,有效減少了大量非必要的硬盤 IO開銷,且Spark 在數(shù)據(jù)實時處理、數(shù)據(jù)挖掘以及機器學(xué)習(xí)等尤其是需要大量迭代的計算方面具有先天的優(yōu)勢。
MapReduce是一種分布式編程模型,是Hadoop的核心組件之一[2]。原數(shù)據(jù)處理軟件的數(shù)據(jù)解析算法位于MapReduce模塊,通過客戶端將大文件數(shù)據(jù)導(dǎo)入HDFS(Hadoop Distributed File System,分布式文件系統(tǒng),Hadoop的核心組件之一,下文簡稱HDFS)文件系統(tǒng)后,提交解析任務(wù)給MapReduce模塊。按數(shù)據(jù)類型將任務(wù)分類、切塊后提交給Map,后調(diào)用解析算法,在Reduce階段根據(jù)業(yè)務(wù)需要進行上下文運算和排序處理后輸出到HDFS文件系統(tǒng)。原數(shù)據(jù)處理框架如圖1所示。
圖1 原數(shù)據(jù)處理框架
這種方式主要有2個問題:1)將數(shù)據(jù)預(yù)處理和解析同時進行,會導(dǎo)致啟停、調(diào)用MapReduce模塊頻繁,在Reduce模塊對文件進行上下文運算導(dǎo)致數(shù)據(jù)傾斜嚴重,內(nèi)存消耗過大,最終導(dǎo)致數(shù)據(jù)解析效率不高。2)數(shù)據(jù)吞吐量不足,可能會出現(xiàn)數(shù)據(jù)預(yù)處理出現(xiàn)錯誤、產(chǎn)生壞數(shù)據(jù)的情況,導(dǎo)致數(shù)據(jù)解析的可靠性不足。
針對上述問題設(shè)計了分布式數(shù)據(jù)預(yù)處理框架。主要包括以下2個部分:
1)將數(shù)據(jù)解析算法前移,對數(shù)據(jù)進行規(guī)格化處理后,再進入數(shù)據(jù)解析模塊進行處理。有效減少了MapReduce的啟停調(diào)用頻率;
2)采用Kafka消息隊列,增加數(shù)據(jù)的吞吐量,可以保證數(shù)據(jù)的高質(zhì)量、高可靠性預(yù)解析。Kafka是由LinkedIn開發(fā)的基于發(fā)布-訂閱(Pub-Sub)機制的分布式消息隊列,具有高吞吐、低延遲、易拓展的優(yōu)勢,并可以對消息隊列進行持久化存儲[3]。優(yōu)化后的分布式預(yù)處理框架如圖2所示。
圖2 分布式預(yù)處理框架
分布式預(yù)處理框架先按照一定協(xié)議規(guī)則對數(shù)據(jù)文件進行分類,然后通過切塊、打標簽等方法以多進程、多線程的方式高效地將原始數(shù)據(jù)通過Kafka消息隊列有序地解析成結(jié)構(gòu)化、可讀性高的有效數(shù)據(jù),以便于后續(xù)業(yè)務(wù)的查詢與分析。Kafka消息隊列對帶有標簽的數(shù)據(jù)塊按照一定規(guī)則進行排序,形成有序數(shù)據(jù)流,保證進入解析計算模塊的數(shù)據(jù)是規(guī)格化的數(shù)據(jù),可以大幅提升數(shù)據(jù)解析效率。
通過調(diào)整預(yù)處理結(jié)構(gòu)得到的規(guī)格化數(shù)據(jù),進入數(shù)據(jù)解析框架。原軟件采用的MapReduce框架通過輸入加載模塊(InputFormat)對HDFS中的數(shù)據(jù)進行加載后,進行邏輯分塊存入HDFS,數(shù)據(jù)讀取模塊(RecordReader)將各個分塊數(shù)據(jù)從HDFS讀取出來以鍵值對的形式輸出作為Map函數(shù)的輸入MapReduce框架是把中間結(jié)果寫入到HDFS中,帶來了大量的數(shù)據(jù)復(fù)制、磁盤IO和序列化開銷[4]。這些大量的額外開銷產(chǎn)生了內(nèi)存消耗過大的問題,導(dǎo)致軟件的數(shù)據(jù)解析計算性能不足。Spark將數(shù)據(jù)以“RDD轉(zhuǎn)換”的方式進行解析計算,結(jié)果保存在內(nèi)存當(dāng)中,不儲存中間結(jié)果,可以大幅減少內(nèi)存資源消耗,從而提升解析效率。
RDD(Resilient Distributed Datasets,彈性分布式數(shù)據(jù)集,下文簡稱RDD)是Spark中最基本的數(shù)據(jù)抽象單位,本質(zhì)上是一個只讀的分區(qū)記錄集合,不能直接修改,只能基于穩(wěn)定的物理存儲中的數(shù)據(jù)集來創(chuàng)建RDD,或者通過在其他RDD上執(zhí)行確定的轉(zhuǎn)換操作創(chuàng)建新的RDD。每個RDD可以分成多個分區(qū),每個分區(qū)就是一個數(shù)據(jù)集片段(HDFS上的塊),并且一個RDD的不同分區(qū)可以被保存到集群中不同的節(jié)點上,從而可以在集群中的不同節(jié)點上進行并行計算[5]。
RDD的調(diào)用方法是非即時的,在計算之前的RDD轉(zhuǎn)換操作,Spark僅僅是記錄下了RDD轉(zhuǎn)換操作的行動軌跡以及全部RDD之間的依賴關(guān)系,在轉(zhuǎn)換結(jié)果未確定之前,不進行真正的計算。RDD的特殊調(diào)用機制,使Spark在處理數(shù)據(jù)時不必耗費資源存儲中間結(jié)果,直接獲得所需要的最終數(shù)據(jù)進行輸出與存儲。
Spark會根據(jù)RDD之間的轉(zhuǎn)化操作,區(qū)分出2種依賴關(guān)系:寬依賴于窄依賴。一個父RDD對應(yīng)多個子RDD,圖3中RDD0、RDD1、RDD2、RDD3之間的關(guān)系,為寬依賴;一個或多個父RDD對應(yīng)一個子RDD,如RDD4、RDD5、RDD6、RDD7之間的關(guān)系,為窄依賴。某一個RDD在發(fā)生數(shù)據(jù)丟失時,Spark會返回上一級RDD對數(shù)據(jù)進行重算[6]。
圖3 RDD的寬依賴與窄依賴
Spark通過記錄RDD之間的依賴關(guān)系,可以提升其容錯性,從而提升Spark的執(zhí)行速度。
在Spark執(zhí)行計算全部任務(wù)時,會根據(jù)RDD之間的依賴關(guān)系生成流程圖,再通過流程圖的信息劃分任務(wù)階段。分解的具體依據(jù)是:
1)從流程圖的末端進行反向分析,遇到窄依賴就把當(dāng)前RDD的計算任務(wù)劃入當(dāng)前階段。由于窄依賴的轉(zhuǎn)化關(guān)系僅存在一個丟失數(shù)據(jù)的子RDD,在重算時對于父類RDD的利用率為100%,具有很高的計算效率,Spark會盡量將全部窄依賴都分入一個階段進行解析。
2)遇到寬依賴就斷開。對于寬依賴,一個父RDD對應(yīng)多個子RDD,某一個子RDD發(fā)生數(shù)據(jù)丟失時,Spark返回上一級父RDD,并對所有的子RDD進行再次運算,無論其他子RDD是否發(fā)生數(shù)據(jù)丟失。Spark在寬依賴關(guān)系的RDD產(chǎn)生數(shù)據(jù)丟失時,會對其他未丟失的子RDD的再次計算產(chǎn)生了大量多余的計算,與窄依賴放在同一階段進行計算會降低計算效率[7]。
Spark根據(jù)寬窄依賴將計算任務(wù)分成多個階段,其中窄依賴的轉(zhuǎn)化關(guān)系類似于管道運輸,RDD之間相互不受影響,將其劃分到同一任務(wù)階段進行分布式計算,可以大幅提升計算效率。
基于RDD調(diào)用機制以及Spark內(nèi)存計算機制的優(yōu)越性,設(shè)計如圖4所示基于Spark的數(shù)據(jù)處理框架。從HDFS取出數(shù)據(jù)文件,獲得初始數(shù)據(jù)集RDDOrigin后,對數(shù)據(jù)進行預(yù)處理。預(yù)處理階段先根據(jù)數(shù)據(jù)包頭分成5種數(shù)據(jù)類型并打上標簽,再把數(shù)據(jù)進行切塊形成若干個RDD。
以Net類型數(shù)據(jù)為例,RDDNet被切塊后形成若干個帶有標簽的小塊RDD,然后對小塊RDD進行解析,解析完畢后將小塊RDD進行合并、排序操作,最終得到RDDResult為輸出結(jié)果,存入到HDFS。Spark任務(wù)階段規(guī)劃如圖4所示。Spark從流程圖的末端開始分析,RDD的合并排序以及RDD的解析轉(zhuǎn)換均為窄依賴,可劃分到同一階段;而初始數(shù)據(jù)集RDDOrigin的按數(shù)據(jù)類型分塊與RDD1553、RDDAD、RDDNet、RDDTCP/IP、RDDIO的切塊這2個轉(zhuǎn)化操作均為寬依賴,被劃分到不同的任務(wù)階段中。由于Spark在劃分任務(wù)階段時是從末端開始分析,那么實際計算的任務(wù)階段順序應(yīng)該是反過來的。RDDOrigin按數(shù)據(jù)類型分塊的任務(wù)在階段1,RDD1553、RDDAD、RDDNet、RDDTCP/IP、RDDIO的切塊在階段2,解析、數(shù)據(jù)的合并、排序任務(wù)在階段3。全部計算任務(wù)完成后,輸出最終的RDDResult并存入HDFS。
圖4 基于Spark的數(shù)據(jù)處理框架
大數(shù)據(jù)軟件在中標麒麟5.0操作系統(tǒng)環(huán)境和龍芯3A3000刀片服務(wù)器集群的硬件平臺上適配運行;采用的大數(shù)據(jù)組件主要包括Hadoop 2.7.2,HDFS 2.7.3,Spark 2.0.2,Kafka 0.8;以某航天武器裝備控制系統(tǒng)產(chǎn)生的試驗數(shù)據(jù)為解析對象,數(shù)據(jù)類型主要為1553、AD、IO等二進制數(shù)據(jù)類型。將100M,300M,1G,3G大小的航天裝備試驗數(shù)據(jù)分別用舊方案與優(yōu)化后的方案進行解析,并記錄處理時間,最后把2個方案所消耗的時間進行對比。
通過對航天裝備試驗數(shù)據(jù)進行解析比對,得出如表1所示結(jié)果:
通過試驗數(shù)據(jù)對比可以看出:優(yōu)化后的方案在處理100M、300M、1G、3G大小的數(shù)據(jù)均較原始方案性能優(yōu)越,數(shù)據(jù)處理速度有了明顯提升。
通過對數(shù)據(jù)預(yù)處理框架和Spark內(nèi)存計算技術(shù)的研究,提出了一種基于Spark的國產(chǎn)化海量數(shù)據(jù)預(yù)處理與數(shù)據(jù)計算方法,對航天武器裝備控制系統(tǒng)產(chǎn)生的試驗數(shù)據(jù)進行快速解析和處理。經(jīng)過與原數(shù)據(jù)處理軟件框架的對比分析驗證,結(jié)果表明,本文的優(yōu)化方法可以有效提升國產(chǎn)大數(shù)據(jù)平臺的數(shù)據(jù)處理性能,利用RDD調(diào)用機制與Spark內(nèi)存計算能力,解決數(shù)據(jù)在MapReduce框架中解析性能不足的問題,并已應(yīng)用到部分航天武器裝備系統(tǒng)中,有效解決了國產(chǎn)平臺下海量數(shù)據(jù)的快速處理分析需求。
表1 數(shù)據(jù)解析結(jié)果比對