任 剛,李 鑫,趙開新,劉小杰,張 陽,郜廣蘭,4
(1.河南工學(xué)院 計(jì)算機(jī)科學(xué)與技術(shù)學(xué)院,河南 新鄉(xiāng) 453003;2.中國礦業(yè)大學(xué) 計(jì)算機(jī)科學(xué)與技術(shù)學(xué)院,江蘇 徐州 221116;3.河南省生產(chǎn)制造物聯(lián)大數(shù)據(jù)工程技術(shù)研究中心,河南 新鄉(xiāng) 453003;4.新鄉(xiāng)市虛擬現(xiàn)實(shí)與系統(tǒng)重點(diǎn)實(shí)驗(yàn)室,河南 新鄉(xiāng) 453003;5.河南工學(xué)院 數(shù)據(jù)挖掘與智能計(jì)算研究所,河南 新鄉(xiāng) 453003)
路段行車時(shí)間(Road Link Travel Time, RLTT)是智能交通應(yīng)用系統(tǒng)的重要基礎(chǔ)數(shù)據(jù)之一,路段行車時(shí)間計(jì)算(Calculation of RLTT, CoRLTT)是指從各類交通數(shù)據(jù)中計(jì)算出路段行車時(shí)間信息的計(jì)算過程,是智能交通應(yīng)用關(guān)鍵支撐技術(shù)之一,也是智能交通領(lǐng)域的熱點(diǎn)研究內(nèi)容之一[1]。近年來,高清攝像頭(High Definition Camera, HDC)開始在城市交通中普遍應(yīng)用。該設(shè)備被廣泛部署于各交通路口,通過視頻識(shí)別技術(shù)記錄通過該路口車輛的車牌、車輛通過時(shí)間等信息,產(chǎn)生大規(guī)模交通卡口過車數(shù)據(jù)集。該類數(shù)據(jù)集為路段行車時(shí)間計(jì)算提供了良好的數(shù)據(jù)源。交通卡口過車數(shù)據(jù)的路段行車時(shí)間計(jì)算方法成為智能交通領(lǐng)域研究熱點(diǎn)之一。
Liu等[2]于2011年較早提出了基于交通卡口過車數(shù)據(jù)的路段行車時(shí)間基本計(jì)算方法。但該算法是串行算法,當(dāng)數(shù)據(jù)規(guī)模較大時(shí),其性能往往不佳。隨后出現(xiàn)的大數(shù)據(jù)并行計(jì)算模型MapReduce[3-5]為該問題解決提供了可能。
MapReduce是一種基于Hadoop磁盤文件系統(tǒng)(Hadoop Distributed File System, HDFS)的大數(shù)據(jù)并行計(jì)算模型,該模型以作業(yè)為基本計(jì)算單位,采用2種操作計(jì)算模式。該模型1個(gè)作業(yè)由map和reduce 2個(gè)操作構(gòu)成,每次作業(yè)完成,計(jì)算結(jié)果會(huì)存入HDFS。張帥等[6]利用MapReduce實(shí)現(xiàn)了交通卡口過車數(shù)據(jù)集上的路段行車時(shí)間并行計(jì)算,該方法利用2次作業(yè)迭代完成了計(jì)算過程,實(shí)驗(yàn)結(jié)果表明,基于MapReduce的路段行車時(shí)間方法大幅提高了計(jì)算效率。但是,由于該算法采用MapReduce模型2次作業(yè)迭代完成計(jì)算過程,在2次作業(yè)迭代之間,中間結(jié)果數(shù)據(jù)需要置于磁盤文件系統(tǒng)HDFS存儲(chǔ),消耗了一定時(shí)間,當(dāng)數(shù)據(jù)規(guī)模較大時(shí),性能具有改善空間。
Spark[7-8]是一種新興的大數(shù)據(jù)并行編程模型,與MapReduce模型相比,Spark改善了作業(yè)的執(zhí)行方式,1個(gè)作業(yè)可以包括多個(gè)數(shù)據(jù)轉(zhuǎn)換操作,避免了MapReduce模型1個(gè)作業(yè)僅有map和reduce 2個(gè)數(shù)據(jù)轉(zhuǎn)換操作的限制,更擅長處理多迭代型問題[9]。因此,利用Spark模型來實(shí)現(xiàn)一個(gè)能夠處理交通卡口過車數(shù)據(jù)的路段行車時(shí)間計(jì)算方法成為一個(gè)迫切需求。
為此,本文研究路段行車時(shí)間計(jì)算串行算法與Spark模型數(shù)據(jù)轉(zhuǎn)換的內(nèi)在關(guān)系,提出基于Spark模型的路段行車時(shí)間計(jì)算方法(Spark-based CoRLTT, Spark-CoRLTT )。最后,通過實(shí)驗(yàn)驗(yàn)證算法的有效性。
Spark是一種基于彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset, RDD)的大數(shù)據(jù)并行計(jì)算模型。RDD是一種存儲(chǔ)于內(nèi)存的只讀數(shù)據(jù)集合,通過map、groupByKey和reduceByKey等轉(zhuǎn)換(Transformation)操作完成RDD轉(zhuǎn)換,其計(jì)算架構(gòu)如圖1所示。該模型從HDFS讀入數(shù)據(jù),形成RDD,通過一系列轉(zhuǎn)換操作,結(jié)果重新寫回HDFS。在利用RDD進(jìn)行迭代操作時(shí),通常采用鍵值對(duì)RDD這種數(shù)據(jù)集形式,該數(shù)據(jù)集元素以(key,value)鍵值對(duì)形式出現(xiàn)。
圖1 Spark計(jì)算架構(gòu)
map輸入為(key1,value1),該操作根據(jù)具體業(yè)務(wù)邏輯將其轉(zhuǎn)換為新的鍵值對(duì)(key2,value2),map操作鍵值對(duì)轉(zhuǎn)換形式定義如下:
map::(key1,value1)→(key2,value2)
(1)
map生成(key2,value2)后,發(fā)送至groupByKey或reduceByKey進(jìn)一步處理。由于groupByKey和reduceByKey是寬依賴轉(zhuǎn)換,Spark系統(tǒng)會(huì)啟動(dòng)shuffle操作,將具有相同key2的value2鏈接起來,形成list(value2),shuffle操作形式定義如下:
shuffle::(key2,value2)→(key2,list(value2))
(2)
reduceByKey操作負(fù)責(zé)將合并具有相同鍵的值,并對(duì)該值進(jìn)行聚合操作,其形式如下:
reduceByKey::(key2,list(value2))→(key3,value3)
(3)
groupByKey操作和reduceByKey類似,接收來自map將輸入鍵值對(duì)按鍵值分組,相同的鍵值會(huì)被分到同一組,不同的是group不進(jìn)行聚合操作。
過車數(shù)據(jù)一般包含4列數(shù)據(jù),表1列出一組樣本數(shù)據(jù),一條記錄可形式化表示為:
表1 交通卡口過車樣本
r=(id,vehicle,hdc,timei)
(4)
其中,id為記錄關(guān)鍵字;vehicle為車牌;time為過車時(shí)間;hdc為高清攝像頭。1個(gè)具有n條記錄的過車數(shù)據(jù)集可表示為:
R={r0,r1, …,rn-1}
(5)
其中,ri=(idi,vehiclei,hdci,timei)
定義1:車輛出行軌跡
一條車輛出行軌跡由多個(gè)hdc和通過時(shí)間序列構(gòu)成,形式化表示如下:
{(hdc0,time0), …, (hdcn-1,timen-1)}
(6)
Spark計(jì)算架構(gòu)如圖1所示。
定義2:路段
一條路段是位于兩個(gè)hdc之間的路徑,可形式化定義為:
link=
(7)
定義3:路段行車時(shí)間
路段行車時(shí)間是在一段時(shí)間內(nèi)通過該路段所有車輛行車時(shí)間平均值。假設(shè)在計(jì)算周期內(nèi)有n輛通過路段linki,則路段行車時(shí)間可形式化定義為:
(8)
其中,timevehiclej為車輛vehiclej在路段linki上的行車時(shí)間?;谏鲜龆x,路段行車時(shí)間串行計(jì)算方法可分為如下4個(gè)階段。
階段1:按車牌分組
具有相同車牌的車輛被分為相同組,1個(gè)數(shù)據(jù)集R就被分為多個(gè)子數(shù)據(jù)集,形式表示如下:
R=∪Rvehiclei
(9)
其中,
Rvehiclei= {(rj∈rj∈R,vehiclej=vehiclei)}
(10)
階段2:按過車時(shí)間排序
集合Rvehiclei按過車時(shí)間排序,形成vehiclei的1次出行軌跡,形式如下:
Rvehiclei={(hdc0,time0),…, (hdcn-1,timen-1)}
(11)
階段3: 按路段切割車輛軌跡
將車輛軌跡Rvehiclei按照相鄰卡口進(jìn)行切割,形成路段行車時(shí)間集合TT,形式如下:
TT={tt0,tt1,…,ttn-1}
(12)
其中,
tti=(
(13)
階段4:按路段分組并計(jì)算RLTT
將路段行車時(shí)間集合TT按照路段分組,可形成子集TTlinki,每個(gè)子集包含該路段的所有行車時(shí)間,形式如下:
TT=∪TTlinki
(14)
其中,
TTlinki={(ttj|ttj∈TT,linkj=linki)}
(15)
計(jì)算TTlinki均值,該值即是linki的RLTT,形式表示如下:
(16)
其中,j為TTlinki中元素的個(gè)數(shù)。
根據(jù)前述串行計(jì)算方法,并結(jié)合Spark計(jì)算模型數(shù)據(jù)轉(zhuǎn)換操作特點(diǎn),研究組提出路段行車時(shí)間串行計(jì)算過程到Spark數(shù)據(jù)轉(zhuǎn)換映射關(guān)系。如圖2所示,階段1按車牌分組由一個(gè)map實(shí)現(xiàn),稱為First map;階段2按過車時(shí)間排序由groupByKey實(shí)現(xiàn);階段3按路段分割車輛軌跡由另一個(gè)map實(shí)現(xiàn),稱為Second map;階段4按路段分組并計(jì)算路段行車時(shí)間由reduceByKey實(shí)現(xiàn)。
圖2 Spark-RLTT算法映射關(guān)系
根據(jù)上述映射關(guān)系,研究組提出基于Spark-CoRLTT并行架構(gòu),如圖3所示。
圖3 Spark-RLTT并行架構(gòu)
First map讀取原始數(shù)據(jù)集R,每行標(biāo)記為(key1,value1),然后將其轉(zhuǎn)化(key2,value2)健值對(duì)。(key1,value1)鍵值對(duì)形式化表示為:
key=i
(17)
value1= (idi,vehiclei,hdci,timei)
(18)
其中,key1為行號(hào);value1為數(shù)據(jù)集R的1行。生成(key1,value1)后,取出value1中的vehiclei賦予key2,取出hdci和timei賦予value2,形式化表示如下:
key2=vehiclei
(19)
value2=
(20)
主要實(shí)現(xiàn)代碼如下:
算法1: First map of Spark-CoRLTTInput:(key1,value1)Output:(key2,value2)01key2=value1.getVehicle();02hdc=value1.getHDC();03time=value1.getTime();04value2=
生成(key2,value2)后,Spark模型自動(dòng)啟動(dòng)shuffle過程,相同key2的value2被連接為list(value2),這樣,groupByKey操作實(shí)際輸入為(key2, list(value2))。groupByKey將key2直接賦予key3,根據(jù)time排序,形成value3。具體表示如下:
key3=key2
(21)
value3=(
(22)
主要實(shí)現(xiàn)代碼實(shí)現(xiàn)如下所示:
算法2: groupByKey of Spark-CoRLTTInput: (key2, list(value2))Output: (key3, value3)01key3 = key2;02value3 = sortByTime(list(value2));03Emit(key3, value3);
該操作按路段切割車輛的出行軌跡,形成路段和行車時(shí)間的鍵值對(duì)。輸入是(key3,value3),首先取出value3中相鄰元素的hdc,形成key4,取出相鄰元素的time求差值,將該值賦予value4,形式表示如下:
key4=
(23)
value4=timej-timej+1
(24)
主要實(shí)現(xiàn)代碼如下:
算法3: Second map of Spark-CoRLTTInput: (key3, value3)Output: (key4, value4)01for (i=0; i++; i < value3.getSize - 2)02 firstElement=value3.getElement(i);03 secondElement=value3.getElement(i +1);04 hdc1=firstElement.getHDC();05 time1=firstElement.getTime();06 hdc2=secondElement.getHDC();07 time2=secondElement.getTime();08 key4 =
生成(key4,value4)后,Spark模型啟動(dòng)shuffle過程,相同的key4的value4被連接為新的鍵值對(duì)(key4, list(value4))。
該操作用于將相同路段的行車時(shí)間規(guī)約為一組數(shù)據(jù)集,并求平均數(shù),計(jì)算出路段行車時(shí)間。reduceByKey輸入為(key4, list(value4)),鍵值保持不變,遍歷value4中所有元素,求均值賦給value5,形式表示如下:
key5=
(25)
(26)
主要實(shí)現(xiàn)代碼如下:
實(shí)驗(yàn)用Spark集群包含8個(gè)計(jì)算節(jié)點(diǎn),每個(gè)計(jì)算節(jié)點(diǎn)由12核2.1 GHz處理器、16 GB內(nèi)存和1 T硬盤構(gòu)成。經(jīng)典串行算法記為Serial-CoRLTT,基于MapReduce的RLTT計(jì)算方法記為MR-CoRLTT,提出的基于Spark模型的RLTT計(jì)算方法記為Spark-CoRLTT。實(shí)驗(yàn)數(shù)據(jù)來自我國西南某省會(huì)城市157個(gè)交通卡口,該城市高峰時(shí)期每小時(shí)可產(chǎn)生600萬條過車記錄,規(guī)模約9 GB,每天可產(chǎn)生2 000萬條過車記錄,規(guī)模約30GB。
分析該實(shí)驗(yàn)數(shù)據(jù)規(guī)模對(duì)計(jì)算時(shí)間的影響。Serial-CoRLTT計(jì)算節(jié)點(diǎn)設(shè)置為1,MR-CoRLTT和Spark-CoRLTT計(jì)算節(jié)點(diǎn)設(shè)置為4。過車記錄從600萬條逐步增加到2 000萬條。實(shí)驗(yàn)結(jié)果如圖4所示,可以看出,隨著數(shù)據(jù)集規(guī)模增加,3種算法計(jì)算時(shí)間都隨之增長,Serial-CoRLTT計(jì)算時(shí)間總是最多,說明不管是基于MapReduce模型的并行計(jì)算方法還是基于Spark模型的并行計(jì)算方法,計(jì)算性能都明顯優(yōu)于傳統(tǒng)串行計(jì)算方法。
其次,提出的Spark-CoRLTT算法計(jì)算時(shí)間總是小于MR-CoRLTT算法計(jì)算時(shí)間,這說明提出的Spark-CoRLTT算法性能優(yōu)于現(xiàn)有MR-CoRLTT算法。其原因在于提出的Spark-CoRLTT算法由1次作業(yè)就能完成整個(gè)計(jì)算過程,而傳統(tǒng)的MR-CoRLTT算法由2次作業(yè)完成整個(gè)計(jì)算過程,2次作業(yè)之間大量的數(shù)據(jù)存儲(chǔ)時(shí)間消耗了計(jì)算時(shí)間,這說明了本文算法的有效性。還能看出,隨著數(shù)據(jù)規(guī)模的增大,提出的Spark-CoRLTT效率優(yōu)勢(shì)更加明顯,其原因在于隨著數(shù)據(jù)規(guī)模的增加,傳統(tǒng)MR-CoRLTT算法所消耗的磁盤存儲(chǔ)越多,造成計(jì)算時(shí)間越長,效率也越低。
該實(shí)驗(yàn)分析計(jì)算時(shí)間和計(jì)算節(jié)點(diǎn)數(shù)的關(guān)系。計(jì)算節(jié)點(diǎn)數(shù)從1增加到8。數(shù)據(jù)規(guī)模設(shè)置為2千萬條,保持不變。實(shí)驗(yàn)結(jié)果如圖5所示,可以看出:首先,隨著計(jì)算節(jié)點(diǎn)的增加,提出的Spark-CoRLTT和MR-CoRLTT計(jì)算時(shí)間都隨之減少,這說明2種算法都具有較好的計(jì)算節(jié)點(diǎn)擴(kuò)展性。其次,提出的Spark-CoRLTT算法計(jì)算時(shí)間總是小于MR-CoRLTT計(jì)算時(shí)間,這說明本文提出算法計(jì)算性能優(yōu)于原算法,其原因在于提出的算法1次作業(yè)即可完成整個(gè)計(jì)算過程,較之原算法少了1次數(shù)據(jù)存儲(chǔ)時(shí)間。最后,還應(yīng)該注意到,隨著節(jié)點(diǎn)數(shù)的增加,下降幅度逐步減少,這是因?yàn)槊總€(gè)并行算法都有計(jì)算下限。
圖5 計(jì)算節(jié)點(diǎn)擴(kuò)展性
本文提出了一種基于Spark模型的路段行車時(shí)間計(jì)算方法Spark-CoRLTT,實(shí)驗(yàn)表明,較之傳統(tǒng)基于MapReduce模型的計(jì)算方法,文中提出的Spark-CoRLTT計(jì)算效率明顯提升。