張前進(jìn)
(安徽國(guó)防科技職業(yè)學(xué)院信息工程系,安徽六安237011)
基于Storm的物聯(lián)網(wǎng)海量實(shí)時(shí)數(shù)據(jù)流處理研究
張前進(jìn)
(安徽國(guó)防科技職業(yè)學(xué)院信息工程系,安徽六安237011)
針對(duì)物聯(lián)網(wǎng)中數(shù)據(jù)實(shí)時(shí)與異構(gòu)的特點(diǎn),設(shè)計(jì)了基于云計(jì)算的海量實(shí)時(shí)數(shù)據(jù)感知與處理模型。結(jié)合海量實(shí)時(shí)數(shù)據(jù)并行處理機(jī)制與數(shù)據(jù)處理流程,利用開(kāi)源實(shí)時(shí)處理系統(tǒng)Storm,實(shí)現(xiàn)了海量實(shí)時(shí)數(shù)據(jù)流處理過(guò)程,并給出了核心實(shí)現(xiàn)方法。
物聯(lián)網(wǎng);實(shí)時(shí)數(shù)據(jù);海量數(shù)據(jù)
物聯(lián)網(wǎng)的概念是國(guó)際電信聯(lián)盟(International Telecommunication Union,ITU)在2005年的《ITU Internet reports 2005-the Internet of things》中提出的。ITU認(rèn)為“通過(guò)對(duì)事物內(nèi)嵌RFID微型芯片或者傳感器芯片,通過(guò)互聯(lián)網(wǎng)就能實(shí)現(xiàn)物與物、人與物、人與人之間的信息交互,從而形成一個(gè)無(wú)所不在的物聯(lián)網(wǎng)”[1]。隨著物聯(lián)網(wǎng)技術(shù)的快速發(fā)展,以及智慧城市、智能交通、智慧農(nóng)業(yè)等項(xiàng)目的建設(shè),傳感器的數(shù)量呈指數(shù)級(jí)增長(zhǎng),IDC報(bào)告中指出,到2020年全球傳感器數(shù)量將突破2 120億個(gè)[2]。如此龐大數(shù)量的傳感器每時(shí)每刻都在產(chǎn)生實(shí)時(shí)數(shù)據(jù),其數(shù)據(jù)規(guī)模是海量的。在海量的物聯(lián)網(wǎng)數(shù)據(jù)中,一方面,數(shù)據(jù)本身是異構(gòu)的,既有存儲(chǔ)在關(guān)系數(shù)據(jù)庫(kù)中的結(jié)構(gòu)化數(shù)據(jù),又有web形式的半結(jié)構(gòu)化數(shù)據(jù),還有音視頻等文檔形式的非機(jī)構(gòu)化數(shù)據(jù);另一方面,很多應(yīng)用場(chǎng)景的實(shí)時(shí)數(shù)據(jù)具有時(shí)效性,如智慧農(nóng)業(yè)中的天氣、環(huán)境等。針對(duì)物聯(lián)網(wǎng)實(shí)時(shí)數(shù)據(jù)規(guī)模與特點(diǎn),本文利用分布式實(shí)時(shí)處理Storm開(kāi)源架構(gòu),設(shè)計(jì)了基于物聯(lián)網(wǎng)的海量實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)。
物聯(lián)網(wǎng)中除了數(shù)據(jù)是異構(gòu)的,感知設(shè)備本身的數(shù)據(jù)采集協(xié)議、傳輸協(xié)議也是異構(gòu)的。為了屏蔽設(shè)備的異構(gòu),在物聯(lián)網(wǎng)數(shù)據(jù)實(shí)時(shí)感知與處理模型中引入云服務(wù)平臺(tái),如圖1所示。
圖1 物聯(lián)網(wǎng)實(shí)時(shí)數(shù)據(jù)感知與處理模型
感知層:感知層由傳感器、二維碼標(biāo)簽、RFID標(biāo)簽、M2M終端等感知設(shè)備組成,是物聯(lián)網(wǎng)體系中的重要組成部分[3],負(fù)責(zé)利用傳感設(shè)備節(jié)點(diǎn)采集與感知實(shí)時(shí)的環(huán)境數(shù)據(jù)。由于感知層是物聯(lián)網(wǎng)的基礎(chǔ),其可靠性決定了物聯(lián)網(wǎng)系統(tǒng)的穩(wěn)定性,因此,感知層的主要任務(wù)是構(gòu)建一個(gè)低成本、高可靠性的感知網(wǎng)絡(luò)。
傳輸層:傳輸層由接入網(wǎng)和物聯(lián)網(wǎng)關(guān)兩部分組成。接入網(wǎng)負(fù)責(zé)將采集到的傳感數(shù)據(jù)通過(guò)網(wǎng)絡(luò)進(jìn)行傳輸,由無(wú)線傳感網(wǎng)、局域網(wǎng)、衛(wèi)星網(wǎng)等組成。因此,接入網(wǎng)是一種異構(gòu)網(wǎng)絡(luò),不同的網(wǎng)絡(luò),其網(wǎng)絡(luò)協(xié)議、傳輸協(xié)議均不相同,要達(dá)到不同網(wǎng)絡(luò)間信息的互聯(lián)互通,需要將異構(gòu)網(wǎng)絡(luò)進(jìn)行融合。物聯(lián)網(wǎng)關(guān)能夠?qū)崿F(xiàn)不同感知網(wǎng)絡(luò)協(xié)議與接入網(wǎng)絡(luò)協(xié)議間的轉(zhuǎn)換以及將數(shù)據(jù)按照統(tǒng)一格式進(jìn)行封裝,從而解決異構(gòu)網(wǎng)絡(luò)間的互聯(lián)互通問(wèn)題。
處理層:處理層是物聯(lián)網(wǎng)海量實(shí)時(shí)數(shù)據(jù)流處理系統(tǒng)的核心部分。數(shù)據(jù)采集與處理模塊負(fù)責(zé)采集傳輸層上傳的數(shù)據(jù),并按照業(yè)務(wù)規(guī)則對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換處理,將無(wú)意義的離散數(shù)據(jù)與業(yè)務(wù)建立映射關(guān)系,變成有意義的業(yè)務(wù)數(shù)據(jù)。云資源服務(wù)平臺(tái)負(fù)責(zé)為應(yīng)用層提供與平臺(tái)、網(wǎng)絡(luò)無(wú)關(guān)的統(tǒng)一數(shù)據(jù)資源服務(wù)。處理層中引入云服務(wù)平臺(tái),為實(shí)時(shí)數(shù)據(jù)處理提供可擴(kuò)展的、具有彈性的云計(jì)算服務(wù)[3],有效提高整個(gè)系統(tǒng)的數(shù)據(jù)處理能力。
應(yīng)用層:應(yīng)用層由智能交通、智慧農(nóng)業(yè)、智慧城市等智能應(yīng)用與服務(wù)組成。應(yīng)用層直接面向用戶,負(fù)責(zé)為用戶提供不受時(shí)間、地點(diǎn)限制的智能服務(wù)。
2.1 海量實(shí)時(shí)數(shù)據(jù)的并行處理機(jī)制
相對(duì)串行計(jì)算而言,并行計(jì)算是指將一個(gè)按照順序執(zhí)行的計(jì)算任務(wù),分解成若干個(gè)可以同時(shí)執(zhí)行的子任務(wù)加以并行執(zhí)行,從而完成整個(gè)計(jì)算任務(wù),并行計(jì)算的主要目的是快速解決大型且復(fù)雜的計(jì)算問(wèn)題[5]。
基于物聯(lián)網(wǎng)感知的數(shù)據(jù)具有流式數(shù)據(jù)的特點(diǎn),主要表現(xiàn)為:數(shù)據(jù)是一組時(shí)間序列下針對(duì)感知對(duì)象相關(guān)狀態(tài)屬性的數(shù)據(jù),數(shù)據(jù)間具有無(wú)相互依賴(lài)的特征[6]。同時(shí),實(shí)時(shí)感知的物聯(lián)網(wǎng)數(shù)據(jù)還具有量大流速快的特點(diǎn)。因此,對(duì)實(shí)時(shí)感知的海量物聯(lián)網(wǎng)數(shù)據(jù)的處理,不僅要求系統(tǒng)具有高可靠性、高穩(wěn)定性,還要求系統(tǒng)能夠快速處理。為了滿足海量數(shù)據(jù)處理的實(shí)時(shí)性要求,采用數(shù)據(jù)并行處理技術(shù)完成數(shù)據(jù)的實(shí)時(shí)處理。
并行處理機(jī)制如圖2所示,首先根據(jù)感知數(shù)據(jù)源對(duì)象屬性特征進(jìn)行并行劃分,然后根據(jù)數(shù)據(jù)分發(fā)機(jī)制將上述數(shù)據(jù)分發(fā)到不同的并行處理節(jié)點(diǎn)上,并行節(jié)點(diǎn)按照預(yù)定義的計(jì)算規(guī)則對(duì)收到的數(shù)據(jù)進(jìn)行運(yùn)算處理,最后將各個(gè)并行處理節(jié)點(diǎn)處理的中間數(shù)據(jù)匯總到合并節(jié)點(diǎn)上,通過(guò)運(yùn)算形成為上層應(yīng)用服務(wù)使用的最終數(shù)據(jù)。
圖2 海量實(shí)時(shí)數(shù)據(jù)并行處理機(jī)制
2.2 海量實(shí)時(shí)數(shù)據(jù)處理流程
海量實(shí)時(shí)數(shù)據(jù)處理流程主要包含數(shù)據(jù)采集、數(shù)據(jù)預(yù)處理、實(shí)時(shí)計(jì)算分析與存儲(chǔ)、UI展示等過(guò)程。具體實(shí)現(xiàn)步驟如下:
1)數(shù)據(jù)采集。通過(guò)傳輸層采集由感知層產(chǎn)生的各類(lèi)實(shí)時(shí)數(shù)據(jù),之后將數(shù)據(jù)發(fā)送到數(shù)據(jù)控制服務(wù)器。
2)數(shù)據(jù)預(yù)處理。數(shù)據(jù)預(yù)處理實(shí)現(xiàn)對(duì)數(shù)據(jù)的分類(lèi)、清洗、格式轉(zhuǎn)換等操作。
3)實(shí)時(shí)計(jì)算分析與存儲(chǔ)。首先將預(yù)處理完成后的數(shù)據(jù)放入待處理消息隊(duì)列,按順序?qū)⑾⒎湃氩⑿杏?jì)算集群完成實(shí)時(shí)計(jì)算分析。然后分兩條線處理,一條線直接進(jìn)入第5步,另一條線進(jìn)入第4步。
4)計(jì)算結(jié)果存儲(chǔ)與深度分析。這樣可以更好地為用戶提供個(gè)性化的服務(wù)。
5)UI展示。
大數(shù)據(jù)處理可以分為批處理和實(shí)時(shí)流處理兩種模式[7]。對(duì)于大數(shù)據(jù)的批處理模式,目前較為流行的是基于MapReduce與HDFS的開(kāi)源框架Hadoop分布式存儲(chǔ)計(jì)算平臺(tái)[8]。Hadoop開(kāi)源框架適合對(duì)歷史數(shù)據(jù)的集中處理,例如大規(guī)模網(wǎng)站訪問(wèn)日志的分析、大型購(gòu)物網(wǎng)站的網(wǎng)頁(yè)索引等,但它無(wú)法滿足物聯(lián)網(wǎng)中大規(guī)模實(shí)時(shí)感知數(shù)據(jù)的處理。大數(shù)據(jù)實(shí)時(shí)處理方面,較流行的有Storm、Spark、Samza等基于Apache的開(kāi)源框架。本文基于Storm開(kāi)源框架設(shè)計(jì)分布式海量實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)。Storm框架最初由BackType開(kāi)發(fā),2011年被Tritter公司收購(gòu),同年由Tritter在GitHub上將其開(kāi)源。
3.1 Storm基本組件
Storm開(kāi)源框架主要分為Nimbus和Supervisor兩種組件,這兩種組件都是快速失敗的,沒(méi)有狀態(tài)。任務(wù)狀態(tài)和心跳信息都保存在Zookeeper上,Zookeeper是Storm重點(diǎn)依賴(lài)的外部資源。
Nimbus是控制節(jié)點(diǎn)的后臺(tái)程序,負(fù)責(zé)為工作節(jié)點(diǎn)分配工作和發(fā)送代碼,并且監(jiān)控工作節(jié)點(diǎn)的工作狀態(tài)[9],全局只有一個(gè)Nimbus。
Supervisor是工作節(jié)點(diǎn)的后臺(tái)程序,每一個(gè)工作節(jié)點(diǎn)上運(yùn)行一個(gè),負(fù)責(zé)接受控制節(jié)點(diǎn)Nimbus分配的任務(wù),會(huì)監(jiān)聽(tīng)分配任務(wù)給它的那個(gè)控制節(jié)點(diǎn),根據(jù)需要關(guān)閉或者啟動(dòng)工作進(jìn)程worker。
一個(gè)工作節(jié)點(diǎn)會(huì)運(yùn)行一個(gè)或多個(gè)工作進(jìn)程,每一個(gè)工作節(jié)點(diǎn)都會(huì)執(zhí)行一個(gè)Topology任務(wù)子集[10]。Topology是Storm框架中運(yùn)行的一個(gè)封裝計(jì)算任務(wù)邏輯的實(shí)時(shí)應(yīng)用程序,由Spout和Bolt構(gòu)成。工作節(jié)點(diǎn)與控制節(jié)點(diǎn)的通信與協(xié)調(diào)都是通過(guò)Zookeeper來(lái)實(shí)現(xiàn)。
3.2 基于Storm的分布式海量實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)
3.2.1 系統(tǒng)架構(gòu)
基于物聯(lián)網(wǎng)的海量實(shí)時(shí)數(shù)據(jù)流處理系統(tǒng)架構(gòu)如圖3所示,由數(shù)據(jù)源接入模塊、數(shù)據(jù)緩存模塊、Storm集群、Hadoop集群、關(guān)系數(shù)據(jù)庫(kù)、UI展示等部分組成。
圖3 系統(tǒng)架構(gòu)圖
1)數(shù)據(jù)源接入模塊
數(shù)據(jù)源接入模塊負(fù)責(zé)為數(shù)據(jù)處理集群快速接入數(shù)據(jù)源。本文采用的是基于開(kāi)源的Apache Flume日志系統(tǒng)實(shí)現(xiàn)各種實(shí)時(shí)感知數(shù)據(jù)源的快速接入,F(xiàn)lume是Cloudera提供的一個(gè)高可靠、高可用的分布式海量日志采集及聚合和傳輸系統(tǒng),還可以用于歷史數(shù)據(jù)的收集。同時(shí),該模塊還能夠?qū)?shù)據(jù)進(jìn)行簡(jiǎn)單處理。
2)數(shù)據(jù)緩存模塊
對(duì)于實(shí)時(shí)數(shù)據(jù)處理,如果數(shù)據(jù)流量較大,數(shù)據(jù)處理模塊的處理能力可能無(wú)法達(dá)到,甚至引起宕機(jī)。因此,系統(tǒng)引入Kafka系統(tǒng)數(shù)據(jù)緩存模塊,Kafka系統(tǒng)是一個(gè)高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),通過(guò)O(1)的磁盤(pán)數(shù)據(jù)結(jié)構(gòu)保證消息的持久性和穩(wěn)定性,其吞吐量可以支持每秒數(shù)百萬(wàn)的消息。
3)Storm與Hadoop集群
Storm集群負(fù)責(zé)實(shí)時(shí)數(shù)據(jù)流的處理,由一個(gè)主節(jié)點(diǎn)和若干從節(jié)點(diǎn)組成。使用Storm的一個(gè)Spout插件storm-kafka可以持續(xù)不斷地從Kafka系統(tǒng)中讀取數(shù)據(jù),然后通過(guò)Storm集群進(jìn)行實(shí)時(shí)運(yùn)算,同時(shí)將運(yùn)算結(jié)果通過(guò)關(guān)系數(shù)據(jù)庫(kù)實(shí)現(xiàn)持久性保存。Hadoop集群由一個(gè)主管理節(jié)點(diǎn)和若干個(gè)計(jì)算節(jié)點(diǎn)組成,負(fù)責(zé)對(duì)歷史數(shù)據(jù)的批處理,實(shí)現(xiàn)對(duì)物聯(lián)網(wǎng)感知數(shù)據(jù)的深度分析與挖掘。
3.2.2 系統(tǒng)實(shí)現(xiàn)
1)數(shù)據(jù)源接入與數(shù)據(jù)緩存組件集成
Flume本身自帶了諸多數(shù)據(jù)源的sink,對(duì)于已存在的sink只需針對(duì)環(huán)境修改配置文件即可。本文通過(guò)自定義Flume中的Kafka sink實(shí)現(xiàn)兩者的集成,自定義Flume的sink需要繼承AbstractSink并實(shí)現(xiàn)Configurable接口,該接口主要包含消息處理process()方法和sink配置configure(Context arg0)方法。
?process()方法實(shí)現(xiàn)示例:
public Status process() throws EventDeliveryException{
……
byte[] body=event.getBody();
final String msg=new String(body);
final KeyedMessage
producer.send(message);
……
}
?configure(Context arg0)方法實(shí)現(xiàn)示例:
public void configure(Context arg0) {
Properties prop=new Properties();
prop.put(″zookeeper.connect″,zookeeperValue);
prop.put(″metadata.broker.list″,brokerValue);
prop.put(″serializer.class″,StringEncoder.class.getName());
producer=new Producer
}
2)ISpout與IBolt接口實(shí)現(xiàn)
Storm中的計(jì)算主要分為兩種類(lèi)型,一個(gè)是數(shù)據(jù)源的處理Spout和中間數(shù)據(jù)的處理Bolt。Spout作為task運(yùn)行在worker內(nèi)容,主要負(fù)責(zé)數(shù)據(jù)的發(fā)送,其核心方法為nextTuple()。
?nextTuple()方法示例:
public void nextTuple() {
……
this.collector.emit(new Values(sendData));//發(fā)送數(shù)據(jù)
}
Bolt負(fù)責(zé)節(jié)點(diǎn)處理,既可以進(jìn)行簡(jiǎn)單的數(shù)據(jù)處理,也可以實(shí)現(xiàn)數(shù)據(jù)流的合并等復(fù)雜計(jì)算。其核心方法為excute(Tuple tuple,BasicOutputCollector collector)。
?excute(Tuple tuple,BasicOutputCollector collector)方法示例:
public void execute(Tuple tuple,BasicOutputCollector collector) {
String sentence=(String) tuple.getValue(0);
//數(shù)據(jù)處理邏輯
……
collector.emit(new Values(out));
}
物聯(lián)網(wǎng)感知數(shù)據(jù)的實(shí)時(shí)處理在現(xiàn)實(shí)中有較大的應(yīng)用價(jià)值,例如智能交通中實(shí)時(shí)交通情況大數(shù)據(jù)分析,更加有利于公眾服務(wù)。本文設(shè)計(jì)的基于Storm的海量數(shù)據(jù)的實(shí)時(shí)處理系統(tǒng)具有高可靠、高效的特點(diǎn),可以很好地應(yīng)對(duì)物聯(lián)網(wǎng)數(shù)據(jù)異構(gòu)的問(wèn)題,并且可以勝任大規(guī)模的實(shí)時(shí)處理任務(wù)。
[1]高哲,翁祖泉.基于物聯(lián)網(wǎng)海量數(shù)據(jù)處理的實(shí)時(shí)數(shù)據(jù)庫(kù)應(yīng)用研究[J].中國(guó)集成電路.2013(11):57-58.
[2]DIGNAN Larry.Internet of things:$8.9 trillion market in 2020,212 billion connected things[EB/OL].(2013-10-03)[2016-10-20].http://www.zdnet.com/article/internet-of-things-8-9-trillion-market-in-2020-212-billion-connected-things/.
[3]馬駿,郭淵博,馬建峰,等.物聯(lián)網(wǎng)感知層基于資源分層的多用戶訪問(wèn)控制方案[J].電子學(xué)報(bào),2014(1):28.
[4]羅劍明.制造物聯(lián)網(wǎng)的實(shí)時(shí)數(shù)據(jù)感知與處理模型的研究[D].廣州:廣東工業(yè)大學(xué),2015:12.
[5]王慧.基于Hadoop的并行挖掘算法的研究[D].北京:首都師范大學(xué),2013:3-4.
[6]趙卓峰,魏文飛,馬強(qiáng).基于無(wú)共享架構(gòu)的海量感知數(shù)據(jù)實(shí)時(shí)處理系統(tǒng)[J].微電子學(xué)與計(jì)算機(jī),2012,29(9):10.
[7]王銘坤,袁少光,朱永利,等.基于Storm的海量數(shù)據(jù)實(shí)時(shí)聚類(lèi)[J].計(jì)算機(jī)應(yīng)用,2014,34(11):3078.
[8]覃雄派,王會(huì)舉,杜小勇,等.大數(shù)據(jù)分析:RDBMS與MapReduce的競(jìng)爭(zhēng)與共生[J].軟件學(xué)報(bào),2012,23(1):35-36.
[9]李川,鄂海紅,宋美娜.基于Storm的實(shí)時(shí)計(jì)算框架的研究與應(yīng)用[J].軟件,2014,35(10):17.
[10]鄧立龍,徐海水.Storm實(shí)現(xiàn)的應(yīng)用模型研究[J].廣東工業(yè)大學(xué)學(xué)報(bào),2014,31(3):115.
責(zé)任編輯:楊子立
Real Time Massive Data Stream Processing in Internet of Things Based on Storm
ZHANG Qianjin
(Department of Information Engineering,Anhui Vocational College Of Defense Technology,Lu′an 237011)
According to the realtime and heterogeneous characteristics of data in the Internet of things,a realtime massive data sensing and processing model was designed on the basis of cloud computing.Based on the open source real time operating system Storm,a combination of real time massive data parallel processing mechanism and data processing flow realized real time massive data stream processing with core implementation method proposed.
Internet of things;real time data;massive data
10.3969/j.issn.1671?0436.2016.06.007
2016-11-21
安徽省高校自然科學(xué)研究重點(diǎn)項(xiàng)目(KJ2016A120)
張前進(jìn)(1982— ),男,碩士,講師。
TP391
A
1671- 0436(2016)06- 0030- 04