王 勇,張 躍
(北京工業(yè)大學(xué)信息學(xué)部,北京 100124)
大數(shù)據(jù)時代帶給社會的不僅僅是數(shù)據(jù)變大、資源增多,更有思維模式改變以及隨之而來的數(shù)據(jù)處理技術(shù)不斷創(chuàng)新、數(shù)據(jù)利用能力飛速發(fā)展。健康大數(shù)據(jù)技術(shù)應(yīng)用和發(fā)展已作為國家重大戰(zhàn)略付諸實施[1]。相關(guān)研究有:Abderrazak 等[2]將hadoop 框架以及開源相關(guān)組件應(yīng)用于倉儲問題,提高醫(yī)療數(shù)據(jù)的倉儲性能,對健康數(shù)據(jù)平臺建設(shè)具有一定的借鑒意義,缺點是不太符合健康監(jiān)測數(shù)據(jù)特點;文獻[3]利用HBase 和Phoenix 構(gòu)建高性能的健康監(jiān)測大數(shù)據(jù)平臺,并對平臺讀寫性能進行優(yōu)化,然而其未對數(shù)據(jù)采集傳輸和發(fā)布共享進行研究;文獻[4-5]分別研究了適合健康監(jiān)測大數(shù)據(jù)的接入?yún)f(xié)議和發(fā)布協(xié)議,為健康監(jiān)測數(shù)據(jù)采集和發(fā)布共享提供了思路,但仍需要在具體實施中進一步驗證。
大規(guī)模健康監(jiān)測數(shù)據(jù)的采集存儲和共享利用仍然存在很多問題,本文詳細研究了Kafka[6]、HBase[7]等大數(shù)據(jù)相關(guān)技術(shù),實現(xiàn)一款面向用戶健康服務(wù)的、可擴展的健康監(jiān)測大數(shù)據(jù)處理平臺,有效解決健康監(jiān)測數(shù)據(jù)生態(tài)系統(tǒng)中大規(guī)模數(shù)據(jù)的采集傳輸、存儲以及發(fā)布共享問題,填補了研究空白。通過研究HBase 組織與存儲模式,設(shè)計出適合存儲健康監(jiān)測大數(shù)據(jù)的HBase 存儲模型。對Kafka 分布式消息中間件的發(fā)布訂閱模式進行研究,實現(xiàn)健康監(jiān)測大數(shù)據(jù)的采集傳輸與發(fā)布共享架構(gòu)。引入Kafka 作為架構(gòu)中樞,不僅能屏蔽數(shù)據(jù)源的異構(gòu)型,保證各個服務(wù)模塊之間高內(nèi)聚、低耦合,還能使數(shù)據(jù)通道變得簡單,減輕下游數(shù)據(jù)庫系統(tǒng)的壓力,提高系統(tǒng)擴展性。
健康監(jiān)測大數(shù)據(jù)理想化狀態(tài)是:由健康監(jiān)測設(shè)備產(chǎn)生的數(shù)據(jù),通過數(shù)據(jù)采集接口傳入數(shù)據(jù)中心進行集中存儲,利用健康監(jiān)測大數(shù)據(jù)平臺提供的數(shù)據(jù)發(fā)布接口獲取平臺數(shù)據(jù),實現(xiàn)數(shù)據(jù)共享,經(jīng)過分析與處理后的數(shù)據(jù)也可通過數(shù)據(jù)發(fā)布接口發(fā)送給用戶。
如圖1 所示,健康監(jiān)測大數(shù)據(jù)平臺系統(tǒng)架構(gòu)包括應(yīng)用平臺和支撐平臺,應(yīng)用平臺主要實現(xiàn)數(shù)據(jù)應(yīng)用,如數(shù)據(jù)分析和挖掘,數(shù)據(jù)發(fā)布是將最終的分析結(jié)果以及相關(guān)數(shù)據(jù)共享給用戶;在支撐平臺中,有分布式數(shù)據(jù)采集傳輸模塊與存儲模塊,分布式數(shù)據(jù)采集傳輸模塊對不同來源的健康監(jiān)測數(shù)據(jù)進行采集和傳輸,存儲模塊主要實現(xiàn)數(shù)據(jù)持久化,負責(zé)將數(shù)據(jù)高效存儲在大數(shù)據(jù)集群上,為數(shù)據(jù)應(yīng)用提供支持。
Fig.1 Health monitoring big data platform architecture model圖1 健康監(jiān)測大數(shù)據(jù)平臺架構(gòu)模型
Apache Kafka 是Hadoop 生態(tài)系統(tǒng)中的一個工具,用于處理事務(wù)日志和其它實時數(shù)據(jù)。Kafka 是一個流媒體平臺,能夠以發(fā)布/訂閱的形式傳遞流數(shù)據(jù)[8]。在發(fā)布—訂閱消息系統(tǒng)中,消息的生產(chǎn)者稱為發(fā)布者,消費者稱為訂閱者,消息被持久化到一個topic 中,消費者可以訂閱一個或多個topic 并消費該topic 中所有的數(shù)據(jù),其體系結(jié)構(gòu)如圖2 所示。
健康監(jiān)測大數(shù)據(jù)平臺需要實時將采集到的健康監(jiān)測數(shù)據(jù)信息存入數(shù)據(jù)中心進行持久化存儲,當(dāng)信息采集平臺將這些變化的數(shù)據(jù)信息寫入或更新到數(shù)據(jù)庫時,數(shù)據(jù)庫產(chǎn)生很大的壓力,對數(shù)據(jù)采集系統(tǒng)性能提出了很高要求。利用kafka 分布式、高吞吐、基于發(fā)布/訂閱的特性,可在廉價的PC Server 上搭建大規(guī)模的消息系統(tǒng)[9]。
Kafka Connect 是一種用于在Kafka 和其它系統(tǒng)之間可擴展、可靠的流式傳輸數(shù)據(jù)工具,使用它能快速將大量數(shù)據(jù)集移入和移出Kafka 連接器。Kafka Connect 可獲取整個數(shù)據(jù)庫,或從所有應(yīng)用程序服務(wù)器收集指標(biāo)數(shù)據(jù)到Kafka 主題,使數(shù)據(jù)用于流處理。導(dǎo)出作業(yè)可將數(shù)據(jù)從Kafka topic 傳輸?shù)蕉未鎯虿樵兿到y(tǒng),或傳遞到批處理系統(tǒng)進行離線分析。Kafka 數(shù)據(jù)采集與傳輸模型如圖3 所示。
Fig.2 Kafka architecture圖2 Kafka 體系結(jié)構(gòu)
Fig.3 Kafka data acquisition and transmission model圖3 Kafka 數(shù)據(jù)采集與傳輸模型
系統(tǒng)使用Hadoop 體系中的HBase 組件對數(shù)據(jù)進行持久化存儲。HBase 是一個使用key/value 鍵值對的基于列存儲的數(shù)據(jù)庫,支持海量數(shù)據(jù)的高效存儲,存儲的數(shù)據(jù)具有稀疏性[10]。
HBase 表的索引稱為RowKey 行關(guān)鍵字,RowKey 必須具備唯一性,一般為標(biāo)志性信息和時間戳組合。Rowkey 長度不宜過長,還應(yīng)盡量保證散列[11]。本文將健康檔案編號或身份證號加入rowkey,健康檔案編號或身份證號具有一定的隨機性,能夠保證rowkey 設(shè)計均勻分布在各個Region中。與此同時還要考慮集群查詢性能,查詢都是基于某個用戶的時間序列,本文設(shè)計rowkey 的id+時間戳timestamp作為rowkey,用戶的信息就會連續(xù)存儲在一起,查詢效率自然提高。
Hbase 的列族也是越少越好,因為Hbase 的列族在內(nèi)存結(jié)構(gòu)中是一個cf 對應(yīng)一個store 區(qū)域,數(shù)據(jù)量大的storefile 自然會多,在查詢多列族數(shù)據(jù)時需要跨文件訪問數(shù)據(jù)內(nèi)容,合并任務(wù)自然增多,會降低性能。
基于以上原則,根據(jù)中華人民共和國衛(wèi)生部批準(zhǔn)的《城鄉(xiāng)居民健康檔案基本數(shù)據(jù)集》[12]建立Hbase 健康監(jiān)測數(shù)據(jù)模型,如表1 所示。
健康監(jiān)測數(shù)據(jù)包括用戶的基礎(chǔ)數(shù)據(jù)、生理數(shù)據(jù)、運動數(shù)據(jù)、睡眠數(shù)據(jù)、環(huán)境數(shù)據(jù)等[13]。HBase 存儲模型將這些數(shù)據(jù)分成基礎(chǔ)數(shù)據(jù)(baseInfo)和健康數(shù)據(jù)(healthData)兩個列族進行存儲?;A(chǔ)數(shù)據(jù)包括身份證號、姓名、性別、年齡、出生日期,健康數(shù)據(jù)包括身高、體重、體溫、血糖、血氧、血壓、心率、計步、睡眠質(zhì)量等數(shù)據(jù)。
Table 1 HBase storage model of health monitoring data表1 健康監(jiān)測數(shù)據(jù)HBase 存儲模型
為有效實現(xiàn)健康監(jiān)測數(shù)據(jù)利用與共享,健康監(jiān)測大數(shù)據(jù)平臺可以提供兩種數(shù)據(jù)發(fā)布與共享服務(wù):①健康監(jiān)測數(shù)據(jù)查詢服務(wù);②健康狀態(tài)監(jiān)測服務(wù)[5]。
1.3.1 健康監(jiān)測數(shù)據(jù)查詢服務(wù)
健康監(jiān)測大數(shù)據(jù)平臺提供健康監(jiān)測數(shù)據(jù)查詢服務(wù),其它基于本平臺的應(yīng)用通過客戶端主動向健康監(jiān)測大數(shù)據(jù)平臺服務(wù)器發(fā)送查詢請求消息。健康監(jiān)測大數(shù)據(jù)平臺使用Kafka 作為健康監(jiān)測數(shù)據(jù)采集與發(fā)布的媒介,實現(xiàn)健康監(jiān)測數(shù)據(jù)查詢接口,其交互模型如圖4 所示。
Fig.4 Interaction model of health monitoring data query service圖4 健康監(jiān)測數(shù)據(jù)查詢服務(wù)交互模型
在Kafka 中,創(chuàng)建專門用于發(fā)送和接收查詢消息的主題Topic1,第三方數(shù)據(jù)應(yīng)用平臺通過<table,query-filter,topic>組成的元組向Topic1 發(fā)送查詢消息,其中table 為想要查詢的HBase 表,query-filter 為查詢過濾器,topic 為查詢結(jié)果返回的目標(biāo)主題。當(dāng)與Topic1 相對應(yīng)的消息到達時,查詢處理器處理這些消息,然后到指定的table 按照query-filter 過濾出想要的數(shù)據(jù),將數(shù)據(jù)封裝成消息返回到指定的topic,第三方數(shù)據(jù)應(yīng)用平臺獲取這些消息,得到想要的查詢結(jié)果。
HBase 查詢實現(xiàn)方式:①按指定RowKey 獲取唯一一條記錄的get 方法;②按指定條件獲取一批記錄的scan 方法。對于個人基本信息數(shù)據(jù)等全量數(shù)據(jù)表,使用get 方法,而對于基于時間序列采樣的健康監(jiān)測數(shù)據(jù)則采用scan 方法查詢較為方便。
一般基于時間序列采樣的健康監(jiān)測數(shù)據(jù),本文的Rowkey 設(shè)計為身份證號或健康檔案編號+時間戳形式,這樣可將查詢接口中的Key 和startTime、endTime 值拼接起來形成Rowkey 的startRow 和stopRow,便于在HBase 表中查詢相應(yīng)結(jié)果。而對于全量的數(shù)據(jù)信息表,如個人信息數(shù)據(jù)表,Rowkey 直接設(shè)計為身份證號,這樣查詢條件中的time可以為空,Key 可直接作為RowKey 進行查詢,查詢接口設(shè)計如表2 所示。
Table 2 Health monitoring data query interface表2 健康監(jiān)測數(shù)據(jù)查詢接口
1.3.2 健康狀態(tài)監(jiān)測服務(wù)
健康監(jiān)測大數(shù)據(jù)平臺還主動提供健康狀態(tài)監(jiān)測服務(wù),健康監(jiān)測大數(shù)據(jù)平臺可整合平臺采集存儲的數(shù)據(jù),將血壓、體溫、血糖等健康狀況異常情況及時發(fā)送到健康監(jiān)測類設(shè)備,以供用戶了解異常狀況,供決策時參考。使用Kafka 作為健康狀態(tài)監(jiān)測服務(wù)發(fā)布媒介,其交互模型如圖5 所示。
Fig.5 Interaction model of health monitoring service圖5 健康狀態(tài)監(jiān)測服務(wù)交互模型
當(dāng)健康狀況監(jiān)測模塊發(fā)現(xiàn)健康狀態(tài)異常時,健康狀況監(jiān)測模塊生成一個告警命令報文,并將監(jiān)測結(jié)果封裝成告警消息發(fā)送到Kafka 對應(yīng)的Topic。用戶事先訂閱該Topic,當(dāng)告警消息到達時可以實時獲取該消息。
Kafka Connect 是一種傳輸數(shù)據(jù)工具,主要用于Kafka分布式消息系統(tǒng)與其它系統(tǒng)進行數(shù)據(jù)傳輸,分為Source-Connector 與SinkConnector。其中SourceConnector 用于將整個數(shù)據(jù)庫或從應(yīng)用程序服務(wù)器收集的指標(biāo)導(dǎo)入到Kafka主題,而SinkConnector 與之相反,是從Kafka 主題導(dǎo)出數(shù)據(jù)到其它系統(tǒng)[14]。
開發(fā)Connector 主要是實現(xiàn)兩個接口Connector 和Task,若是開發(fā) Source,只要實現(xiàn) SourceConnector 和SourceTask 兩個接口。比如把文件的數(shù)據(jù)讀取到kafka 中,SourceTask 會讀取文件的每一行并把它們封裝為List<SourceRecord>發(fā)送出去。實現(xiàn)SourceConnector 開發(fā)的時序如圖6 所示。
Fig.6 Timing diagram of SourceConnector development圖6 SourceConnector 開發(fā)時序圖
Sink Connector 就是把Kafka 中的數(shù)據(jù)導(dǎo)入到第三方系統(tǒng)中,比如讀取到HDFS、hbase 等,本文設(shè)計并實現(xiàn)的SinkConnector 主要是HBase。SinkConnector 的開發(fā)與SourceConnector 類似,不同點在于SourceTask 使用poll 接口,而SinkTask 使用put 接口。SinkTask 的put()方法接收集合Collection<SinkRecord>存儲到HBase 中。
HBase 中的數(shù)據(jù)表通過劃分成一個個Region 實現(xiàn)數(shù)據(jù)分片,每一個Region 關(guān)聯(lián)一個RowKey 的范圍區(qū)間,數(shù)據(jù)按RowKey 的字典順序進行組織。正是基于這種設(shè)計使得HBase 能夠輕松應(yīng)對這類查詢:“指定一個RowKey 范圍區(qū)間,獲取該區(qū)間的所有記錄”。如查詢健康檔案號為116755244009,日期從20171001 到20191001 的健康監(jiān)測數(shù)據(jù)表,healthData 列族中的Blood_pressure 列示例代碼如下:
HBase 非鍵列查詢效率非常低,因為在查詢操作中要掃描整個表。為提高檢索效率,引入二級索引機制[15]。實驗結(jié)果表明,經(jīng)過優(yōu)化后的查詢性能能夠充分滿足數(shù)據(jù)發(fā)布服務(wù)需要。二級索引原理如圖7 所示。
圖7 中,二級索引的本質(zhì)就是建立各列值與行鍵之間的映射關(guān)系[16]。要對F:C1 列建立索引時,只需建立F:C1各列值到其對應(yīng)的RowKey 映射關(guān)系。查詢符合F:C1=C11,對應(yīng)的F:C2 列值步驟如下:①根據(jù)C1=C11 得到索引數(shù)據(jù)查找對應(yīng)的RK1;②得到RK1 后再根據(jù)RK1 在主表中查詢C2 的值。
Fig.7 Design idea of HBase secondary index圖7 HBase 二級索引設(shè)計思路
二級索引表建立和探測數(shù)據(jù)主表過程如表3 所示。
Table 3 Health monitoring data表3 健康監(jiān)測數(shù)據(jù)
從表3 數(shù)據(jù)查詢Id_number 列,構(gòu)建的二級索引表如表4 所示。
Table 4 Secondary index表4 二級索引
客戶端發(fā)出請求,首先查詢二級索引表,從表4 獲取相應(yīng)的Rowkey,然后根據(jù)主表中的Rowkey 查詢相應(yīng)的數(shù)據(jù)記錄,詳細流程如圖8 所示。
3.1.1 硬件環(huán)境
本文利用兩臺服務(wù)器劃分為4 個虛擬機節(jié)點搭建系統(tǒng)運行環(huán)境。每個虛擬節(jié)點配置為:CPU:2.40GHz;內(nèi)存:4.0G;硬盤:200GB。具體分布如表5 所示。
Fig.8 Query flow using secondary index table圖8 使用二級索引表查詢流程
Table 5 Distribution of cluster system operating environment表5 集群系統(tǒng)運行環(huán)境分布
3.1.2 軟件環(huán)境
系統(tǒng)軟件環(huán)境及版本如表6 所示。
Table 6 System software environment and version表6 系統(tǒng)軟件環(huán)境及版本
基于Kafka 和HBase 的健康監(jiān)測大數(shù)據(jù)平臺系統(tǒng)性能主要考慮健康監(jiān)測數(shù)據(jù)的采集傳輸能力和健康監(jiān)測數(shù)據(jù)的查詢能力,系統(tǒng)性能測試與優(yōu)化重點是Apache Kafka 分布式消息隊列的吞吐量與HBase 數(shù)據(jù)庫查詢效能。
3.2.1 Kafka 分布式消息隊列性能測試
將存儲在文件中的數(shù)據(jù)作為數(shù)據(jù)源,HBase 作為數(shù)據(jù)持久化存儲獲取數(shù)據(jù)。利用Kafka 提供的性能測試工具kafka-producer-perf-test.sh 和kafka-consumer-perf-test.sh腳本對Kafka 的生產(chǎn)者和消費者吞吐速率進行測試。為充分挖掘Kafka 系統(tǒng)性能,結(jié)合本平臺測試環(huán)境設(shè)置相關(guān)參數(shù)如表7 所示。
一般而言,增大批次有利于增加吞吐量(減少了網(wǎng)絡(luò)IO 次數(shù)),但過于增大批次帶來的好處無法抵消壓縮時間的增長,吞吐率就會降低。分區(qū)數(shù)決定了Kafka 的并行度,分區(qū)數(shù)一般是broker 的整數(shù)倍。
Table 7 Kafka related parameter settings表7 Kafka 相關(guān)參數(shù)設(shè)置
單線程吞吐量顯然是有限的,并沒有完全利用Kafka集群的高吞吐量,因此采用多線程進行并發(fā)讀寫對此進行優(yōu)化。對線程數(shù)與吞吐率的關(guān)系進行測試,結(jié)果如圖9 所示。
Fig.9 Relationship between thread number and throughput rate圖9 線程數(shù)與吞吐率關(guān)系
優(yōu)化以后,使用10 個線程寫,系統(tǒng)隨著線程數(shù)的增加吞吐率顯著提升到27MB/s 左右,消息數(shù)達17 萬條/s 以上,可見使用批處理或多線程對提升吞吐率效果明顯。
3.2.2 HBase 數(shù)據(jù)庫性能測試
采用HBase 統(tǒng)一的JavaAPI 接口對HBase 數(shù)據(jù)查詢性能進行測試,圖10 為采用二級索引前后的查詢響應(yīng)時間對比結(jié)果。查詢條件為非RowKey,查詢數(shù)據(jù)量從2~12萬條記錄不等。實驗結(jié)果顯示,二級索引的建立能夠使非索引數(shù)據(jù)的查詢響應(yīng)時間縮短近3 倍。
Fig.10 Comparison of query response time before and after optimization圖10 優(yōu)化前后查詢響應(yīng)時間對比
本文基于Kafka 分布式消息系統(tǒng),結(jié)合HBase 分布式存儲數(shù)據(jù)庫,以解決健康監(jiān)測數(shù)據(jù)生態(tài)系統(tǒng)中“信息孤島”問題為出發(fā)點,通過開發(fā)Kafka Connector 初步形成一個高可靠的健康監(jiān)測大數(shù)據(jù)平臺。首先研究了Kafka 和HBase在健康監(jiān)測數(shù)據(jù)平臺建設(shè)中的應(yīng)用,設(shè)計了健康監(jiān)測數(shù)據(jù)的采集傳輸、共享架構(gòu)以及存儲模型。然后調(diào)整集群設(shè)置和參數(shù)配置,對查詢效率進行優(yōu)化,以達到平臺最佳性能。實驗結(jié)果表明,總的吞吐量取決于代理節(jié)點的數(shù)量、數(shù)據(jù)的主題分區(qū)數(shù)量以及生產(chǎn)消費消息的節(jié)點數(shù)量。通常情況下增加分區(qū)可以提高Kafka 集群的吞吐量,然而分區(qū)過多會增加無效及延遲風(fēng)險,采用批處理或者多線程都有利于增加吞吐量,但是線程數(shù)一般應(yīng)不大于分區(qū)數(shù)。建立二級索引是應(yīng)對HBase 非RowKey 查詢的有效方式。本文針對健康監(jiān)測數(shù)據(jù)存儲特點建立二級索引,能有效提升查詢響應(yīng)速度。
本文研究了大數(shù)據(jù)關(guān)鍵技術(shù)在健康監(jiān)測數(shù)據(jù)平臺中的應(yīng)用。要實現(xiàn)生產(chǎn)環(huán)境大規(guī)模集群的有效配置,需要考慮核心節(jié)點數(shù)量。隨著數(shù)據(jù)量和組件數(shù)量的增加,節(jié)點之間的網(wǎng)絡(luò)帶寬或?qū)⒊蔀槠款i。由于健康監(jiān)測數(shù)據(jù)本身的復(fù)雜性以及HBase 的局限性,要提升復(fù)雜查詢效率還需進一步研究。