侯敬儒 吳 晟 李英娜
(昆明理工大學(xué)信息工程與自動(dòng)化學(xué)院 昆明 650500)
相比離線計(jì)算,在線學(xué)習(xí)是以對(duì)訓(xùn)練數(shù)據(jù)通過(guò)完全增量[1]的形式順序處理一遍為基礎(chǔ)(就是說(shuō),一次只訓(xùn)練一個(gè)樣例)。當(dāng)處理完每一個(gè)訓(xùn)練樣本,模型會(huì)對(duì)測(cè)試樣例做預(yù)測(cè)并得到正確的輸出(如得到分類(lèi)的標(biāo)簽后者回歸的真實(shí)目標(biāo))。在線學(xué)習(xí)背后的想法就是模型隨著接收到新的消息不斷更新自己[2~3],而不是像離線訓(xùn)練一次次重新訓(xùn)練。
在完全在線環(huán)境下,我們不會(huì)(或者也許不能)對(duì)整個(gè)訓(xùn)練集做多次訓(xùn)練,因此當(dāng)輸入到達(dá)時(shí)我們需要立刻處理。在線方法還包括小批量離線方法[4],并不是每次處理一個(gè)輸入,而是每次一個(gè)小批量的訓(xùn)練數(shù)據(jù)。Spark Streaming[5]作為 Spark 框架中專(zhuān)門(mén)處理實(shí)時(shí)數(shù)據(jù)的模塊,就是采用這種微批處理思想來(lái)達(dá)到準(zhǔn)實(shí)時(shí)計(jì)算的目的[6]。
聚類(lèi)分析[7]是一種數(shù)據(jù)挖掘領(lǐng)域中常用的無(wú)監(jiān)督學(xué)習(xí)模型,旨在發(fā)現(xiàn)緊密相關(guān)的觀測(cè)值群組,使得與屬于不同簇的觀測(cè)值相比,屬于同一簇的觀測(cè)值相互之間盡可能相似[8],在識(shí)別數(shù)據(jù)的內(nèi)在結(jié)構(gòu)方面具有極其重要的作用[9]。但其復(fù)雜度一般較高,所以單機(jī)環(huán)境執(zhí)行存在低吞吐、高延遲等問(wèn)題,難以適應(yīng)當(dāng)下大數(shù)據(jù)環(huán)境,故而通過(guò)分布式并行計(jì)算來(lái)解決此類(lèi)問(wèn)題。文章采用了KMeans聚類(lèi)算法,實(shí)現(xiàn)KMeans算法的并行化,并研究設(shè)計(jì)實(shí)現(xiàn)了基于Spark Streaming的在線KMeans聚類(lèi)模型。
Spark Streaming是Spark計(jì)算框架中專(zhuān)門(mén)處理實(shí)時(shí)數(shù)據(jù)流的模塊,它能實(shí)現(xiàn)對(duì)實(shí)時(shí)流數(shù)據(jù)的高吞吐、高容錯(cuò)率[10]的流處理。
Spark Streaming的處理流程是當(dāng)Spark Stream?ing接收到來(lái)自數(shù)據(jù)源的實(shí)時(shí)輸入數(shù)據(jù)后,將數(shù)據(jù)按照指定大小劃分為若干個(gè)數(shù)據(jù)片段DStream(DStream是Spark Streaming的基本抽象,是一系列RDDs組成),并將每個(gè)片段轉(zhuǎn)化為RDDs,然后由Spark Enigne對(duì)DStream中的RDDs進(jìn)行批處理,將RDD經(jīng)過(guò)變換等操作后的中間結(jié)果緩存在內(nèi)存中,最終實(shí)現(xiàn)大規(guī)模的流處理。Spark Streaming的處理流程圖如圖1所示。
圖1 Spark Streaming的處理流程圖
目前,Spark Streaming支持最小的Batch Size是0.5s~2s之間,所以Spark Streaming能滿足除對(duì)實(shí)時(shí)性要求非常高之外的所有流式實(shí)時(shí)處理要求。
使用Spark Streaming編寫(xiě)的程序與編寫(xiě)Spark程序非常相似,在Spark程序中,主要通過(guò)操作RDD(Resilient Distributed Datasets,彈性分布式數(shù)據(jù)集)提供的接口,如map、reduce、filter等,實(shí)現(xiàn)數(shù)據(jù)的批處理。而在Spark Streaming中,則通過(guò)操作DStream(表示數(shù)據(jù)流的RDD序列)[11]提供的接口,這些接口和RDD提供的接口類(lèi)似。圖2和圖3展示了由Spark Streaming程序到Spark jobs的轉(zhuǎn)換圖。
在圖2中,Spark Streaming把程序中對(duì)DStream的操作轉(zhuǎn)換為DStream Graph,圖3中,對(duì)于每個(gè)時(shí)間片,DStream Graph都會(huì)產(chǎn)生一個(gè)RDD Graph;針對(duì)每個(gè)輸出操作(如 print、foreach等),Spark Streaming都會(huì)創(chuàng)建一個(gè)Spark action;對(duì)于每個(gè)Spark action,Spark Streaming都會(huì)產(chǎn)生一個(gè)相應(yīng)的Spark job,并交給 JobManager。JobManager中維護(hù)著一個(gè)Jobs隊(duì)列,Spark job存儲(chǔ)在這個(gè)隊(duì)列中,JobManager把 Spark job 提 交 給 Spark Scheduler,Spark Scheduler負(fù)責(zé)調(diào)度Task到相應(yīng)的Spark Ex?ecutor上執(zhí)行。
圖2 Spark Streaming程序轉(zhuǎn)換為DStream Graph
Spark Streaming的另一大優(yōu)勢(shì)在于其容錯(cuò)性,RDD會(huì)記住創(chuàng)建自己的操作,每一批輸入數(shù)據(jù)都會(huì)在內(nèi)存中備份,如果由于某個(gè)結(jié)點(diǎn)故障導(dǎo)致該結(jié)點(diǎn)上的數(shù)據(jù)丟失,這時(shí)可以通過(guò)備份的數(shù)據(jù)在其它結(jié)點(diǎn)上重算得到最終的結(jié)果。
正如Spark Streaming最初的目標(biāo)一樣,它通過(guò)豐富的API和基于內(nèi)存的高速計(jì)算引擎讓用戶可以結(jié)合流式處理,批處理和交互查詢等應(yīng)用。因此Spark Streaming適合一些需要?dú)v史數(shù)據(jù)和實(shí)時(shí)數(shù)據(jù)結(jié)合分析的應(yīng)用場(chǎng)合[12]。當(dāng)然,對(duì)于實(shí)時(shí)性要求不是特別高的應(yīng)用也能完全勝任。另外通過(guò)RDD的數(shù)據(jù)重用機(jī)制可以得到更高效的容錯(cuò)處理。
KMeans均值算法的思想一般是先初始化隨機(jī)給定K個(gè)clusters(簇)中心,將待分類(lèi)的樣本數(shù)據(jù)點(diǎn)依照最近鄰原則分配到各個(gè)cluster中。之后以一定法則重新計(jì)算各個(gè)cluster的質(zhì)心,以確定新的cluster。不停的循環(huán)計(jì)算,直到cluster的質(zhì)心移動(dòng)距離小于某一個(gè)實(shí)際給定的具體值。
KMeans均值聚類(lèi)算法“三步走”如下簡(jiǎn)介:1)尋找待聚類(lèi)的樣本數(shù)據(jù)點(diǎn)的聚類(lèi)中心。
2)計(jì)算每個(gè)樣本數(shù)據(jù)點(diǎn)到之前尋找到的聚類(lèi)中心的距離,將每個(gè)樣本數(shù)據(jù)點(diǎn)聚類(lèi)到離該樣本數(shù)據(jù)點(diǎn)最近的聚類(lèi)中去。
3)計(jì)算每個(gè)聚類(lèi)中所有樣本數(shù)據(jù)點(diǎn)坐標(biāo)的平均值,然后將這個(gè)計(jì)算的平均值作為新的聚類(lèi)中心。
循環(huán)執(zhí)行第2)、3)步,一直計(jì)算到新的聚類(lèi)中心移動(dòng)距離小于某個(gè)給定的具體值或者聚類(lèi)次數(shù)達(dá)到了設(shè)定值為止。
3.2.1 數(shù)據(jù)接入模塊
文章利用分布式“準(zhǔn)”實(shí)時(shí)計(jì)算框架-Spark Streaming進(jìn)行實(shí)時(shí)數(shù)據(jù)的KMeans在線聚類(lèi),以下是數(shù)據(jù)接入部分的實(shí)現(xiàn)。
通過(guò)StreamingContext實(shí)例調(diào)用socketText?Stream方法實(shí)現(xiàn)數(shù)據(jù)接入。該方法會(huì)從一個(gè)TCP源創(chuàng)建一個(gè)輸入數(shù)據(jù)流。其使用TCP套接字接收數(shù)據(jù),并且以UTF-8編碼方式、換行符( )為分隔符接收字節(jié)。socketTextStream需要傳遞三個(gè)參數(shù),分別是hostname(接收數(shù)據(jù)的主機(jī)名)、port(接收數(shù)據(jù)的端口號(hào))、storageLevel(用做存儲(chǔ)接收數(shù)據(jù)的存儲(chǔ)級(jí)別,默認(rèn)是MEMORY_AND_DISK_SER_2級(jí)別)。以下是socketTextStream方法的詳細(xì)定義。
def socketTextStream(
hostname:String,
port:Int,
storageLevel:StorageLevel=StorageLevel.MEM?ORY_AND_DISK_SER_2
):ReceiverInputDStream[String]=withNamed?Scope(“socket text stream”){
socketStream[String](hostname,port,SocketRe?ceiver.bytesToLines,storageLevel)
}
調(diào)用示例:val trainingData=ssc.socketTextStream(“192.168.1.100”,8341,StorageLevel.MEMO?RY_AND_DISK_SER_2).map(Vectors.parse)
3.2.2 主函數(shù)
作為程序入口,主函數(shù)的任務(wù)是設(shè)置Job執(zhí)行的相關(guān)參數(shù),步驟如下:
1)設(shè)置程序運(yùn)行名稱(chēng)、執(zhí)行集群的Master、相關(guān)jar包等實(shí)例化SparkConf對(duì)象
val conf=new SparkConf()
.setAppName(“StreamingKMeansModel”)
.setMaster(“yarn-cluster”)//spark://hjr:7077
2)通過(guò)SparkConf對(duì)象和batchDuration參數(shù)實(shí)例化StreamingContext對(duì)象
val ssc=new StreamingContext(conf,Seconds(args(2).toLong))
3)通過(guò) StreamingContext的對(duì)象的 socketText?Stream方法實(shí)現(xiàn)數(shù)據(jù)接入
val trainingData = ssc.socketTextStream(“192.168.1.100”,8341,StorageLevel.MEMO?RY_AND_DISK_SER_2)。map(Vectors.parse)
4)設(shè)置相關(guān)參數(shù),實(shí)例化StreamingKMeans模型
val model=new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt,0.0)
5)使用實(shí)時(shí)數(shù)據(jù)訓(xùn)練模型
model.trainOn(trainingData)
6)啟動(dòng)流執(zhí)行,并等待執(zhí)行停止
ssc.start()
ssc.awaitTermination()
文章實(shí)驗(yàn)基于QJM(Quorum Journal Manager)下的HA(High Available)大數(shù)據(jù)平臺(tái),其中,以Ha?doop的HDFS為基礎(chǔ)存儲(chǔ)框架,主要以Spark為計(jì)算框架,使用Zookeeper統(tǒng)籌HA下的大數(shù)據(jù)平臺(tái),管理整個(gè)集群配置。平臺(tái)包括2個(gè)Master節(jié)點(diǎn)和3個(gè)Worker節(jié)點(diǎn),節(jié)點(diǎn)之間局域網(wǎng)連接;平臺(tái)的資源管理和任務(wù)調(diào)度采用Yarn模式。
集群網(wǎng)絡(luò)配置如表1所示。
表1 網(wǎng)絡(luò)配置
集群的各個(gè)服務(wù)組件配置情況如表2所示。
文章這組實(shí)驗(yàn)是比對(duì)Java單機(jī)環(huán)境和基于Spark的分布式集群環(huán)境的吞吐能力,吞吐量(吞吐能力測(cè)試指標(biāo))是指系統(tǒng)在單位時(shí)間內(nèi)處理數(shù)據(jù)的數(shù)量。實(shí)驗(yàn)不停的增加Application執(zhí)行的數(shù)據(jù)量,步長(zhǎng)為50萬(wàn)條記錄,統(tǒng)計(jì)數(shù)據(jù)處理完成所需時(shí)間,單位為秒(s)。考慮到結(jié)果的準(zhǔn)確度,此組實(shí)驗(yàn)數(shù)據(jù)均是經(jīng)過(guò)3次試驗(yàn)結(jié)果的平均值,具體如圖4所示。
表2 各個(gè)服務(wù)組件版本配置信息
圖4 Java單機(jī)和Spark集群運(yùn)行時(shí)間對(duì)比
由圖4實(shí)驗(yàn)結(jié)果分析:剛開(kāi)始數(shù)據(jù)量較小的時(shí)候,Spark集群計(jì)算需要消耗更多的時(shí)間,這種結(jié)果的原因是Spark Application在分布式集群運(yùn)行的時(shí)候需要通過(guò)網(wǎng)絡(luò)傳輸數(shù)據(jù)、分發(fā)任務(wù),故而消耗了一部分集群資源和時(shí)間。
隨著數(shù)據(jù)量的成倍增長(zhǎng),Spark分布式集群的處理時(shí)間顯著縮短。這是因?yàn)樵赟park內(nèi)部計(jì)算Jobs是由不同的executor(一個(gè)執(zhí)行Task的容器)在集群的worker計(jì)算節(jié)點(diǎn)中分布式執(zhí)行。由圖4分析,當(dāng)數(shù)據(jù)量稍大時(shí),Java單機(jī)環(huán)境下已經(jīng)需要很長(zhǎng)時(shí)間了,可見(jiàn)若數(shù)據(jù)量呈現(xiàn)指數(shù)級(jí)增長(zhǎng)時(shí),Java單機(jī)處理時(shí)間將不能夠忍受,但是基于Spark的分布式集群規(guī)模還可以繼續(xù)橫向擴(kuò)展,對(duì)于大數(shù)據(jù)量的處理需求依然游刃有余。
該組實(shí)驗(yàn)中,定義的數(shù)據(jù)延遲是從一條數(shù)據(jù)(Event)發(fā)出到其被完全處理的時(shí)間,文章通過(guò)Spark Streaming的Web UI界面查看處理延遲。數(shù)據(jù)處理延遲和處理模塊的并行度有關(guān),也就是該實(shí)驗(yàn)下集群的Worker數(shù)量有關(guān),Worker節(jié)點(diǎn)數(shù)越多,Spark Streaming接受的Events等待被處理的時(shí)間就越短,故而延遲就越小,以下是對(duì)比實(shí)驗(yàn)。
將Spark Streaming的數(shù)據(jù)源設(shè)置為2,Receiv?ers設(shè)置為1,程序分別在Worker節(jié)點(diǎn)數(shù)為1、3上執(zhí)行情況如圖5~6所示。
圖5 Worker數(shù)量為1
圖6 Worker數(shù)量為3
從上圖可以看出:當(dāng)在線KMeans聚類(lèi)模型的數(shù)據(jù)源數(shù)目為2,Worker數(shù)為1時(shí),會(huì)有部分?jǐn)?shù)據(jù)處理延遲在100ms~800ms之間,這是因?yàn)楫?dāng)數(shù)據(jù)源數(shù)目為2,兩個(gè)數(shù)據(jù)源不停生成新數(shù)據(jù),而在線KMeans聚類(lèi)模型不能夠及時(shí)處理數(shù)據(jù),從而數(shù)據(jù)會(huì)有些許積累,因而會(huì)有部分?jǐn)?shù)據(jù)處理延遲。當(dāng)在線KMeans聚類(lèi)模型的并行度增加到3(Worker數(shù)量)時(shí),絕大部分的處理延遲都在50ms以內(nèi),這就說(shuō)明兩個(gè)數(shù)據(jù)源產(chǎn)生的數(shù)據(jù)被Spark Streaming很快處理完了。
根據(jù)業(yè)務(wù)實(shí)際情況設(shè)置數(shù)據(jù)源數(shù)目和Spark Streaming的并行度以及任務(wù)需要的CPU Cores、Memory等是優(yōu)化Spark Streaming性能的重要途徑,要根據(jù)計(jì)算任務(wù)合理分配,最大限度發(fā)揮了Spark Sreaming的并行處理能力。
基于在線計(jì)算與機(jī)器學(xué)習(xí)的結(jié)合使用需求,引入了Spark Streaming準(zhǔn)實(shí)時(shí)計(jì)算框架,設(shè)計(jì)并編程實(shí)現(xiàn)了基于Spark Streaming的KMeans在線聚類(lèi)模型,最后通過(guò)集群吞吐、處理延遲分析了集群的性能,集群運(yùn)行狀況良好。
接下來(lái)的工作中,將對(duì)Spark2.0的Structured Streaming、流數(shù)據(jù)處理結(jié)果的可視化展開(kāi)研究,從而達(dá)到數(shù)據(jù)流的產(chǎn)品化。
[1]趙玲玲,劉杰,王偉.基于Spark的流程化機(jī)器學(xué)習(xí)分析方法[J].計(jì)算機(jī)系統(tǒng)應(yīng)用,2016,25(12):162-168.ZHAO Lingling,LIU Jie,WANG Wei.Method of Imple?ment Machine Learning Analysis with Workflow Based on Spark Platform[J].Computer Systems&Applications,2016,25(12):162-168.
[2]金瑜.在線學(xué)習(xí)算法及其應(yīng)用研究[D].西安:西安電子科技大學(xué),2012.JIN Yu.Study of Online Learning Algorithms with Applica?tions[D].Xi'an:Xidian University,2012.
[3]吳健,陳克寒,吉利川,等.一種基于用戶聚類(lèi)的sky line在線計(jì)算方法:CN,CN 103150336 A[P].2013.WU Jian,CHEN Kehan,JI Lichuan,et al.A kind of based on user clustering sky line online calculation method:CN,CN 103150336 A[P].2013.
[4]武海麗,李彩玲.基于Google云計(jì)算的在線學(xué)習(xí)系統(tǒng)設(shè)計(jì)研究[J].山西煤炭管理干部學(xué)院學(xué)報(bào),2016,29(4):214-216.WU Haili,LI Cailing.The online learning system design based on Google's cloud computing research[J].Journal of Shanxi Coal-Mining Administrators College,2016,29(4):214-216.
[5]賴(lài)小平.基于Logistic回歸的在線廣告并行運(yùn)算模型[J].計(jì)算機(jī)工程,2015,41(8):42-45.LAI Xiaoping.Online Advertising Parallel Operation Mod?el Based on Logistic Regression[J].Computer Engineer?ing,2015,41(8):42-45.
[6]尹清波,王慧強(qiáng),張汝波,等.半監(jiān)督在線增量自學(xué)習(xí)異常檢測(cè)方法研究[J].計(jì)算機(jī)研究與發(fā)展,2006(z2):419-424.YIN Qingbo,WANG Huiqiang,ZhANG Rubo,et al.Semi-Supervised Increment Anomaly Detection[J].Jour?nal of Computer Research and Development,2006(z2):419-424.
[7]張賢德.基于Spark平臺(tái)的實(shí)時(shí)流計(jì)算推薦系統(tǒng)的研究與實(shí)現(xiàn)[D].鎮(zhèn)江:江蘇大學(xué),2016.ZHANG Xiande.Research and Implementation of Real Time Stream Computing Recommendation System Based on Spark Platform[D].Zhenjiang:Jiangsu University,2016.
[8]岑凱倫,于紅巖,楊騰霄.大數(shù)據(jù)下基于Spark的電商實(shí)時(shí)推薦系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)[J].現(xiàn)代計(jì)算機(jī),2016(24):61-69.CEN Kailun,YU Hongyan,YANG Tengxiao.Design and Implement of E-Commerce Real-Time Recommender Sys?tem with Spark Based on Big Data[J].Modern Computer,2016(24):61-69.
[9]胡俊,胡賢德,程家興.基于Spark的大數(shù)據(jù)混合計(jì)算模型[J].計(jì)算機(jī)系統(tǒng)應(yīng)用,2015,24(4):214-218.HU Jun,HU Xiande,CHEN Jiaxing.Big Data Hybrid Computing Mode Based on Spark[J].Computer Systems&Applications,2015,24(4):214-218.
[10]陳僑安,李峰,曹越,等.基于運(yùn)行數(shù)據(jù)分析的Spark任務(wù)參數(shù)優(yōu)化[J].計(jì)算機(jī)工程與科學(xué),2016,38(1):11-19.CHEN Qiaoan,LI Feng,CAO Yue,et al.Parameter opti?mization for Spark jobs based on runtime data analysis[J].Computer Engineering and Science,2016,38(1):11-19.
[11]薛瑞,朱曉民.基于Spark Streaming的實(shí)時(shí)日志處理平臺(tái)設(shè)計(jì)與實(shí)現(xiàn)[J].電信工程技術(shù)與標(biāo)準(zhǔn)化,2015(9):55-58.XUE Rui,ZHU Xiaomin.Real-time log processing sys?tem based on spark streaming[J].Telecom Engineering Technics and Standardization,2015(9):55-58.
[12]方峰,蔡志平,肇啟佳,等.使用Spark Streaming的自適應(yīng)實(shí)時(shí)DDoS檢測(cè)和防御技術(shù)[J].計(jì)算機(jī)科學(xué)與探索,2016,10(5):601-611.FANG Feng,CAI Zhiping,ZHAO Qijia,et al.Adaptive Technique for Real-Time DDoS Detection and Defense Using Spark Streaming[J].Journal of Frontiers of Com?puter Science&Technology,2016,10(5):601-611.