金石聲 李玨
[摘要]本省自動氣象觀測系統(tǒng)根據觀測要素的不同分為單雨量系統(tǒng)、兩要素系統(tǒng)和六要素系統(tǒng)。三類觀測系統(tǒng)分別接收各自站點采集的報文進行解碼入庫,同時通過FTP將數據報文發(fā)送到綜合處理系統(tǒng)再次進行解碼入庫。這樣的處理流程不僅增加網絡負擔,還進行了重復的解碼工作,資源利用率低。為此本文采用基于Spring Batch的框架,定時對三個不同觀測系統(tǒng)的后臺數據庫的實時數據向綜合處理系統(tǒng)的數據庫進行遷移。該流程采用數據分區(qū)技術,多線程并行處理,在減少了對網絡帶寬的占用量的同時,降低了各個系統(tǒng)CPU使用量,提高了工作效率。
[關鍵詞]Spring Batch 數據遷移 數據分區(qū) 多線程
引言
本省的氣象自動站綜合系統(tǒng)需要對已經進行過解碼入庫的單雨量、兩要素和六要素觀測報文進行集中,再次進行解碼后插入綜合數據庫,并以此對外提供服務。這樣的處理流程不僅在資源利用、網絡帶寬的占用都產生了很大的浪費。
通過分別讀取單雨量、兩要素和六要素的數據庫,對已經入庫的數據進行準實時的遷移,在降低網絡帶寬占用的同時,也能降低綜合處理系統(tǒng)的資源占用。但是基于大量的數據讀寫,且重復性高工作,傳統(tǒng)的編程方式處理不僅繁瑣,且維護性低。已經Spring Batch的出現(xiàn)無疑是解決這種問題的一種有效工具。由于目前已經有比較成熟的開源框架支持批處理的需求,所以本方案擬選用開源框架Spring Batch。這樣可以借助開源框架比較成熟的代碼,減少研究的難度,加強框架擴展性,減少研發(fā)周期,加快實際應用進度,并且保證程序的穩(wěn)定性。
一、Spring Batch簡介
Spring batch是Spring的一個子項目,由Spring Source與Accenture(埃森哲)合作開發(fā)的批處理框架。Springhatch對編寫批處理程序本身的特性進行了抽象。將批處理程序分為Job和Job Step兩個部分,將處理環(huán)節(jié)定義為數據讀、數據處理和數據寫三個步驟。提供Job Repository來存儲Job執(zhí)行期間的元數據,可以在處理大量的數據時,提供日志記錄/跟蹤,事務管理,處理統(tǒng)計,資源管理等特性。此外,還提供了分區(qū)技術采用多線程方式并行處理作業(yè)。
二、系統(tǒng)總體架構設計
考慮到系統(tǒng)可能出現(xiàn)的單店故障,為減少處理這類故障的開發(fā)難度。開發(fā)三個獨立的程序分別對單雨量、兩要素和六要素三個數據庫中的數據進行遷移。本文以遷移兩要素數據為例進行介紹。
Spring bath的核心思想是將讀取到的數據轉化為Java對象,然后對對象進行操作。首先需要根據表中的字段建立相應的實體類,然后Spring Batch把從源數據庫中讀取的每條數據映射為對應的Java對象,由于本文只是對數據進行遷移,不需要對對象進行處理,所以將Java對象的值通過寫步驟寫入目標數據庫。實際應用中只關注兩張表中存儲的數據:小時數據表(tabHourData)和分鐘數據表(tabMinuteData),由于遷移這兩張表沒有具體的先后順序,將這兩部分工作并行處理。
Spring batch提供了XML方式進行業(yè)務流程配置,通過spht元素來提供并行作業(yè)流的定義,通過task-execution屬性來定義執(zhí)行的線程池,從而提高Job的執(zhí)行效率。其中要定義兩個不同的作業(yè)步(transferHottrTab_step和transferMinuteTab_step),每個作業(yè)步下定義了兩個具體的子Job分別來完成對兩張數據表的數據遷移。子Job中又分別定義了讀、寫兩個過程來完成數據的遷移。其中為了保證執(zhí)行效率定義了commit-interval,指定了從數據庫讀入1000條數據后進行一次寫操作,這樣既減少了10的訪問,也提高了寫入效率。關鍵配置如下(以transferHottrTab_Job為例):
數據遷移中讀取的數據量較大,為了高時效的完成讀取作業(yè),讀取數據的任務進行分區(qū),每個分區(qū)交給不同的線程處理。該模式的優(yōu)點在于分區(qū)中每一個元素的處理都能像一個普通Spring Batch任務的單步一樣運行。具體關系圖如3-1,將需要讀取的目標數據分為了3個分區(qū),每一個分區(qū)都有一個執(zhí)行上下文Execution Context,StepExecutionSplitter根據不同的上下文生成作業(yè)步執(zhí)行器,然后交給PartitionHandler來處理。應為Spring Batch默認實現(xiàn)了StepExecutionSplitter以及PartitionHandler。開發(fā)過程中原則上只需要實現(xiàn)自己分區(qū)邏輯partitioner即可。
小時表和分鐘表都含有對數據觀測時間的字段,因此對該字段進行分區(qū),可以實現(xiàn)分區(qū)策略的共享。具體配置如下:
class=”com.xxzx.partition.DBpartition”>
作業(yè)中用于對數據庫進行分區(qū)的DBpartition了實現(xiàn)了Spring Btach的Partitioner接口,定義具體的分區(qū)策略,將數據查詢的時間進行切片,然后寫入Step的執(zhí)行上下文,關鍵代碼如下:
while(start<=max){
ExecutionContext context=new ExecutionContext();
if(end>=max){
end=max;
}
context.putInt(_STARTTIME,start);
context.putInt(_ENDTIME,end);
start+=targetSize;
end+=targetSize;
resultMap.put(“partition”+(number++),context);
通過把每次任務中需要查詢的時間段根據targetSize的值進行切片,意味著數據片段分配到不同的作業(yè)步中。并將_ENDTIME和_STARTYIME寫入Step的執(zhí)行上下文(ExecutionContext)。然后在讀取數據的階段通過讀取Step執(zhí)行上下文獲取每個片段的統(tǒng)計時段。
3.2定時調度
由于Spring Batch本身并不是一個定時的調度框架。本文采用Spring本身提供的一個輕量級的調度框架SpringScheduler來實現(xiàn)定時調度任務。關鍵配置如下:
其中采用cron表達式實現(xiàn)每5分鐘調度一次作業(yè),并且在schedulerLauncher中完成對啟動具體作業(yè)的配置。這樣便將Spring Scheduler和Spring Batch結合起來完成數據遷移任務。
四、結論
本文基于Spring Batch框架采用數據分區(qū)技術、多線程并行處理的方法開發(fā)了一個數據庫遷移系統(tǒng)完成批量數據的遷移工作,并結合Spring Scheduler實現(xiàn)了批處理任務的定時調度。在實際工作中優(yōu)化了本省自動站數據處理系統(tǒng)中帶寬占用率高、系統(tǒng)資源浪費的現(xiàn)狀。實際工作中對本省自動站數量兩要素自動站進行了測試,完成3260條數據的遷移的時間不超過30秒。完全滿足實際業(yè)務中的需求。