陳瑤 李洋磊
摘 ?要:本文分析了ActiveMQ數(shù)據(jù)傳輸?shù)牡讓釉?,以解決數(shù)據(jù)突發(fā)洪峰時(shí)期的隊(duì)列數(shù)據(jù)積壓問題。利用增加并發(fā)消費(fèi)者、調(diào)整消息預(yù)取值、批量消息確認(rèn)等參數(shù),實(shí)現(xiàn)了傳輸性能的多倍提升。最后還根據(jù)業(yè)務(wù)運(yùn)行出現(xiàn)過的問題,優(yōu)化了服務(wù)端的配置,加強(qiáng)了薄弱環(huán)節(jié)的監(jiān)控,提升了系統(tǒng)的穩(wěn)定性。
關(guān)鍵詞:ActiveMQ;民航數(shù)據(jù)傳輸;數(shù)據(jù)傳輸框架
中圖分類號(hào):TP368.5 ? ? ?文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):2096-4706(2019)16-0128-04
Abstract:This paper has analyzed the underlying principle of ActiveMQ data transmission,to solve the data backlog problem during the peak period of data. By adding concurrent consumers,adjusting message prefetch values and batch message validation parameters,the transmission performance is improved many times. Finally,according to the problems in the operation of the business,the configuration of the server is optimized,the monitoring of weak links is strengthened,and the stability of the system is improved.
Keywords:ActiveMQ;civil aviation data transfer;data transfer framework
0 ?引 ?言
為保證數(shù)據(jù)及時(shí)、可靠傳輸,選用了中間件ActiveMQ,設(shè)計(jì)了一套通用的數(shù)據(jù)傳輸框架。該框架現(xiàn)階段主要應(yīng)用于民航氣象數(shù)據(jù)的傳輸,為南方航空、華南國際商務(wù)航空等用戶提供高效、可靠的數(shù)據(jù)傳輸服務(wù)。該系統(tǒng)投入使用后,數(shù)據(jù)傳輸?shù)募皶r(shí)性增強(qiáng)、可靠性提高,得到了用戶單位的認(rèn)可。
在幾年的運(yùn)行過程中,也發(fā)現(xiàn)了許多可以改進(jìn)的地方。該框架現(xiàn)階段傳輸?shù)拿窈綒庀髷?shù)據(jù),具有在整點(diǎn)及半點(diǎn)數(shù)據(jù)量較大的特點(diǎn),框架承載的數(shù)據(jù)量越來越多,用戶數(shù)也在增加。在數(shù)據(jù)量突發(fā)洪峰的情況下,如果某個(gè)用戶的數(shù)據(jù)獲取能力較差,ActiveMQ的數(shù)據(jù)消費(fèi)能力被阻塞,容易產(chǎn)生大量的數(shù)據(jù)等待確認(rèn)、大量數(shù)據(jù)重發(fā)的情況,嚴(yán)重影響數(shù)據(jù)的吞吐量,導(dǎo)致數(shù)據(jù)積壓,也給ActiveMQ服務(wù)器的運(yùn)行帶來潛在的風(fēng)險(xiǎn)。
為解決該問題,對(duì)系統(tǒng)運(yùn)行中的數(shù)據(jù)進(jìn)行分析,提出適用于不同場(chǎng)景的消息隊(duì)列優(yōu)化過程。
1 ?ActiveMQ消息的傳送機(jī)制
如圖1所示,ActiveMQ的消息由生產(chǎn)者(即數(shù)據(jù)發(fā)送 端)發(fā)出后,會(huì)被ActiveMQ的Broker保存,消費(fèi)者(即數(shù)據(jù)接收端)已經(jīng)在Broker上注冊(cè),Broker會(huì)確保消息被發(fā)送給這些消費(fèi)者,確保消息已經(jīng)送達(dá)后,該消息才會(huì)被刪除。
ActiveMQ的消息傳送機(jī)制如圖1所示,通過消費(fèi)者正常接收到消息后,返回一個(gè)確認(rèn)接收狀態(tài)的消息——ACK消息給Broker,如果層次較為復(fù)雜,則會(huì)一層一層的返回ACK消息。
ActiveMQ中的ACK消息有以下幾種類型,定義在字段ACK_TYPE中,如表1所示。
從ACK_TYPE的值可以看出,在ActiveMQ中,消息確認(rèn)的頻率是可以由開發(fā)者選擇的??梢韵M(fèi)一條消息返回一條確認(rèn)消息,也可以選擇另外一種模式——延時(shí)確認(rèn)。在消費(fèi)者成功消費(fèi)消息后,不立即返回ACK,而是等到這些ACK消息的條數(shù)積攢到某個(gè)閾值時(shí),返回一個(gè)ACK消息把他們?nèi)看_認(rèn)。
從這個(gè)定義也可看出,延時(shí)確認(rèn)具有更好的性能。特別是在網(wǎng)絡(luò)擁堵的時(shí)期,N條消息只會(huì)有1條ACK消息,相比N條消息N條ACK返回,大大減輕了網(wǎng)絡(luò)負(fù)荷。但這樣的確認(rèn)機(jī)制也存在一定的弊端,如果消費(fèi)端出現(xiàn)異常,無法正常返回ACK,會(huì)導(dǎo)致N條消息重發(fā),反而會(huì)造成網(wǎng)絡(luò)負(fù)擔(dān)。
并且大量消息如果不得到及時(shí)的確認(rèn),Broker需保存這些消息,并將他們放置于隊(duì)列中排隊(duì)等待確認(rèn),這將消耗Broker服務(wù)器的內(nèi)存、硬盤等資源,如果該服務(wù)器的性能低下,將給Broker的運(yùn)行帶來潛在的風(fēng)險(xiǎn)。
所以需要分析運(yùn)行的實(shí)際情況,根據(jù)已有的資源進(jìn)行靈活的配置、調(diào)優(yōu)。
2 ?問題定位及分析
利用現(xiàn)有的框架和數(shù)據(jù)傳輸模式,模擬測(cè)試數(shù)據(jù)突發(fā)洪峰時(shí)的數(shù)據(jù)吞吐量,從分析消息包處理耗時(shí)入手,進(jìn)行各個(gè)參數(shù)的調(diào)優(yōu)。首先在生產(chǎn)端生成大量的消息,以在測(cè)試數(shù)據(jù)突發(fā)洪峰時(shí)期,每個(gè)消費(fèi)者的消息處理速度、消息積壓數(shù)[2]。如表2所示。
從表2的數(shù)據(jù)可以看出,消費(fèi)端的消費(fèi)能力存在的差距,消費(fèi)能力差的客戶端在突發(fā)數(shù)據(jù)洪峰時(shí)容易發(fā)生數(shù)據(jù)積壓。兩個(gè)消費(fèi)者的網(wǎng)絡(luò)狀態(tài)類似,可以排除因網(wǎng)絡(luò)原因?qū)е碌南⒎e壓。通過進(jìn)一步分析消費(fèi)能力弱的消費(fèi)端,研究其消息處理流程,發(fā)現(xiàn)其接收到消息后還需進(jìn)行串行處理,處理過程更加復(fù)雜,導(dǎo)致消息處理更加耗時(shí),消息返回ACK的時(shí)間也更長,導(dǎo)致了Broker需等待這個(gè)更慢的消費(fèi)者。
針對(duì)消費(fèi)端處理速度存在瓶頸的問題,設(shè)想通過提高消費(fèi)端的處理速度入手。消費(fèi)端的消息處理流程為業(yè)務(wù)需要,無法精簡處理流程來加快消費(fèi)速度。那還有其他什么手段可以增加消費(fèi)端對(duì)消息的消費(fèi)速度?既然現(xiàn)存的串行等待的時(shí)間無法縮短,那是否可以通過并行多個(gè)消費(fèi)者程序來提高效率?
3 ?增加并發(fā)消費(fèi)者方案測(cè)試
擬通過增加并發(fā)消費(fèi)者的方式,看是否能提高消息處理的速度。要使用并發(fā)消費(fèi)者[3],可修改框架中Spring的JMS配置,增加多個(gè)Listener實(shí)例。配置項(xiàng)為Simple Message Listener Container[4],可以配置固定的實(shí)例個(gè)數(shù),也可以配置一個(gè)實(shí)例數(shù)的區(qū)間,這樣消費(fèi)者可以根據(jù)消息的壓力情況動(dòng)態(tài)調(diào)整并發(fā)數(shù)。
配置文件:
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connec tionFactory"/>
<property name="destinationName" value="${jms.queue.name}"/>
<property name="messageListener" ref="message Receiver"/>
<property name="concurrency" value="10-20"/>
或者:
<property name="concurrentConsumers" value= "20"/>
</bean>
測(cè)試結(jié)果如表3所示。
4 ?消費(fèi)者優(yōu)化
通過測(cè)試發(fā)現(xiàn),增加并行消費(fèi)者后,消息的消費(fèi)速度出現(xiàn)明顯的提升。但消費(fèi)者數(shù)目大于10以后,消息處理速度不再提升,在多個(gè)消費(fèi)者中,有些消費(fèi)者很忙碌,需要處理大量的消息,有些消費(fèi)者很空閑。為什么會(huì)出現(xiàn)這樣的情況?
在目標(biāo)URI的定義中,有一個(gè)prefetchSize[5]參數(shù)值可配置,如下代碼所示:
String queueURI = "queueForGuest?customer.prefetchSize=100";
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(queue URI);
prefetchSize參數(shù)定義了一次有多少條消息推送給消費(fèi)者,若在代碼中沒有指定prefetchSize參數(shù)值,系統(tǒng)將給其一個(gè)默認(rèn)值,如表4所示。
從這個(gè)默認(rèn)值中可以看出,使用默認(rèn)值無法與現(xiàn)實(shí)需求吻合。如果消費(fèi)者處理消息的能力很差,一次推送1000個(gè)消息給消費(fèi)者,無疑會(huì)造成消費(fèi)者端的擁堵。如果消費(fèi)者端性能好、處理速度快,可配置較高的prefetchSize值[7]。
而本系統(tǒng)中消費(fèi)端航空公司2的消費(fèi)者就是一個(gè)慢消費(fèi)者,消費(fèi)消息的速度慢,如果使用默認(rèn)prefetchSize值1000,一次將1000個(gè)消息推送給該消費(fèi)端,剩下的推送給該消費(fèi)端并行的消費(fèi)者,就出現(xiàn)了上文中的情況,部分消費(fèi)者要處理的消息很多,消費(fèi)能力差,消費(fèi)速度慢,這些消費(fèi)者特別忙碌,甚至出現(xiàn)擁堵現(xiàn)象。而其他并行的消費(fèi)者沒有消息需要處理。
所以嘗試將該消費(fèi)端的prefetchSize值進(jìn)行調(diào)整,提升消費(fèi)端整體的性能。
為了測(cè)試系統(tǒng)中現(xiàn)有消費(fèi)者的適合的prefetchSize值大小,將prefetchSize值分別配置為100、1000,并進(jìn)行了對(duì)比分析測(cè)試,測(cè)試結(jié)果如表5所示。
將queuePrefetch參數(shù)修改為100后,消費(fèi)端Consumer2并行且忙碌的消費(fèi)者數(shù)量增加,消費(fèi)10000條消息減少。
除了批量獲取多個(gè)消息可以使性能提高,批量確認(rèn)多個(gè)消息也將使性能大大提升,ACK的模式有很多種,如ActiveMQ消息的傳送機(jī)制表中的說明,其中效率最高的機(jī)制為optimizeAcknowledge模式,當(dāng)有prefetchSize的65%個(gè)消息被正確消費(fèi)后,消費(fèi)端將返回一條ACK消息,并批量確認(rèn)這些消息。
這樣的模式雖然效率高,但若消費(fèi)端出現(xiàn)異常,未正常返回這些消息的ACK,Broker將重發(fā)這些消息,這樣的模式適用于高吞吐量、對(duì)重復(fù)消息有容錯(cuò)能力的系統(tǒng)。觀察系統(tǒng)運(yùn)行時(shí)這樣的異常情況較少,且在消費(fèi)端均做了重復(fù)消息處理,同時(shí)本系統(tǒng)現(xiàn)應(yīng)用于傳輸氣象報(bào)文,對(duì)實(shí)時(shí)性要求很高,提高吞吐量對(duì)系統(tǒng)的運(yùn)行意義重大。故這種高效模式適用于本系統(tǒng)。
將原來的逐條消息ACK改為optimizeAcknowledge模式后,消費(fèi)端、Broker端的資源消耗降低,處理速度提高,測(cè)試結(jié)果如表6所示。
5 ?Broker優(yōu)化
從表2的測(cè)試結(jié)果可以看出,不同的消費(fèi)端的消費(fèi)速度差異較大,系統(tǒng)運(yùn)行中同時(shí)存在快消費(fèi)者和慢消費(fèi)者,在現(xiàn)場(chǎng)運(yùn)行的過程中也多次發(fā)現(xiàn)這樣的問題,某個(gè)消費(fèi)者的消費(fèi)能力較慢,不能及時(shí)消費(fèi)消息或者返回ACK,導(dǎo)致Broker必須在內(nèi)存中保存這些消息,增加了內(nèi)存的消耗,消息積壓過多時(shí),需要將內(nèi)存的消息寫入到磁盤中,增加了Broker端的磁盤I/O消耗。如果情況進(jìn)一步嚴(yán)重,Broker將阻塞生產(chǎn)者,迫使其降低生產(chǎn)消息的速率甚至不生產(chǎn)消息。
一個(gè)慢的消費(fèi)者不僅給Broker端的運(yùn)行帶來了巨大的潛在風(fēng)險(xiǎn),還有可能導(dǎo)致快的消費(fèi)者也無法正常獲取消息。這是在運(yùn)行環(huán)境中必須高度重視的一個(gè)問題。
保證系統(tǒng)運(yùn)行的穩(wěn)定至關(guān)重要,但與此同時(shí),即使用戶是慢消費(fèi)者,保證他們及時(shí)獲取到數(shù)據(jù)也很重要,如何滿足這個(gè)矛盾的需求,主要從以下三個(gè)方面進(jìn)行了優(yōu)化:
(1)關(guān)閉producerFlowControl,即使有慢消費(fèi)者,先保證消息生產(chǎn)及快消費(fèi)者消費(fèi)的速度,保證消息傳輸不會(huì)因?yàn)槁M(fèi)者而終止。
(2)捕獲Broker資源消耗異常,及時(shí)進(jìn)行干預(yù)、優(yōu)化。
在默認(rèn)情況下,producerFlowControl是開啟的,在這種模式下,如果消費(fèi)者消費(fèi)能力差,Broker將降低消息的生產(chǎn),以保證消費(fèi)端不會(huì)由于消息擁堵而資源耗盡,該模式為調(diào)節(jié)Broker來配合慢消費(fèi)端。
如果選用該模式,消息的生產(chǎn)者也可以進(jìn)行一些異常的處理,可以進(jìn)行異常告警,并且生產(chǎn)者可以在等待設(shè)定的時(shí)間后進(jìn)行重試,避免由于失敗而使發(fā)送消息的請(qǐng)求立即被阻塞,生產(chǎn)者變成假死的狀態(tài)。
(3)監(jiān)控Broker資源使用情況,監(jiān)控消費(fèi)者消費(fèi)情況,及時(shí)發(fā)現(xiàn)慢消費(fèi)者,對(duì)異常及時(shí)進(jìn)行干預(yù)、優(yōu)化。監(jiān)控每個(gè)消費(fèi)者消費(fèi)消息的情況,主要監(jiān)控參數(shù)為消費(fèi)者是否掉線、阻塞的消息數(shù)、等待確認(rèn)的消息數(shù)、進(jìn)隊(duì)列消息數(shù)、出隊(duì)列消息數(shù)等。
6 ?結(jié) ?論
通過各個(gè)參數(shù)的調(diào)優(yōu),傳輸系統(tǒng)數(shù)據(jù)積壓的問題得到了解決,消息傳輸?shù)男阅艿玫搅颂岣撸鬏數(shù)乃俣忍嵘闆r如表7所示。
表7 ?優(yōu)化后傳輸時(shí)間變化及傳輸效率提升情況
通過方案調(diào)整及參數(shù)優(yōu)化,系統(tǒng)的性能及穩(wěn)定性都得到了較大的提高,達(dá)到了預(yù)期目標(biāo)?;谥虚g件ActiveMQ的調(diào)優(yōu)方法還有很多,例如消息傳送優(yōu)先級(jí)、虛擬通道、分布式網(wǎng)絡(luò)、roker集群等,在進(jìn)一步的研究工作中可從這些方面進(jìn)一步提高性能及系統(tǒng)穩(wěn)定性。
參考文獻(xiàn):
[1] APACHE software foundation. ActiveMQ [EB/OL].http://activemq.apache.org/index.html,2018-09-10.
[2] 王鵬,從波,李國杰,等.基于ActiveMQ消息總線的性能測(cè)試方法 [J].測(cè)試技術(shù)學(xué)報(bào),2019,33(2):147-152.
[3] 周聰.基于改進(jìn)的Active MQ的通信模型的設(shè)計(jì)和實(shí)現(xiàn) [D].長春:吉林大學(xué),2017.
[4] Spring AMQP. Spring [EB/OL].https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.html,2019-01-01.
[5] Bruce Snyder,Dejan Bosanac,Rob Davies. Introduction to Apache ActiveMQ Green Paper from Active MQ in action [M].London:Manning,2017:20-23.
[6] Bruce Snyder,Dejan Bosanac,Rob Davies. Active MQ inaction [M].London:Manning,2005:4-5.
[7] 龐佳麗.分布式系統(tǒng)中基于中間件的異步通信可靠性研究 [D].杭州:浙江工業(yè)大學(xué),2017.
作者簡介:陳瑤(1987.04-),女,漢族,湖南湘潭人,工
程師,碩士,研究方向:數(shù)據(jù)傳輸框架;李洋磊(1983.01-),男,漢族,河南洛陽人,工程師,碩士,研究方向:民航氣象信息系統(tǒng)設(shè)備維護(hù)。