亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        模型驅(qū)動的大數(shù)據(jù)流水線框架PiFlow

        2020-06-20 12:00:58朱小杰趙子豪
        計算機應用 2020年6期
        關(guān)鍵詞:批處理流水線數(shù)據(jù)處理

        朱小杰,趙子豪,2,杜 一,2*

        (1.中國科學院計算機網(wǎng)絡(luò)信息中心,北京 100190;2.中國科學院大學,北京 100049)

        (?通信作者電子郵箱duyi@cnic.cn)

        0 引言

        伴隨著互聯(lián)網(wǎng)和物聯(lián)網(wǎng)技術(shù)的不斷發(fā)展,數(shù)據(jù)采集終端的種類和數(shù)目迅猛增加,數(shù)據(jù)量越來越大、產(chǎn)生速度越來越快[1]。同時,隨著越來越多的大科學裝置的建設(shè)和重大科學實驗的開展,科學研究也進入到一個數(shù)據(jù)密集型科學階段[2]。麥肯錫在2011 年的一份報告中稱,數(shù)據(jù)已經(jīng)成為重要的生產(chǎn)要素[3]。然而,大數(shù)據(jù)時代的到來,使得大數(shù)據(jù)處理面臨數(shù)據(jù)復雜、計算復雜、系統(tǒng)復雜等問題[4],對數(shù)據(jù)處理軟件的實時性、靈活性、穩(wěn)定性都提出了較高的要求,如金融行業(yè)的日常運營產(chǎn)生大量數(shù)據(jù),這些數(shù)據(jù)產(chǎn)生于金融服務(wù)系統(tǒng),時效性較強,需要在金融服務(wù)系統(tǒng)與數(shù)據(jù)處理系統(tǒng)之間流動并進行實時計算[5]。國家自然科學基金大數(shù)據(jù)知識管理服務(wù)平臺,需要融合來自MongoDB、Oracle、文件傳輸協(xié)議(File Transfer Protocol,F(xiàn)TP)等不同數(shù)據(jù)源上億條科研項目、科研成果、科研人員等數(shù)據(jù)[6]。

        大數(shù)據(jù)處理過程包括數(shù)據(jù)采集、清洗、匯聚、分析等不同環(huán)節(jié),不同環(huán)節(jié)之間具有較強的邏輯關(guān)系和執(zhí)行順序,具有典型的流水線特征。構(gòu)建大數(shù)據(jù)流水線有助于提高效率,但大數(shù)據(jù)流水線處理面臨如下問題:1)數(shù)據(jù)處理過程中,處理模塊由于缺乏合理的抽象,導致處理模塊的復用性低;2)相關(guān)工具選型太多導致開發(fā)復雜性增加;3)數(shù)據(jù)處理模塊缺乏統(tǒng)一的模型管理,導致數(shù)據(jù)處理框架或平臺的可維護性低。

        針對大數(shù)據(jù)處理過程中遇到的上述問題,本文提出了一種支持大數(shù)據(jù)流水線機制的描述語言——PiFlowDL,實現(xiàn)了所見即所得的流水線設(shè)計,提出了基于分布式計算框架Apache Spark 的流水線執(zhí)行機制,并給出了基于有向無環(huán)圖(Directed Acyclic Graph,DAG)的流水線執(zhí)行調(diào)度策略。通過與主流工具的對比測試驗證了所提框架的可行性及性能。

        1 相關(guān)工作

        1.1 模型驅(qū)動的開發(fā)方法

        模型驅(qū)動的開發(fā)方法是由對象管理組織(Object Management Group,OMG)提出的用以提高軟件開發(fā)效率的一種開發(fā)方法[7],其核心思想是將軟件系統(tǒng)建立在各種模型的基礎(chǔ)上,通過模型的變換來驅(qū)動系統(tǒng)的開發(fā)[8]。在軟件工程領(lǐng)域得到充分利用的同時[9],該方法被應用到了很多場景中。Vanderdonckt 等[10]給出用戶界面描述語言的概念,將模型驅(qū)動的開發(fā)方法應用到用戶界面設(shè)計與開發(fā)中。杜一等[11]改進了該方法,使用模塊化的描述方式重新設(shè)計了界面描述語言,將用戶界面設(shè)計與開發(fā)流程中多個角色進行了適配,提高了開發(fā)效率。隨后,可視化描述語言被提出,對任務(wù)、用戶、領(lǐng)域?qū)ο蟮瘸橄蠼M成元素和表征、對話等具體組成元素,以及它們之間的映射關(guān)系進行描述[12],將模型驅(qū)動的開發(fā)方法應用于可視化系統(tǒng)開發(fā)中,極大地提高了系統(tǒng)開發(fā)的效率[13]。本文將模型驅(qū)動的開發(fā)方法引入大數(shù)據(jù)處理系統(tǒng)中,在大數(shù)據(jù)流水線的具體組成元素和界面模型中的抽象組成元素之間構(gòu)建映射關(guān)系?;谟成潢P(guān)系設(shè)計大數(shù)據(jù)流水線模型描述語言PiFlowDL,將大數(shù)據(jù)流水線設(shè)計工具與執(zhí)行引擎解耦合,從而提高了開發(fā)效率,實現(xiàn)了所見即所得的大數(shù)據(jù)流水線構(gòu)建過程。

        1.2 數(shù)據(jù)處理框架

        大數(shù)據(jù)技術(shù)的發(fā)展推進了以基于硬盤的MapReduce[14]和基于內(nèi)存的Apache Spark[15]為代表的分布式計算模型的誕生。為了方便分布式處理,MapReduce 的任務(wù)之間數(shù)據(jù)不直接共享,大部分需通過網(wǎng)絡(luò)傳輸,這導致MapReduce在處理流程復雜任務(wù)時效率較低。批處理系統(tǒng)Hadoop[16]是基于MapReduce 的一種實現(xiàn),其在傳統(tǒng)的批處理任務(wù)上具有較為廣泛的應用,但由于其自身延遲大的特點,并不適用于實時性要求較強的任務(wù)。有向無環(huán)圖框架在某種程度上解決了MapReduce 模型中任務(wù)間邏輯關(guān)系復雜的問題,目前在數(shù)據(jù)處理領(lǐng)域有大量對DAG 的應用和相關(guān)研究[17-19]。Apache Spark 是應用DAG 框架的典型代表,它將大數(shù)據(jù)處理任務(wù)在框架層以有向無環(huán)圖的模式進行組織,使得大數(shù)據(jù)開發(fā)人員可以更聚焦于數(shù)據(jù)處理邏輯本身。

        而相關(guān)應用的發(fā)展,又對大數(shù)據(jù)處理的實時性提出了挑戰(zhàn)。Storm[20]、Flink[21]、S4[22]等一系列流式處理工具的誕生在一定程度上填補了數(shù)據(jù)實時處理領(lǐng)域的空白。不同大數(shù)據(jù)處理框架的對比如表1 所示。Storm 是一個開源的分布式實時計算系統(tǒng),能夠可靠地處理無限的數(shù)據(jù)流,其拓撲結(jié)構(gòu)靈活的編程方式和分布式協(xié)調(diào)大大提高了多步驟的數(shù)據(jù)處理流程的效率。Spark Streaming 用微批處理的方法實現(xiàn)流處理,Venkataraman 等[23]提出的流式處理工具Drizzle 具有比Spark Streaming 更高的效率。Apache Flink 同時支持了流計算和批處理,每個Flink 數(shù)據(jù)流以一個或多個源開始,并以一個或多個接收器結(jié)束。近年,有學者在Spark 的基礎(chǔ)上進行改進,增強了Spark 應用的實時性[24],提供了數(shù)據(jù)起源的相關(guān)支持[25]。然而,Spark提供的處理框架以及Storm、Flink等大數(shù)據(jù)開發(fā)工具,依然需要大數(shù)據(jù)開發(fā)人員理解如DataFrame、彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets,RDD)等較多的框架本身的概念及開發(fā)模式,并編寫較多的數(shù)據(jù)處理邏輯,具有較高的使用門檻。為屏蔽不同數(shù)據(jù)處理工具之間在開發(fā)模式、編程接口上的差異導致的數(shù)據(jù)處理框架之間的遷移成本過高的問題,谷歌提出了Apache Beam(https://beam.apache.org/)框架,該框架統(tǒng)一了數(shù)據(jù)批處理和流處理的編程范式,使得開發(fā)者不需要了解底層數(shù)據(jù)開發(fā)接口,而直接通過Beam 軟件開發(fā)工具包(Software Development Kit,SDK)接口進行數(shù)據(jù)處理的加工與設(shè)計。然而,這些數(shù)據(jù)處理框架均沒有提供對應的數(shù)據(jù)處理工具,數(shù)據(jù)處理應用的開發(fā)門檻仍然較高,數(shù)據(jù)開發(fā)人員需要在理解業(yè)務(wù)的同時具有較高的大數(shù)據(jù)編程水平。

        表1 大數(shù)據(jù)處理框架對比Tab.1 Comparison of big data processing frameworks

        1.3 交互式流水線工具

        隨著相關(guān)技術(shù)的整體進步,部分數(shù)據(jù)大規(guī)模、無邊界、亂序的特點被放大。文獻[26]指出了物聯(lián)網(wǎng)應用對數(shù)據(jù)處理和分析的需求。傳統(tǒng)方法在處理無邊界數(shù)據(jù)集時,通常將其切分為有邊界數(shù)據(jù)集進行微批次處理,Google提出了Dataflow模型[27],在抽象層面提供了新的模型,以應對流數(shù)據(jù)的處理場景。

        不同交互式流水線處理工具的對比如表2 所示。Google Cloud Dataflow[28]是對Dataflow 的具體實現(xiàn),它是一種構(gòu)建、管理和優(yōu)化復雜數(shù)據(jù)處理流水線的工具,可以在不手工配置和管理MapReduce 集群的前提下構(gòu)建復雜的pipeline,支持從批處理到流處理模式的無縫切換,使開發(fā)者可以將主要精力放在業(yè)務(wù)邏輯本身。StreamSets(https://streamsets.com/)是一種典型的大數(shù)據(jù)ETL(Extract-Transform-Load)工具,提供所見即所得的可視化數(shù)據(jù)流程配置界面,它將數(shù)據(jù)處理流程分為數(shù)據(jù)源(Origins)、執(zhí)行器(Executors)、處理器(Processors)、數(shù)據(jù)存儲(Destinations)四類;但由于其規(guī)范了Processor 的類型,如一個pipeline中只能定義一個數(shù)據(jù)源,靈活性較差。這些工具由于封閉性、靈活性不足以及缺乏統(tǒng)一的模型支持,普遍存在擴展性較差的問題。Apache NiFi(http://nifi.apache.org/)是一個成熟的開源大數(shù)據(jù)流水線項目,它基于工作流式的編程理念,提供了較好的流水線定義和執(zhí)行功能。Kafka[29]是一種具有較高吞吐能力的分布式消息系統(tǒng),國外有學者在Kafka和Apache NiFi 的基礎(chǔ)上提出了一種數(shù)據(jù)流接收的框架[30]。然而,Apache NiFi 很難實現(xiàn)與Hadoop、Spark 等主流大數(shù)據(jù)框架的無縫集成,這導致其在處理海量數(shù)據(jù)時往往具有較差的性能。此外,NiFi在工作過程中會保存各個步驟的中間結(jié)果,導致磁盤I/O成為NiFi的瓶頸,這種機制導致的缺點在數(shù)據(jù)冗余量大的時候體現(xiàn)得尤為明顯。

        基于以上相關(guān)工作,本文首先給出了基于模型驅(qū)動的大數(shù)據(jù)流水線描述語言PiFlowDL,該語言以模塊化、層次化的方式對大數(shù)據(jù)處理任務(wù)進行描述。同時,在PiFlowDL 基礎(chǔ)上,本文提出了一種模型驅(qū)動的大數(shù)據(jù)流水線框架PiFlow。該框架以所見即所得的方式配置流水線,同時為第三方處理模塊開發(fā)提供了易用接口,極大地提高了大數(shù)據(jù)處理環(huán)境的構(gòu)建與開發(fā)效率。與同類框架在基準數(shù)據(jù)上的對比測試進一步驗證了所提框架在性能上的優(yōu)勢。

        表2 交互式流水線處理工具對比Tab.2 Comparison of interactive pipeline processing tools

        2 流水線描述語言PiFlowDL

        大數(shù)據(jù)處理過程適合表示為流水線,不同的流水線之間在結(jié)構(gòu)上差異較大,但在數(shù)據(jù)處理組件的層面上往往具有較強的相似性。文獻[31]提出了大數(shù)據(jù)分析即服務(wù)的模型驅(qū)動方法。當前的流水線描述語言缺少可復用性,擴展性較差,描述能力弱。針對這些問題,新的流水線描述語言的設(shè)計應遵循如下原則:

        1)具有模塊化特征。模塊化的描述可以增加PiFlowDL的可讀性,同時能夠增加模塊在不同的數(shù)據(jù)處理流水線中的復用性。

        2)具有層次化描述能力。能夠在數(shù)據(jù)處理模塊、數(shù)據(jù)處理單元等不同層次上對數(shù)據(jù)處理流水線進行描述。不同抽象層次的描述能力,進一步增加了描述語言的可擴展性,可擴展性的能力將傳導到基于該描述語言的框架實現(xiàn)中。

        3)具有良好的擴展性。能夠支持新的數(shù)據(jù)處理組件的設(shè)計與引入。良好的可擴展性,可以使得框架能夠根據(jù)實際需求,快速進行處理單元的實現(xiàn)和復用,進一步提高大數(shù)據(jù)流水線系統(tǒng)的開發(fā)效率。

        基于上述設(shè)計原則,本文提出了大數(shù)據(jù)流水線描述語言PiFlowDL。PiFlowDL 基于有向無環(huán)圖模型,將一條流水線的主要組成部分抽象為:一系列具有數(shù)據(jù)處理能力的數(shù)據(jù)處理組件Stop、數(shù)據(jù)在數(shù)據(jù)處理組件之間的流動方向Path,以及其他基本信息BasicInfo。其中,數(shù)據(jù)處理組件作為拓撲圖中的節(jié)點,數(shù)據(jù)在數(shù)據(jù)處理組件之間的流動方向及規(guī)則作為拓撲圖中的有向邊。PiFlowDL 主要部分的可擴展標記語言(eXtensible Markup Language,XML)Schema結(jié)構(gòu)如圖1所示。圖1 中:每個矩形框表示一個節(jié)點;矩形框跟隨的加號表示節(jié)點可展開,減號表示節(jié)點不可展開;矩形框下方的數(shù)字表示節(jié)點允許出現(xiàn)的次數(shù)。另外,分別用“S”“C”“A”的矩形框表示XML Schema 描述時的“Sequence”“Choice”及“All”等三種模型。Flow 描述流水線信息,主要由3 個子模塊組成,包含基本信息模塊BasicInfo、數(shù)據(jù)處理組件模塊Stop 和數(shù)據(jù)流向模塊Path。其中,F(xiàn)low 與BasicInfo 為一對一關(guān)系,與Stop 模塊和Path 模塊均為一對多的關(guān)系。BasicInfo 模塊是對流水線基本信息的描述,包括流水線名稱Name、流水線唯一標識UUID、流水線檢查點CheckPoint 和流水線運行模式RunMode,其中CheckPoint 和RunMode 是可選項。圖2 是BasicInfo 模塊XML Schema結(jié)構(gòu)。

        圖1 Flow模塊的XML Schema的結(jié)構(gòu)Fig.1 Structure of XML Schema of Flow module

        圖2 BasicInfo模塊XML Schema的結(jié)構(gòu)Fig.2 Structure of XML Schema of BasicInfo module

        數(shù)據(jù)處理組件泛指在流水線中具有數(shù)據(jù)采集、處理等功能的全部組件,Stop模塊是對流水線數(shù)據(jù)處理組件的描述,其中包括數(shù)據(jù)處理組件名稱Name、數(shù)據(jù)處理組件唯一標識UUID(Universal Unique IDentifier)、數(shù)據(jù)處理組件包名Bundle、數(shù)據(jù)處理組件屬性Properties、數(shù)據(jù)處理組件數(shù)據(jù)入口Inports 和數(shù)據(jù)出口Outports。圖3 是Stop 模塊XML Schema 結(jié)構(gòu)。其中:Properties 包含0 到多個Property,表示數(shù)據(jù)處理組件的多個屬性;Inports包含1到多個Inport,表示數(shù)據(jù)處理組件的多個輸入端口;Outports 包含1 到多個Outport,表示數(shù)據(jù)處理組件的多個輸出端口。Path模塊是對流水線數(shù)據(jù)流動方向的描述,表示數(shù)據(jù)在各個Stop模塊中的流動順序。圖4是Path模塊XML Schema 結(jié)構(gòu),該模塊包括源數(shù)據(jù)處理組件From、源數(shù)據(jù)處理組件From 的數(shù)據(jù)輸出端口Outport、目標數(shù)據(jù)處理組件To,以及目標數(shù)據(jù)處理組件To的數(shù)據(jù)輸入端口Inport。

        圖3 Stop模塊XML Schema的結(jié)構(gòu)Fig.3 Structure of XML Schema of Stop module

        在一些復雜的場景下,單條流水線并不能滿足需求,有些復雜邏輯需要多條流水線共同配合來完成。如截止到2018年2月,國家自然科學基金大數(shù)據(jù)知識管理服務(wù)平臺[6]共設(shè)計12類93條流水線進行大數(shù)據(jù)的采集、清洗、匯聚,并且需要根據(jù)實際業(yè)務(wù)的變化不斷增加。流水線的運行涉及數(shù)據(jù)的采集、清洗、匯聚等過程,存在先后順序,支持流水線和流水線組間的調(diào)度,可大大減少人工干預,提高后期運維效率。PiFlowDL 提出了Project、FlowGroup 和Flow 三層概念,并支持調(diào)度。Flow 是流水線調(diào)度的基本單位,在同一個項目中具備相似功能的流水線組織進一個FlowGroup中,Project是在項目層面對FlowGroup和Flow的集合。

        圖4 Path模塊XML Schema的結(jié)構(gòu)Fig.4 Structure of XML Schema of Path module

        圖5 Project模塊XML Schema的結(jié)構(gòu)Fig.5 Structure of XML Schema of Project module

        Project 包含BasicInfo(Name、UUID 等)、多個Flow、多個FlowGroup 和Condition,如圖5 所示。同樣的,F(xiàn)lowGroup 包含BasicInfo(Name、UUID 等)、多個Flow 和Condition。Condition表示Flow 和FlowGroup 之間,以及Flow 和Flow 之間的調(diào)度策略,Current 表示當前節(jié)點,After 表示執(zhí)行當前節(jié)點Current 之前必須執(zhí)行完畢的節(jié)點。FlowGroup 和Condition 模塊的結(jié)構(gòu)如圖6所示。

        圖6 FlowGroup和Condition模塊XML Schema的結(jié)構(gòu)Fig.6 Structure of XML Schema of FlowGroup and Condition module

        項目中可能涉及Project、FlowGroup、Flow 的協(xié)同調(diào)度,如數(shù)據(jù)定時采集與清洗,PiFlowDL 的XML Schema 的結(jié)構(gòu)如圖7所示。其中:Expression 表示定時調(diào)度策略,Entity 為執(zhí)行實體,包括Project、FlowGroup和Flow中的一種,如圖8所示。

        圖7 PiFlowDL的XML Schema的結(jié)構(gòu)Fig.7 Structure of XML Schema of PiFlowDL

        圖8 調(diào)度實體模塊XML Schema的結(jié)構(gòu)Fig.8 Structure of XML Schema of scheduling Entity module

        3 流水線框架PiFlow

        基于流水線描述語言PiFlowDL 的PiFlow 系統(tǒng)架構(gòu)如圖9所示。該系統(tǒng)由5 個模塊組成,分別是可視化引擎、RESTful API、執(zhí)行引擎、監(jiān)控和日志。

        圖9 大數(shù)據(jù)流水線系統(tǒng)PiFlow的系統(tǒng)架構(gòu)Fig.9 System architecture of big data pipeline system PiFlow

        可視化引擎以可視化的方式提供數(shù)據(jù)處理組件配置、數(shù)據(jù)流向配置和屬性配置等功能。PiFlowDL生成器將用戶通過可視化方法配置的流水線轉(zhuǎn)換為流水線描述語言。Web Service通過RESTful API將流水線描述語言發(fā)送給執(zhí)行引擎。執(zhí)行引擎包括PiFlow 解析器、執(zhí)行模塊和調(diào)度模塊。PiFlowDL解析器將以流水線描述語言表示的流水線轉(zhuǎn)化為對應的有向無環(huán)圖模型。執(zhí)行模塊實現(xiàn)流水線的執(zhí)行策略,包括流處理和批處理。在調(diào)度子模塊中,執(zhí)行引擎對基于Project、FlowGroup、Flow 三層概念組織的流水線進行調(diào)度,被調(diào)度流水線在Yarn 上執(zhí)行。監(jiān)控模塊對流水線運行情況進行監(jiān)控,保存監(jiān)控數(shù)據(jù),并向上提供接口。日志模塊負責采集流水線運行過程中的日志,通過獲取Yarn 日志并解析,向上提供接口。

        3.1 執(zhí)行引擎

        執(zhí)行引擎是大數(shù)據(jù)流水線系統(tǒng)PiFlow的核心模塊。用戶配置的流水線在執(zhí)行引擎中轉(zhuǎn)換為可執(zhí)行流水線,等待被調(diào)度并執(zhí)行。執(zhí)行引擎中包括PiFlowDL 解析器、執(zhí)行子模塊、調(diào)度子模塊等組件。PiFlowDL解析器將流水線描述語言所描述的流水線解析成DAG Graph。執(zhí)行子模塊采用圖遍歷的執(zhí)行策略對流水線進行批處理或流處理。調(diào)度子模塊對流水線進行調(diào)度。

        3.1.1 PiFlowDL解析器

        PiFlowDL解析器將可視化引擎生成的流水線模型描述語言轉(zhuǎn)換成流水線的有向無環(huán)圖DAG 供執(zhí)行子模塊執(zhí)行。模型描述語言采用XML 形式化表示,經(jīng)PiFLowDL 解析器解析生成FlowBean、StopBean、PathBean 類對象,并最終生成AnalyzedGraph 類對象。AnalyzedGraph 類對流水線數(shù)據(jù)處理組件,組件間關(guān)系進行結(jié)構(gòu)化定義,并提供流水線執(zhí)行接口。PiFlowDL 解析流程如圖10所示,該圖僅以執(zhí)行某條Flow為例進行說明。

        圖10 PiFlowDL解析Fig.10 PiFlowDL parsing

        3.1.2 流水線執(zhí)行模塊

        流水線執(zhí)行模塊基于一定策略執(zhí)行流水線任務(wù)。有向無環(huán)圖的遍歷有自頂向下和自底向上兩種邏輯。流水線執(zhí)行模塊可采用自頂向下的圖遍歷方式,自頂向下的流水線執(zhí)行機制可對入度為0 的數(shù)據(jù)處理組件并行執(zhí)行,以提高執(zhí)行效率。但PiFlow 支持數(shù)據(jù)的合流(Merge)、分流(Fork)、連接(Join)等操作,數(shù)據(jù)處理組件需滿足上游組件全部執(zhí)行完畢的條件才能被執(zhí)行,以獲得上游組件處理完畢的數(shù)據(jù)進行相關(guān)操作。自頂向下的執(zhí)行策略會增加邏輯的復雜性。

        本文采用自底向上的串行執(zhí)行邏輯,通過遞歸調(diào)用的方式完成流水線的執(zhí)行,可以保證每條流水線執(zhí)行且執(zhí)行一次,簡化執(zhí)行邏輯。對基于有向無環(huán)圖遍歷執(zhí)行策略的描述如下文偽代碼所示,該策略的核心思想是:在DAG 中找到所有出度為0 的數(shù)據(jù)處理組件,逐個遍歷,判斷當前stop 是否有入邊,若有入邊,則逐個遞歸調(diào)用入邊所有stop,之后執(zhí)行當前stop;若沒有入邊,則直接執(zhí)行當前stop。這種策略可以保證流水線中的每個數(shù)據(jù)處理模塊都得到執(zhí)行。同時,因為某個數(shù)據(jù)處理組件的下級stop之間數(shù)據(jù)互不影響,所以對出度為0的stop的遍歷順序不影響執(zhí)行結(jié)果。

        3.1.3 批量處理與流式處理

        本文實現(xiàn)了對PiFlow流水線進行批處理和流處理的兩種方法。圖11是流水線批處理示意圖。該示例流水線包含7個Stop:A、B、C、D、E、F、G,6 個Path:A→B,B→D,C→D,D→E,E→F,E→G。具體執(zhí)行邏輯如圖11所示。

        PiFlow 采用微批處理策略實現(xiàn)流處理。圖12(a)給出了流處理流水線配置方式。其中:流水線頂部為Streaming 類型數(shù)據(jù)處理組件;虛框中表示由普通數(shù)據(jù)處理組件組成的批處理流水線BatchProcess。Streaming 類型處理組件設(shè)置批處理時間窗口BatchTime,每隔BatchTime 時間產(chǎn)生一批數(shù)據(jù)傳遞給下游BatchProcess進行批處理,具體見圖12(b)。

        圖11 PiFlow批處理Fig.11 Batch processing in PiFlow

        圖12 PiFlow流處理配置方式及處理流程Fig.12 PiFlow flow processing configuration mode and processing flow

        3.1.4 組件擴展

        PiFlow 通過數(shù)據(jù)處理組件擴展子模塊支持用戶自定義開發(fā)數(shù)據(jù)處理組件Stop。圖13 是組件擴展模塊的統(tǒng)一建模語言(Unified Modeling Language,UML)類圖,通過將上下游數(shù)據(jù)進行抽象并封裝成JobInputStream 和JobOutputStream,以統(tǒng)一的input/output格式(本文采用Spark數(shù)據(jù)格式DataFrame)增強了數(shù)據(jù)處理組件的擴展性和可復用性。自定義的Stop需繼承抽象類ConfigurableStop,并實現(xiàn)相應接口。其中:inportList、outportList 為自定義數(shù)據(jù)處理組件的數(shù)據(jù)入口列表和出口列表;initialize 為流水線初始化函數(shù);perform 為實現(xiàn)組件功能的具體函數(shù),通過JobInputStream.read(inport)獲取上游數(shù)據(jù),JobOutputStream.write(outport,newDataFrame)將數(shù)據(jù)通過端口outport傳遞給下游組件。同時,支持用戶自定義屬性,通過setProperties 函數(shù)對屬性進行初始化,getPropertyDescriptor 給出自定義屬性的描述信息。getIcon 和getGroup 提供組件的圖標和組信息。自定義流數(shù)據(jù)處理組件需繼承ConfigurableStreamingStop,并實現(xiàn)getDStream等接口即可。

        3.1.5 調(diào)度模塊

        調(diào)度模塊負責流水線的調(diào)度,圖14 展示了某一Project 內(nèi)流水線配置,包含F(xiàn)1、F5、F6 三條流水線和一個流水線組G1,G1內(nèi)包含F(xiàn)2、F3和F4三條流水線,實線箭頭表示After條件。

        流水線設(shè)計等待Waiting、開始Started、完成Completed、錯誤Error 四種狀態(tài),不同狀態(tài)之間具有邏輯關(guān)系不可隨意轉(zhuǎn)換,如:等待狀態(tài)只能轉(zhuǎn)換為開始狀態(tài),不能直接轉(zhuǎn)換為完成狀態(tài)。在大數(shù)據(jù)流水線中,通常由等待狀態(tài)轉(zhuǎn)換為開始狀態(tài)、開始狀態(tài)轉(zhuǎn)換為完成狀態(tài)或錯誤狀態(tài)。本文采用基于流水線狀態(tài)的調(diào)度機制,其具體調(diào)度策略如圖15 所示。所有流水線根據(jù)狀態(tài)放置到四個資源池中,分別為等待調(diào)度資源池Waiting Pool、正在運行資源池Started Pool、已完成資源池Completed Pool、錯誤資源池Error Pool,其中Waiting Pool 為初始狀態(tài)。同時設(shè)置兩個Monitor:Condition Monitor 和Task Monitor。Condition Monitor 負責拉取Waiting Pool、Completed Pool 和Error Pool 中流水線,判讀滿足After 條件的流水線,啟動該流水線并放置到Started Pool 中。Task Monitor 拉取Started Pool 中流水線,若任務(wù)完成則放置到Completed Pool中,若任務(wù)失敗則放置到Error Pool 中。直到所有流水線全部進入Completed Pool或者Waiting Pool中只剩余Error Pool中流水線的后續(xù)流水線,整個調(diào)度任務(wù)結(jié)束。

        3.2 其他模塊

        PiFlow 流水線還包括可視化引擎、流水線監(jiān)控模塊、流水線日志模塊以及RESTful API。

        可視化引擎提供所見即所得方式配置流水線的功能,具體架構(gòu)見圖9,其中包括數(shù)據(jù)處理組件配置和流水線配置。數(shù)據(jù)處理組件子模塊Stop,負責加載、配置后端Stop 信息。流水線子模塊Flow,負責配置流水線信息,監(jiān)控流水線,獲取日志。為增強復用性,流水線相關(guān)信息可保存為模板,模板子模塊Template 負責流水線模板的保存、下載、上傳、加載。流水線監(jiān)控模塊通過Listener 機制對流水線及每個數(shù)據(jù)處理組件進行監(jiān)控。監(jiān)控包括流水線開始時間、結(jié)束時間和狀態(tài),以及每個數(shù)據(jù)處理組件開始時間、結(jié)束時間和狀態(tài)。流水線狀態(tài)定義為三種:STARTED、COMPLETED、FAILED,數(shù)據(jù)處理組件狀態(tài)定義為四種:INIT、STARTED、COMPLETED、FAILED。具體監(jiān)控機制如圖16所示。

        針對每個數(shù)據(jù)處理組件,Listener 在數(shù)據(jù)處理組件的初始化、開始、結(jié)束以及異常情況提供接口進行監(jiān)控。同時考慮輕量級部署,監(jiān)控數(shù)據(jù)存儲到內(nèi)嵌式數(shù)據(jù)庫H2DB 中。PiFlow 日志模塊通過調(diào)用Yarn API 獲取日志,通過日志解析器解析日志,并向上提供接口。RESTful 采用Client/Server 架構(gòu),將客戶端和服務(wù)器解耦合。統(tǒng)一的接口要求客戶端和服務(wù)器之間通信的方法必須統(tǒng)一化,提高交互的可見性。鑒于此,PiFlow 采用RESTful API 方式暴露接口,供用戶和可視化引擎使用。接口具體包括啟動流水線startFlow、停止流水線stopFlow、獲取流水線狀態(tài)getFlowInfo、獲取流水線日志getFlowLog,以及獲取數(shù)據(jù)處理組件相關(guān)信息的接口如getAllStops 等。同時,包括啟動、停止Project 和FlowGroup 等接口。

        圖13 組件擴展UML類圖Fig.13 UML class diagram of component extension

        圖14 Project流水線樣例Fig.14 Project pipeline sample

        3.3 系統(tǒng)原型

        基于PiFlowDL 的系統(tǒng)原型如圖17~18 所示。圖17 展示了流水線配置界面,左側(cè)欄分組展示數(shù)據(jù)處理組件,通過拖拽方式可將Stop 放入到畫布中央,右側(cè)欄展示Stop 的基本信息和需要設(shè)置的屬性信息。圖18 展示流水線運行監(jiān)控頁面,監(jiān)控流水線及數(shù)據(jù)處理組件的執(zhí)行時間及狀態(tài)。同時,考慮遷移問題PiFlow 支持模板功能,將流水線Flow 保存成模板,新環(huán)境中導入模板即可完成遷移工作。

        圖15 調(diào)度示意圖Fig.15 Dispatching schematic diagram

        圖16 PiFlow監(jiān)控機制Fig.16 Monitoring mechanism of PiFlow

        4 應用實例

        4.1 DBLP數(shù)據(jù)采集與處理

        DBLP(DataBase systems and Logic Programming)是計算機領(lǐng)域內(nèi)對研究的成果以作者為核心的一個計算機類英文文獻的集成數(shù)據(jù)庫系統(tǒng)。按年代列出了作者的科研成果,包括國際期刊和會議等公開發(fā)表的論文。以DBLP 數(shù)據(jù)為例,使用PiFlow 進行采集、清洗、入庫的流水線,如圖17 所示。首先下載壓縮文件dblp.xml.gz,然后解壓生成dblp.xml,最后針對dblp.xml 中 Article、Inproceedings、Proceedings、Book、Incollection、WWW(World Wide Web)、Mastersthesis、Phdthesis不同標簽進行解析并寫入Hive 數(shù)據(jù)倉庫。圖18 展示了數(shù)據(jù)處理過程流水線監(jiān)控頁面。

        為驗證PiFlow 的性能,針對該場景與Apache NiFi 進行了對比測試,共設(shè)計了4 條流水線,測試流水線如表3 所示。F1功能為DBLP 數(shù)據(jù)采集入庫,F(xiàn)2~F4 為讀取Oracle 數(shù)據(jù)寫入Hive。測試使用5 臺物理機搭建的集群環(huán)境,每臺物理機為32核CPU,內(nèi)存為128 GB。性能對比測試結(jié)果見表4,針對每條流水線的運行時間分別進行了三次測試,結(jié)果取平均值(見平均耗時)。其中PiFlow 性能提升比例公式為:PiFlow 性能提升比例=NiFi平均耗時/PiFlow 平均耗時-1。Apache NiFi所需資源如表5,PiFlow 所需資源如表6。Apache NiFi 采集Oracle數(shù)據(jù)庫的策略為將數(shù)據(jù)進行分頁,每頁10 000條數(shù)據(jù),以頁為單位進行并發(fā)讀寫。PiFlow 采用分區(qū)方式進行讀寫,所采用的線程數(shù)與Apache NiFi 相同。由于Apache NiFi 基于FlowFile 文件形式計算,而PiFlow 基于內(nèi)存計算,針對設(shè)計的4 條流水線PiFlow 相較Apache NiFi 平均性能提升了5 倍,且數(shù)據(jù)量越大優(yōu)勢越明顯。

        圖17 DBLP數(shù)據(jù)采集流水線配置頁面Fig.17 DBLP data collection pipeline configuration page

        圖18 DBLP數(shù)據(jù)采集流水線監(jiān)控頁面Fig.18 DBLP data collection pipeline monitoring page

        表3 流水線測試樣例Tab.3 Pipeline test samples

        4.2 微生物數(shù)據(jù)采集與處理

        通過將微生物組學數(shù)據(jù),微生物組、酶、合成元件、代謝產(chǎn)物數(shù)據(jù),微生物資源文獻,專利報告等微生物大數(shù)據(jù)資源進行采集、清洗、匯聚形成微生物大數(shù)據(jù)平臺,并通過關(guān)聯(lián)數(shù)據(jù)對促進微生物技術(shù)領(lǐng)域知識發(fā)現(xiàn)(如新酶的開發(fā)、新的物種和功能的挖掘)具有重大意義。利用PiFlow提供的組件擴展功能,本實例設(shè)計了10 類可復用的不同的數(shù)據(jù)處理組件。通過組合10類擴展的微生物領(lǐng)域的數(shù)據(jù)處理組件以及原生的4類數(shù)據(jù)處理組件,實現(xiàn)了10 類微生物數(shù)據(jù)的解析、處理、存儲。相關(guān)數(shù)據(jù)處理組件如表7所示。

        表4 性能對比測試結(jié)果Tab.4 Test results of performance comparison

        表5 NiFi性能對比測試資源Tab.5 Resources of NiFi performance comparison test

        下面以GenBank 數(shù)據(jù)為例進行說明。GenBank 數(shù)據(jù)來源為FTP服務(wù)器(ftp://ftp.ncbi.nlm.nih.gov/genbank),數(shù)據(jù)類型為*.seq.gz 壓縮文件。該實例的需求為將GenBank 數(shù)據(jù)下載、解析、存儲到ElasticSearch 中。流水線流程為:1)下載FTP數(shù)據(jù)LoadFromFtpToHDFS;2)解壓文件UnzipFilesOnHDFS;3)解析數(shù)據(jù)GenBankData;4)存儲到ElasticSearch 中PutEs。圖19展示了PiFlow的流水線配置。

        表6 PiFlow性能對比測試資源Tab.6 Resources of PiFlow performance comparison test

        表7 微生物相關(guān)數(shù)據(jù)處理組件Tab.7 Processing components of microorganism related data

        圖19 GenBank數(shù)據(jù)采集流水線Fig.19 GenBank data collection pipeline

        5 結(jié)語

        本文首先設(shè)計了一種大數(shù)據(jù)流水線模型描述語言PiFlowDL,在此基礎(chǔ)上,提出了基于PiFlowDL 的大數(shù)據(jù)流水線系統(tǒng)。該系統(tǒng)以所見即所得的方式配置流水線,支持實時監(jiān)控流水線運行狀態(tài),查看流水線運行日志,同時提供模板功能。該系統(tǒng)提供了豐富的數(shù)據(jù)處理組件,同時集成了科學大數(shù)據(jù)領(lǐng)域的相關(guān)算法。但PiFlow 還存在一些不足:首先,PiFlow 雖然提供了對Streaming 源的支持,但在流計算時不支持多個Streaming 源;其次,流水線間調(diào)度,不支持流計算流水線。我們將在上述方面進行下一步工作。

        猜你喜歡
        批處理流水線數(shù)據(jù)處理
        Gen Z Migrant Workers Are Leaving the Assembly Line
        認知診斷缺失數(shù)據(jù)處理方法的比較:零替換、多重插補與極大似然估計法*
        心理學報(2022年4期)2022-04-12 07:38:02
        ILWT-EEMD數(shù)據(jù)處理的ELM滾動軸承故障診斷
        流水線
        報廢汽車拆解半自動流水線研究
        基于希爾伯特- 黃變換的去噪法在外測數(shù)據(jù)處理中的應用
        基于PSD-BPA的暫態(tài)穩(wěn)定控制批處理計算方法的實現(xiàn)
        SIMATIC IPC3000 SMART在汽車流水線領(lǐng)域的應用
        自動化博覽(2014年6期)2014-02-28 22:32:05
        基于POS AV610與PPP的車輛導航數(shù)據(jù)處理
        批處理天地.文件分類超輕松
        国产免费看网站v片不遮挡| 亚洲一区二区三区中国| 99999久久久久久亚洲| 国产又粗又黄又爽的大片| 精品久久欧美熟妇www| 国产精品无码一区二区三区免费| 欧洲亚洲视频免费| 无码中文字幕久久久久久| 亚洲av乱码一区二区三区观影| 国产a级精精彩大片免费看| 无码日日模日日碰夜夜爽| av天堂手机一区在线| 自拍偷区亚洲综合激情| 亚洲精品成人无限看| 丰满人妻一区二区三区视频53| 人妻丰满熟妇AV无码区HD| 国产成人免费一区二区三区| 手机在线看片在线日韩av| 一区二区三区精品少妇| 94久久国产乱子伦精品免费| 亚洲中文无码成人影院在线播放 | 黑人巨大跨种族video| 人妻无码一区二区在线影院 | 久久亚洲精品成人AV无码网址| 国产av一区二区凹凸精品| 白白色日韩免费在线观看| 亚洲av网站在线观看一页| 国产又色又爽又刺激在线播放| 日日摸夜夜添狠狠添欧美| 亚洲国产成人精品福利在线观看| 亚洲av福利天堂在线观看| 久久精品中文字幕有码| 日本三级片在线观看| 一本色道久久88综合日韩精品| 国精产品一品二品国在线| 亚洲AV永久无码精品表情包| 91亚洲国产成人精品一区.| 一本一道av无码中文字幕﹣百度| 中文人妻无码一区二区三区| 日本免费精品免费视频| 白嫩丰满少妇av一区二区|