亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        基于Spark Streaming的實時數(shù)據(jù)分析系統(tǒng)及其應用

        2017-07-31 17:47:29韓德志陳旭光雷雨馨戴永濤
        計算機應用 2017年5期
        關鍵詞:數(shù)據(jù)分析系統(tǒng)數(shù)據(jù)流日志

        韓德志,陳旭光,雷雨馨,戴永濤,張 肖

        (1.上海海事大學 信息工程學院,上海 201306; 2.鄭州大學 信息工程學院,鄭州 450001)

        基于Spark Streaming的實時數(shù)據(jù)分析系統(tǒng)及其應用

        韓德志1*,陳旭光1,雷雨馨2,戴永濤1,張 肖1

        (1.上海海事大學 信息工程學院,上海 201306; 2.鄭州大學 信息工程學院,鄭州 450001)

        (*通信作者電子郵箱dezhihan88@sina.com)

        為了實現(xiàn)對實時網(wǎng)絡數(shù)據(jù)流的快速分析,設計一種分布式實時數(shù)據(jù)流分析系統(tǒng)(DRDAS),能有效解決并發(fā)訪問數(shù)據(jù)流的收集、存儲和實時分析問題,為大數(shù)據(jù)環(huán)境的網(wǎng)絡安全檢測提供了一種有效的數(shù)據(jù)分析平臺;根據(jù)Spark Streaming運行的原理設計一種動態(tài)采樣的K-Means并行算法,與DRDAS結合能實時有效地檢測大數(shù)據(jù)環(huán)境下的各種分布式拒絕服務(DDoS)攻擊。實驗結果顯示:DRDAS具有好的可擴展性、容錯性和實時處理能力,與動態(tài)采樣的K-Means并行算法結合能實時地檢測各種DDoS攻擊,縮短了攻擊的檢測時間。

        Spark Streaming框架; 分布式流處理;網(wǎng)絡數(shù)據(jù)分析;分布式拒絕服務攻擊

        0 引言

        隨著互聯(lián)網(wǎng)的高速發(fā)展,普通用戶帶寬普遍升級,尤其是在一些大城市,家庭用戶的帶寬都已經(jīng)達到20 MB/s,甚至更高。此外,隨著3G網(wǎng)絡的普及,以及4G網(wǎng)絡的逐步推廣,移動互聯(lián)網(wǎng)也進入了一個蓬勃發(fā)展的時期。個人網(wǎng)絡帶寬的快速增長和不斷增加的網(wǎng)絡用戶,對企業(yè)網(wǎng)絡安全帶來了巨大的挑戰(zhàn)。因為,這些高帶寬的網(wǎng)絡用戶一旦被黑客控制并參與到分布式拒絕服務(Distributed Denial of Service, DDoS)攻擊中時,其影響將無法估量。

        全球著名內(nèi)容分發(fā)網(wǎng)絡(Content Delivery Network, CDN)服務商Incapsula,在2014年發(fā)布的DDoS攻擊趨勢報告[1]中指出,2014年DDoS攻擊行為增加了240%,且流量已經(jīng)超過了100 GB。此外,該公司的最近一份分析報告[2]指出,目前或有數(shù)十萬甚至上百萬臺的小型或家庭辦公環(huán)境 (Small Office Home Office, SOHO)專用路由器已成為僵尸路由器,被黑客用來執(zhí)行大規(guī)模DDoS攻擊。該公司2014年的一項關于DDoS造成損失的調查發(fā)現(xiàn):49%的DDoS攻擊會持續(xù)6~24 h,每小時的平均經(jīng)濟損失為40 000美元[3]。

        此外,近兩年來,網(wǎng)絡安全事件更是頻繁發(fā)生:2014年12月20日— 21日,一家將服務器部署在阿里云上的游戲公司遭遇了DDoS攻擊,453.8 GB/s的峰值流量使之成為全球最大的DDoS攻擊受害者;而知名的代碼托管網(wǎng)站GitHub,于2015年3月26日開始遭遇其網(wǎng)站最大規(guī)模的DDoS攻擊,導致部分地區(qū)服務中斷,且攻擊持續(xù)了80多小時;2015年5月11日,網(wǎng)易遭受了最新型的DDoS攻擊——鏈路洪泛攻擊(Link Flooding Attack, LFA)[3],以至服務中斷9 h,損失超過1 500萬元。

        對超大流量的DDoS攻擊,研究如何高效、及時地檢測并報警有重要的理論意義和重大的經(jīng)濟價值。本文將新型的分布式流式計算框架——Spark Streaming[4],應用于實時的大流量網(wǎng)絡數(shù)據(jù)實時分析,為提高大數(shù)據(jù)環(huán)境的DDoS檢測速度提供保證。

        1 相關工作

        1.1 大數(shù)據(jù)流特點

        在大數(shù)據(jù)時代,流式數(shù)據(jù)已經(jīng)在很多環(huán)境中加以應用,并取得很好的效果。本文在文獻[5]的基礎上,總結出了大數(shù)據(jù)流具有以下6個特點:

        1)并發(fā)性。體現(xiàn)在兩個方面:服務并發(fā)和數(shù)據(jù)并發(fā)。在當前的大部分系統(tǒng)中,流處理都是在一些高并發(fā)的應用中,所提供的都是高并發(fā)服務,而并發(fā)服務必然會并發(fā)產(chǎn)生大量數(shù)據(jù),這就要求系統(tǒng)具有很好的并發(fā)收集、處理和分析的能力。

        2)實時性。數(shù)據(jù)實時產(chǎn)生,也需要實時計算,保證數(shù)據(jù)的時效性,讓數(shù)據(jù)更快、更好地體現(xiàn)其價值,因為很多場景,比如實時推薦系統(tǒng)、實時監(jiān)控系統(tǒng),數(shù)據(jù)是具有實效性的,若不能在短時間內(nèi)加以處理和分析,就會失去數(shù)據(jù)的價值。

        3)易失性。原始數(shù)據(jù)流一般被實時處理和分析后,就會被丟棄,只有少量日志被持久保存,或者只保存處理后的結果,想要再次分析原始日志可能性不大。

        4)突發(fā)性。因為數(shù)據(jù)是實時產(chǎn)生的,只受當前服務的影響,跟前一時刻的數(shù)據(jù)量大小無關,也就是說前后兩個時間的數(shù)據(jù)量可能會相差很大。

        5)無序性。在大數(shù)據(jù)環(huán)境下,不同數(shù)據(jù)源之間的產(chǎn)生和傳輸并不能保證按照其產(chǎn)生的相對順序,因此在分析時需要更加完善的處理邏輯,而不是只根據(jù)其到達的先后順序。

        6)無限性。只要服務存在,數(shù)據(jù)就一直會持續(xù)、動態(tài)地增加,所以數(shù)據(jù)的大小理論上是無限的,不能用具體的數(shù)字來量化。當然,實際使用中不可能用磁盤無限地存儲,應定期地對歷史日志進行清理或壓縮備份。

        1.2 大數(shù)據(jù)流相關研究

        由于單臺計算機的存儲、計算能力的限制,已經(jīng)不能滿足大數(shù)據(jù)時代海量數(shù)據(jù)的處理和分析的要求。同樣地,在網(wǎng)絡安全檢測中,單臺服務器已經(jīng)不能及時處理和分析短時間內(nèi)產(chǎn)生的大量網(wǎng)絡數(shù)據(jù),這給安全檢測系統(tǒng)帶來了巨大的挑戰(zhàn)。目前,已經(jīng)有一些學者開始研究用分布式計算框架處理海量的網(wǎng)絡數(shù)據(jù)。

        文獻[6]使用Hadoop對海量數(shù)據(jù)流的日志進行分析,該方法能提供良好的容錯性,且能大幅縮短計算的時間,但是需要先將所有的日志收集起來,再導入到Hadoop集群中,這樣浪費了大量的時間,不能對大數(shù)據(jù)流進行實時分析。

        文獻[7]中將Hadoop應用到海量網(wǎng)絡數(shù)據(jù)流攻擊的日志分析中,雖然能處理和分析大量的網(wǎng)絡監(jiān)控數(shù)據(jù)的日志信息,但是由于其采用離線分析方式,該方法只適用于研究網(wǎng)絡攻擊的一些行為和特征,不能真正用于網(wǎng)絡檢測系統(tǒng)中。

        文獻[8]在文獻[7]的方法上作了一些改進,將原來的分析文本日志改成了直接分析抓取到的二進制數(shù)據(jù),并實現(xiàn)了P3(Parallel Packet Processor)系統(tǒng),在一定程度上減輕了數(shù)據(jù)抓取服務器的負擔,但還是采用離線分析處理,缺乏實用性。

        文獻[9]用Hadoop進行DDoS攻擊檢測,實現(xiàn)HTTP GET flooding攻擊檢測算法,由于使用的是Hadoop系統(tǒng),仍然不能解決響應時間慢的問題。

        文獻[6-9]雖將分布式計算方法用于網(wǎng)絡檢測和數(shù)據(jù)流分析中,但是都不能很好地解決時效性的問題。針對以上情況,本文提出了一種基于Kafka[10]和Spark Streaming[4]的實時流處理和分析方法,對海量的網(wǎng)絡數(shù)據(jù)進行實時分析,適合于大流量的實時網(wǎng)絡分析和異常檢測。

        海量的數(shù)據(jù)是大數(shù)據(jù)出現(xiàn)的前提,而數(shù)據(jù)收集則是大數(shù)據(jù)的基石。日志數(shù)據(jù)收集在流數(shù)據(jù)收集中占有重要比重,許多公司的業(yè)務平臺每天都會分散地產(chǎn)生大量日志數(shù)據(jù),收集并匯總這些業(yè)務日志數(shù)據(jù),供離線和在線的分析系統(tǒng)使用。日志收集系統(tǒng)所需考慮的基本特征包括:高可靠性、高可用性和高可擴展性?!胺稚⑹占?,集中處理”是當前日志處理系統(tǒng)的一個主流思想。日志收集也是流式日志處理系統(tǒng)的前提和基礎,日志只有被實時收集、匯總后,才能進行后續(xù)的相關處理操作。下面針對當前流行的開源日志數(shù)據(jù)流收集系統(tǒng),進行介紹和對比。

        2 數(shù)據(jù)流實時數(shù)據(jù)分析系統(tǒng)總體設計

        2.1 數(shù)據(jù)流實時數(shù)據(jù)分析系統(tǒng)整體架構

        實時網(wǎng)絡數(shù)據(jù)分析系統(tǒng)(Distributed Real-time Data Analysis System, DRDAS)的整體架構如圖1所示,其組成部分包括數(shù)據(jù)收集系統(tǒng)、數(shù)據(jù)分析系統(tǒng)和數(shù)據(jù)存儲系統(tǒng)。在圖1中,Kafka Broker是數(shù)據(jù)收集系統(tǒng),負責大數(shù)據(jù)流的實時搜集和傳遞;Spark Streaming是數(shù)據(jù)分析系統(tǒng),負責對Kafka Broker傳遞的數(shù)據(jù)流進行實時分析,并將分析結果傳遞應用系統(tǒng)進行各種處理,同時將正常的數(shù)據(jù)傳遞存儲系統(tǒng);Database表示結構化的數(shù)據(jù)存儲系統(tǒng),Hbase表示非結構化的數(shù)據(jù)存儲系統(tǒng)。

        圖1 系統(tǒng)整體架構Fig. 1 Overall architecture of DRDAS

        2.2 數(shù)據(jù)流實時數(shù)據(jù)收集系統(tǒng)

        大數(shù)據(jù)流具有如本文1.1節(jié)所描述的6大特性,在大數(shù)據(jù)流并發(fā)產(chǎn)生的情況下,一方面要保證數(shù)據(jù)的實時性,另一方面盡量減少數(shù)數(shù)據(jù)丟失,這些都給數(shù)據(jù)的收集帶來了巨大挑戰(zhàn)。

        在一些廣告、搜索、推薦、安全系統(tǒng)中,為了滿足海量日志或事件的實時傳輸,及時分析、處理,并實時反饋的需求,很多人都在為解決這一難題而研究。LinkedIn公司開發(fā)了一套專用的分布式消息訂閱和發(fā)布系統(tǒng)——Kafka[10]。早在2012年LinkedIn公司就已經(jīng)將Kafka應用在實際生產(chǎn)系統(tǒng),并每天收集和處理上百億的消息,峰值時能處理每秒高達 172 000條消息[8]。LinkedIn將Kafka于2011年開源,并成為Apache的開源項目之一。

        由于Kafka是一套分布式系統(tǒng),其吞吐量可以隨著集群的擴展而線性增加。

        圖2為Kafka的整體架構。 圖中給出了Kafka的三大構成部分:生產(chǎn)者(Producer),即日志的來源;代理(Broker),消息的中間管理者;消費者(Consumer),消息的使用者。Producer負責將消息收集并推送(Push)到Broker,而Broker則負責接收Producer發(fā)送來的消息,并將消息本地持久化,Consumer則是消息的真正使用者,從Broker拉取(Pull)消息并進行處理。

        整個Kafka架構的核心就是Broker,Broker的吞吐量直接關系到整個系統(tǒng)的可用性。而為了加強其處理能力,設計者從兩方面作了考慮:1)巧妙的消息管理策略;2)線性化的擴展方式。

        圖2 Kafka架構Fig. 2 Kafka architecture

        圖3展示了Kafka的消息管理策略,通過Topic名稱將消息分開管理,一個Topic由多個Partition組成,而每個Partition又是由多個Segment文件構成,每個Segment存儲指定數(shù)量的消息,存滿后會存放到一個新的Segment文件中。消息在同一個Partition中是有序的,而且能保證消息讀取時是有序(按照Partition中的存儲順序)。

        圖3 Kafka消息管理策略Fig. 3 Kafka message management strategy

        圖4為Kafka對同一個Topic的消息讀寫管理:消息流入時,根據(jù)該消息所在Topic的分區(qū)數(shù)和復制數(shù),將不同的消息分別存儲到不同的Partition上,同時,也將同一條消息復制多份,保存在不同的主機上,達到數(shù)據(jù)冗余備份和提高讀取速度的目的;Consumer讀取消息時,對于屬于同一個Group的Consumer,在同一個Partition上,不能并發(fā)讀取,但是在不同的Partition上可以并發(fā)操作,也就是說同一個Topic的Partition數(shù)量越多,可允許的并發(fā)數(shù)也就越多。

        圖4 Kafka對Partitions的讀寫操作Fig. 4 Kafka’ read and write operations in the Partitions

        2.3 數(shù)據(jù)流實時數(shù)據(jù)分析系統(tǒng)

        Spark[11]于2009年誕生于加州伯克利大學的APMLab實驗室,2010年正式開源,并于2013年成為Apache基金項目,在2014年成為Apache基金的頂級項目。為了解決Hadoop[12]計算時將中間結果存入磁盤導致計算速度緩慢的問題,Spark應運而生。

        Spark的生態(tài)系統(tǒng)如圖5所示,該生態(tài)系統(tǒng)包括了批處理、流處理、機器學習、圖計算、數(shù)據(jù)分析等,相比Hadoop生態(tài)圈,顯得更加全面,是一個更適合大數(shù)據(jù)應用場景的分布式計算框架。

        彈性分布式數(shù)據(jù)集(Resilient Distribute Datasets,RDD)[13]是Spark的核心,也是Spark實現(xiàn)故障恢復、數(shù)據(jù)依賴的關鍵。RDD模型以簡單的邏輯——Lineage很好地解決了數(shù)據(jù)之間的依賴,且保證很好的容錯性,并能將中間結果存入內(nèi)存,盡量減少數(shù)據(jù)的磁盤讀寫,使得計算速度得到了很大的提升,尤其是在迭代式計算中,計算速度提升了一個數(shù)量級。

        圖5 Spark生態(tài)圈Fig. 5 Spark ecosystem

        不同于Hadoop中的MapReduce,Spark將相應的MapReduce很好地封裝在RDD中。在RDD上可執(zhí)行兩類操作:轉換(Transformation)和動作(Action)。圖6展示了數(shù)據(jù)從輸入到得到最終結果的操作過程。數(shù)據(jù)在RDD中并不是以原始形態(tài)存在,而是以數(shù)據(jù)所在的具體位置的形式包含在RDD中,并在RDD中經(jīng)過不同的轉換得到新的RDD,直到執(zhí)行動作時才執(zhí)行真正的計算,得到最終想要的結果。

        圖6 RDD上的相關操作Fig. 6 Related operations in RDD

        Spark Streaming[4]是Spark生態(tài)系統(tǒng)中用于實時計算的一個框架,其核心也是基于RDD的,它可以無縫地和Spark銜接,將歷史數(shù)據(jù)和實時數(shù)據(jù)完美地融合。Spark Streaming與Yahoo 推出的S4[14]以及Twitter推出的Storm[15]并列為三大主要數(shù)據(jù)流處理系統(tǒng)。大數(shù)據(jù)流處理系統(tǒng)的典型實例見文獻[16],Spark Streaming、S4和Storm等分布式流處理系統(tǒng)的對比見文獻[17]。

        圖7展示了Spark Streaming流處理的實質:將一小段時間內(nèi)的數(shù)據(jù)合并,再作微型的批處理,而不是分別對每一條數(shù)據(jù)進行實時處理。這也是它與其他流處理系統(tǒng)的最大區(qū)別,因此它并不是真正意義上的實時處理,而是延時較低的微批處理,正好適合本文中的應用場景。此外,Spark Streaming還能通過滑動窗口將最近一段時間的數(shù)據(jù)進行合并,或直接與Spark結合將歷史數(shù)據(jù)和實時數(shù)據(jù)一起分析,為實時數(shù)據(jù)和歷史分析提供了一個統(tǒng)一的處理平臺。

        圖7 Spark Streaming的處理過程Fig. 7 Processing procedures of Spark Streaming

        2.4 日志數(shù)據(jù)流收集系統(tǒng)對比

        日志數(shù)據(jù)流收集系統(tǒng)具備3個基本組件,分別是Agent(接收原始數(shù)據(jù),并將數(shù)據(jù)發(fā)給Collector)、Collector(接收多個Agent發(fā)送過來的數(shù)據(jù),匯總后將數(shù)據(jù)發(fā)往Store)和Store(中央存儲系統(tǒng),將匯總后的數(shù)據(jù)進行持久化存儲)。表1綜合對比了Scribe、Flume[18]、Chukwa、LogStash四種日志數(shù)據(jù)流收集系統(tǒng)。

        表1 4種日志收集系統(tǒng)對比Tab. 1 Comparison of four log collection systems

        3 數(shù)據(jù)流實時數(shù)據(jù)分析系統(tǒng)性能分析

        3.1 數(shù)據(jù)流分析系統(tǒng)整體數(shù)據(jù)流圖

        圖8為數(shù)據(jù)流分析系統(tǒng)整體數(shù)據(jù)流圖,整個系統(tǒng)包括:數(shù)據(jù)收集、數(shù)據(jù)分析、數(shù)據(jù)存儲、模型(或算法)訓練、入侵檢測。系統(tǒng)的數(shù)據(jù)流向為:1) 數(shù)據(jù)的來源為不同的服務器,通過各種抓包軟件,如:TcpDump、NetFlow、Sniff等,對特定的網(wǎng)卡或端口進行數(shù)據(jù)包抓取,并通過Flume將不同服務器上的網(wǎng)絡數(shù)據(jù)匯總,將數(shù)據(jù)抓、分析和檢測分離,減輕應用服務器的負擔。Flume最初是由Cloudera的工程師設計用于合并日志數(shù)據(jù)的系統(tǒng),后將其開源出來,并逐漸發(fā)展成為一款開源、高可靠、高擴展、易管理、支持客戶擴展的分布式數(shù)據(jù)采集系統(tǒng),主要是用于日志數(shù)據(jù)的收集和聚合。2)數(shù)據(jù)匯聚之后,將所抓取的網(wǎng)絡數(shù)據(jù)作為Kafka Producer的消息源,并傳送到Kafka Broker,讓Broker對所有網(wǎng)絡數(shù)據(jù)進行有序的管理。3)Spark Streaming則實時從Kafka Broker中拉取數(shù)據(jù),再將數(shù)據(jù)分散到不同的Spark Executor進行分析和統(tǒng)計。4) Spark將抓取的網(wǎng)絡數(shù)據(jù)處理后,一方面可以將結果傳給其他的應用,作進一步分析;另一方面可以將結果持久化,存儲在數(shù)據(jù)庫中,供后續(xù)分析使用。5)對于得到的實時數(shù)據(jù),可以使之與之前得到的歷史數(shù)據(jù)進行合并,進而進行模型(或算法)訓練或者直接通過模型進行DDoS檢測,并得到檢測結果。

        圖8 數(shù)據(jù)流分析系統(tǒng)整體數(shù)據(jù)流圖Fig. 8 Data flow of data analysis system

        3.2 實時性分析

        要做到實時數(shù)據(jù)分析,就要求系統(tǒng)具有實時的數(shù)據(jù)收集、處理功能。本系統(tǒng)中,使用了Flume和Kafka來保證數(shù)據(jù)能實時地被收集并傳到Spark Streaming進行實時分析。

        Flume能監(jiān)控某一指定的目錄下的文件或整合自定義的數(shù)據(jù)收集器,能將新增的數(shù)據(jù)實時地傳輸?shù)綌?shù)據(jù)中心服務器,并作為Kafka消息隊列的消息來源。

        Kafka則能將接收到的實時數(shù)據(jù)轉存為有序的消息隊列,以有序的方式分別存儲在不同服務器的Partition上,讀取消息時,通過并發(fā)方式從不同Partition讀取數(shù)據(jù)并使用ZeroCopy技術保證消息快速有序的讀取,后面會有相關的實驗進行測試和說明。

        Spark Streaming能將接收的數(shù)據(jù)匯總成一個個小的數(shù)據(jù)集,并復制多份存儲,然后進行微批量的實時處理,其這一特性很好地避免了并發(fā)數(shù)據(jù)處理中頻繁的任務分配和調度問題,能達到次秒級延時的實時處理。

        3.3 可擴展性分析

        為了適應數(shù)據(jù)量的不斷增長,需要系統(tǒng)具有很好的可擴展性。本系統(tǒng)的擴展性體現(xiàn)在兩個方面:

        1)數(shù)據(jù)收集可擴展。隨著數(shù)據(jù)總量增長,已有資源不一定能滿足數(shù)據(jù)收集的要求,這就需要數(shù)據(jù)收集系統(tǒng)具有很好的擴展性。而本系統(tǒng)中的Flume和Kafka都能通過擴展集群,讓處理能力得到近乎線性的增長,很好地滿足可擴展性要求。

        2)數(shù)據(jù)分析可擴展。Spark Streaming是一個分布式的實時處理系統(tǒng),其數(shù)據(jù)處理能力也是隨著集群數(shù)量的擴展而遞增,實驗部分會有相關的數(shù)據(jù)證明。目前,已經(jīng)有多個超過上千臺Spark服務器組成的集群應用在生產(chǎn)環(huán)境當中,證明了其在數(shù)據(jù)分析方面的可擴展性。

        3.4 容錯性分析

        為了保證系統(tǒng)可靠和穩(wěn)定的運行,容錯性也是絕大多數(shù)系統(tǒng)需要考慮的一個關鍵性因素。在本系統(tǒng)中的容錯性體現(xiàn)在:

        1)數(shù)據(jù)收集的可靠性。Flume能將數(shù)據(jù)緩存在本地磁盤,并記錄已傳輸數(shù)據(jù)的偏移量,若由于網(wǎng)絡或其他原因導致的數(shù)據(jù)傳輸失敗,會在故障恢復時,繼續(xù)之前的數(shù)據(jù)傳輸;而Kafka則是通過復制因子來控制數(shù)據(jù)保存的份數(shù),通過將數(shù)據(jù)復制到不同的機器上,解決了數(shù)據(jù)丟失或系統(tǒng)單點故障問題。

        2)數(shù)據(jù)處理的可靠性。Spark Streaming有一種很好的容錯機制—— 提前日志寫(Write Ahead Logs, WAL)機制,即:先將數(shù)據(jù)寫入日志,再計算。通過將實時收集的數(shù)據(jù)寫入到日志中,并復制多份,能解決計算過程中單點故障造成的數(shù)據(jù)丟失,便于故障恢復和轉移。

        4 數(shù)據(jù)分析系統(tǒng)相關的實驗分析

        實驗環(huán)境說明:本實驗使用vSphere虛擬機搭建Linux集群,每個Linux虛擬機的配置為2×2核CPU,內(nèi)存4 GB,內(nèi)網(wǎng)網(wǎng)速1 000 Mb/s,Kafka版本0.8.21,F(xiàn)lume版本1.6.0,Spark版本1.4.1。

        為了測試Kafka接收和讀取消息速度,本實驗使用了Kafka自帶的壓測工具,并通過其腳本kafka-consumer-perf-test.sh和kafka-producer-perf-test.sh進行測試,配置項都使用默認值。圖9和圖10是在不同數(shù)量Kafka Server集群下,Kafka消息接收和讀取速度的測試。其中圖9是Kafka接收和讀取消息每秒的字節(jié)數(shù)測試結果,圖10是Kafka每秒鐘接收和讀取的消息數(shù)量測試結果。雖然,測試結果的好壞由很多因素決定:磁盤讀寫速度、CPU、內(nèi)存、網(wǎng)絡、配置參數(shù)等,但是從圖9和圖10可以明顯看出,Kafka消息處理的能力隨著集群數(shù)量的增加呈近似線性增長關系,證實了Kafka具有線性擴展能力。

        除了數(shù)據(jù)接收和讀取能力測試外,本文還使用spark-perf[19]和visualvm[20]等對Spark Streaming數(shù)據(jù)處理能力進行壓力測試和監(jiān)控。圖11是對應的測試數(shù)據(jù)結果,從圖中可以看出Spark Streaming處理數(shù)據(jù)的能力也跟集群數(shù)量成正比關系。當然,數(shù)據(jù)處理速度的具體數(shù)值會受到處理復雜度的影響,但是總體趨勢還是類似的,即通過擴展Spark集群數(shù)量,可以提高集群數(shù)據(jù)的處理速度和數(shù)據(jù)量。

        圖9 Kafka服務器數(shù)量與消息吞吐量的關系Fig. 9 Relationship between Kafka server number and message throughput

        圖10 Kafka服務器數(shù)量與消息吞吐率的關系Fig. 10 Relationship between Kafka server number and message throughput rate

        圖9~11對由Kafka、Flume和Spark Streaming組成的數(shù)據(jù)分析系統(tǒng),在數(shù)據(jù)吞吐量和數(shù)據(jù)處理速度方面進行測試,測試結果可知:整個數(shù)據(jù)分析系統(tǒng)的數(shù)據(jù)吞吐量隨Kafka集群數(shù)成線性增長,數(shù)據(jù)實時分析處理能力隨Spark Streaming集群成線性增長。

        圖11 Spark Streaming處理速度測試Fig. 11 Processing speed test for Spark Steaming

        表2對3種不同的數(shù)據(jù)分析場景和特點進行了對比:單機環(huán)境下不能處理大量的數(shù)據(jù),沒有容錯性,不能進行擴展,其數(shù)據(jù)處理能力是有限制的;Hadoop雖然可以線性擴展,能處理大量的數(shù)據(jù),但其設計初衷就是為了離線處理海量數(shù)據(jù)。因此,Hadoop不適合于實時處理場合;Spark Streaming在各方面有明顯的優(yōu)勢。

        表2 單機、Hadoop、Spark Streaming對比Tab. 2 Comparison of single-machine, Hadoop, Spark Streaming

        5 實時數(shù)據(jù)分析系統(tǒng)的應用

        K-Means[21]是一種典型的局域原型的目標函數(shù)聚類算法,屬于無監(jiān)督學習范疇,已成功應用于網(wǎng)絡環(huán)境的DDoS檢測系統(tǒng)中。為了將經(jīng)典的K-Means算法應用于大數(shù)據(jù)環(huán)境下的實時數(shù)據(jù)分析系統(tǒng)中,本文根據(jù)Spark運行的原理設計動態(tài)采樣的改進K-Means并行化算法[22],并將該算法成功應用于基于Spark Streaming實時數(shù)據(jù)分析系統(tǒng)中。

        5.1 動態(tài)采樣K-Means算法

        在普通的聚類算法[22]中,隨機均勻選擇k個點作為聚類中心,k的取值以及初始中心的選擇將直接影響最終聚類的效果。如果k選取不當,K-Means算法在計算過程中易收斂于局部最優(yōu)解,從而無法得到正確結果。在DDoS檢測系統(tǒng)中,K-Means聚類需要處理大量混雜攻擊流的數(shù)據(jù),這對初始中心的選擇造成了極大的困難。為了解決這一問題,采用一種動態(tài)采樣的K-means聚類改進算法,以滿足DDoS檢測系統(tǒng)的需求,算法流程如圖12。

        圖12 動態(tài)采樣改進的K-Means算法流程Fig. 12 Flow of improved K-Means algorithm by dynamic sampling

        改進K-Means算法的主要思想是預先只選擇一個點作為聚類中心,構建一個規(guī)模函數(shù)V(X),該函數(shù)表示數(shù)據(jù)點到其所屬的聚類中心的距離平方和,通過不斷迭代最小化規(guī)模函數(shù)值使聚類結果收斂。改進K-Means算法的主要原理:首先從數(shù)據(jù)集合X中選擇一個點作為始聚類中心并加入動態(tài)采樣集合C,根據(jù)規(guī)模函數(shù)V(X)計算得到初始規(guī)模N,然后進行N次的循環(huán);在每次循環(huán)內(nèi)再選取m個點,每次循環(huán)計算采樣概率P(X),采樣概率的意義是:聚類中心是相對分散的點,離本聚類中心越遠,其成為另外一個聚類中心的可能性越高,即是盡可能地選擇遠離當前聚類中心的點作為采樣數(shù)據(jù)。每經(jīng)過一次迭代,重新計算規(guī)模函數(shù)V(X)的值,更新下一次采樣的概率。然后將本次采樣的簇中心點集合C′與原采樣集合C求并集作為新的采樣集合C。當N次循環(huán)結束后,產(chǎn)生一個新的采樣集合C,該集合中一共有N*m個數(shù)據(jù)。此時得到的數(shù)據(jù)集C的規(guī)模遠小于初始數(shù)據(jù)集X,且數(shù)據(jù)是經(jīng)過篩選的相對集中。最后再對C運行普通的K-Means算法,因為C是經(jīng)過預處理得到的集合,因此整個聚類算法將會異常迅速。本算法在時間復雜度上也有改進,采用了N次迭代替代定義收斂閾值的方法,降低了運算過程中執(zhí)行迭代的次數(shù),對于通過機器學習方法檢測大數(shù)據(jù)環(huán)境下的DDoS攻擊具有重要意義。

        具體的算法定義如下:

        定義1 規(guī)模函數(shù)V(X)定義如式(1)所示:

        (1)

        其中D2(X,C)表示X中的點到所處聚類中心的距離平方和。

        定義2 動態(tài)采樣概率函數(shù)P(X)定義如式(2)所示:

        (2)

        定義3 初始限定規(guī)模函數(shù)值V,初始采樣個數(shù)m(m

        具體算法實現(xiàn)如下:

        輸入 數(shù)據(jù)集,K;

        輸出K個聚類中心。

        步驟1 從集合X中隨機取1個點加入集合C;

        步驟2 根據(jù)式(1)計算C的初始限定規(guī)模函數(shù)值,記為V;

        步驟3 循環(huán)lgV=N次開始,按式(2)計算動態(tài)采樣概率記為P,從集合X中按概率P取出m個點加入集合C′,求C∪C′,記為C,循環(huán)結束;

        步驟4 采用普通K-Means算法求出集合C的聚類中心。

        5.2 基于Spark動態(tài)采樣改進K-Means算法并行化

        普通的單機DDoS檢測算法無法直接在Spark平臺上運行,根據(jù)Spark運行的原理設計動態(tài)采樣的改進K-Means算法的并行化。具體的流程如下:

        1)算法開始,Master節(jié)點上程序從數(shù)據(jù)輸入源獲取初始數(shù)據(jù)集。這里的輸入源是預先定義的接口,可以通過多種途徑獲取數(shù)據(jù),如InputStream、Hadoop分布式文件系統(tǒng) (Hadoop Distributed File System, HDFS)、本地文件等,這一設計方便對本算法的測試。獲取數(shù)據(jù)后,系統(tǒng)將其轉換為RDD1,并調用cache方法將RDD1加載至內(nèi)存,該RDD作為待處理的數(shù)據(jù)。

        2)開始進行數(shù)據(jù)的分割,為并行化作準備。系統(tǒng)以塊為單位(64 MB)將RDD1劃分成多個分塊。接下來Master節(jié)點調用map方法,把眾多的數(shù)據(jù)塊分配到多個Worker節(jié)點上。Worker節(jié)點接收數(shù)據(jù)塊,并執(zhí)行Master的map指令,對數(shù)據(jù)塊進行處理。經(jīng)過這一步驟,原數(shù)據(jù)集中的String文本被轉換為DenseVector向量對象,這些對象是程序能夠直接使用的數(shù)據(jù),并且分布在各個Worker節(jié)點上等待計算。map方法結束后,RDD1生成了新的RDD2。

        3)隨機選取初始的聚類中心。程序調用takeSample方法,從RDD2中選取一個作為聚類中心向量,并創(chuàng)建RDD3對象。

        4)開始進入循環(huán)過程,程序根據(jù)5.1節(jié)中具體算法實現(xiàn)中的步驟3進行迭代計算。在每次循環(huán)體中根據(jù)定義1和定義2重新計算當前采樣概率函數(shù)P,接著調用takeSample方法根據(jù)概率P選取新的RDD向量作為中心點。經(jīng)過一次循環(huán),一共采樣了1+m個向量,生成RDD4。緊接著系統(tǒng)調用union方法,把RDD3和RDD4合并成RDD5。

        5)經(jīng)過lgV次后,循環(huán)結束,此時RDD5中的向量個數(shù)≤1+m*lgV。這一數(shù)據(jù)量遠遠小于初始的數(shù)據(jù)量。

        6)系統(tǒng)將RDD5作為結果輸出。

        上述整個采樣階段RDD轉換流程如圖13所示。圖中圓角矩形框代表RDD;RDD內(nèi)直角矩形框表示該RDD中的數(shù)據(jù)分片,這些分片散布在不同的Worker節(jié)點上;箭頭指向的方向表示RDD轉換的過程。

        圖13 采樣階段RDD轉換流程Fig. 13 RDD conversion process in sampling phase

        5.3 基于實時數(shù)據(jù)分析系統(tǒng)的DDoS攻擊檢測

        為了測試動態(tài)采樣的改進K-Means并行化算法在基于Spark的數(shù)據(jù)分析系統(tǒng)上的檢測速度和準確率,設計了如下實驗:采用KDD99[23]數(shù)據(jù)集的訓練全集(500萬條數(shù)據(jù))作為實驗樣本,分別從中抽取5組作為實驗數(shù)據(jù)。這五組的數(shù)據(jù)量分別為:1萬、50萬、100萬、200萬、500萬。

        實驗設計了3個實驗: 第1個實驗,采用單機算法串行處理數(shù)據(jù)樣本;第2個實驗采用在Spark集群上實現(xiàn)的普通K-Means算法,并行處理數(shù)據(jù)樣本;第3個實驗采用在Spark的數(shù)據(jù)分析系統(tǒng)實現(xiàn)的動態(tài)采樣的改進K-Means算法。

        分別對3個實驗的時間消耗、每輪迭代平均時間、正確率進行統(tǒng)計分析,最終的結果如圖14~16所示。

        圖14 三種算法在5組實驗的耗時對比Fig. 14 Comparison of time-consumption of three algorithms in 5 experiments

        圖15 三種算法在5組實驗的每輪迭代平均時間對比Fig. 15 Comparison of average time for each iteration of three algorithms in 5 experiments

        圖16 兩種算法的正確率對比Fig. 16 Comparison of accuracy rates of two algorithms

        通過圖14和圖15對比發(fā)現(xiàn),在數(shù)據(jù)量較小的情況下,三種算法的耗時差距不大,單機運行的算法正確率也相對較高,Spark的數(shù)據(jù)分析系統(tǒng)并行計算的優(yōu)勢不明顯;當數(shù)據(jù)量大于100萬時,單機運行的時間急劇增長,準確率迅速下降,此時Spark的數(shù)據(jù)分析系統(tǒng)并行計算的優(yōu)勢相當顯著。對比在Spark的數(shù)據(jù)分析系統(tǒng)實現(xiàn)的K-Means和動態(tài)采樣的改進的K-Means算法,改進的算法在正確率上保持相對穩(wěn)定,而算法的效率則比普通方法更高。從圖16可以進一步看清動態(tài)采樣改進的K-Means算法和普通K-Means算法準確率的差別,本實驗能較好地體現(xiàn)基于Spark的實時數(shù)據(jù)分析系統(tǒng)和動態(tài)采樣的改進K-Means算法相結合的優(yōu)勢。

        6 結語

        本文在已有研究基礎上,進一步地將網(wǎng)絡數(shù)據(jù)分析與分布式計算相結合,克服已有數(shù)據(jù)分析方案的時延大、缺乏時效性、不能滿足大數(shù)據(jù)實時處理和分析的缺陷。通過將網(wǎng)絡數(shù)據(jù)抓取、收集并導入Kafka,產(chǎn)生實時的消息隊列,再使用Spark Streaming進行網(wǎng)絡數(shù)據(jù)實時分析:一方面可以實時地對網(wǎng)絡數(shù)據(jù)進行分析,為大數(shù)據(jù)環(huán)境提供快速高效的異常檢測;另一方面,滿足了大數(shù)據(jù)環(huán)境下數(shù)據(jù)處理速度和數(shù)據(jù)處理量的要求。

        通過驗證分布式實時計算框架Spark Streaming在大規(guī)模網(wǎng)絡環(huán)境中對海量網(wǎng)絡數(shù)據(jù)實時分析的可行性;另一方面,確定了大規(guī)模實時網(wǎng)絡檢測的整體架構,為進一步研究大規(guī)模網(wǎng)絡數(shù)據(jù)實時分析和檢測理論和實現(xiàn)技術奠定基礎;最后,為了驗證實時數(shù)據(jù)分析系統(tǒng)在大數(shù)據(jù)環(huán)境下的DDoS攻擊檢測能力,本文根據(jù)Spark運行的原理設計了動態(tài)采樣改進K-Means并行化算法,并將該算法應用于基于Spark Streaming實時數(shù)據(jù)分析系統(tǒng)中,實驗結果顯示基于Spark Streaming的實時分析系統(tǒng)用于大數(shù)據(jù)環(huán)境下的DDoS檢測是可行的。下一步,將結合已有的隱半馬爾可夫模型異常檢測算法,進一步設計能實時有效檢測大數(shù)據(jù)環(huán)境的各種DDoS攻擊行為的算法。

        References)

        [1] Incapsula. Report: 2014 DDoS Trends-Botnet activity is up by 240%[EB/OL].[2014-06-20].https://www.incapsula.com/blog/ddos-threat-landscape-report-2014.html.

        [2] Incapsula. Lax security opens the door for mass-scale abuse of SOHO routers[EB/OL].[2015-09-20].https://www.incapsula.com/blog/ddos-botnet-soho-router.html.

        [3] Incapsula. DDoS impact survey reveals the actual cost of DDoS attacks[EB/OL]. [2015-08-20].https://www.incapsula.com/blog/ddos-impact-cost-of-ddos-attack.html.

        [4] ZAHARIA M, DAS T, LI H, et al. Discretized streams: an efficient and fault-tolerant model for stream processing on large clusters[C]// Proceedings of the 4th USENIX Conference on Hot Topics in Cloud Ccomputing. Berkeley, CA: USENIX Association, 2012:10.

        [5] 孫大為,張廣艷,鄭緯民.大數(shù)據(jù)流式計算:關鍵技術及系統(tǒng)實例[J].軟件學報,2014,25(4):839-862.(SUN D W, ZHANG G Y, ZHENG W M. Big data stream computing: technologies and instances[J]. Journal of Software, 2014,25(4):839-862.)

        [6] LEE Y, KANG W, SON H. An Internet traffic analysis method with MapReduce[C]// Proceedings of the 2010 IEEE/IFIP Network Operations & Management Symposium Workshops. Piscataway, NJ: IEEE, 2010:357-361.

        [7] KHATTAK R, BANO S, HUSSAIN S, et al. DOFUR: DDoS forensics using MapReduce[C]// Proceedings of the 2011 Frontiers of Information Technology. Washington, DC: IEEE Computer Society, 2011:117-120.

        [8] GOODHOPE K, KOSHY J, KREPS J. Building LinkedIn’s real-time activity data pipeline[C]// Proceedings of the 2012 IEEE Computer Society Technical Committee on Data Engineering. Washington, DC: IEEE Computer Society, 2012: 33-45.

        [9] KREPS J, CORP L, NARKHEDE N, et al. Kafka: a distributed messaging system for log processing[C]// Proceedings of the 2011 ACM SIGMOD Workshop on Networking Meets Databases. New York: ACM, 2011:231-240.

        [10] Apache Software Foundation. Apache Kafka[EB/OL]. [2015-09-16]. http://kafka.apache.org/.

        [11] Apache Software Foundation. Apache Spark: Lightning-fast cluster computing[EB/OL]. [2015-09-16]. http://spark.apache.org/.

        [12] Apache Software Foundation. Welcome to Apache Hadoop[EB/OL]. [2015-09-16]. http://hadoop.apache.org/.

        [13] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing[C]// Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation. Piscataway, NJ: IEEE, 2011: 141-146.

        [14] Apache Software Foundation. S4: distributed stream compute platform[EB/OL].[2015-09-16]. http://incubator.apache.org/s4/.

        [15] Apache Software Foundation. Storm: distributed and fault-tolerance real-time computation[EB/OL]. [2015-09-16]. http://storm.apache.org/.

        [16] Apache Software Foundation. Apache Kafka[EB/OL]. [2015-09-16]. http://kafka.apache.org/.

        [17] 崔星燦,禹曉輝,劉洋,等. 分布式流處理技術綜述[J]. 計算機研究與發(fā)展, 2015, 52(2): 318-332.(CUI X C, YU X H, LIU Y, et al. Distributed stream processing: a survey[J]. Journal of Computer Research and Development, 2015, 52(2): 318-332.)

        [18] Apache Software Foundation. Apache Flume[EB/OL].[2016-01-20]. http://flume.apache.org/.

        [19] Databricks. Spark-perf[EB/OL]. [2015-09-16]. https://github.com/databricks/ spark-perf.

        [20] 虞立軍, 王建光, 倪力. 使用VisualVM 進行性能分析及調優(yōu)[EB/OL]. [2013-02-18]. http://www.ibm.com/developerworks/ cn/java/j-lo-visualvm/.(YU L J, WANG J G, NI L. performance analysis and tuning by using VisualVM[EB/OL]. [2013-02-18]. http://www.ibm.com/developerworks/ cn/java/j-lo-visualvm/.)

        [21] 周麗娟, 王慧, 王文伯,等. 面向海量數(shù)據(jù)的并行K-Means算法[J]. 華中科技大學學報(自然科學版), 2012,40(S1):150-152.(ZHOU L J, WANG H, WANG W B, et al. ParallelK-Means algorithm for mass data[J]. Journal of Huazhong University of Science and Technology(Natural Science Edition), 2012,40(S1):150-152.)

        [22] 劉罕. 基于Spark框架的DDoS攻擊檢測系統(tǒng)研究[D].上海: 上海海事大學, 2016:5.(LIU H. Research of the DDoS Attack Detection System base on the Spark Framework[M]. Shanghai: Shanghai Mritime University, 2016:5.)

        [23] HETTICH S, BAY S D. KDD cup 1999 data [EB/OL]. [1999-10-20]. http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html.

        This work is partially supported by the National Natural Science Foundation of China (61672338,61373028).

        HAN Dezhi, born in 1966, Ph.D., professor. His research interests include cloud computing, cloud storage and security, big data application.

        CHEN Xuguang, born in 1993, M.S. candidate. His research interests include cloud computing, big data real-time analysis.

        LEI Yuxin, born in 1996. Her research interests include data mining, network security.

        DAI Yongtao, born in 1991, M.S. candidate. His research interests include cloud computing, distributed computing, data mining, network security.

        ZHANG Xiao, born in 1994, M.S. candidate. Her research interests include cloud computing, big data real-time analysis.

        Real-time data analysis system based on Spark Streaming and its application

        HAN Dezhi1*, CHEN Xuguang1, LEI Yuxin2, DAI Yongtao1, ZHANG Xiao1

        (1.CollegeofInformationEngineering,ShanghaiMaritimeUniversity,Shanghai201306,China;2.SchoolofInformationEngineering,ZhengzhouUniversity,ZhengzhouHenan450001,China)

        In order to realize the rapid analysis of massive real-time data, a Distributed Real-time Data Analysis System (DRDAS) was designed, which resolved the collection, storage and real-time analysis for mass concurrent data. And according to the operation principle of Spark Streaming, a dynamic samplingK-means parallel algorithm was proposed, which could quickly and efficiently detect all kinds of DDoS (Distributed Denial of Service) attacks. The experimental results show that the DRDAS has good scalability, fault tolerance and real-time processing ability, and along with newK-means parallel algorithm, the DRDAS can real-time detect various DDoS attacks, and shorten the detecting time of attacks.

        Spark Streaming framework; distributed stream processing; network data analysis; Distributed Denial of Service (DDoS) attack

        2016-07-15;

        2016-11-26。 基金項目:國家自然科學基金資助項目(61373028,61672338)。

        韓德志(1966—),男,河南信陽人,教授,博士,CCF會員,主要研究方向:云計算、云存儲及其安全、大數(shù)據(jù)應用; 陳旭光(1993—),男,河南信陽人,碩士研究生,主要研究方向:云計算、大數(shù)據(jù)實時分析; 雷雨馨(1996—),女,河南鄭州人,主要研究方向:數(shù)據(jù)挖掘、網(wǎng)絡安全; 戴永濤(1991—),男,湖南邵陽人,碩士研究生,主要研究方向:云計算、分布式計算、數(shù)據(jù)挖掘、網(wǎng)絡安全; 張肖(1994—),女,安徽蚌埠人,碩士研究生,主要研究方向:云計算、大數(shù)據(jù)實時分析。

        1001-9081(2017)05-1263-07

        10.11772/j.issn.1001-9081.2017.05.1263

        TP316.2

        A

        猜你喜歡
        數(shù)據(jù)分析系統(tǒng)數(shù)據(jù)流日志
        一名老黨員的工作日志
        華人時刊(2021年13期)2021-11-27 09:19:02
        扶貧日志
        心聲歌刊(2020年4期)2020-09-07 06:37:14
        汽車維修數(shù)據(jù)流基礎(下)
        利用GSM-R接口數(shù)據(jù)分析系統(tǒng)偏移的方法研究
        焊接設備實時監(jiān)測與數(shù)據(jù)分析系統(tǒng)在核電建造行業(yè)的應用
        基于信息融合的社群金融信息數(shù)據(jù)分析系統(tǒng)的研究與實現(xiàn)
        時代金融(2018年15期)2018-08-28 13:55:02
        一種提高TCP與UDP數(shù)據(jù)流公平性的擁塞控制機制
        智能數(shù)據(jù)分析系統(tǒng)研究及應用
        游學日志
        基于數(shù)據(jù)流聚類的多目標跟蹤算法
        国产自拍视频免费在线| 92精品国产自产在线观看48页| 日韩精品有码中文字幕在线| 国产精品嫩草影院AV| 国产亚洲精久久久久久无码77777| 精品国产AⅤ一区二区三区V免费| 级毛片免费看无码| 成人免费播放片高清在线观看| 日本视频一区二区三区一| 欧洲熟妇色 欧美| 国产a级午夜毛片| 丁香婷婷激情俺也去俺来也| 亚洲精品中文字幕乱码影院| 极品成人影院| 亚洲AV无码国产永久播放蜜芽| 亚洲一区久久久狠婷婷| 情爱偷拍视频一区二区| 乌克兰少妇xxxx做受野外| 久久国产精品视频影院| 在线免费观看毛视频亚洲精品 | 日韩欧美aⅴ综合网站发布| 亚洲精品一区二区三区大桥未久| 久久熟女五十路| 国产精品一区二区三区av在线| 一二区成人影院电影网| 久久精品国产亚洲av电影| 国产青青草视频在线播放| 日本伊人精品一区二区三区| 国产探花在线精品一区二区| 中文字幕在线日韩| 国产女主播大秀在线观看| 日韩夜夜高潮夜夜爽无码| 黄色成人网站免费无码av| 国产一区二区三区杨幂| 精品亚洲一区二区三区四区五| 精品成人av一区二区三区| 91久久福利国产成人精品| 久久精品国产亚洲av网站| av永久天堂一区二区三区| 亚洲国产美女精品久久| 日韩字幕无线乱码免费|