亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        Spark任務(wù)間消息傳遞方法研究

        2022-11-16 02:24:24夏立斌劉曉宇姜曉巍孫功星
        關(guān)鍵詞:內(nèi)存編程分布式

        夏立斌,劉曉宇,孫 瑋,姜曉巍,孫功星

        1.中國科學(xué)院 高能物理研究所,北京 100049

        2.中國科學(xué)院大學(xué),北京 100049

        為了對自然現(xiàn)象進(jìn)行更深層次的理解分析,諸多科學(xué)研究正面臨著高性能計(jì)算、大數(shù)據(jù)分析與機(jī)器學(xué)習(xí)帶來的挑戰(zhàn),如粒子物理實(shí)驗(yàn)、氣候預(yù)測、生物數(shù)據(jù)分析、天文圖像處理等。這些科學(xué)計(jì)算問題不僅數(shù)據(jù)量大,并且其計(jì)算任務(wù)通常是以數(shù)值線性代數(shù)(numerical linear algebra)為核心的多維參數(shù)擬合、線性方程組求解等問題,具有很高的靈活性,不易被并行實(shí)現(xiàn)。因此,完善的科學(xué)計(jì)算系統(tǒng)需兼顧大數(shù)據(jù)處理和高性能計(jì)算的特點(diǎn)和優(yōu)勢,以保證計(jì)算任務(wù)的執(zhí)行效率。

        近年來內(nèi)存計(jì)算(in-memory computing)技術(shù)是互聯(lián)網(wǎng)行業(yè)的研究熱點(diǎn),隨著內(nèi)存容量和性能的提升,可以將計(jì)算時需要的數(shù)據(jù)盡量放在內(nèi)存中實(shí)現(xiàn)本地化計(jì)算,避免與網(wǎng)絡(luò)、磁盤等低速設(shè)備進(jìn)行頻繁的I/O 操作而損耗性能,其擁有“一次加載,多次使用”的特點(diǎn),可以大大提升迭代計(jì)算的執(zhí)行效率。分布式處理框架Spark 借助該技術(shù),利用存放在內(nèi)存中的彈性分布式數(shù)據(jù)集(resilient distributed datasets,RDDs)和DataSet、DataFrame 構(gòu)建了一套MapReduce-like 的編程模型,并提供諸多通用的數(shù)據(jù)操作算子,方便用戶簡單地實(shí)現(xiàn)復(fù)雜數(shù)據(jù)處理任務(wù);同時允許將可重用的數(shù)據(jù)緩存到內(nèi)存中,通過構(gòu)建任務(wù)依賴關(guān)系構(gòu)成的DAG(directed acyclic graph)對任務(wù)進(jìn)行分發(fā),以提升迭代和交互任務(wù)的性能[1]。

        由于Spark 編程模型的限制,其針對的應(yīng)用特點(diǎn)多為易并行(embarrassingly parallel)且相互獨(dú)立的任務(wù),可以方便地進(jìn)行大規(guī)模橫向擴(kuò)展實(shí)現(xiàn)并行計(jì)算(圖1(a))。而科學(xué)計(jì)算和機(jī)器學(xué)習(xí)任務(wù)間往往彼此相關(guān),表現(xiàn)出具有通信性的特征,如基礎(chǔ)線性代數(shù)子程序(basic linear algebra subprograms,BLAS)中的矩陣向量運(yùn)算。Spark 由于任務(wù)間無法相互通信,在其機(jī)器學(xué)習(xí)庫Spark MLlib中實(shí)現(xiàn)的BLAS只能通過Shuffle方式進(jìn)行任務(wù)間的數(shù)據(jù)交換,會產(chǎn)生大量中間數(shù)據(jù),對內(nèi)存、網(wǎng)絡(luò)等帶來極大的負(fù)擔(dān),導(dǎo)致性能下降;在科學(xué)計(jì)算領(lǐng)域,通常使用MPI(message passing interface)進(jìn)行任務(wù)間的消息傳遞(圖1(b)),其高效的點(diǎn)對點(diǎn)和集合通信機(jī)制,可以使得算法進(jìn)行靈活高效的實(shí)現(xiàn),并且對高帶寬、低延遲的RDMA(remote direct memory access)網(wǎng)絡(luò),如InfiniBand、RoCE(RDMA over converged ethernet)等有良好的支持。

        針對科學(xué)計(jì)算的特點(diǎn),本文研究了Spark 內(nèi)存計(jì)算和MPI消息傳遞模型的無縫融合解決方案,通過MPI的多種通信編程機(jī)制對Spark編程模型的表達(dá)能力進(jìn)行擴(kuò)充,以實(shí)現(xiàn)大數(shù)據(jù)分析和數(shù)值計(jì)算的高效率執(zhí)行。本文的主要貢獻(xiàn)為:(1)利用MPI增加了Spark任務(wù)間的通信機(jī)制,使得Spark 能夠高效處理科學(xué)計(jì)算算法及迭代密集型應(yīng)用。(2)通過更改Spark任務(wù)DAG的劃分方式,添加MPI Stage在Spark框架中對MPI任務(wù)進(jìn)行統(tǒng)一調(diào)度執(zhí)行。(3)實(shí)現(xiàn)MPI在分布式內(nèi)存數(shù)據(jù)集之中執(zhí)行,對任務(wù)進(jìn)行函數(shù)化、算子化,減輕開發(fā)難度,減少數(shù)據(jù)移動開銷,同時提升了任務(wù)的容錯性。

        1 相關(guān)研究情況介紹

        近年來已有一些對高性能計(jì)算和大數(shù)據(jù)技術(shù)融合的研究。早期的研究主要利用高性能計(jì)算中優(yōu)異的網(wǎng)絡(luò)通信性能對大數(shù)據(jù)處理框架進(jìn)行優(yōu)化,如通過擴(kuò)展MPI 鍵值對的支持以實(shí)現(xiàn)Hadoop/Spark 中的通信原語[2],利 用RDMA 提 升Spark Shuffle 的 性 能[3]。隨 后Anderson、Gittens 等人通過Linux 共享內(nèi)存和TCP/IP sockets 的方式對Spark 和MPI 進(jìn)行連接,并在內(nèi)存中對兩種框架的數(shù)據(jù)格式副本進(jìn)行轉(zhuǎn)換[4-5]。 Malitsky利用PMI Server 和Spark Driver 分別對Spark 應(yīng)用程序中的Spark任務(wù)與MPI任務(wù)進(jìn)行管理[6]。除了對Spark與MPI結(jié)合的研究嘗試以外,SparkNet[7]、TensorFlowOnSpark[8]、RayDP[9]等項(xiàng)目都在探索基于不同工作負(fù)載的計(jì)算框架之間的結(jié)合,以實(shí)現(xiàn)端到端的計(jì)算流程。

        上述工作的設(shè)計(jì)思想可以總結(jié)為兩點(diǎn):利用高性能計(jì)算對框架本身的通信模塊進(jìn)行修改;對不同的計(jì)算框架進(jìn)行連接。前者無法解決Spark編程模型帶來的局限性;后者需要用戶編寫兩套不同的代碼,在執(zhí)行過程中存在數(shù)據(jù)格式轉(zhuǎn)換的開銷,且Spark 與MPI 計(jì)算任務(wù)在相互獨(dú)立的上下文中管理,本質(zhì)上依然需要對兩套集群分別進(jìn)行維護(hù)和資源分配。文獻(xiàn)[4-6]中的解決方案,均停留在Spark 與MPI 連接調(diào)用的層面上,未能從Spark的編程和計(jì)算模型的角度出發(fā)進(jìn)行思考。在對統(tǒng)一資源管理方法的研究中,GERBIL[10]和MPICH-yarn[11]基于Yarn對MPI任務(wù)的申請分配方式做出了探索,但僅涉及到資源管理器的層面,使得MPI可以根據(jù)獲取到的資源信息分配進(jìn)程,仍未解決Spark與MPI間的無縫融合。

        在HPC 應(yīng)用中廣泛使用的MPI 庫相比于Spark/Hadoop 有著極高的性能優(yōu)勢,但無法在生產(chǎn)率和容錯(fault-tolerant)等方面得到保證。在最新的MPI-4.0[12]標(biāo)準(zhǔn)中,MPI 本身仍然不提供錯誤處理機(jī)制,一旦MPI 作業(yè)中的某個進(jìn)程出現(xiàn)錯誤,需要對作業(yè)進(jìn)行重新運(yùn)行?,F(xiàn)有對容錯的研究,主要通過Checkpoint/Restart(CPR)、Global-Restart、MPI Stage 等方式對作業(yè)的運(yùn)行狀態(tài)進(jìn)行記錄,并盡可能地減少作業(yè)啟動時間的開銷[13]。在大數(shù)據(jù)領(lǐng)域,Spark 計(jì)算框架和Alluxio 分布式內(nèi)存系統(tǒng)[14]的核心是以數(shù)據(jù)為中心,通過DAG對計(jì)算/數(shù)據(jù)的執(zhí)行流程進(jìn)行記錄,從而可以很方便地使作業(yè)從錯誤中恢復(fù)。

        2 Spark消息通信設(shè)計(jì)與實(shí)現(xiàn)

        2.1 編程和計(jì)算模型

        2.1.1 編程模型

        Spark的編程模型是基于分布式數(shù)據(jù)集的函數(shù)式編程思想,即將一系列算子作用在分布式數(shù)據(jù)集(RDD)之上,進(jìn)行MapReduce-like 操作,然后根據(jù)依賴關(guān)系生成DAG 執(zhí)行。對于任務(wù)之間可以完全并行的操作,能夠利用多線程技術(shù)提升性能,其提供的諸多算子和編程庫使得用戶能便捷地開發(fā)出并行應(yīng)用。MPI 則是基于計(jì)算資源(如處理器、緩存)和緩沖區(qū)的編程模型,核心為進(jìn)程間通信,通過使用OpenMP 可以實(shí)現(xiàn)混合編程(hybrid programming),由于用戶可以直接結(jié)合物理資源和算法特性進(jìn)行細(xì)粒度的優(yōu)化,因此被廣泛地應(yīng)用到高性能計(jì)算領(lǐng)域中。具體的MPI與Spark編程模型和框架軟件生態(tài)的對比如表1所示。

        表1 Spark與MPI對比Table 1 Comparison of Spark and MPI

        為彌補(bǔ)Spark 編程模型表達(dá)能力的局限性,同時為MPI 添加了基于數(shù)據(jù)的DAG 執(zhí)行能力,本文將兩種編程模型進(jìn)行了融合,使得MPI可以充分利用大數(shù)據(jù)生態(tài)內(nèi)的內(nèi)存計(jì)算能力和分布式文件系統(tǒng)等組件,將大數(shù)據(jù)處理和高性能計(jì)算結(jié)合在一起。如代碼1所示,編程模型的核心為多個分區(qū)構(gòu)成的分布式數(shù)據(jù)集。通過借鑒Spark編程模型中的map操作,使用mpimap將MPI函數(shù)映射到數(shù)據(jù)集的不同分區(qū)上進(jìn)行計(jì)算。此時的數(shù)據(jù)分區(qū)單位,同時也是MPI進(jìn)程單位,數(shù)據(jù)分區(qū)集合RDD也代表著同一個Communicator下的MPI進(jìn)程集合。基于以上編程概念,使得Spark 的map 任務(wù)間可以通過MPI通信原語進(jìn)行交互,彌補(bǔ)了Spark 編程模型中map 任務(wù)間不能交互的不足。并且每個mpimap操作產(chǎn)生的計(jì)算中間數(shù)據(jù)都會存放在內(nèi)存中,可供后續(xù)算子復(fù)用。因此,編程時可以將復(fù)雜的MPI邏輯解耦,并與Spark計(jì)算中便捷的數(shù)據(jù)處理方法結(jié)合,在統(tǒng)一的應(yīng)用中通過數(shù)據(jù)流的形式進(jìn)行串聯(lián)。在實(shí)際使用的過程中,用戶可以根據(jù)性能需求靈活地實(shí)現(xiàn)算法,而不必局限于Spark 原生算子及MapReduce編程模型的限制。

        基于上述編程模型,底層的計(jì)算框架可根據(jù)數(shù)據(jù)分區(qū)信息,采用數(shù)據(jù)本地性及網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)的算法進(jìn)行任務(wù)(進(jìn)程)分配。通過使用以數(shù)據(jù)為中心的模型,取代了傳統(tǒng)mpirun 命令行啟動MPI 進(jìn)程的方式,加強(qiáng)數(shù)據(jù)和計(jì)算之間的聯(lián)系,降低了編程的復(fù)雜度,同時也方便用戶進(jìn)行代碼復(fù)用。

        2.1.2 消息通信DAG計(jì)算模型

        為了將編程模型中提出的概念設(shè)計(jì)與實(shí)際任務(wù)的物理分配建立起有效的聯(lián)系,Spark利用不同RDD之間的依賴(Dependency)關(guān)系,生成出計(jì)算DAG 以供任務(wù)調(diào)度。如圖2 所示,Spark 默認(rèn)提供的依賴關(guān)系包括NarrowDependency 和ShuffleDependency 兩種。為了減少重復(fù)計(jì)算,同時提升任務(wù)的并行度,Spark 通過ShuffleDependency劃分出Stage,同一個Stage中的任務(wù)作用在單獨(dú)的Partition 之上,不需要與其他任務(wù)的Partition數(shù)據(jù)進(jìn)行交換,在不同的Stage 間通過Shuffle 進(jìn)行數(shù)據(jù)交換。

        在對Spark 中加入消息通信機(jī)制后,低效的Shuffle數(shù)據(jù)交換方式已被MPI 取代。為了實(shí)現(xiàn)二者的無縫融合,必須對原有的DAG 構(gòu)建策略進(jìn)行修改。本文添加了與MPI 相關(guān)的RDD、Dependency 和Stage,并對DAG 的劃分策略進(jìn)行了修改。如圖3 所示,對RDD 進(jìn)行mpimap 函數(shù)計(jì)算會產(chǎn)生新的MPIMapPartitionRDD,以及相對應(yīng)的MPIDependency 依賴,并在DAG 生成的過程中劃分出MPI Stage。鑒于MPI 通信模型的要求,該Stage 中的任務(wù)在Barrier mode 下執(zhí)行,即必須獲取到所有對應(yīng)的進(jìn)程資源,否則計(jì)算不會被執(zhí)行。不同于Spark 自身實(shí)現(xiàn)的ShuffleMapStage,MPI Stage 在任務(wù)間利用消息傳遞進(jìn)行通信,從而擴(kuò)充Spark的局限性,以提升計(jì)算效率。

        2.1.3 MPI計(jì)算容錯

        根據(jù)以上規(guī)則生成出的DAG,每個作用在RDD 分區(qū)上的MPI 函數(shù)都會在一個Stage 中執(zhí)行,并在函數(shù)執(zhí)行成功之后產(chǎn)生一個新的RDD。由于對MPI程序進(jìn)行了算子化操作,在一個作業(yè)中會有多個MPI函數(shù)被分配到相互獨(dú)立的Stage中,根據(jù)數(shù)據(jù)的依賴關(guān)系并行執(zhí)行。上述的編程模型和計(jì)算模型,不僅僅對Spark 自身的表達(dá)能力進(jìn)行了擴(kuò)充,同時也使得MPI可以實(shí)現(xiàn)以數(shù)據(jù)為中心的內(nèi)存計(jì)算。MPI 與Spark 的無縫融合,為使用HDFS,Alluxio等分布式文件系統(tǒng)也提供了便利。在對于容錯技術(shù)的研究上,圍繞MPI進(jìn)行的相關(guān)工作都集中在Checkpoint和減少作業(yè)恢復(fù)的啟動時間上,對于某一個進(jìn)程產(chǎn)生的錯誤是不可避免的。Spark、Alluxio 則是從計(jì)算/數(shù)據(jù)的DAG出發(fā),根據(jù)中間計(jì)算執(zhí)行流和中間數(shù)據(jù)緩存,保證作業(yè)從錯誤中快速恢復(fù)。因此,在對MPI函數(shù)算子化后,作業(yè)的執(zhí)行過程變成了諸多作用在分布式內(nèi)存數(shù)據(jù)集上的變換,相當(dāng)于對一個大的MPI作業(yè)進(jìn)行了分割,并可以對中間計(jì)算結(jié)果進(jìn)行保存。當(dāng)部分任務(wù)出現(xiàn)錯誤時,可以基于DAG 和RDD 進(jìn)行恢復(fù),而無需重啟整個作業(yè)。以上容錯的處理方式,尤其是對運(yùn)行在大規(guī)模數(shù)據(jù)集上的作業(yè),能夠減輕大量狀態(tài)恢復(fù)所需的計(jì)算和I/O開銷。

        2.2 Blaze架構(gòu)設(shè)計(jì)

        根據(jù)2.1 節(jié)闡述的編程和計(jì)算模型,基于Spark 和OpenMPI 構(gòu)建了一套計(jì)算系統(tǒng)Blaze,其總體設(shè)計(jì)和工作流程如下所示。

        2.2.1 OpenMPI組件設(shè)計(jì)

        基于MPI 標(biāo)準(zhǔn)的實(shí)現(xiàn)有多種,其中OpenMPI 的采用模塊-組件的架構(gòu)設(shè)計(jì)(modular component architecture,MCA),如圖4 所示,能夠便捷地進(jìn)行模塊插件開發(fā),實(shí)現(xiàn)自定義功能[15]。為了實(shí)現(xiàn)MPI與Spark的融合,OpenMPI端的主要任務(wù)為接收Spark調(diào)度器中產(chǎn)生的進(jìn)程分配信息,從而啟動MPI Executor(Process)執(zhí)行具體任務(wù)。傳統(tǒng)的MPI 程序主要使用mpirun 命令行執(zhí)行,即通過OpenMPI 的Run-Time 層(PRTE)產(chǎn)生資源分配信息,并實(shí)現(xiàn)進(jìn)程啟動。因此,Blaze需要對上述執(zhí)行過程進(jìn)行修改以完成對Spark的結(jié)合。

        Blaze 在設(shè)計(jì)上并未改變OpenMPI 執(zhí)行的邏輯,而是充分利用MCA 架構(gòu)的靈活性,編寫了自定義的功能模塊進(jìn)行替換。按照OpenMPI 模塊調(diào)用的順序,主要包括進(jìn)程生命周期管理模塊(PLM)、資源分配模塊(RAS)、進(jìn)程映射模塊(RMAPS)、啟動模塊(ODLS)及存儲上述信息的數(shù)據(jù)庫。使用JNI的方式將PRTE集成在Master中提供服務(wù),并通過對上述與進(jìn)程啟動相關(guān)模塊的改寫,使得MPI資源分配與調(diào)度所需的信息完全由Spark提供。

        具體的實(shí)現(xiàn)方法為:(1)PRTE根據(jù)PLM管理的作業(yè)生命周期維護(hù)了一個有限狀態(tài)機(jī),在Blaze 中由Master進(jìn)行管理。(2)Master 中記錄著系統(tǒng)整體的資源使用情況,以及提交的應(yīng)用信息,RAS 被Master 中的作業(yè)資源申請功能所替代。(3)RMAPS 進(jìn)程映射規(guī)則由Blaze 任務(wù)調(diào)度器根據(jù)2.1.2小節(jié)的DAG 計(jì)算模型產(chǎn)生,目前使用數(shù)據(jù)本地性算法進(jìn)行調(diào)度。(4)ODLS 在OpenMPI 中負(fù)責(zé)本地進(jìn)程的啟動,但在Blaze 中將此過程改為Worker根據(jù)調(diào)度信息產(chǎn)生Executor。

        至此,相對于傳統(tǒng)MPI 進(jìn)程啟動的關(guān)鍵組件已在Blaze 中實(shí)現(xiàn)。MPI 進(jìn)程與Spark Executor 的概念建立起來了連接,MPI_Init 可直接利用Executor 中包含的Namespace 及Rank 信息完成進(jìn)程間的感知,進(jìn)而實(shí)現(xiàn)Spark任務(wù)間的消息通信。

        2.2.2 Blaze處理流程

        Blaze處理作業(yè)的總體過程如圖5所示,架構(gòu)設(shè)計(jì)和執(zhí)行邏輯在主體上與Spark保持一致。不同之處主要在于,本文2.1.2 小節(jié)設(shè)計(jì)的DAG 計(jì)算模型通過DAG Scheduler 進(jìn)行了實(shí)現(xiàn),并由Task Scheduler 根據(jù)數(shù)據(jù)本地性原則完成MPI 任務(wù)的分配;同時在Master 中添加了2.2.1 小節(jié)介紹的MPI RTE,通過與Spark 提供的信息進(jìn)行交互以完成任務(wù)調(diào)度;最終的計(jì)算任務(wù)在MPI Executor中執(zhí)行。

        Blaze 完整的作業(yè)處理流程為,Master 接受客戶端提交的作業(yè),并將其調(diào)度到Worker 節(jié)點(diǎn)中啟動App Driver,此處保存著應(yīng)用程序運(yùn)行時的上下文BlazeContext,代表與Spark 和MPI 之間的連接。用戶提交的應(yīng)用程序代碼在Driver 中執(zhí)行,調(diào)度器根據(jù)編程模型的規(guī)則生成DAG 計(jì)算圖,并按照依賴關(guān)系將Stage 轉(zhuǎn)換成TaskSet 按順序發(fā)送到Executor 執(zhí)行。對于一個MPIStage,在任務(wù)調(diào)度的過程中,需要根據(jù)RDD Partition的存儲位置等信息做出決策,與Master中的MPI RTE通信,生成MPI Namespace 和Rank。每一個MPIStage 擁有一個Namespace,其生命周期由TasksetManager管理,使得被傳送到Executor 的Task 閉包能夠完成MPI 的初始化。因此由Worker產(chǎn)生的MPI Executor為單線程模式,即每個Executor分配一個CPU core。同時為了對科學(xué)計(jì)算中的C/C++ MPI程序進(jìn)行支持,可將Spark執(zhí)行中傳送的Closure替換為待執(zhí)行的二進(jìn)制文件或動態(tài)鏈接庫在MPI Executor中執(zhí)行。

        3 實(shí)驗(yàn)測試及結(jié)果分析

        在科學(xué)計(jì)算及機(jī)器學(xué)習(xí)領(lǐng)域,分布式稠密矩陣(dense matrix)和稀疏矩陣(sparse matrix)的運(yùn)算,以及迭代算法是各種計(jì)算作業(yè)的基礎(chǔ)。例如在高能物理領(lǐng)域,分波分析(partial wave analysis,PWA)需要在高統(tǒng)計(jì)量樣本數(shù)據(jù)上進(jìn)行數(shù)值擬合的計(jì)算,其核心過程為使用最大似然法估計(jì)待定參數(shù),需要反復(fù)迭代以求得最優(yōu)參數(shù)[16];格點(diǎn)QCD(lattice quantum chromodynamics,LQCD)從第一性原理出發(fā)將連續(xù)的量子場離散化為時間-空間格點(diǎn),計(jì)算熱點(diǎn)為大規(guī)模線性方程組的求解[17]。因此,本文對實(shí)際物理分析中的計(jì)算特征進(jìn)行提取,選取分布式稠密矩陣乘法,線性方程組求解作為性能測試的用例,并與Spark的原生實(shí)現(xiàn)進(jìn)行對比。

        本文的實(shí)驗(yàn)環(huán)境是一個8 節(jié)點(diǎn)組成的集群,包括1個BlazeMaster和8個Worker節(jié)點(diǎn)。各個節(jié)點(diǎn)的軟硬件情況如表2所示。

        表2 測試集群軟硬件環(huán)境Table 2 Software and hardware environment of cluster

        3.1 矩陣乘法測試

        對于稠密矩陣的乘法測試,本文選取BLAS的第三層次矩陣-矩陣乘(matrix-matrix multiplication)進(jìn)行實(shí)驗(yàn)。Spark在其機(jī)器學(xué)習(xí)庫Spark MLlib中基于MapReduce實(shí)現(xiàn)了分布式分塊矩陣(BlockMatrix)的乘法運(yùn)算,而本文則利用Blaze 中添加的消息傳遞機(jī)制,基于MPI和Spark 的RDD-based BlockMatrix 實(shí) 現(xiàn) 了Cannon 算法,與Spark MLlib中默認(rèn)的稠密矩陣乘法進(jìn)行對比。

        Cannon 算法將待乘矩陣A和B按處理器數(shù)量p分成p×p的矩陣Aij和Bij,(0<i,j <p-1),分布在虛擬的方陣格點(diǎn)之上。在完成每輪結(jié)果Cij的計(jì)算后,對A矩陣沿著i方向循環(huán)左移,B矩陣沿著j循環(huán)上移,共執(zhí)行p次循環(huán)移位后輸出結(jié)果C。常規(guī)的n×n矩陣乘法的復(fù)雜度為O(n3),Cannon算法將計(jì)算復(fù)雜度降低到了O(n3/p)。但由于其在計(jì)算的過程中存在大量的分塊矩陣數(shù)據(jù)傳遞過程,無法在Spark 中高效實(shí)現(xiàn),而在本文設(shè)計(jì)的Blaze中則可以借助MPI對Cannon算法進(jìn)行實(shí)現(xiàn),其中本地矩陣向量運(yùn)算使用OpenBLAS處理。

        實(shí)驗(yàn)結(jié)果對比的基準(zhǔn)為矩陣計(jì)算部分的性能,即矩陣已被加載到Spark 中的DenseMatrix 分布式內(nèi)存數(shù)據(jù)后的計(jì)算過程。 圖6(a)是一組在20 160×20 160 大小上的矩陣乘法測試,并根據(jù)Cannon 算法要求選擇了n2數(shù)量的CPU 核心。結(jié)果顯示,基于消息傳遞方法的矩陣乘法相比于Spark MLlib 中的實(shí)現(xiàn)有57%~68%的性能提升,與C++實(shí)現(xiàn)的矩陣乘法性能十分接近,額外的開銷在于序列化反序列化過程及編程語言層面。同時隨著CPU 核心數(shù)目的增長,消息傳遞方法的加速曲線逐漸變緩,是由于進(jìn)程數(shù)目增加導(dǎo)致的通信開銷不斷增長。圖6(b)則展示了隨著矩陣維數(shù)的增長在相同CPU核心數(shù)目(64 核)情況下的性能對比,消息傳遞相比于Spark MLlib 有50%~69%的提升。并且當(dāng)矩陣維度增大到64 000時,Spark會由于OOM無法得到計(jì)算結(jié)果。

        由上述測試可以看出,矩陣維度及CPU 核心數(shù)目等參數(shù)的變化都會對程序的執(zhí)行效率造成影響,但在相同的軟硬件配置下,同樣基于BlockMatrix 和MLlib BLAS 實(shí)現(xiàn)的消息傳遞方式的矩陣乘法在性能測試中的表現(xiàn)均優(yōu)于Spark MLlib。同時本文提出的Spark 消息傳遞方法,相比于文獻(xiàn)[5]中提出的Alchemist有10%~27%的性能提升,原因在于Alchemist 并未將MPI 融合在Spark中,而是充當(dāng)MPI與Spark交互的連接器,因此存在額外的數(shù)據(jù)傳遞及格式轉(zhuǎn)換開銷。并且用戶需使用Alchemist 提供的API 編寫程序,在程序運(yùn)行時調(diào)用MPI 實(shí)現(xiàn)的編程庫進(jìn)行計(jì)算,相比于在Spark 中直接實(shí)現(xiàn)的任務(wù)間消息傳遞,在易用性、靈活性及性能表現(xiàn)上均存在不足。

        3.2 迭代計(jì)算測試

        共軛梯度下降法(conjugate gradient method)是一種在Krylov子空間中迭代的算法,被廣泛應(yīng)用在線性系統(tǒng)的求解中,如Ax=b。其實(shí)現(xiàn)如算法1 所示,計(jì)算的熱點(diǎn)為矩陣-向量乘和向量-向量乘,當(dāng)結(jié)果小于規(guī)定的殘差值或迭代到達(dá)最大輪次后輸出結(jié)果。

        由于Spark Mllib 中未包含CG 算法,本文對Spark-CG進(jìn)行了實(shí)現(xiàn),其核心思想為,對迭代中需復(fù)用的分布式矩陣A進(jìn)行Cache,而每輪迭代更新的參數(shù)通過Broadcast 發(fā)送到Worker 中使用,在每個任務(wù)內(nèi)完成分布式矩陣向量乘法及向量內(nèi)積運(yùn)算?;谙鬟f方式的實(shí)現(xiàn)則通過MPI_Allreduce 完成參數(shù)更新,并令所有的計(jì)算過程在同一個Stage中完成。

        圖7 分別展示了在不同CPU 核心數(shù)量(32 000×32 000 矩陣)及不同維度矩陣(32 核)下通過CG 求解線性方程組的平均迭代耗時,結(jié)果顯示消息傳遞方式的實(shí)現(xiàn)相比于Spark 分別有51%~94%和58%~92%的性能提升,相比于Alchemist 有7%~17%的性能提升。在對Spark的測試過程中,隨著CPU核心數(shù)目的提升,并沒有帶來執(zhí)行時間上的顯著縮短,原因在于隨著節(jié)點(diǎn)增加帶來的調(diào)度和網(wǎng)絡(luò)開銷以及不同任務(wù)的執(zhí)行時間差異影響了整體性能,實(shí)際上每一個任務(wù)內(nèi)的計(jì)算時間仍然在下降,與此同時,消息傳遞方式則表現(xiàn)出了與預(yù)期相符的性能表現(xiàn)。故采用消息傳遞的方式可以大量減少冗余任務(wù)的分發(fā)調(diào)度過程,同時能夠更高效地實(shí)現(xiàn)分布式環(huán)境下的矩陣向量運(yùn)算。因此對于迭代密集型應(yīng)用,均可選擇使用消息傳遞的方式替換Spark 中的Broadcast及Reduce等操作,從而獲得極大的性能提升。

        4 結(jié)束語

        本文針對Spark不能滿足科學(xué)計(jì)算場景下對編程模型靈活性的需求,以及計(jì)算性能低下的問題,深入研究了Spark 及OpenMPI 框架的設(shè)計(jì)思想,提出了一種在Spark 任務(wù)間進(jìn)行消息傳遞的方法。通過修改Spark 與OpenMPI的資源管理和任務(wù)調(diào)度的運(yùn)行邏輯,實(shí)現(xiàn)了大數(shù)據(jù)和高性能計(jì)算框架的融合,有效提升了Spark 執(zhí)行數(shù)值計(jì)算任務(wù)的性能。因此,本文提出的Spark 任務(wù)間消息傳遞方法,可適用于大規(guī)模數(shù)據(jù)量環(huán)境下的科學(xué)計(jì)算與數(shù)據(jù)處理任務(wù),如迭代密集型計(jì)算或具有任務(wù)通信特征的算法與應(yīng)用等。

        為了便于移植歷史程序,或出于性能考慮需使用基于C/C++編寫的MPI程序時,會存在大量的數(shù)據(jù)轉(zhuǎn)換和序列化/反序列化開銷。在Executor 內(nèi)執(zhí)行的MPI 計(jì)算任務(wù)也存在類似的問題,同時會面臨JVM 的內(nèi)存壓力。因此,除計(jì)算引擎和調(diào)度系統(tǒng)之外,高效統(tǒng)一的數(shù)據(jù)格式以及內(nèi)存管理系統(tǒng)也是需要關(guān)注的重點(diǎn)。RDD雖然提供了良好的分布式內(nèi)存計(jì)算抽象,但無法滿足復(fù)雜數(shù)據(jù)類型的高效使用與管理。本文下一步的工作會專注于分布式內(nèi)存管理相關(guān)技術(shù)的優(yōu)化研究,以最終實(shí)現(xiàn)可以高效率執(zhí)行多種作業(yè)任務(wù)的統(tǒng)一計(jì)算系統(tǒng)。

        猜你喜歡
        內(nèi)存編程分布式
        我家有只編程貓
        我家有只編程貓
        我家有只編程貓
        我家有只編程貓
        “春夏秋冬”的內(nèi)存
        分布式光伏熱錢洶涌
        能源(2017年10期)2017-12-20 05:54:07
        分布式光伏:爆發(fā)還是徘徊
        能源(2017年5期)2017-07-06 09:25:54
        基于DDS的分布式三維協(xié)同仿真研究
        西門子 分布式I/O Simatic ET 200AL
        基于內(nèi)存的地理信息訪問技術(shù)
        欧美疯狂做受xxxxx高潮| 极品人妻少妇av免费久久| 色欲网天天无码av| 亚洲av无码乱观看明星换脸va| 亚洲国产日韩在线人成蜜芽| 中文字幕亚洲精品高清| 国产偷国产偷亚洲高清视频| 亚洲七久久之综合七久久| 欧美自拍丝袜亚洲| 久久久亚洲精品蜜臀av| 国产乱理伦在线观看美腿丝袜| 无人视频在线观看免费播放影院| 日本免费人成视频播放| 日本看片一区二区三区| 国产亚洲精品97在线视频一| 激情航班h版在线观看| 伊人网在线视频观看| 日韩人妻有码中文字幕| 精品国产亚洲级一区二区| 免费人妻无码不卡中文字幕18禁 | 午夜国产精品视频在线观看| 日韩精品一区二区三区中文| 欧美日本亚洲国产一区二区| 国产精东一区二区三区| 亚洲中文字幕精品乱码2021| 国产精品综合一区二区三区| 国产乱色国产精品免费视频| 白白色发布视频在线播放| 亚洲国产精品成人久久| 色猫咪免费人成网站在线观看| 国产AV无码专区亚洲AWWW| 国产激情视频在线观看大全| 欧美俄罗斯40老熟妇| 人伦片无码中文字幕| 99亚洲女人私处高清视频| 亚洲成av人在线观看网址| 亚洲暴爽av天天爽日日碰| 抖射在线免费观看视频网站| 青青草成人在线免费视频| 99久久人人爽亚洲精品美女| 久久99亚洲综合精品首页|