劉 瀟,季英凱
(江蘇省疾病預(yù)防控制中心 公共衛(wèi)生信息所,江蘇 南京 210009)
隨著疾控信息化工作的不斷深入,疾控的傳染病、公共衛(wèi)生突發(fā)事件、慢性病、免疫規(guī)劃、精神衛(wèi)生等業(yè)務(wù)條線的信息系統(tǒng)在不斷地建立與完善,由于業(yè)務(wù)系統(tǒng)在建設(shè)之初缺乏總體規(guī)劃,各自為政,各類數(shù)據(jù)難以支撐有效的業(yè)務(wù)協(xié)同服務(wù),造成信息孤島[1]。在當(dāng)前各級(jí)疾控業(yè)務(wù)協(xié)作日漸緊密、內(nèi)部一體化集成日漸成熟的大背景下[2],為了有效地對(duì)業(yè)務(wù)數(shù)據(jù)進(jìn)行匯聚與利用,中國疾病預(yù)防控制中心制定的《疾病預(yù)防控制信息系統(tǒng)建設(shè)指導(dǎo)方案(2018年版)》要求,以國家和省統(tǒng)籌區(qū)域兩級(jí)建設(shè)為重點(diǎn),依托全員人口信息庫等基礎(chǔ)設(shè)施,構(gòu)建實(shí)時(shí)共享的動(dòng)態(tài)電子疾病檔案(electronic diseases records,EDR)[3-4],以個(gè)人健康為核心,貫穿整個(gè)生命周期,以出生和死亡2個(gè)重要的生命節(jié)點(diǎn)為開始和結(jié)束,全程記錄疾病發(fā)生、發(fā)展及轉(zhuǎn)歸的監(jiān)測信息,形成以個(gè)人基礎(chǔ)信息(人口學(xué)信息、出生登記、死亡登記)為基礎(chǔ),包含體檢篩查史、疾病診斷史、檢驗(yàn)檢測史、治療隨訪史、流行病學(xué)史和預(yù)防接種史等內(nèi)容的主題數(shù)據(jù)目錄[5],以支撐疾控各類業(yè)務(wù)的交互協(xié)同,為政府決策分析提供有效的支持。如何利用現(xiàn)有的業(yè)務(wù)系統(tǒng)實(shí)現(xiàn)各類業(yè)務(wù)數(shù)據(jù)的匯集,以形成實(shí)時(shí)共享的電子疾病檔案成了疾控信息化建設(shè)面臨的一個(gè)新的問題。
當(dāng)前,疾控的各類業(yè)務(wù)系統(tǒng)于不同的時(shí)間由不同的開發(fā)公司建設(shè),所采用的技術(shù)架構(gòu)、業(yè)務(wù)流程以及業(yè)務(wù)數(shù)據(jù)的格式各不相同,各業(yè)務(wù)數(shù)據(jù)與電子疾病檔案的數(shù)據(jù)標(biāo)準(zhǔn)均存在一定程度的差異?;谶@些問題,為了實(shí)現(xiàn)電子疾病檔案所需數(shù)據(jù)的抽取并進(jìn)行有效的匯集,本文基于流式計(jì)算框架Flink建立一個(gè)電子疾病檔案數(shù)據(jù)的實(shí)時(shí)處理模型,使用消息中間件Kafka實(shí)現(xiàn)各類業(yè)務(wù)主題數(shù)據(jù)的發(fā)布與訂閱,使用Flink實(shí)現(xiàn)各類業(yè)務(wù)數(shù)據(jù)按電子疾病檔案主題目錄數(shù)據(jù)的實(shí)時(shí)處理,使用HBase作為數(shù)據(jù)庫實(shí)現(xiàn)電子疾病檔案數(shù)據(jù)的匯集與有效關(guān)聯(lián),通過實(shí)驗(yàn)證明該模型的可行性與高效性。
Apache Flink是一個(gè)低延遲、高吞吐的分布式計(jì)算框架[6],可對(duì)無界數(shù)據(jù)流與有界數(shù)據(jù)流進(jìn)行計(jì)算[7-8],相對(duì)于以MapReduce為代表的批處理計(jì)算框架、以Spark Streaming為代表的微批處理方案、以Storm為代表的流處理計(jì)算框架,Flink提供了DataStream API與DataSet API分別對(duì)流處理與批處理予以支持,其將批數(shù)據(jù)看成是有界的流數(shù)據(jù),本質(zhì)上還是進(jìn)行數(shù)據(jù)流的處理[9]。這種批流一體的特性使得Flink在執(zhí)行計(jì)算時(shí)具有極高的靈活性與極低的延遲性。
Flink可在單獨(dú)集群中運(yùn)行,也可以在Yarn、Mesos等資源調(diào)度與管理框架上運(yùn)行[10]。一個(gè)Flink集群總是包含一個(gè)JobManager以及一個(gè)或多個(gè)TaskManager[11]。JobManager負(fù)責(zé)處理Job提交、Job監(jiān)控以及資源管理;TaskManager運(yùn)行worker進(jìn)程,負(fù)責(zé)實(shí)際任務(wù)Tasks的執(zhí)行。這些任務(wù)共同組成了一個(gè)Job。Flink應(yīng)用程序中的任務(wù)由用戶定義的算子轉(zhuǎn)換而來的流式Dataflows所組成,以構(gòu)成有向無環(huán)圖(DAG)以對(duì)數(shù)據(jù)流行處理,如圖1所示。Flink的Dataflows由Source、Transformation、Sink3大部分組成。Source可從數(shù)據(jù)源不斷獲取數(shù)據(jù),Transformation通過Flink提供或用戶自定義的各類算子靈活組合將獲取的數(shù)據(jù)流進(jìn)行各種業(yè)務(wù)邏輯的處理,最終由Sink輸出到外部。Source與Sink階段都提供了包括Kafka、 ElasticSearch、MySQL、InfluxDB等多種數(shù)據(jù)庫引擎的專用算子用以獲取或輸出數(shù)據(jù),使用十分方便。
圖1 Flink Dataflow
Flink的開發(fā)同時(shí)支持Java和Scala語言,提供了4個(gè)不同抽象層級(jí)的API,提供給用戶以開發(fā)應(yīng)用程序。第一層的ProcessFunction API是其他三層的調(diào)用與封裝的基礎(chǔ)。Flink在這個(gè)API上實(shí)現(xiàn)最基礎(chǔ)的流式處理能力。在該層用戶可以進(jìn)行有狀態(tài)編程,實(shí)現(xiàn)復(fù)雜的時(shí)間語義處理。第二層是Core APIs。該層包含應(yīng)用于無界或有界數(shù)據(jù)流的DataStream API和應(yīng)用于有界數(shù)據(jù)集的DataSet API,提供各種形式的用戶自定義轉(zhuǎn)換(transformations)、聯(lián)接(joins)、聚合(aggregations)、窗口(windows)和狀態(tài)(state)等操作。第三層是Table API。該層通過定義數(shù)據(jù)的schema提供類似于關(guān)系模型中的操作,比如 select、join、group-by 和 aggregate 等。第四層是SQL API。這層在語義和程序表達(dá)式上都類似于Table API,但其程序都是通過SQL表達(dá)式的形式來實(shí)現(xiàn)。
Kafka是一個(gè)分布式的消息發(fā)布-訂閱模式[12]的中間件系統(tǒng)。Kafka在主題中保存消息的信息,生產(chǎn)者向主題寫入數(shù)據(jù),消費(fèi)者從主題讀取數(shù)據(jù),從而實(shí)現(xiàn)數(shù)據(jù)的傳遞。這種隱式調(diào)用的風(fēng)格使生產(chǎn)者與消費(fèi)者相互解耦,使其可以相互獨(dú)立的變化。
高性能、高吞吐、低延時(shí)是Kafka的顯著的特性,雖然Kafka的消息保存在磁盤上,但由于采用了順序?qū)懭?、MMFiles(Memory Mapped Files)、Zero Copy、批量壓縮等技術(shù)優(yōu)化了讀寫性能[13],使其可以突破傳統(tǒng)的數(shù)據(jù)庫、消息隊(duì)列等數(shù)據(jù)引擎所受限的磁盤IO瓶頸。因此即使是部署在普通的單機(jī)服務(wù)器上,Kafka也能輕松支持每秒百萬級(jí)的寫入請(qǐng)求[14],讀寫速度超過大部分的消息中間件,這種特性使得Kafka在海量數(shù)據(jù)場景中應(yīng)用廣泛。
本文設(shè)計(jì)的電子疾病檔案數(shù)據(jù)處理模型采用Kafka+Flink+HBase建立實(shí)時(shí)數(shù)據(jù)流的處理框架。在Kafka中建立主題Topic,疾控的各個(gè)業(yè)務(wù)系統(tǒng)作為生產(chǎn)者按規(guī)定的格式向主題中生產(chǎn)數(shù)據(jù),通過這種模式將數(shù)據(jù)生產(chǎn)的任務(wù)交給各個(gè)數(shù)據(jù)系統(tǒng)的開發(fā)維護(hù)單位,避免直接操作多個(gè)業(yè)務(wù)系統(tǒng)帶來不可預(yù)知的風(fēng)險(xiǎn)。Flink程序作為消費(fèi)者去Kafka的主題中獲取數(shù)據(jù)并發(fā)送至下游進(jìn)行數(shù)據(jù)類目的識(shí)別、分流以及格式轉(zhuǎn)換等業(yè)務(wù)邏輯的處理操作,最終寫入Hbase進(jìn)行數(shù)據(jù)的持久化,并通過HBase表結(jié)構(gòu)的設(shè)計(jì)實(shí)現(xiàn)以個(gè)人信息為基礎(chǔ),各類電子疾病檔案主題目錄數(shù)據(jù)的關(guān)聯(lián)。模型架構(gòu)設(shè)計(jì)如圖2所示。
圖2 模型架構(gòu)
2.2.1 消息中間件設(shè)計(jì)
由于疾控業(yè)務(wù)系統(tǒng)多,電子疾病檔案所包含的數(shù)據(jù)類目繁雜且數(shù)據(jù)格式各異,而兒童預(yù)防接種等業(yè)務(wù)系統(tǒng)在某些時(shí)間段內(nèi)產(chǎn)生的實(shí)時(shí)數(shù)據(jù)量很大,使用消息隊(duì)列Kafka作為中間件來接收各類業(yè)務(wù)系統(tǒng)產(chǎn)生的數(shù)據(jù)是一個(gè)很適合的選擇。一方面,利用Kafka出色的性能提高業(yè)務(wù)數(shù)據(jù)寫入的響應(yīng)時(shí)間,保證穩(wěn)定性;另一方面,使得各個(gè)業(yè)務(wù)系統(tǒng)與Flink數(shù)據(jù)處理程序解耦。數(shù)據(jù)生產(chǎn)層的業(yè)務(wù)系統(tǒng)既可以將業(yè)務(wù)系統(tǒng)完成審批后的實(shí)時(shí)數(shù)據(jù)流接入消息隊(duì)列,也可以批量對(duì)歷史數(shù)據(jù)進(jìn)行處理,業(yè)務(wù)數(shù)據(jù)生產(chǎn)者與消費(fèi)程序各自獨(dú)立運(yùn)行。
數(shù)據(jù)生產(chǎn)層的各個(gè)業(yè)務(wù)系統(tǒng)按約定的格式將電子疾病檔案系統(tǒng)需要的數(shù)據(jù)寫入Kafka的主題中,為了使下游的消費(fèi)者Flink程序能正確解析與處理,數(shù)據(jù)結(jié)構(gòu)可以分為3個(gè)部分:第一部分代表該數(shù)據(jù)所屬的電子疾病檔案主題目錄;第二部分代表該條數(shù)據(jù)的操作類型(新增、修改或刪除);第三部分為業(yè)務(wù)數(shù)據(jù)所包含的具體內(nèi)容,各個(gè)數(shù)據(jù)項(xiàng)統(tǒng)一使用制表符分隔(見表1)。
表1 生產(chǎn)者數(shù)據(jù)結(jié)構(gòu)
2.2.2 Flink程序設(shè)計(jì)
Flink程序使用Scala語言設(shè)計(jì)并實(shí)現(xiàn),程序的Source階段使用Flink框架自帶的FlinkKafkaCon-sumer不斷地從Kafka的主題內(nèi)消費(fèi)數(shù)據(jù)。在Transformation階段,對(duì)獲取的數(shù)據(jù)按電子疾病檔案的主題目錄進(jìn)行分流,在各個(gè)主題目錄對(duì)應(yīng)的數(shù)據(jù)流中對(duì)其所屬的數(shù)據(jù)進(jìn)行各自不同的操作類型識(shí)別、數(shù)據(jù)格式檢查、數(shù)據(jù)格式轉(zhuǎn)換等操作。由于從Kafak獲取的數(shù)據(jù)包含了電子疾病檔案的各類數(shù)據(jù)目錄,程序根據(jù)數(shù)據(jù)的前6位字符判斷該條數(shù)據(jù)所屬的主題目錄,利用Flink提供的旁路輸出Side Output功能對(duì)數(shù)據(jù)流進(jìn)行切分,每類主題目錄的數(shù)據(jù)被發(fā)送至其對(duì)應(yīng)的子數(shù)據(jù)流中調(diào)用自定義的各類主題目錄的數(shù)據(jù)處理函數(shù)進(jìn)行處理,最終在各個(gè)子數(shù)據(jù)流的Sink階段,將處理好的電子疾病檔案數(shù)據(jù)寫入HBase進(jìn)行持久化。由于Flink沒有提供已封裝好的HBase的Sink算子,自定義算子實(shí)現(xiàn)Flink提供的RichSinkFunction接口來完成相關(guān)的對(duì)HBase的操作,Flink程序的拓?fù)浣Y(jié)構(gòu),如圖3所示。
圖3 程序拓?fù)浣Y(jié)構(gòu)
以死亡報(bào)告主題目錄數(shù)據(jù)為例,Flink程序關(guān)鍵代碼如下:
val environment = StreamExecutionEnvironment.getExecutionEnvironment//創(chuàng)建可執(zhí)行環(huán)境
val outsideStream = environment.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),props))//使用FlinkKafkaConsumer 從Kafka消費(fèi)數(shù)據(jù)
.process(new ProcessFunction[String,String] {//在基本處理函數(shù) ProcessFunction中使用Side Output函數(shù)對(duì)數(shù)據(jù)進(jìn)行分流
override def processElement(source:String,context:ProcessFunction[String,String]#Context,collector:Collector[String]):Unit = {
Source.substring(0,6)match {//截取數(shù)據(jù)前六位獲取所屬主題目錄
case "320601" => context.output(death01,source.substring(7)) //判斷所屬主題目錄為死亡報(bào)告數(shù)據(jù)后獲取剩余數(shù)據(jù)部分并標(biāo)記其對(duì)應(yīng)的數(shù)據(jù)流標(biāo)識(shí)death01進(jìn)行分流
...//其他主題目錄判斷與獲取
case _ => context.output(or,source.substring(7))
}
}
}).setParallelism(1)//分流算子并行度設(shè)置
val deathStream = outsideStream.getSideOutput(death01) //分流后主題目錄數(shù)據(jù)流的獲取
....
deathStream.map(deathTransformation(_)).setParallelism(2) //在自定義的deathTransformation函數(shù)中完成其對(duì)應(yīng)主題目錄數(shù)據(jù)的操作類型識(shí)別、格式檢查、轉(zhuǎn)換操作
.addSink(new deathHbaseSink()) //自定義deathHbaseSink函數(shù)對(duì)其對(duì)應(yīng)主題目錄數(shù)據(jù)進(jìn)行持久化操作
2.2.3 HBase設(shè)計(jì)
在HBase電子疾病檔案所屬的命名空間Name-space中為各類主題目錄建立各自所屬的表,考慮到之后的業(yè)務(wù)需要通過身份證號(hào)以個(gè)人信息為單位對(duì)疾病檔案進(jìn)行綜合展示與分析,對(duì)唯一標(biāo)識(shí)表中的一行數(shù)據(jù)的RowKey[15]使用身份證號(hào)加業(yè)務(wù)主鍵號(hào)的方式來生成,這樣既保證了表中RowKey的唯一性,又可以使未來的業(yè)務(wù)端調(diào)用Hbase的API時(shí),通過其提供的前置過濾器將身份證提取出來進(jìn)行匹配與查詢,而無需再建立二級(jí)索引或借助其他的工具。在Flink程序進(jìn)行數(shù)據(jù)持久化時(shí),使用Protobuf工具提供的功能對(duì)電子疾病檔案數(shù)據(jù)進(jìn)行序列化壓縮后,再關(guān)聯(lián)其對(duì)應(yīng)的RowKey寫入Hbase,使每個(gè)RowKey在表中只存儲(chǔ)一個(gè),提升空間的利用率。
在疾控內(nèi)部局域網(wǎng)部署應(yīng)用模型進(jìn)行測試,使用的各類軟件信息,如表2所示。集群硬件環(huán)境包含10臺(tái)Cpu8核,內(nèi)存32 G,操作系統(tǒng)為64位Linux Centos 7.7的虛擬機(jī)。其中,7臺(tái)為Hadoop與Hbase集群的部署提供支持。Hadoop集群中包含了2個(gè)互為主從的管理節(jié)點(diǎn)NameNode,5個(gè)計(jì)算節(jié)點(diǎn)DataNode,2個(gè)資源調(diào)度管理節(jié)點(diǎn)ResourceManager;HBase集群中包含了兩個(gè)互為主從的HMaster節(jié)點(diǎn),3個(gè)RegionServer節(jié)點(diǎn);Flink集群部署于DataNode所在的5個(gè)計(jì)算節(jié)點(diǎn),使用Yarn Session的模式對(duì)Flink作業(yè)進(jìn)行調(diào)度與管理。Kafka與ZooKeeper等中間件部署在另外三臺(tái)虛擬機(jī)上。
表2 軟件版本信息
實(shí)驗(yàn)從疾控內(nèi)網(wǎng)數(shù)據(jù)庫批量取出死亡報(bào)告信息與兒童預(yù)防接種信息各10 000條寫入Kafka的電子疾病檔案主題內(nèi),對(duì)Flink程序中的分流算子以及死亡報(bào)告信息與兒童預(yù)防接種信息的數(shù)據(jù)處理算子分別設(shè)置不同的并行度的情況下,記錄數(shù)據(jù)全部寫入HBase的運(yùn)行時(shí)間情況,運(yùn)行時(shí)間皆為5次實(shí)驗(yàn)后的平均值,結(jié)果如表3所示。
表3 實(shí)驗(yàn)結(jié)果
從實(shí)驗(yàn)結(jié)果可以看出,在數(shù)據(jù)量較大的情況下,該模型的數(shù)據(jù)處理的實(shí)時(shí)性良好,且合理的提高分流算子或數(shù)據(jù)處理算子的并發(fā)度能有效地提高模型的數(shù)據(jù)處理能力,實(shí)驗(yàn)證明該模型具有良好的數(shù)據(jù)實(shí)時(shí)處理能力及彈性擴(kuò)展能力。
為了建設(shè)電子疾病檔案系統(tǒng),實(shí)現(xiàn)疾控各個(gè)業(yè)務(wù)條線數(shù)據(jù)的有效匯集,進(jìn)而為業(yè)務(wù)協(xié)同以及輔助決策提供數(shù)據(jù)支持,本文根據(jù)疾控業(yè)務(wù)信息化建設(shè)的現(xiàn)狀設(shè)計(jì)了一個(gè)數(shù)據(jù)實(shí)時(shí)處理模型,并使用消息中間件Kafka,流式計(jì)算框架Flink及寬列數(shù)據(jù)庫HBase給出了模型的具體實(shí)現(xiàn)。實(shí)踐表明,模型具有良好的數(shù)據(jù)處理能力,滿足預(yù)計(jì)的設(shè)計(jì)目標(biāo),其分層設(shè)計(jì)的架構(gòu)風(fēng)格使得每一層都可以靈活根據(jù)需求獨(dú)立的變化而不影響其他層。在業(yè)務(wù)生產(chǎn)數(shù)據(jù)量比較大的場景,可以單獨(dú)為某類業(yè)務(wù)數(shù)據(jù)另設(shè)一個(gè)Kafka以提高數(shù)據(jù)吞吐量。在Flink程序的Source階段,先進(jìn)行數(shù)據(jù)的合流后再進(jìn)行接下來的處理;而對(duì)于某些實(shí)時(shí)處理的數(shù)據(jù)量較大、數(shù)據(jù)處理任務(wù)較復(fù)雜的業(yè)務(wù),也可以提高該業(yè)務(wù)數(shù)據(jù)處理算子的并行度以提高數(shù)據(jù)處理速度。目前,該模型已成功應(yīng)用在江蘇省疾控中心的死亡報(bào)告、預(yù)防接種等部分業(yè)務(wù)條線數(shù)據(jù)的處理與匯集中,下一步的工作是根據(jù)電子疾病檔案的數(shù)據(jù)規(guī)范要求進(jìn)一步匯集數(shù)據(jù),并結(jié)合統(tǒng)計(jì)分析工具與相關(guān)算法進(jìn)行數(shù)據(jù)的利用與分析。