李 超,董新華,陳建峽
湖北工業(yè)大學(xué) 計(jì)算機(jī)學(xué)院,武漢430068
隨著信息時(shí)代的快速發(fā)展,基于圖的迭代算法有著廣泛應(yīng)用[1]。例如,PageRank算法可以對(duì)網(wǎng)頁(yè)的重要性進(jìn)行排序,SimRank算法可以對(duì)社交網(wǎng)絡(luò)中用戶之間的相似度進(jìn)行分析。由于真實(shí)網(wǎng)絡(luò)的數(shù)據(jù)規(guī)模較大,通常采取分布式框架對(duì)圖數(shù)據(jù)進(jìn)行迭代處理,基于全局同步更新機(jī)制[2](BSP)的圖處理系統(tǒng)由于實(shí)現(xiàn)簡(jiǎn)單且易于擴(kuò)展,為大規(guī)模圖數(shù)據(jù)的分析提供了便利。
受全局同步更新機(jī)制的啟發(fā)[3],Spark 環(huán)境下的Graphx[4]圖處理系統(tǒng)基于子圖對(duì)數(shù)據(jù)進(jìn)行全局同步迭代更新。通過(guò)彈性分布式數(shù)據(jù)集以分區(qū)方式存放頂點(diǎn)和連接邊屬性,Graphx可以利用頂點(diǎn)的局部狀態(tài)更新頂點(diǎn)的全局狀態(tài),并且在迭代過(guò)程中將中間計(jì)算結(jié)果放入內(nèi)存,以此避免頻繁地I/O 訪問(wèn)。但是,由于Graphx 要求子圖之間的計(jì)算任務(wù)保持全局同步,因此降低了圖的收斂速度。
針對(duì)全局同步機(jī)制收斂速度較慢的問(wèn)題,研究人員提出了異步迭代更新方法[5],Zhang[6]闡述了異步迭代收斂需要滿足的條件。當(dāng)?shù)惴M足該條件時(shí),頂點(diǎn)的狀態(tài)更新能夠繞開(kāi)同步路障。雖然異步迭代提高了算法的收斂速度,但是以頂點(diǎn)為中心的異步迭代需要鄰居節(jié)點(diǎn)頻繁跨連接邊發(fā)送消息[7-8],當(dāng)鄰居節(jié)點(diǎn)狀態(tài)的變化對(duì)頂點(diǎn)狀態(tài)更新的作用不大時(shí),會(huì)降低網(wǎng)絡(luò)的通信效率[9]。
為了解決圖迭代收斂速度較慢以及通信效率較低的問(wèn)題,本文在Spark 環(huán)境下提出一種基于子圖的異步迭代更新方法,總體研究思路如下:首先,對(duì)圖的切分、全局同步和異步迭代更新等概念進(jìn)行簡(jiǎn)要介紹;其次,結(jié)合Spark 環(huán)境下圖數(shù)據(jù)存儲(chǔ)和更新的特點(diǎn),推導(dǎo)出基于子圖的異步迭代更新條件,分別從異步消息通信和迭代更新機(jī)制等方面給出具體的研究方案,在此基礎(chǔ)上給出研究方案在分布式環(huán)境下的具體實(shí)現(xiàn);最后,通過(guò)PageRank 算法分別從圖的收斂結(jié)果、收斂速度和通信代價(jià)等方面驗(yàn)證了方法有效性,并對(duì)實(shí)驗(yàn)結(jié)果進(jìn)行分析。實(shí)驗(yàn)結(jié)果表明,本文方法不僅能夠提高算法收斂速度,同時(shí)還能降低通信開(kāi)銷(xiāo)。
圖數(shù)據(jù)有較強(qiáng)的關(guān)聯(lián)性,因此圖的切分對(duì)數(shù)據(jù)的迭代更新有較大影響[10-11]。將圖1中頂點(diǎn)E 切分成4部分后,邊分區(qū)可以存放連接邊的完整屬性和頂點(diǎn)的局部狀態(tài),點(diǎn)分區(qū)可以存放頂點(diǎn)的全局狀態(tài),計(jì)算節(jié)點(diǎn)分別對(duì)邊分區(qū)中頂點(diǎn)的局部狀態(tài)和點(diǎn)分區(qū)中頂點(diǎn)的全局狀態(tài)進(jìn)行迭代處理。
圖1 有向圖的點(diǎn)切分方法
在圖迭代過(guò)程中,連接邊上的頂點(diǎn)需要頻繁交互信息。由于圖1中邊分區(qū)存放了連接邊兩端的頂點(diǎn),因此連接邊上的頂點(diǎn)可以直接在本地交互信息。對(duì)于連接邊數(shù)量多于頂點(diǎn)數(shù)量的真實(shí)網(wǎng)絡(luò)來(lái)說(shuō),這種切分方式可以顯著減少頂點(diǎn)通過(guò)連接邊跨分區(qū)發(fā)送消息的次數(shù)[11]。
圖的狀態(tài)迭代更新可以用公式(1)描述:
公式(1)表明目的節(jié)點(diǎn)j 在第k 輪的狀態(tài)值根據(jù)連接邊上的源節(jié)點(diǎn)在第k-1 輪的狀態(tài)值計(jì)算得到。
在全局同步機(jī)制下,頂點(diǎn)的全局狀態(tài)依賴于所有邊分區(qū)內(nèi)頂點(diǎn)的局部狀態(tài)。當(dāng)所有邊分區(qū)頂點(diǎn)的局部狀態(tài)全部計(jì)算完畢,頂點(diǎn)的全局狀態(tài)更新才能開(kāi)始。異構(gòu)環(huán)境下如果連接邊分布在多個(gè)不同的邊分區(qū),那么本地計(jì)算耗時(shí)最長(zhǎng)的邊分區(qū)將直接影響下一輪迭代開(kāi)始的時(shí)間。
為提高算法收斂速度,Zhang[6]認(rèn)為對(duì)公式(1)作適當(dāng)變形后,可以得到公式(2):
公式(2)表明目的節(jié)點(diǎn)j 在第k 輪的狀態(tài)可以通過(guò)nj個(gè)源節(jié)點(diǎn)在第k-1 輪的狀態(tài)計(jì)算得到。如果只考慮源節(jié)點(diǎn)在第k 輪與第k-1 輪的狀態(tài)變化值,令,那么公式(2)可變?yōu)楣剑?):
公式(2)和公式(3)表明異步更新能夠減少圖的收斂時(shí)間,但是分布式環(huán)境下圖的迭代對(duì)異步更新條件、消息通信和更新機(jī)制有著不同的要求,需要根據(jù)實(shí)際情況加以具體分析。本文首先給出Spark環(huán)境下基于子圖的異步迭代形式,在此基礎(chǔ)上給出消息通信模型和異步處理機(jī)制。
式中的⊕算子定義為局部算子,將公式(5)帶入公式(4),可以得到公式(6):
式中的⊕算子定義為全局算子,公式(6)表明頂點(diǎn)的全局狀態(tài)改變值可以根據(jù)不同邊分區(qū)的局部改變值進(jìn)行更新。由于不同分區(qū)的本地計(jì)算任務(wù)互不影響,因此全局算子滿足交換律。在函數(shù)fm作用下,如果局部算子也滿足交換律和分配律,那么頂點(diǎn)收到任一局部狀態(tài)變化值,就能立即更新頂點(diǎn)的全局狀態(tài)。
分布式圖處理系統(tǒng)主要采取消息傳遞[12]和共享內(nèi)存的通信方式?;诠蚕韮?nèi)存通信的分布式圖處理系統(tǒng)一般采取分布式鎖保證數(shù)據(jù)的一致性。由于分布式環(huán)境下的計(jì)算節(jié)點(diǎn)有獨(dú)立的內(nèi)存地址,共享內(nèi)存的通信方式實(shí)現(xiàn)起來(lái)較為困難?;谙鬟f的通信方式在計(jì)算節(jié)點(diǎn)之間發(fā)送消息,目前主要有基于Netty 的通信協(xié)議[13]、消息傳遞接口協(xié)議[14]和遠(yuǎn)程過(guò)程調(diào)用協(xié)議。
根據(jù)Spark 環(huán)境下頂點(diǎn)狀態(tài)更新的特點(diǎn),本文采取遠(yuǎn)程過(guò)程調(diào)用協(xié)議實(shí)現(xiàn)分區(qū)數(shù)據(jù)集之間的異步消息通信,將每個(gè)分區(qū)數(shù)據(jù)集看作基于Actor 模型[15]的通信實(shí)體。另外,遠(yuǎn)程過(guò)程調(diào)用不僅能夠?qū)崿F(xiàn)異步消息通信,底層通信協(xié)議還支持?jǐn)?shù)據(jù)塊傳輸。由于頂點(diǎn)的局部狀態(tài)值分布在不同的邊分區(qū),而頂點(diǎn)唯一的全局狀態(tài)值存放在點(diǎn)分區(qū),邊分區(qū)m 與點(diǎn)分區(qū)n 之間存在交集。當(dāng)邊分區(qū)在本地獲取本地目的節(jié)點(diǎn)的局部匯聚值之后,可以將本地局部匯聚的結(jié)果以數(shù)據(jù)塊Blockm->n(local(ΔSID))的形式發(fā)送給頂點(diǎn)所在的點(diǎn)分區(qū),數(shù)據(jù)塊中頂點(diǎn)ID 滿足式(7):
與頂點(diǎn)為中心的消息發(fā)送方式相比,以數(shù)據(jù)塊為單位將計(jì)算結(jié)果集中發(fā)給點(diǎn)分區(qū),不僅可以批量處理數(shù)據(jù)塊,同時(shí)還能提高通信效率。
當(dāng)點(diǎn)分區(qū)收到數(shù)據(jù)塊之后,為避免頂點(diǎn)狀態(tài)在異步更新過(guò)程中產(chǎn)生數(shù)據(jù)讀寫(xiě)沖突,本文創(chuàng)建數(shù)據(jù)塊緩存隊(duì)列以及線程池,在頂點(diǎn)狀態(tài)表上設(shè)置讀寫(xiě)鎖,保證頂點(diǎn)狀態(tài)在異步更新過(guò)程中的一致性。
圖2中,線程池分別執(zhí)行數(shù)據(jù)塊的接收(receive)、更新(update)和發(fā)送(send)任務(wù)。receive線程將收到的數(shù)據(jù)塊放入緩存隊(duì)列,update線程從緩存隊(duì)列中取出數(shù)據(jù)塊進(jìn)行處理,將數(shù)據(jù)塊中頂點(diǎn)的局部狀態(tài)變化delta 寫(xiě)進(jìn)狀態(tài)表(state table),同時(shí)更新頂點(diǎn)的全局狀態(tài)。如果更新后的頂點(diǎn)作為源節(jié)點(diǎn)指向其他目的節(jié)點(diǎn),則將最新的狀態(tài)變化值發(fā)送給該點(diǎn)作為源節(jié)點(diǎn)的邊分區(qū),以更新其指向的目的節(jié)點(diǎn)。另外,update線程通過(guò)設(shè)定發(fā)送閾值,將滿足條件的狀態(tài)變化值放入待發(fā)送數(shù)據(jù)塊,當(dāng)數(shù)據(jù)塊達(dá)到一定規(guī)模后喚醒send 線程,由send 線程將數(shù)據(jù)塊發(fā)給邊分區(qū),以更新邊分區(qū)中源節(jié)點(diǎn)的狀態(tài)。當(dāng)點(diǎn)分區(qū)內(nèi)頂點(diǎn)全局狀態(tài)變化值小于閾值時(shí),終止迭代過(guò)程。
圖2 點(diǎn)分區(qū)和邊分區(qū)之間異步迭代更新機(jī)制
本文以PageRank[16]算法驗(yàn)證基于子圖的異步迭代更新方法,PageRank的迭代計(jì)算公式為:
Spark 環(huán)境下邊分區(qū)以三元組((srcId,srcAttr),(dstId,dstAttr),attr)的格式存儲(chǔ)本地連接邊的狀態(tài)值,其中srcId、dstId 是連接邊上源節(jié)點(diǎn)的ID 和目的節(jié)點(diǎn)的ID,srcAttr 是源節(jié)點(diǎn)在分區(qū)內(nèi)當(dāng)前的狀態(tài)值,dstAttr 是目的節(jié)點(diǎn)的狀態(tài)值,attr是連接邊完整的屬性值,取決于源節(jié)點(diǎn)的出度數(shù)。
根據(jù)圖數(shù)據(jù)的特點(diǎn),在連接邊上定義map 函數(shù)和reduce函數(shù),頂點(diǎn)的局部狀態(tài)可以直接利用數(shù)據(jù)的本地性計(jì)算獲取。與傳統(tǒng)的MapReduce 分布式計(jì)算模型[12]不同,由于采用彈性分布式數(shù)據(jù)集存放圖數(shù)據(jù),Spark環(huán)境下通過(guò)map 函數(shù)和reduce 函數(shù)實(shí)現(xiàn)的消息映射和匯聚結(jié)果不需要頻繁寫(xiě)入外存,每條連接邊上的Map函數(shù)定義如下:
在邊分區(qū)內(nèi)執(zhí)行map 函數(shù)時(shí),源節(jié)點(diǎn)src 以并行方式向本地目的節(jié)點(diǎn)集合dstIds發(fā)送消息,消息度量值是源節(jié)點(diǎn)當(dāng)前的狀態(tài)值與該條連接邊屬性的乘積。當(dāng)目的節(jié)點(diǎn)存在多條入度連接邊,目的節(jié)點(diǎn)dst 在本地局部聚合的結(jié)果是該點(diǎn)所有入度連接邊上消息聚合的總和,通過(guò)reduce函數(shù)得到:
Spark 環(huán)境下的局部計(jì)算可以通過(guò)公式(9)和公式(10)實(shí)現(xiàn),⊕算子是乘積算子和加法算子,滿足交換律、結(jié)合律。根據(jù)公式(6),目的節(jié)點(diǎn)dst 的全局狀態(tài)值是上一次更新后的全局狀態(tài)值與最新的狀態(tài)變化值直接求和:
式中,⊕是加法算子,滿足交換律和結(jié)合律,因此頂點(diǎn)在第k 輪的全局狀態(tài)與不同邊分區(qū)中頂點(diǎn)的局部狀態(tài)值到達(dá)的次序無(wú)關(guān),而通過(guò)式(9)、(10)計(jì)算得到,因此全局狀態(tài)值的更新滿足異步迭代條件。本文接下來(lái)結(jié)合公式(9)、(10)和公式(11)給出PageRank算法異步更新的具體實(shí)現(xiàn)。
根據(jù)公式(9)、(10),Spark 環(huán)境下邊分區(qū)內(nèi)部的活躍節(jié)點(diǎn)沿其出度連接邊給目的節(jié)點(diǎn)發(fā)消息,而目的節(jié)點(diǎn)對(duì)其每條入度連接邊上對(duì)收到的消息進(jìn)行局部聚合,如算法1。
算法1在邊分區(qū)內(nèi)對(duì)目的節(jié)點(diǎn)的局部狀態(tài)聚合
輸入:點(diǎn)分區(qū)的活躍源節(jié)點(diǎn)集合activeSet,源節(jié)點(diǎn)狀態(tài)值變化的數(shù)據(jù)塊newSrcAttrblock
輸出:邊分區(qū)內(nèi)部目的節(jié)點(diǎn)的局部狀態(tài)信息local_aggregates,并將localBlockForVertexPartition發(fā)給點(diǎn)分區(qū)vertexPartition
/*遍歷本地的源節(jié)點(diǎn)集合*/
1.for each srcId in localSrcIds
/*如果活躍節(jié)點(diǎn)集合包含源節(jié)點(diǎn)sccId*/
2.if activeSet.contains(srcId)
/*更新邊分區(qū)三元組上活躍源節(jié)點(diǎn)srcId的狀態(tài)值*/
3. newSrcAttr=update(srcId,newSrcAttrblock)
4. newedgeTriplet=updateEdgeTriple(tsrcId,news rcAttr)
/*在源節(jié)點(diǎn)srcId的出度連接邊上給目的節(jié)點(diǎn)發(fā)送消息*/
5. mapFunc(newedgeTriplet=>dstMsg([dstId,msg)])
/*在目的節(jié)點(diǎn)dstId入度連接邊上作本地局部聚合*/
6. for each dstId,msg in dstMsg([dstId,msg)]
7. reduceFunc(local_aggregates(dstId),msg)
/*根據(jù)目的節(jié)點(diǎn)所在的點(diǎn)分區(qū)對(duì)局部聚合結(jié)果進(jìn)行切分*/
8.dstBlockToVertexPartition=split(dstVidsInVertexPartition,local_aggregates)
/*將分區(qū)后的數(shù)據(jù)塊依次發(fā)送至所在的點(diǎn)分區(qū)*/
9.for each vertexRef in vertexPartitionRefs
10.vertexPartitionRef.send(dstBlockToVertexPartition)
算法1中,邊分區(qū)中源節(jié)點(diǎn)是否對(duì)其出度連接邊上的目的節(jié)點(diǎn)發(fā)送消息,取決于源節(jié)點(diǎn)是否處于活躍狀態(tài)(active)。因此,邊分區(qū)首先接收來(lái)自點(diǎn)分區(qū)的活躍節(jié)點(diǎn)集(activeSet)以及包含源節(jié)點(diǎn)變化值的數(shù)據(jù)塊newSrcAttrblock,并且檢查本地源節(jié)點(diǎn)是否在活躍節(jié)點(diǎn)集中。對(duì)于活躍的源節(jié)點(diǎn),更新其狀態(tài)變化值得到最新的以該點(diǎn)為中心的三元組集合newedgeTriplet。此后,mapFunc 并行作用于更新后的以srcId 為源節(jié)點(diǎn)的三元組上,同時(shí)reduceFunc 對(duì)目的節(jié)點(diǎn)dstId 入度連接邊上的消息進(jìn)行聚合,得到目的節(jié)點(diǎn)在本地局部聚合的結(jié)果local_aggregates。由于邊分區(qū)中的目的節(jié)點(diǎn)分布在不同的點(diǎn)分區(qū)中,按照式(7)對(duì)本地聚合結(jié)果進(jìn)行切分,將切分后的結(jié)果dstBlockToVertexPartition 通過(guò)每個(gè)點(diǎn)分區(qū)地址引用vertexPartitionRef發(fā)送給對(duì)應(yīng)的點(diǎn)分區(qū)。
另一方面,點(diǎn)分區(qū)收到不同邊分區(qū)的數(shù)據(jù)塊,按照?qǐng)D2更新節(jié)點(diǎn)的狀態(tài)信息,如算法2。
算法2點(diǎn)分區(qū)對(duì)收到的數(shù)據(jù)塊后進(jìn)行處理,將更新后的源節(jié)點(diǎn)狀態(tài)變化值以數(shù)據(jù)塊的形式發(fā)送到所在的邊分區(qū)
輸入:點(diǎn)分區(qū)從邊分區(qū)收到的局部匯聚結(jié)果dstBlockTo-VertexPartition
輸出:更新后的源節(jié)點(diǎn)狀態(tài)變化值數(shù)據(jù)塊newSrcAttrblock
/*receive線程將邊分區(qū)發(fā)送的數(shù)據(jù)塊dstBlockToVertex-Partition放入阻塞隊(duì)列blockingQueue*/
1.blockingQueue.pu(tdstBlockToVertexPartition)
/*update線程池從阻塞隊(duì)列取出數(shù)據(jù)塊curBlock*/
2.curBlock=blockingQueue.take()
/*update 線程池遍歷數(shù)據(jù)塊中的節(jié)點(diǎn)vid 及其變化值delta*/
3.for each vid,delta in curBlock
/*將節(jié)點(diǎn)和狀態(tài)變化值寫(xiě)入狀態(tài)信息表stateTable*/
4.stateTable.write(vid,delta)
/*將滿足條件的源節(jié)點(diǎn)放入狀態(tài)更新數(shù)據(jù)集srcDelta-ToEdgePartition,活躍節(jié)點(diǎn)集activeSet中*/
5.if(delta>DELTA_THRESHHOLD)
6.srcDeltaToEdgePartition.append(vid,delta)
7.activeSet.add(vid)
/*根據(jù)源節(jié)點(diǎn)所在的邊分區(qū)srcVidToEdgePartition 對(duì)狀態(tài)更新數(shù)據(jù)集srcDeltaToEdgePartition進(jìn)行切分*/
8.newSrcAttrblock=split(srcVidToEdgePartition,srcDelta-ToEdgePartition)
/*send線程將切分后的數(shù)據(jù)塊newSrcAttrblock依次發(fā)送至所在的邊分區(qū)*/
9.for each edgePartitionRef in edgePartitionRefs
10.edgePartitionRef.send(newSrcAttrblock)
算法2中,receive線程首先將來(lái)自邊分區(qū)的數(shù)據(jù)塊放入緩存隊(duì)列,update線程從緩存隊(duì)列取出數(shù)據(jù)塊處理目的節(jié)點(diǎn)的變化值。為了避免對(duì)狀態(tài)表中同一頂點(diǎn)同時(shí)寫(xiě)入狀態(tài)變化值,update線程在寫(xiě)入數(shù)據(jù)之前需要首先獲取狀態(tài)表的寫(xiě)鎖,在寫(xiě)入數(shù)據(jù)之后檢查該頂點(diǎn)的狀態(tài)變化值是否超過(guò)頂點(diǎn)狀態(tài)變化的閾值,并將滿足條件的頂點(diǎn)放入待發(fā)送數(shù)據(jù)塊以及活躍頂點(diǎn)集activeSet中。另外,由于點(diǎn)分區(qū)中更新后的源節(jié)點(diǎn)分布在不同的邊分區(qū)中,需要對(duì)點(diǎn)分區(qū)內(nèi)更新后的結(jié)果進(jìn)行切分,并將切分后的結(jié)果newSrcAttrblock 通過(guò)每個(gè)邊分區(qū)地址引用edgePartitionRef 依次發(fā)送給對(duì)應(yīng)的邊分區(qū)。當(dāng)邊分區(qū)收到點(diǎn)分區(qū)的數(shù)據(jù)塊之后再次執(zhí)行算法1,并繼續(xù)執(zhí)行新一輪的局部聚合任務(wù),當(dāng)邊分區(qū)內(nèi)所有源節(jié)點(diǎn)為非活躍狀態(tài)時(shí),終止算法1和算法2。
本文選取真實(shí)網(wǎng)絡(luò)樣本數(shù)據(jù)集wiki-topcats[17],該數(shù)據(jù)集共包含1 791 489個(gè)頂點(diǎn),28 511 807條連接邊。為實(shí)現(xiàn)負(fù)載均衡,以哈希方式對(duì)圖數(shù)據(jù)作點(diǎn)切分,頂點(diǎn)和連接邊的狀態(tài)值分別存放在4 個(gè)點(diǎn)分區(qū)和4 個(gè)邊分區(qū)。通過(guò)兩組實(shí)驗(yàn)驗(yàn)證方法有效性:第一組實(shí)驗(yàn)統(tǒng)計(jì)PageRank 在全局同步和異步更新的收斂結(jié)果;第二組實(shí)驗(yàn)給出不同迭代方式下的收斂時(shí)間和通信開(kāi)銷(xiāo)。
在執(zhí)行迭代算法之前,首先對(duì)圖中頂點(diǎn)狀態(tài)進(jìn)行初始化。根據(jù)公式(11),在迭代過(guò)程中需要保證第k+1輪狀態(tài)值是第k 輪狀態(tài)值與下一輪狀態(tài)變化值求和的結(jié)果,因此設(shè)定點(diǎn)分區(qū)內(nèi)頂點(diǎn)的初始值為0,邊分區(qū)內(nèi)頂點(diǎn)初始值1-d,邊分區(qū)內(nèi)所有頂點(diǎn)的狀態(tài)為激活狀態(tài)。另外,PageRank 算法中頂點(diǎn)狀態(tài)值在迭代過(guò)程中呈單調(diào)增長(zhǎng)趨勢(shì),因此采用頂點(diǎn)全局狀態(tài)值的總和作為收斂程度的度量值,頂點(diǎn)的狀態(tài)初始值以及整個(gè)圖中活躍節(jié)點(diǎn)的個(gè)數(shù)將影響整個(gè)圖的最終收斂結(jié)果。當(dāng)d 值越小,頂點(diǎn)的初始狀態(tài)值越大,并且圖中活躍頂點(diǎn)個(gè)數(shù)越多時(shí),圖中頂點(diǎn)狀態(tài)值的收斂總和越大。根據(jù)PageRank 算法的迭代公式,通常情況下設(shè)定d 值為0.8,使得孤立頁(yè)面隨機(jī)跳轉(zhuǎn)到其他頁(yè)面的概率為0.2。在異步迭代方式下,不同點(diǎn)分區(qū)內(nèi)頂點(diǎn)的全局狀態(tài)相互獨(dú)立,只要點(diǎn)分區(qū)內(nèi)頂點(diǎn)全局狀態(tài)總和的增長(zhǎng)區(qū)間小于設(shè)定的閾值,即認(rèn)為該分區(qū)的頂點(diǎn)達(dá)到全局收斂。當(dāng)設(shè)定閾值越大,圖越容易達(dá)到收斂狀態(tài),當(dāng)所有分區(qū)的頂點(diǎn)全部收斂,結(jié)束整個(gè)迭代過(guò)程。
首先,按照全局同步方式對(duì)圖數(shù)據(jù)迭代。Spark 環(huán)境下的全局同步通過(guò)reduce 算子觸發(fā)邊分區(qū)內(nèi)部的局部消息聚合任務(wù),再將聚合后的結(jié)果發(fā)送給點(diǎn)分區(qū)作全局同步,圖中所有頂點(diǎn)的狀態(tài)總和與迭代次數(shù)之間的關(guān)系如圖3所示。
圖3 全局同步迭代下頂點(diǎn)狀態(tài)總和
在圖3中,頂點(diǎn)狀態(tài)值的總和隨著迭代次數(shù)增長(zhǎng)不斷增大。迭代前10 輪頂點(diǎn)狀態(tài)總和的增長(zhǎng)速度較快,隨后增長(zhǎng)速度減緩,迭代到22輪時(shí),頂點(diǎn)狀態(tài)值的總和接近收斂狀態(tài)。
其次,異步迭代不受全局同步的限制,因此異步更新沒(méi)有迭代次數(shù)的概念,不能通過(guò)迭代次數(shù)判斷圖的收斂狀態(tài)??紤]到頂點(diǎn)的全局狀態(tài)更新取決于邊分區(qū)數(shù)據(jù)塊到達(dá)的時(shí)間,并且不同分區(qū)內(nèi)頂點(diǎn)的全局狀態(tài)之間相互獨(dú)立,因此可以統(tǒng)計(jì)不同點(diǎn)分區(qū)內(nèi)頂點(diǎn)的全局狀態(tài)總和判斷圖數(shù)據(jù)的收斂狀態(tài)。圖4 給出不同點(diǎn)分區(qū)內(nèi)頂點(diǎn)狀態(tài)總和隨著數(shù)據(jù)塊處理的變化關(guān)系。
圖4 異步迭代下各個(gè)點(diǎn)分區(qū)的頂點(diǎn)狀態(tài)總和
在異步迭代初始階段,邊分區(qū)內(nèi)所有源節(jié)點(diǎn)的初始狀態(tài)都為激活狀態(tài),因此邊分區(qū)內(nèi)所有源節(jié)點(diǎn)都能向目的節(jié)點(diǎn)發(fā)送消息,使得初始階段各個(gè)點(diǎn)分區(qū)內(nèi)頂點(diǎn)的全局狀態(tài)總和增長(zhǎng)較快。另外,異步更新并不要求所有邊分區(qū)的數(shù)據(jù)塊同時(shí)到達(dá),只要點(diǎn)分區(qū)收到數(shù)據(jù)塊就能立即更新部分頂點(diǎn)的狀態(tài),因此各個(gè)點(diǎn)分區(qū)的頂點(diǎn)狀態(tài)總和在收斂過(guò)程中出現(xiàn)不同幅度的震蕩。當(dāng)4 個(gè)點(diǎn)分區(qū)處理完65~70個(gè)數(shù)據(jù)塊后,頂點(diǎn)狀態(tài)總和與全局同步迭代到22輪的狀態(tài)值接近,認(rèn)為異步迭代接近收斂。
為比較全局同步和異步迭代的收斂速度,以圖3和圖4 的收斂值統(tǒng)計(jì)全局同步和異步迭代的收斂時(shí)間。圖5給出全局同步下reduce和collect算子在每輪迭代過(guò)程中的平均運(yùn)行時(shí)間。
圖5 全局同步每輪迭代的運(yùn)行時(shí)間
從圖5 可以看到,全局同步迭代輪數(shù)較少,但是由于存在路障限制,全局同步迭代需要等到所有邊分區(qū)的結(jié)果全部到達(dá)才能對(duì)頂點(diǎn)的狀態(tài)進(jìn)行更新。每輪迭代過(guò)程中reduce 算子平均運(yùn)行時(shí)間為12~18 s,collect算子平均運(yùn)行時(shí)間在8~10 s。在已知圖數(shù)據(jù)收斂狀態(tài)情況下,僅考慮每輪迭代過(guò)程中reduce 算子運(yùn)行的時(shí)間,并且只在最后收斂階段使用collect算子將頂點(diǎn)狀態(tài)匯總至driver 計(jì)算收斂結(jié)果,全局同步的平均收斂時(shí)間為335.7 s。
由于初始階段點(diǎn)分區(qū)需要等待邊分區(qū)局部聚合的結(jié)果,導(dǎo)致點(diǎn)分區(qū)發(fā)送數(shù)據(jù)塊的時(shí)間變長(zhǎng),因此圖6 中前后兩個(gè)階段消耗的時(shí)間比其他階段長(zhǎng)。在對(duì)圖數(shù)據(jù)處理多次后,4個(gè)點(diǎn)分區(qū)的平均收斂時(shí)間分別為112.4 s、114.5 s、117.2 s、119.7 s,相同數(shù)據(jù)集下以頂點(diǎn)為中心異步更新的收斂時(shí)間為102.7 s。
圖6 給出4 個(gè)點(diǎn)分區(qū)處理數(shù)據(jù)塊的時(shí)間,起始點(diǎn)設(shè)為點(diǎn)分區(qū)收到數(shù)據(jù)塊的時(shí)間,結(jié)束點(diǎn)為點(diǎn)分區(qū)將處理后的頂點(diǎn)狀態(tài)變化值以數(shù)據(jù)塊的形式發(fā)送給邊分區(qū)的時(shí)間,圖中每個(gè)點(diǎn)分區(qū)對(duì)數(shù)據(jù)塊的接收、更新到發(fā)送時(shí)間集中在0.5~2.5 s。
圖6 異步更新下點(diǎn)分區(qū)對(duì)數(shù)據(jù)塊的處理時(shí)間
全局同步的通信開(kāi)銷(xiāo)主要由每輪迭代過(guò)程中的reduce算子產(chǎn)生,當(dāng)邊分區(qū)在本地的局部聚合全部結(jié)束之后,需要對(duì)所有邊分區(qū)中具有相同索引頂點(diǎn)的局部狀態(tài)作全局聚合,并將結(jié)果發(fā)送給點(diǎn)分區(qū)。統(tǒng)計(jì)發(fā)現(xiàn)全局同步迭代方式下每輪迭代的通信開(kāi)銷(xiāo)在125~132 MB。對(duì)圖數(shù)據(jù)進(jìn)行多次全局同步迭代后,通信量均值為2 850 MB。
相比全局同步要求4 個(gè)邊分區(qū)將數(shù)據(jù)塊匯總后同時(shí)發(fā)送到點(diǎn)分區(qū),以子圖為中心的異步迭代不需要等待其他分區(qū)的局部聚合結(jié)果,能夠直接將邊分區(qū)聚合后的數(shù)據(jù)塊發(fā)送給點(diǎn)分區(qū)。分別對(duì)邊分區(qū)和點(diǎn)分區(qū)發(fā)送的消息量進(jìn)行統(tǒng)計(jì),結(jié)果表明邊分區(qū)給點(diǎn)分區(qū)發(fā)送的數(shù)據(jù)塊大小在3~4 MB,點(diǎn)分區(qū)給邊分區(qū)發(fā)送的數(shù)據(jù)塊大小在2~3 MB。當(dāng)各個(gè)點(diǎn)分區(qū)接近收斂狀態(tài),產(chǎn)生的網(wǎng)絡(luò)通信量共1 950 MB。基于頂點(diǎn)為中心的異步迭代通過(guò)頂點(diǎn)更新次數(shù)統(tǒng)計(jì)網(wǎng)絡(luò)通信開(kāi)銷(xiāo),統(tǒng)計(jì)發(fā)現(xiàn)以頂點(diǎn)為中心的異步迭代方式下每個(gè)頂點(diǎn)平均更新9 次達(dá)到收斂狀態(tài)。對(duì)圖數(shù)據(jù)進(jìn)行多次異步迭代后,網(wǎng)絡(luò)通信量均值為2 520 MB。
根據(jù)以上分析,圖7給出了不同迭代方式下的圖迭代的收斂時(shí)間和通信開(kāi)銷(xiāo)。從圖7可以看到,與全局同步迭代方式相比,以子圖為中心的異步迭代不僅能有效降低收斂速度同時(shí)能提高通信效率。與頂點(diǎn)為中心的異步迭代方式相比,基于子圖為中心的異步更新方式在收斂時(shí)間上雖略有增長(zhǎng),但是能夠顯著降低通信開(kāi)銷(xiāo)。
圖7 不同迭代方式下的收斂時(shí)間和通信開(kāi)銷(xiāo)
圖3 表明全局同步迭代輪數(shù)較少,但是由于存在路障限制,全局同步迭代需要等到所有邊分區(qū)的結(jié)果全部到達(dá)才能對(duì)頂點(diǎn)的狀態(tài)進(jìn)行更新。圖4 中異步更新處理的數(shù)據(jù)塊個(gè)數(shù)雖然較多,但只要迭代算法滿足異步更新條件,點(diǎn)分區(qū)收到任一邊分區(qū)的局部聚合結(jié)果,就能夠立即從緩存隊(duì)列中取出數(shù)據(jù)塊進(jìn)行處理,因此基于子圖的異步更新方式能極大縮短圖的收斂時(shí)間。由于整個(gè)迭代過(guò)程時(shí)間較短,這也使得異步迭代產(chǎn)生的通信量遠(yuǎn)少于全局同步產(chǎn)生的通信量。
另外,以頂點(diǎn)為中心的異步迭代以頂點(diǎn)為單位更新頂點(diǎn)狀態(tài),以子圖為中心的異步迭代以數(shù)據(jù)塊為單位更新頂點(diǎn)狀態(tài),因此以頂點(diǎn)為單位進(jìn)行異步更新能夠更快地加速圖狀態(tài)收斂。相比較于以頂點(diǎn)的異步迭代在收斂時(shí)間上略有增長(zhǎng),基于子圖的異步迭代可以通過(guò)以下方式極大提高通信效率:
(1)大部分網(wǎng)絡(luò)拓?fù)浞膬缏煞植?,網(wǎng)絡(luò)中連接邊的數(shù)量遠(yuǎn)遠(yuǎn)超過(guò)頂點(diǎn)的個(gè)數(shù),因此以頂點(diǎn)為對(duì)象發(fā)送消息的次數(shù)遠(yuǎn)少于跨連接邊發(fā)送消息的次數(shù)[18]。
(2)基于子圖的劃分方式將大量頂點(diǎn)連接邊存放在同一分區(qū),少部分頂點(diǎn)的連接邊分布在不同的邊分區(qū),這種存儲(chǔ)方式不僅減小了消息發(fā)送的次數(shù),基于子圖為中心的異步迭代通過(guò)在邊分區(qū)內(nèi)通過(guò)聚合機(jī)制獲取頂點(diǎn)的局部狀態(tài)后,以批量方式集中將分區(qū)的局部聚合結(jié)果發(fā)送給點(diǎn)分區(qū)。
(3)單個(gè)頂點(diǎn)在同一邊分區(qū)中存在多條連接邊,更新后的頂點(diǎn)狀態(tài)發(fā)往同一邊分區(qū)后在很大程度上能夠?qū)旤c(diǎn)狀態(tài)信息重用,因此進(jìn)一步減少了同一頂點(diǎn)跨越計(jì)算節(jié)點(diǎn)發(fā)送消息的次數(shù)。
Spark 環(huán)境下的Graphx 圖處理系統(tǒng)要求子圖之間的計(jì)算任務(wù)保持全局同步,因此限制了圖迭代的收斂速度[19]。根據(jù)Spark 環(huán)境下圖切分和數(shù)據(jù)存儲(chǔ)的特點(diǎn),本文提出了一種基于子圖的異步迭代更新方法。實(shí)驗(yàn)結(jié)果表明,該方法能夠有效提高圖迭代的收斂速度,同時(shí)降低網(wǎng)絡(luò)通信開(kāi)銷(xiāo)。未來(lái),將對(duì)方法的擴(kuò)展性[20]作進(jìn)一步研究。