吳云龍,李玲娟
(南京郵電大學(xué) 計(jì)算機(jī)學(xué)院,江蘇 南京 210023)
入侵檢測(cè)系統(tǒng)(intrusion detection systems,IDS)[1]在保障企業(yè)網(wǎng)絡(luò)的安全性方面扮演著至關(guān)重要的角色。為了有效監(jiān)測(cè)訪問行為的異常與否,數(shù)據(jù)挖掘技術(shù)被廣泛應(yīng)用到入侵檢測(cè)系統(tǒng)中。數(shù)據(jù)挖掘算法分為兩類:有監(jiān)督和無監(jiān)督[2]。無監(jiān)督的聚類算法有K-Means、模糊C均值聚類(fuzzy c-means,F(xiàn)CM)等。FCM[3]在傳統(tǒng)C-均值算法中添加隸屬度矩陣來判定某條數(shù)據(jù)所屬類別。這種算法在運(yùn)算過程中需要多次迭代,當(dāng)數(shù)據(jù)量特別大或者數(shù)據(jù)樣本屬性很多的時(shí)候,運(yùn)算速度會(huì)大大降低。為了提高FCM算法對(duì)大數(shù)據(jù)的聚類效率,設(shè)計(jì)了FCM算法基于具有實(shí)時(shí)計(jì)算特點(diǎn)的大數(shù)據(jù)計(jì)算平臺(tái)Spark[4]的并行化實(shí)現(xiàn)方法。該方法采用HDFS分布式存儲(chǔ)底層數(shù)據(jù),運(yùn)用RDD對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換,用持久化技術(shù)進(jìn)行中間結(jié)果的重用。為了檢驗(yàn)該并行化FCM算法的有效性,用該算法對(duì)KDD CUP 99數(shù)據(jù)集聚類來進(jìn)行入侵檢測(cè),首先對(duì)入侵檢測(cè)數(shù)據(jù)集KDD CUP 99[5]進(jìn)行了特征值提取等預(yù)處理,然后利用實(shí)現(xiàn)的FCM算法對(duì)訓(xùn)練數(shù)據(jù)集聚類得到聚類中心,再用測(cè)試數(shù)據(jù)集進(jìn)行聚類效果檢驗(yàn)。
聚類分析可以在大量數(shù)據(jù)中挖掘隱藏的規(guī)律[6]。從聚類的結(jié)果來看,聚類算法可以分為兩大類:軟聚類和硬聚類。硬聚類是把某一個(gè)數(shù)據(jù)點(diǎn)明確劃分到某一個(gè)類;軟聚類是將樣本個(gè)體通過隸屬度標(biāo)識(shí)出與各個(gè)類簇的隸屬關(guān)系[7],模糊C均值聚類算法FCM是經(jīng)典的軟聚類算法[8]。模糊的概念是指,用[0,1]之間的隸屬度來確定某個(gè)點(diǎn)屬于某個(gè)類的程度。每條數(shù)據(jù)對(duì)所有類中心點(diǎn)的隸屬度之和為1。FCM算法的步驟[9]如下:
步驟1:指定分類數(shù)和加權(quán)指數(shù)m,確定迭代次數(shù),隨機(jī)生成初始化隸屬度矩陣并滿足式1的約束條件。
(1)
其中,j表示第j條數(shù)據(jù);uij表示第j條數(shù)據(jù)隸屬于第i個(gè)中心點(diǎn)的隸屬度,取值范圍為[0,1]。
步驟2:按聚類中心的迭代公式(見式2),根據(jù)uij構(gòu)成的隸屬度矩陣U,計(jì)算聚類中心。
(2)
其中,m是一個(gè)加權(quán)指數(shù),可以控制聚類結(jié)果的模糊程度,取值范圍為m∈(1,∞)。
步驟3:按式3計(jì)算目標(biāo)函數(shù)J。
(3)
其中,dij=‖ci-xj‖,為第j條數(shù)據(jù)與第i個(gè)中心點(diǎn)的歐氏距離。
如果相鄰兩次迭代趨于收斂,小于指定閾值或者超過指定迭代次數(shù),則結(jié)束;否則執(zhí)行步驟4。
步驟4:根據(jù)新的聚類中心,按隸屬度的迭代公式(式4)重新計(jì)算隸屬度矩陣U,并返回到步驟2。
(4)
Spark是大規(guī)模數(shù)據(jù)的通用分布式并行計(jì)算平臺(tái)。與Hadoop MapReduce編程模型不同,Spark利用RDD(resilient distributed datasets)進(jìn)行計(jì)算,運(yùn)算過程中的輸出結(jié)果可以直接存于內(nèi)存中。Spark以其分布式內(nèi)存計(jì)算的特點(diǎn),可以提升需要多次迭代的算法的效率。
RDD是Spark的核心。邏輯上,RDD存放數(shù)據(jù);物理上,真實(shí)數(shù)據(jù)存放在各個(gè)分區(qū)中,每個(gè)分區(qū)可能位于不同的Worker節(jié)點(diǎn)之上。RDD的數(shù)據(jù)在默認(rèn)情況下都放在內(nèi)存中,因而Spark對(duì)內(nèi)存的要求也很高。當(dāng)內(nèi)存不能滿足運(yùn)行需求時(shí),它會(huì)把RDD數(shù)據(jù)序列化到磁盤。RDD具有容錯(cuò)性,由于把數(shù)據(jù)同時(shí)存放在多個(gè)分區(qū)中,當(dāng)某個(gè)節(jié)點(diǎn)出現(xiàn)分區(qū)數(shù)據(jù)丟失或者損壞等故障時(shí),會(huì)自動(dòng)重建丟失數(shù)據(jù)。
Spark提供了豐富的算子來操縱RDD,每個(gè)算子發(fā)出的指令最終會(huì)落實(shí)到對(duì)每個(gè)分區(qū)里數(shù)據(jù)的操作上。開發(fā)者通過Scala、Java或者Python調(diào)用RDD算子操作分布式數(shù)據(jù)集。RDD根據(jù)算子是否立即執(zhí)行,可以分為兩類:Transformation類和Action類。Transformation類操作會(huì)發(fā)生延遲計(jì)算,不會(huì)真正運(yùn)行,需要Action類操作的觸發(fā)才提交作業(yè)。
運(yùn)算過程中,會(huì)生成多個(gè)RDD,從一個(gè)RDD轉(zhuǎn)換到另一個(gè)RDD。Action類每次操作都會(huì)得到結(jié)果,如collect()、first()等。
Spark有4種運(yùn)行模式:本地模式、Standalone偽分布式模式、基于Mesos的集群模式和基于YARN的集群模式。在Standalone模式下,Spark的架構(gòu)如圖1所示。
圖1 Spark架構(gòu)(Standalone模式下)
ClusterManager為Master(主節(jié)點(diǎn)),控制整個(gè)集群,監(jiān)控Worker。Worker為從節(jié)點(diǎn),負(fù)責(zé)控制計(jì)算節(jié)點(diǎn),啟動(dòng)Executor或Driver。RDD的每個(gè)分區(qū)可能在不同的Worker機(jī)器上,每臺(tái)Worker節(jié)點(diǎn)可能有多個(gè)Executor,每個(gè)Executor可能有多個(gè)Task在運(yùn)行,Task和分區(qū)是一一對(duì)應(yīng)的,所以partition分區(qū)的數(shù)量也影響著Task的并發(fā)程度[10]。
Standalone模式下任務(wù)提交運(yùn)行的步驟是:Client提交應(yīng)用,Master接受Client提交的任務(wù)并命令Worker啟動(dòng)Driver;Driver向Master或者資源管理器申請(qǐng)資源,某種意義上,Driver就是SparkContext,執(zhí)行應(yīng)用程序中的main()函數(shù),創(chuàng)建SparkContext,SparkContext負(fù)責(zé)與ClusterManager通信,對(duì)Spark進(jìn)行初始化,并創(chuàng)建DAGScheduler作業(yè)調(diào)度模塊和TaskScheduler任務(wù)調(diào)度模塊等;從Hadoop的HDFS加載數(shù)據(jù),生成RDD,構(gòu)建RDD DAG,將DAG圖分解成Stage(當(dāng)碰到Action操作時(shí),就會(huì)催生Job;每個(gè)Job中含有1個(gè)或多個(gè)Stage,一個(gè)Stage包含一個(gè)或多個(gè)Task,Stage一般在獲取外部數(shù)據(jù)和Shuffle之前產(chǎn)生);DAGScheduler把一個(gè)Spark作業(yè)轉(zhuǎn)換成Stage的DAG(directed acyclic graph,有向無環(huán)圖),根據(jù)RDD和Stage之間的關(guān)系找出開銷最小的調(diào)度方法,然后把Stage以TaskSet的形式提交給TaskScheduler,在解析DAG圖時(shí),根據(jù)Shuffle來劃分Job內(nèi)部的Stage,并給出Stage之間的依賴關(guān)系;TaskScheduler借助ActorSystem將任務(wù)提交給集群管理器;Task在Executor上運(yùn)行,運(yùn)行完畢釋放所有資源。
(1)避免反復(fù)讀取同樣的數(shù)據(jù)。
文中采用RDD緩存(cache),把在計(jì)算過程被多次用到的數(shù)據(jù)cache到內(nèi)存。
(2)中間結(jié)果的持久化。
Spark的Transformations類操作會(huì)延遲執(zhí)行,它在Action類算子提交任務(wù)到Spark Context時(shí)觸發(fā),如果計(jì)算中其他的Action類操作用到中間結(jié)果,可以手動(dòng)調(diào)用persist將中間結(jié)果做持久化操作。persist可以設(shè)置不同參數(shù),不同的參數(shù)代表不同的存儲(chǔ)級(jí)別。如果把persist參數(shù)設(shè)置為MEMORY_ONLY,其功能同cache功能。
(3)減少通信開銷。
broadcast是以類似洪泛的方式廣播數(shù)據(jù)。它是只讀變量,保持?jǐn)?shù)據(jù)一致性,可以把迭代過程中的聚類中心廣播到各個(gè)分區(qū),在FCM算法的實(shí)現(xiàn)過程中,會(huì)不斷迭代產(chǎn)生新的中心點(diǎn),需要把新的中心點(diǎn)廣播到各個(gè)節(jié)點(diǎn),使得下一次只計(jì)算每條數(shù)據(jù)和新的中心點(diǎn)之間的距離。這樣可以高效地分發(fā)變量,減少通信開銷。
基于Spark的FCM算法并行化實(shí)現(xiàn)流程見圖2。
圖2 算法實(shí)現(xiàn)流程
Step1:把文本數(shù)據(jù)上傳到Hadoop的HDFS文件系統(tǒng),在應(yīng)用程序中通過sc.textFile()加載進(jìn)RDD,然后把數(shù)據(jù)轉(zhuǎn)換成稠密向量,這里把數(shù)據(jù)cache到內(nèi)存,計(jì)算過程中數(shù)據(jù)直接從內(nèi)存中獲取。對(duì)數(shù)據(jù)做L2范式,即求向量各元素的平方和然后求平方根,并且做持久化。
Step2:初始化聚類中心,采用takeSample API隨機(jī)抽取部分訓(xùn)練數(shù)據(jù)集的方法獲得初始聚類中心,并把聚類中心廣播給各個(gè)partition。開始計(jì)時(shí),便于最后統(tǒng)計(jì)運(yùn)行時(shí)間。
Step3:利用mapPartiotions對(duì)每個(gè)分區(qū)進(jìn)行計(jì)算。計(jì)算數(shù)據(jù)點(diǎn)到各中心的隸屬度和數(shù)據(jù)點(diǎn)到各簇的距離和,隸屬度滿足式1。距離的計(jì)算可以調(diào)用Spark MLlib提供的MLUtil.fastsquaredDistance(x1,x2),x1和x2數(shù)據(jù)結(jié)構(gòu)為數(shù)據(jù)點(diǎn)向量和其L2范式的組合;隸屬度的計(jì)算依據(jù)式2。
Step4:用reduceByKey和collectAsMap得到sums和fuzzycounts。sums表示在并行度為i情況下,數(shù)據(jù)點(diǎn)到各簇的距離和;fuzzycounts表示在并行度為i情況下,各數(shù)據(jù)點(diǎn)到類中心的隸屬度和。
Step5:基于sums和fuzzycounts,依據(jù)式4計(jì)算新的中心點(diǎn),并比較相鄰兩次中心點(diǎn)有沒有發(fā)生改變,如果未發(fā)生改變則更換標(biāo)志位,迭代結(jié)束,最后返回預(yù)測(cè)模型FuzzyCMeansModel,它包含了聚類中心和m值。否則迭代次數(shù)加1,繼續(xù)迭代。
Step6:用FuzzyCMeansModel的預(yù)測(cè)函數(shù)來預(yù)測(cè)數(shù)據(jù)點(diǎn)對(duì)某個(gè)簇的隸屬度([0,1])。把最終結(jié)果存入文本,用于計(jì)算聚類準(zhǔn)確率。
實(shí)驗(yàn)選取了KDD CUP 99的10%的數(shù)據(jù)集。這是一個(gè)模擬網(wǎng)絡(luò)攻擊的數(shù)據(jù)集,包含了41個(gè)網(wǎng)絡(luò)屬性,每條記錄結(jié)尾的label顯示了正常還是攻擊。按照大類來分,它包含了正常和異常兩種類型的數(shù)據(jù)。正常訪問的label為Norml,異常數(shù)據(jù)包括四大攻擊:拒絕服務(wù)攻擊DOS、遠(yuǎn)程登錄攻擊R2L、非法獲取Root用戶特權(quán)攻擊U2R、探測(cè)式攻擊Probe。
在數(shù)據(jù)挖掘中,從數(shù)據(jù)源得到的原始數(shù)據(jù)可能會(huì)存在冗余性、空值或者數(shù)據(jù)不完整等問題。為了提高挖掘的質(zhì)量和計(jì)算速度,往往通過數(shù)據(jù)清洗、數(shù)據(jù)降噪、數(shù)據(jù)降維、數(shù)據(jù)集成、數(shù)據(jù)轉(zhuǎn)換和數(shù)據(jù)簡(jiǎn)約等手段進(jìn)行預(yù)處理[11]。在實(shí)驗(yàn)中,為了提高聚類效果,對(duì)數(shù)據(jù)做了如下預(yù)處理。
(1)數(shù)據(jù)簡(jiǎn)約。
有些屬性對(duì)聚類效果不僅沒有幫助,還會(huì)大大降低數(shù)據(jù)挖掘的效率,可以采用屬性選擇和數(shù)據(jù)采樣技術(shù)加以簡(jiǎn)化。文中采用屬性選擇來去除對(duì)聚類效果意義不大的屬性,參考文獻(xiàn)[12],保留了7個(gè)特征:Duration、Protocol_type、Service、Flag、Logged_in、Count、Srv_count。用python腳本提取對(duì)應(yīng)的特征值并保存到另外一份文本中,特征值提取代碼如下:
#需要提取的特征屬性
index=[0,1,2,3,11,22,23]
result=[]
temp=[]
tmp=[]
for line in open("resourcedata"):
tmp=line.split(",")
for i in index:
temp.append(tmp[i])
result.append(temp)
temp=[]
#把result列表數(shù)據(jù)寫到文件中
(2)數(shù)據(jù)轉(zhuǎn)換。
數(shù)據(jù)轉(zhuǎn)換就是把數(shù)據(jù)從一種形式轉(zhuǎn)換到另外一種形式。把數(shù)據(jù)集中的字符屬性與數(shù)值屬性轉(zhuǎn)換,進(jìn)行歸一化,以便于FCM算法的計(jì)算。數(shù)據(jù)用python腳本處理的過程是:首先將保留的7個(gè)屬性中的字符型屬性定義為枚舉;然后讀取字符屬性,按照其屬性值轉(zhuǎn)換為枚舉中定義的值;最后再把結(jié)果保存到數(shù)據(jù)文件中。相應(yīng)的python偽代碼如下:
fromenum import Enum
classprotocol_type(Enum):
tcp='1'
udp='2'
icmp='3'
protocol_type[tcp].value
(3)空值處理。
空值作為數(shù)據(jù)對(duì)象的值,參與運(yùn)算仍然是空值。數(shù)據(jù)挖掘中,空值的存在會(huì)使運(yùn)算混亂,甚至可能無法完成。文中把空值置為“0”。
預(yù)處理前數(shù)據(jù)為41個(gè)屬性,其中還有空值;預(yù)處理后保留了7個(gè)重要屬性,空值也做了處理。
實(shí)驗(yàn)中搭建了單機(jī)環(huán)境和Standalone集群環(huán)境。單機(jī)環(huán)境硬件為HP g6-2025TX,CPU型號(hào)為Intel CORE i5-3210M,雙核處理器,內(nèi)存8 G,硬盤讀寫速度為415 MB/s。軟件環(huán)境為Ubuntu 14.04,JDK版本為JDK1.8,Hadoop版本為2.7.3,Spark版本為2.1,Scala版本為2.10.4。Standalone集群環(huán)境為Master節(jié)點(diǎn)一臺(tái),Worker節(jié)點(diǎn)兩臺(tái),節(jié)點(diǎn)軟件配置與本地模式相同。
對(duì)相同樣本在不同模式下運(yùn)行FCM算法得到的檢測(cè)率如圖3所示??紤]到數(shù)據(jù)集包含四大類攻擊和正常類型,聚類中心數(shù)設(shè)置為5。
圖3 在單機(jī)和Standalone集群環(huán)境下的檢測(cè)率對(duì)比
檢測(cè)率為:
(5)
由圖3可以看出,F(xiàn)CM算法不會(huì)因?yàn)樵诜植际郊涵h(huán)境下處理數(shù)據(jù)而影響聚類結(jié)果,這說明FCM算法按照文中設(shè)計(jì)的方法在Spark上并行化運(yùn)行,具有較好的魯棒性和準(zhǔn)確度。
預(yù)處理前后的數(shù)據(jù)上的檢測(cè)率對(duì)比結(jié)果如圖4所示??梢钥闯?,預(yù)處理后的數(shù)據(jù)上的檢測(cè)率更高。
圖4 Standalone集群環(huán)境下數(shù)據(jù)
預(yù)處理前后的數(shù)據(jù)的聚類時(shí)間對(duì)比結(jié)果如圖5所示??梢钥闯?,對(duì)預(yù)處理后的數(shù)據(jù)的處理速度更快。
圖5 Standalone集群環(huán)境下數(shù)據(jù)預(yù)
對(duì)預(yù)處理后數(shù)據(jù)分別在單機(jī)環(huán)境和Standalone集群環(huán)境下聚類的時(shí)耗對(duì)比結(jié)果如圖6所示。
圖6 單機(jī)和集群環(huán)境下預(yù)處理后數(shù)據(jù)的聚類時(shí)間對(duì)比
可以看出,當(dāng)數(shù)據(jù)量不是很大時(shí),單機(jī)環(huán)境和集群環(huán)境處理數(shù)據(jù)消耗的時(shí)間相近。集群環(huán)境下很多時(shí)間用在Shuffle過程中,甚至單機(jī)環(huán)境下處理數(shù)據(jù)的時(shí)效更高。這說明:小數(shù)據(jù)量并不適合在集群下處理。但是隨著數(shù)據(jù)量的增多,集群環(huán)境對(duì)數(shù)據(jù)的處理速度明顯快于單機(jī)環(huán)境。
文中主要研究FCM算法如何在Spark平臺(tái)并行化實(shí)現(xiàn),擴(kuò)充了Spark MLlib,并將并行化的FCM算法應(yīng)用于對(duì)入侵檢測(cè)數(shù)據(jù)集的聚類。合理調(diào)用Spark RDD算子實(shí)現(xiàn)模糊聚類,把可緩存到內(nèi)存的中間結(jié)果做Cache,降低中間讀寫磁盤操作的時(shí)間,且通過廣播聚類中心方式高效分發(fā)變量,減少節(jié)點(diǎn)間網(wǎng)絡(luò)通信消耗。另外,對(duì)比了KDD CUP 99數(shù)據(jù)集分別在單機(jī)和集群環(huán)境下的檢測(cè)率和運(yùn)行時(shí)間,并對(duì)比數(shù)據(jù)集預(yù)處理前后的檢測(cè)率和聚類效率。應(yīng)用結(jié)果表明,所設(shè)計(jì)的并行化方法提高了FCM算法的速度,對(duì)KDD CUP 99數(shù)據(jù)集的預(yù)處理降低了實(shí)驗(yàn)數(shù)據(jù)的維度,提高了準(zhǔn)確度和處理速度。