王建榮,華連生,唐懷甌,王 云,王 靜
(安徽省氣象信息中心,安徽 合肥 230031)
數(shù)值預報產品是14大類氣象資料之一,是天氣預報、分析和氣候預測的重要資料來源,在科研和業(yè)務中發(fā)揮了重要作用。
中國氣象局CIMISS[1-2](全國綜合氣象信息共享平臺)數(shù)據(jù)庫中存儲了多種數(shù)值預報產品信息,包含起報時間、預報時效、層次、預報要素代碼、區(qū)域代碼、單要素GRIB文件路徑等字段,而具體的GRIB文件存儲在GPFS文件系統(tǒng)中。為確保Oracle數(shù)據(jù)庫的穩(wěn)定運行,數(shù)值預報產品記錄保存3~6個月,并定時清除表空間。在科研和業(yè)務工作中,往往需要長時間序列的數(shù)值預報產品數(shù)據(jù),并且要求實時檢索性能,因此考慮利用分布式架構來解決海量氣象數(shù)據(jù)存儲檢索所面臨的問題。
在分布式存儲和計算技術中,Hadoop框架具有高吞吐量、高并發(fā)、高容錯性、高可靠性、低成本等優(yōu)勢。目前基于Hadoop生態(tài)系統(tǒng)的氣象數(shù)據(jù)存儲方案成為國內外的研究熱點。李永生等[3]選用Hadoop與HBase相結合的方式設計數(shù)值預報產品服務平臺;陳東輝等[4]詳細介紹了基于HBase的氣象地面分鐘數(shù)據(jù)分布式存儲系統(tǒng)。文中選取HBase數(shù)據(jù)庫實現(xiàn)氣象數(shù)據(jù)文件的分布式存儲管理;使用Quartz定時采集數(shù)值預報產品文件;利用Kafka消息隊列將文件采集、產品解碼、存儲入庫功能解耦;進行前端GRIB解碼入庫性能優(yōu)化和后端數(shù)據(jù)檢索性能優(yōu)化。實驗測試驗證了數(shù)值預報產品分布式處理與存儲系統(tǒng)設計的可行性,為海量氣象數(shù)據(jù)的處理、存儲和檢索服務提供一種解決方法。
系統(tǒng)功能模塊如圖1所示。
圖1 系統(tǒng)功能模塊
(1)文件采集模塊。
通過Quartz scheduler定時從數(shù)值預報產品目錄復制GRIB文件到解碼程序入口目錄。
(2)產品解碼模塊。
調用GRIB API[5-6]實現(xiàn)GRIB1、GRIB2文件的解碼,并且生成解碼日志文件和要素GRIB文件(GRIB2格式)。
(3)數(shù)據(jù)存儲模塊。
調用HDFS[7-8]API將產品文件、要素GRIB文件和解碼日志文件上傳至HDFS分布式文件系統(tǒng)。另一方面,使用MapReduce并行程序將解碼日志文件存入HBase。
(4)數(shù)據(jù)檢索模塊。
利用Solr實現(xiàn)HBase的輔助索引,提高數(shù)值預報產品數(shù)據(jù)的檢索效率。
系統(tǒng)一次完整的執(zhí)行流程如圖2所示。
執(zhí)行步驟如下:
(1)Quartz周期性調度完成數(shù)值預報產品文件采集和消息入隊;
(2)解碼程序讀消息,并根據(jù)包含的文件名解碼產品;
(3)將產品文件、要素GRIB文件全部上傳至HDFS;
(4)生成解碼日志文件如消息隊列;
(5)入庫程序讀消息,將日志文件入HBase;
(6)HBase協(xié)處理器同步記錄至Solr索引庫。
圖2 總體流程
Quartz是OpenSymphony開源組織在任務調度領域的一個開源項目,基于Java實現(xiàn)。
主要代碼如下:
Scheduler scheduler=StdSchedulerFactory.getDefaultScheduler();
scheduler.start();
JobDetail job=JobBuilder.newJob(GribProcessJob.class).withIdentity("job","group").build();
……
scheduler.scheduleJob(job, trigger);
GribProcessJob類實現(xiàn)Job接口,重載execute函數(shù),完成GRIB文件采集轉儲和發(fā)送消息到產品文件隊列的過程。
Apache Kafka是用Scala語言實現(xiàn)的分布式消息隊列系統(tǒng),使用Zookeeper進行集群的管理。Kafka有以下特性:可擴展性、數(shù)據(jù)分區(qū)、低延遲、持久存儲、處理大量不同消費者的能力。
Kafka由Producer、Broker(消息服務器)和Consumer三部分組成,Producer和Consumer均屬于客戶端。應用程序通過Producer API發(fā)送消息到Broker集群Leader(主節(jié)點),再通過Consumer API從Broker服務器消費消息。Kafka消息的兩個重要概念為Topic(主題)和Partition(分區(qū))。
分布式處理與存儲系統(tǒng)創(chuàng)建了兩個消息隊列:產品文件隊列和日志文件隊列。為產品文件隊列創(chuàng)建名為“gribfilelist”的topic,每個topic包含3個partition;為日志文件隊列創(chuàng)建名為“l(fā)ogfilelist”的topic,每個topic也包含3個partition。key相同的消息都被發(fā)送到同一個分區(qū)(partition),如所有的ecmf文件名被發(fā)送到相同的分區(qū),而jma文件名被發(fā)送到另一個分區(qū)。
客戶端解碼程序完成GRIB文件解碼后將解碼日志文件發(fā)送至日志文件隊列。
客戶端入庫程序循環(huán)請求消息隊列,檢查并獲取最新的消息后按順序完成:數(shù)值預報產品文件、要素GRIB文件和解碼日志文件寫入HDFS;解碼日志MapReduce方式存入HBase數(shù)據(jù)庫。
Kafka Producer的異步發(fā)送模式允許進行批量發(fā)送:客戶端先將消息緩存在內存中,然后一次請求批量發(fā)送出去。
配置策略,比如可以指定緩存的消息達到某個量的時候就發(fā)出去,或者緩存了固定的時間后就發(fā)送出去,可提高消息發(fā)送效率、減少服務端的I/O次數(shù)。
GRIB碼即二進制格點加工數(shù)據(jù),是WMO(世界氣象組織)推薦使用的表格驅動代碼之一,主要用來表示天氣分析和預報的產品資料?,F(xiàn)行的GRIB碼有兩個版本(Edition),即GRIB1和GRIB2。GRIB2對數(shù)據(jù)的描述基于模板和碼表,而模板引用碼表。
GRIB API是由ECMWF(歐洲中期天氣預報中心)設計研發(fā)的,為用戶提供了C/C++、Fortran等語言的編程接口。用戶程序使用key/value(鍵/值)方法存取GRIB數(shù)據(jù)。GRIB文件中所有信息(Message)都通過key來檢索。每個key都有固定的類型,如實型、整數(shù)型、字符串等。
系統(tǒng)采用GRIB API的C/C++接口實現(xiàn)數(shù)值預報產品解碼。以ECMF產品為例,Quartz每5 min執(zhí)行文件采集任務,從ECMF產品目錄復制文件至解碼程序臨時目錄temp下,例如產品文件名為:
W_NAFP_C_ECMF_20160511055659_P_C1D05110000051100011.bin
解碼程序調用GRIB API對其進行解碼后輸出解碼日志文件和要素GRIB文件:
W_NAFP_C_ECMF_20160511055659_P_C1D05110000051100011.bin.log
該文件由多條解碼記錄組成,單條記錄的格式如下:
1|20160511|0|0|98|0|100|850|WIV|ANEA|250|250|NAFP_ECMF_0_FTM-98-ANEA-WIV-250X250-100-850-999998-999998-999998-2016051100-0.GRB
各字段用“|”分割,字段定義與表1相對應,而NAFP_ECMF_0_FTM-98-ANEA-WIV-250X250-100-850-999998-999998-999998-2016051100-0.GRB即是要素GRIB文件。文件名含義:加工中心代碼為歐洲中期天氣預報中心、預報分辨率為0.25o×0.25o、850 hPa等壓面層格點經緯度范圍(60o,-10o,60o,150o)的緯向風資料,其存儲于HDFS分布式文件系統(tǒng)ECMWF相關存儲路徑下。
HBase(Hadoop database)[9-10]運行在HDFS分布式文件系統(tǒng)上,使用Zookeeper管理集群,提供高可靠性、高性能、列存儲、可伸縮、實時讀寫特性,主要用來存儲非結構化和半結構化的松散數(shù)據(jù)。
系統(tǒng)將數(shù)值預報產品通過GRIB API解碼后存儲在HBase中,不同的數(shù)值預報產品分開存儲在不同的實體數(shù)據(jù)表中,目前存儲了3大類數(shù)值預報產品,分別為ECMWF(歐洲中期數(shù)值預報中心)發(fā)布的細網格(0.25o×0.25o水平分辨率)和粗網格(2.5o×2.5o水平分辨率)的數(shù)值預報產品,JMA(日本氣象廳)發(fā)布的0.5o×0.5o水平分辨率和1.25o×1.25o水平分辨率的數(shù)值預報產品,NCEP/FNL再分析資料。數(shù)據(jù)表以行鍵、列族、數(shù)據(jù)的方式存儲數(shù)值產品的實體數(shù)據(jù)。數(shù)據(jù)表存儲內容見表1。
表1 數(shù)據(jù)表存儲內容說明
data:gribpath是解碼所得要素GRIB文件在HDFS中的存儲路徑。
選取表1中data:date、data:validtime和data:centre三列做數(shù)據(jù)模型展示,見表2。
表2 數(shù)據(jù)模型示例表
Rowkey(行鍵):HBase中的Rowkey唯一標識一行記錄。根據(jù)HBase的優(yōu)化原則[7],Rowkey的長度易固定且不超過200 Bytes,設計如下:AAAAATTT:yyyyMMdd:nnnmmmm:IIIIXJJJJ
其中,AAAAA為5字母長度的英文縮寫,不足5位則在其后補“9”,代表數(shù)值預報產品的預報要素名稱;TTT為預報時效;nnn表示高度層類型,mmmm表示層次;IIII表示4位I方向增量,不足4位則前導置“0”;JJJJ表示4位J方向增量,不足4位則前導置0。
以ECMF數(shù)據(jù)表的行鍵為例:
TEMP9006:20160511:1000010:0250X0250
其含義是:對于溫度要素(temp),在2016年5月11日00:00起報,預報時效為未來6 h的預報場,預報層次為10 hPa,I方向增量為0.25o,J方向增量為0.25o。
時間戳(timestamp):每條數(shù)據(jù)更新的歷史記錄,同一行鍵數(shù)據(jù)再次入庫會記錄不同的時間戳。
列族(column family):每種數(shù)值預報產品的表結構基本相同,每張表只設一個列族data,包含的列(column qualifier)有data:date、data:validtime、data:centre、data:gribpath等。HBase存儲的都是Byte數(shù)組。
Apache Solr是一種開源的、基于 Lucene的全文檢索引擎,支持XML、JSON和python等常用的輸出格式。而SolrCloud[11-12]是基于Solr和Zookeeper的分布式搜索方案,使用Zookeeper作為集群的配置信息中心。
HBase在存儲時,默認按照Rowkey進行排序(字典序)并通過Rowkey及其range來檢索數(shù)據(jù),在HBase查詢時,有以下幾種方式:
(1)通過get方式,指定Rowkey獲取唯一一條記錄;
(2)通過scan方式,設置startRow和stopRow參數(shù)進行范圍匹配;
(3)全表掃描,即直接掃描整張表中所有行記錄。
HBase對Rowkey的一級索引支持較好,按Rowkey查詢的響應時間達到毫秒級。HBase內置Filter(過濾器)特性以支持多條件查詢的二級索引。但HBase的Filter是直接掃描記錄的,如果數(shù)據(jù)范圍很大,會導致查詢速度很慢。因此基于Solr來實現(xiàn)二級索引,滿足Rowkey之外的多要素數(shù)據(jù)檢索需求。
設計Solr索引的關鍵問題是合理地配置索引字段。Zookeeper統(tǒng)一管理XML格式的Solr索引字段描述文件:managed-schema,SolrCloud各實例共享同一個managed-schema。
主要配置如下:
……
設置HBase表的id字段為Solr索引的unique-Key,存儲HBase記錄的Rowkey值。
上文所介紹的Solr索引設計是入庫性能優(yōu)化的前提。
入庫程序采用了MapReduce編程模型[13-14]。MapReduce作業(yè)讀取解碼日志文件插入到HBase數(shù)據(jù)庫中。解碼程序省略了reduce步驟,因mapper輸出中間數(shù)據(jù)到reducer需要通過網絡,受限于Hadoop集群帶寬。
HBase的協(xié)處理器[15](Coprocessor)分為兩類,Observer和EndPoint,其中Observer的代碼部署在服務端,相當于對API調用的代理。系統(tǒng)選用RegionObserver接口。
HBase協(xié)處理器需要獲取HBase的插入和更新操作:攔截put操作,獲取其內容,同步寫入Solr。HBase協(xié)處理器定義以及同步數(shù)據(jù)到Solr的主要代碼如下:
public class SolrIndexCoprocessorObserver extends BaseRegionObserver {
@Override
public void postPut(ObserverContext
String rowKey = Bytes.toString(put.getRow());
try {
Cell cellEdition=put.get(Bytes.toBytes("data"), Bytes.toBytes("edition")).get(0);
String strEdition=new String(CellUtil.cloneValue(cellEdition));
……
SolrInputDocument doc=new SolrInputDocument();
doc.addField("id", rowKey);
doc.addField("edition", strEdition);
……
//寫入緩沖
SolrWriter.addDocToCache(doc);
}
(1)軟件及版本:Quartz-2.2.3;hadoop-2.6.0;zookeeper-3.4.6;solr 5.5.4;hbase-1.2.2;GRIB API 1.13.1。
(2)硬件配置。
測試環(huán)境由6臺X86架構的服務器組成,操作系統(tǒng)均為64位Ubuntu 14.04。其中5臺服務器構建Hadoop、Zookeeper、HBase、Solr集群,1臺部署數(shù)值預報產品解碼入庫程序。
處理器:Intel Core i5-3470 3.20 GHz;
磁盤:1TB,7200 rpm,SATA III接口;
內存:16 GB;
網絡環(huán)境為千兆局域網。
選取ECMWF高分辨率數(shù)值預報產品及其解碼產生的要素GRIB文件為測試對象,其常見的文件大小分布為:約2 MB、約10 MB、約105 MB和約160 MB,而解碼得到的要素GRIB文件數(shù)也隨之不同。
(1)HDFS寫入性能。
數(shù)值預報產品有846個文件,共96 GB,平均大小116 M??蛻舳顺绦蛘{用HDFS API的文件復制操作將數(shù)值預報產品文件寫入HDFS文件系統(tǒng)需要的時間為1 190.986 s,平均寫文件速度為82.54 MB/s;要素GRIB文件上傳至HDFS集群的速度近似。
(2)HBase入庫性能。
采用統(tǒng)計學方法:總體有96 360個解碼日志文件,共57 816 000條記錄,耗時4 576.9 s,平均寫入速度12 632 條/s;隨機抽取1 000,2 000,…,10 000條記錄入庫,見圖3。測試結果表明,隨著入庫記錄數(shù)的增加,數(shù)據(jù)入庫性能總體平穩(wěn),最快寫入速度為13 677 條/s。
圖3 入庫時間和入庫記錄數(shù)的關系
(3)索引完整性驗證。
測試用例如下:
用例編號UC1:按起報時間、預報層次、預報時效、單預報要素檢索預報要素場;
用例編號UC2:按起報時間范圍、預報層次、預報時效、單預報要素檢索預報要素場;
用例編號UC3:按起報時間、預報層次、預報時效、多預報要素檢索預報要素場。
基于HBase Filter[16]的條件過濾查詢和輔助索引查詢返回的記錄數(shù)對比如表3所示。
表3 HBase Filter與SolrCloud查詢記錄數(shù)對比
表3中每個測試用例均做了3組對比,基于SolrCloud索引的查詢記錄數(shù)均和HBase Filter查詢的記錄數(shù)一致,說明索引完整可用。
(4)HBase檢索性能。
表3中各測試用例最大查詢記錄數(shù)所需時間對比如表4所示。
表4 HBase Filter與Solr查詢效率對比
由表4可知,基于SolrCloud的查詢效率遠遠高于HBase Filter查詢,按時間點的查詢基本都在毫秒級返回結果;對于UC2中,按時間范圍檢索方面,HBase Filter效率較低,不適合時間序列的查詢,在實際的氣象業(yè)務應用中,需要結合Solr對HBase進行索引優(yōu)化,來滿足檢索時效的要求。
針對關系型數(shù)據(jù)庫對數(shù)值預報產品數(shù)據(jù)的存儲及檢索效率低等問題,設計了分布式處理與存儲系統(tǒng)。利用Quartz任務調度采集數(shù)值預報產品文件,Kafka消息隊列解耦數(shù)值產品解碼與入庫程序,研究HBase分布式數(shù)據(jù)庫結合SolrCloud索引服務的數(shù)據(jù)存儲與檢索優(yōu)化方案,設計了適合氣象業(yè)務應用的數(shù)值預報產品數(shù)據(jù)存儲模型,并建立Solr索引。關鍵技術是前端MapReduce并行程序入庫、HBase協(xié)處理器同步記錄至SolrCloud。實驗測試表明,該方案提高了存儲效率和檢索速度,能夠滿足業(yè)務中的時效性要求。
[1] 熊安元,趙 芳,王 穎,等.全國綜合氣象信息共享系統(tǒng)的設計與實現(xiàn)[J].應用氣象學報,2015,26(4):500-512.
[2] 楊潤芝,馬 強,李德泉,等.內存轉發(fā)模型在CIMISS數(shù)據(jù)收發(fā)系統(tǒng)中的應用[J].應用氣象學報,2012,23(3):377-384.
[3] 李永生,曾 沁,徐美紅,等.基于Hadoop的數(shù)值預報產品服務平臺設計與實現(xiàn)[J].應用氣象學報,2015,26(1):122-128.
[4] 陳東輝,曾 樂,梁中軍,等.基于HBase的氣象地面分鐘數(shù)據(jù)分布式存儲系統(tǒng)[J].計算機應用,2014,34(9):2617-2621.
[5] 張 藶,周崢嶸,劉媛媛.ECMWF GRIB API及其應用[C]//中國氣象學會氣象通信與信息技術委員會暨國家氣象信息中心科技年會.北京:國家氣象信息中心,2011.
[6] 李 葳.NECP FNL資料解碼及數(shù)據(jù)格式轉換[J].氣象與減災研究,2011,34(1):63-68.
[7] WHITE T.Hadoop:the definitive guide,3E[M].[s.l.]:O’Reilly Media,2012.
[8] DUTTA H,KAMIL A,POOLERY M,et al.Distributed storage of large-scale multidimensional electroencephalogram data using Hadoop and HBase[M]//Grid and cloud database management.Berlin:Springer,2011.
[9] GEORGE L.HBase:the definitive guide[M].Sebastopol:O’Reilly Media,2011.
[10] STONEBRAKER M. SQL databases v. NoSQL databases[J].Communications of the ACM,2010,53(4):10-11.
[11] 郝 強,高占春.基于SolrCloud的網絡百科檢索服務的實現(xiàn)[J].軟件,2015,36(12):103-107.
[12] 付劍生,徐林龍,林文斌.分布式全網職位搜索引擎的研究與實現(xiàn)[J].計算機技術與發(fā)展,2015,25(5):6-9.
[13] 楊潤芝,沈文海,肖衛(wèi)青,等.基于MapReduce計算模型的氣象資料處理調優(yōu)試驗[J].應用氣象學報,2014,25(5):618-628.
[14] 李永生,曾 沁,楊玉紅,等.基于大數(shù)據(jù)技術的氣象算法并行化研究[J].計算機技術與發(fā)展,2016,26(9):47-49.
[15] 鄒敏昊.基于Lucene的HBase全文檢索功能的設計與實現(xiàn)[D].南京:南京大學,2013.
[16] 張 葉,許國艷,花 青.基于HBase的矢量空間數(shù)據(jù)存儲與訪問優(yōu)化[J].計算機應用,2015,35(11):3102-3105.