[摘 要] 時(shí)下,大數(shù)據(jù)分析已成熱門(mén)行業(yè),隨著物聯(lián)網(wǎng)等新興技術(shù)的崛起,海量的數(shù)據(jù)正等待著系統(tǒng)進(jìn)行消化,如何從海量數(shù)據(jù)中挖掘出有價(jià)值的信息是眾多企業(yè)所面臨的新的難題和挑戰(zhàn)。在此應(yīng)用背景下,Apache開(kāi)源組織推出了Hadoop框架,用以實(shí)現(xiàn)海量數(shù)據(jù)的分布式存儲(chǔ)和高效率可擴(kuò)充的分布式計(jì)算。
[關(guān)鍵詞] 大數(shù)據(jù)Hadoop;分布式存儲(chǔ);分布式計(jì)算;MapReduce HDFS
[中圖分類號(hào)] G258.6 [文獻(xiàn)標(biāo)識(shí)碼] A
1 Hadoop的產(chǎn)生
大數(shù)據(jù)的概念最早是由麥肯錫這家公司提出的,他們指出:“數(shù)據(jù),已經(jīng)滲透到當(dāng)今每一個(gè)行業(yè)和業(yè)務(wù)職能領(lǐng)域,成為重要的生產(chǎn)因素,人們對(duì)于海量數(shù)據(jù)的挖掘和運(yùn)用,預(yù)示著新一波生產(chǎn)率增長(zhǎng)和消費(fèi)者盈余浪潮的到來(lái)。”
進(jìn)入2012年,大數(shù)據(jù)的概念開(kāi)始被越來(lái)越多的提及,甚至已經(jīng)上了紐約時(shí)報(bào)和華爾街日?qǐng)?bào)的專欄封面。隨著系統(tǒng)信息化的建設(shè),海量數(shù)據(jù)正呈現(xiàn)井噴式增長(zhǎng),如何消化和分析這些數(shù)據(jù)從中提煉出有價(jià)值的信息是企業(yè)所要面臨的新的難題和挑戰(zhàn)。
首先面臨的挑戰(zhàn)來(lái)自于系統(tǒng)硬件,雖然硬盤(pán)的存儲(chǔ)容量不斷增加,但是磁盤(pán)的尋址效率卻沒(méi)有隨之提高,因此當(dāng)數(shù)據(jù)處于一個(gè)較大規(guī)模的時(shí)候,數(shù)據(jù)的定位和讀取會(huì)變得非常緩慢。這是硬件本身的局限性,很難從軟件方面得到突破點(diǎn),然而同樣從硬件的角度考慮,雖然磁盤(pán)的尋址效率發(fā)展緩慢,但是傳輸效率卻相對(duì)迅速一些,因此基于大數(shù)據(jù)的處理,我們可以嘗試使用另外一種訪問(wèn)方式——基于流式讀取[1]。
兩種方式有什么區(qū)別,或許有些人還不太清楚,基于磁盤(pán)尋址最典型的應(yīng)用便是關(guān)系數(shù)據(jù)庫(kù),首先定位到數(shù)據(jù)的存儲(chǔ)地址,然后從這個(gè)地址開(kāi)始做局部的數(shù)據(jù)處理;而基于流式訪問(wèn)首先獲取數(shù)據(jù)的輸入流,通過(guò)該流來(lái)讀取所有數(shù)據(jù),做全局的數(shù)據(jù)分析,沒(méi)有數(shù)據(jù)尋址的過(guò)程。
基于流的訪問(wèn)方式雖然可以不用考慮磁盤(pán)的尋址時(shí)間,但是缺陷也十分的明顯,比如會(huì)消耗很多的計(jì)算機(jī)資源(cpu,內(nèi)存等),針對(duì)這些缺陷,我們能想到的最直接的處理方式便是將數(shù)據(jù)源進(jìn)行切分,分散到多臺(tái)機(jī)器上進(jìn)行并行的讀取,這樣不但加快了數(shù)據(jù)的讀取效率,也緩解了單臺(tái)機(jī)器性能的不足。但是,經(jīng)過(guò)這種方式處理之后,又會(huì)帶來(lái)新的難題和挑戰(zhàn):
首先,數(shù)據(jù)分散部署之后,有可能會(huì)帶來(lái)數(shù)據(jù)遺失的風(fēng)險(xiǎn)。
其次,對(duì)數(shù)據(jù)片段的解析需要有一個(gè)聚合的操作,來(lái)匯總最后的結(jié)果。
如何解決這些難題,正是hadoop框架的功能職責(zé)。
Hadoop框架提供了一種簡(jiǎn)單的編程模型,用來(lái)對(duì)大數(shù)據(jù)集進(jìn)行分布式處理;它的處理能力是可擴(kuò)充的,由一臺(tái)機(jī)器擴(kuò)充到成千上萬(wàn)臺(tái)的集群,集群中的每臺(tái)機(jī)器都會(huì)參與存儲(chǔ)和計(jì)算[2]。從功能角度來(lái)看hadoop主要具備兩方面的特性,存儲(chǔ)和計(jì)算。存儲(chǔ)邏輯用到的是HDFS子框架,計(jì)算邏輯用到的是MapReduce子框架,每個(gè)子框架分別解決了上述難點(diǎn)。
2 HDFS子框架
2.1 體系結(jié)構(gòu)
由圖片可以看到HDFS主要包含這樣幾個(gè)功能組件
Namenode,存儲(chǔ)文檔的元數(shù)據(jù)信息,還有整個(gè)文件系統(tǒng)的目錄結(jié)構(gòu)。
DataNode,存儲(chǔ)文檔塊信息,并且文檔塊之間是有冗余備份的。
這里面提到了文檔塊的概念,同本地文件系統(tǒng)一樣,HDFS也是按塊存儲(chǔ)的,只不過(guò)塊的大小設(shè)置的相對(duì)大一些,默認(rèn)為64M。如果一個(gè)文件不足64M,那么它只存儲(chǔ)在一個(gè)塊中,而且并不會(huì)占用64M的磁盤(pán)空間,這一點(diǎn)需要注意,HDFS不適用于小文件存儲(chǔ)的原因并不是因?yàn)樾∥募拇疟P(pán)空間,而是因?yàn)樾∥募加昧颂嗟膲K信息,每個(gè)文檔塊的元數(shù)據(jù)是會(huì)存儲(chǔ)在namenode的內(nèi)存里的,因此當(dāng)文檔塊較多的時(shí)候會(huì)十分消耗namenode的內(nèi)存。
從功能結(jié)構(gòu)來(lái)看,namenode提供了數(shù)據(jù)定位的功能,datanode提供數(shù)據(jù)傳輸,也就是客戶端在訪問(wèn)文件系統(tǒng)的時(shí)候是直接從datanode里面讀取數(shù)據(jù)的,而不是namenode。
2.2 IO操作
2.2.1 hdfs讀取文件流程
首先,連接到分布式文件系統(tǒng),從namenode里獲取要訪問(wèn)的文件由哪些塊組成,每一個(gè)塊的存儲(chǔ)地址是多少。
然后,定位到指定的datanode去讀取文件。
注意:每個(gè)塊的存儲(chǔ)地址是在hadoop啟動(dòng)之后才加載到namenode的內(nèi)存里的,而不是持久化存儲(chǔ)到namenode本地。namenode和datanode具備心跳通信的功能,它會(huì)定時(shí)從datanode那里收到一些反饋,包括block的存儲(chǔ)地址信息等等。
2.2.2 hdfs寫(xiě)文件流程
首先,同樣是連接到分布式文件系統(tǒng),向namenode發(fā)送創(chuàng)建文件的命令。
namenode保存文檔的元數(shù)據(jù)信息之后會(huì)調(diào)度具體的datanode來(lái)執(zhí)行數(shù)據(jù)流的寫(xiě)入操作,寫(xiě)入成功后,需要執(zhí)行冗余備份,將Block復(fù)制多份,每一分存儲(chǔ)到不同的機(jī)器節(jié)點(diǎn)中,防止單點(diǎn)故障的出現(xiàn)。
使用HDFS來(lái)存儲(chǔ)數(shù)據(jù),每個(gè)block至少要備份一份,默認(rèn)是3份,如果沒(méi)有指定備份,或者備份的過(guò)程中出現(xiàn)了異常,則文件的寫(xiě)入操作不會(huì)成功。
2.3 hdfs不適用的場(chǎng)景
2.3.1 低延遲的數(shù)據(jù)訪問(wèn)
HDFS主要針對(duì)大文件來(lái)設(shè)計(jì)的,多用于線下的數(shù)據(jù)分析,對(duì)于線上應(yīng)用并且及時(shí)性要求較高的系統(tǒng),可嘗試使用Hbase。
23.2 大量小文件
消耗namenode內(nèi)存,可以使用SequenceFile或MapFile來(lái)作為小文件的容器
2.3.3 多線程寫(xiě)入,隨機(jī)寫(xiě)入
HDFS系統(tǒng)中,每個(gè)文件只能并發(fā)開(kāi)啟一個(gè)Writer,并且寫(xiě)入操作只能在文件的末尾進(jìn)行。
3 MapReduce子框架
MapReduce的大體流程是這樣的,如圖所示:
由圖片可以看到mapreduce執(zhí)行下來(lái)主要包含這樣幾個(gè)步驟:
(1)首先對(duì)輸入數(shù)據(jù)源進(jìn)行切片。
(2)master調(diào)度worker執(zhí)行map任務(wù)。
(3)worker讀取輸入源片段。
(4)worker執(zhí)行map任務(wù),將任務(wù)輸出保存在本地。
(5)master調(diào)度worker執(zhí)行reduce任務(wù),reduce worker讀取map任務(wù)的輸出文件。
(6)執(zhí)行reduce任務(wù),將任務(wù)輸出保存到HDFS。
若對(duì)流程細(xì)節(jié)進(jìn)行深究,可以得到這樣一張流程圖:
角色描述:
JobClient,執(zhí)行任務(wù)的客戶端。
JobTracker,任務(wù)調(diào)度器。
TaskTracker,任務(wù)跟蹤器。
Task,具體的任務(wù)(Map OR Reduce)。
從生命周期的角度來(lái)看,mapreduce流程大概經(jīng)歷這樣幾個(gè)階段:初始化、分配、執(zhí)行、反饋、成功與失敗的后續(xù)處理。
每個(gè)階段所做的事情大致如下
3.1 任務(wù)初始化
3.1.1 JobClient對(duì)數(shù)據(jù)源進(jìn)行切片
切片信息由InputSplit對(duì)象封裝,接口定義如下:
public interface InputSplit extends Writable {
long getLength() throws IOException; String[] getLocations() throws IOException;
}
可以看到split并不包含具體的數(shù)據(jù)信息,而只是包含數(shù)據(jù)的引用,map任務(wù)會(huì)根據(jù)引用地址去加載數(shù)據(jù)。
InputSplit是由InputFormat來(lái)負(fù)責(zé)創(chuàng)建。
public interface InputFormat
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader
JobClient通過(guò)getSplits方法來(lái)計(jì)算切片信息,切片默認(rèn)大小和HDFS的塊大小相同(64M),這樣有利于map任務(wù)的本地化執(zhí)行,無(wú)需通過(guò)網(wǎng)絡(luò)傳遞數(shù)據(jù)
切片成功后,JobClient會(huì)將切片信息傳送至JobTracker
3.1.2 通過(guò)jobTracker生成jobId。
JobTracker.getNewJobId()
3.1.3 檢查輸出目錄和輸入數(shù)據(jù)源是否存在。
輸出目錄已存在,系統(tǒng)拋出異常。
輸入源目錄不存在,系統(tǒng)拋出異常。
3.1.4 拷貝任務(wù)資源到j(luò)obTracker機(jī)器上(封裝任務(wù)的jar包、集群配置文件、輸入源切片信息)。
3.2 任務(wù)分配
JobTracker遍歷每一個(gè)InputSplit,根據(jù)其記錄的引用地址選擇距離最近的TaskTracker去執(zhí)行,理想情況下切片信息就在TaskTracker的本地,這樣節(jié)省了網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)臅r(shí)間。
JobTracker和TaskTracker之間是有心跳通信的邏輯的,通過(guò)彼此間不停的通信,JobTracker可以判斷出哪些TaskTracker正在執(zhí)行任務(wù),哪些TaskTracker處于空閑狀態(tài),以此來(lái)合理分配任務(wù)。
3.3 任務(wù)執(zhí)行
TaskTracker接到任務(wù)后開(kāi)始執(zhí)行如下操作:
3.3.1 將任務(wù)jar包從HDFS拷貝到本地并進(jìn)行解壓
3.3.2 創(chuàng)建一個(gè)新的JVM來(lái)執(zhí)行具體的任務(wù),這樣做的好處是即使所執(zhí)行的任務(wù)出現(xiàn)了異常,也不會(huì)影響TaskTracker的運(yùn)行使用。
如果所執(zhí)行的任務(wù)是map任務(wù),則處理流程大致如下:
首先加載InputSplit記錄的數(shù)據(jù)源切片,通過(guò)InputFormat的getRecordReader()方法,獲取到Reader后,執(zhí)行如下操作:
Kkey=reader.createKey();
V value=reader.createValue();
while(reader.next(key,value)){//遍歷split中的每一條記錄,執(zhí)行map功能函數(shù)。
mapper.map(key,value,output,reporter);
}
3.4 執(zhí)行反饋
mapreduce的執(zhí)行是一個(gè)漫長(zhǎng)的過(guò)程,執(zhí)行期間會(huì)將任務(wù)的進(jìn)度反饋給用戶。
任務(wù)結(jié)束后,控制臺(tái)會(huì)打印Counter信息,方便用戶以全局的視角來(lái)審查任務(wù)。
若執(zhí)行成功:
清理MapReduce本地存儲(chǔ)(mapred.local.dir屬性指定的目錄)。
清理map任務(wù)的輸出文件。
若執(zhí)行失?。?/p>
(1)如果task出現(xiàn)問(wèn)題(map或者reduce)
錯(cuò)誤可能原因:用戶代碼出現(xiàn)異常;任務(wù)超過(guò)mapred.task.timeout指定的時(shí)間依然沒(méi)有返回
錯(cuò)誤處理:
首先將錯(cuò)誤信息寫(xiě)入日志
然后jobtracker會(huì)調(diào)度其他tasktracker來(lái)重新執(zhí)行次任務(wù),如果失敗次數(shù)超過(guò)4次(通過(guò)mapred.map.max.attempts和mapred.reduce.max.attempts屬性來(lái)設(shè)置,默認(rèn)為4),則job以失敗告終。
如果系統(tǒng)不想以這種方式結(jié)束退出,而是想通過(guò)Task成功數(shù)的百分比來(lái)決定job是否通過(guò),則可以指定如下兩個(gè)屬性:
mapred.max.map.failures.percentmap任務(wù)最大失敗率
mapred.max.reduce.failures.percent reduce任務(wù)最大失敗率
如果失敗比率超過(guò)指定的值,則job以失敗告終。
(2)如果是tasktracker出現(xiàn)問(wèn)題
判斷問(wèn)題的依據(jù):和jobtracker不再心跳通信jobtracker將該tasktracker從資源池中移除,以后不在調(diào)度它。
(3)jobtracker出現(xiàn)問(wèn)題
jobtracker作為系統(tǒng)的單點(diǎn)如果出現(xiàn)問(wèn)題也是最為嚴(yán)重的問(wèn)題,系統(tǒng)將處于癱瘓。
參考文獻(xiàn):
[1]TomWhite.Hadoop權(quán)威指南(第二版)[M].著清華大學(xué)出版社2011,7.
[2]chuckLam.Hadoop實(shí)戰(zhàn)[M].人民郵電出版社,2011,10.
作者簡(jiǎn)介:?jiǎn)问咳A(1971.4-),女,本科,講師,研究方向:計(jì)算機(jī)應(yīng)用。
曹社香(1971.2-),女,本科,講師,研究方向:計(jì)算機(jī)應(yīng)用。