顧振德 劉子辰 龍 隆 牟林宏
1(重慶郵電大學(xué)通信與信息工程學(xué)院 重慶 400065) 2(移動計算與新型終端北京市重點(diǎn)實(shí)驗(yàn)室(中國科學(xué)院計算技術(shù)研究所) 北京 100190)
傳統(tǒng)的IoT系統(tǒng)由三部分構(gòu)成:傳感器、網(wǎng)絡(luò)傳輸、數(shù)據(jù)交互平臺[1]。傳感器技術(shù)已經(jīng)取得了長足發(fā)展,市場上存在各種類型設(shè)備,網(wǎng)絡(luò)傳輸?shù)牟渴鸷蛻?yīng)用都日趨成熟[2-4]。隨著IoT的發(fā)展,終端的應(yīng)用規(guī)模都在百萬級別,終端向服務(wù)器并發(fā)地發(fā)送數(shù)據(jù)請求在某一時刻急劇增加,如何在短時間內(nèi)提高服務(wù)器并發(fā)處理能力是數(shù)據(jù)通信服務(wù)系統(tǒng)開發(fā)面臨的一個急需解決的問題[5]。傳統(tǒng)的數(shù)據(jù)通信服務(wù)系統(tǒng)采用Java原生NIO技術(shù)方式實(shí)現(xiàn),如果直接基于NIO類庫和API編程,會降低開發(fā)效率,同時出現(xiàn)epoll bug,導(dǎo)致Selector空輪詢,最終使CPU的占用率達(dá)到100%。羅文韜[6]采用異步處理和基于事件驅(qū)動的機(jī)制來提高服務(wù)器高并發(fā)處理的效率。汪佳文等[7]提出一種動態(tài)負(fù)載均衡算法,并結(jié)合優(yōu)化的Pick-K算法方案實(shí)現(xiàn)高并發(fā)傳輸。
本文提出一種基于Netty的通信服務(wù)系統(tǒng)的設(shè)計方案。該方案借助Netty的異步非堵塞、事件驅(qū)動等性能構(gòu)建高性能網(wǎng)絡(luò)通信程序[8],并通過結(jié)合自定義功能模塊設(shè)計和自定義線程池進(jìn)一步提高服務(wù)系統(tǒng)的并發(fā)處理能力。
Netty是業(yè)界NIO框架中最流行的框架,它的健壯性、可擴(kuò)展性、可定制性都是首屈一指的[9],可進(jìn)行如TCP、UDP套接字服務(wù)器的開發(fā)。本系統(tǒng)因終端與服務(wù)器之間的數(shù)據(jù)通信要求實(shí)時性,因此終端與服務(wù)系統(tǒng)建立TCP長連接實(shí)現(xiàn)?;贜etty實(shí)現(xiàn)終端通信服務(wù)系統(tǒng),可以不用過多關(guān)注連接的建立、數(shù)據(jù)的編解碼等底層通信的實(shí)現(xiàn),進(jìn)而能夠更好地關(guān)注業(yè)務(wù)模塊的實(shí)現(xiàn),極大地簡化了網(wǎng)絡(luò)編程。
長鏈接需要維護(hù)每個鏈路自己消息接收和發(fā)送的緩沖區(qū),而JDK原生NIO類庫[10]無法動態(tài)擴(kuò)容,從而給服務(wù)器帶來沉重的內(nèi)存負(fù)擔(dān)。Netty提供的ByteBuf支持容量的動態(tài)調(diào)整,選擇AdaptiveRecByteBufAllocatorv在創(chuàng)建服務(wù)端時候指定RecvByteBufAllocator,緩沖區(qū)的大小設(shè)置為消息的平均大小,避免額外的內(nèi)存浪費(fèi)。TCP層面的接收和發(fā)送緩沖區(qū)的大小設(shè)置,對于長連接設(shè)置為32 K。每個長連接就是一個會話,每個會話都有心跳等數(shù)據(jù)結(jié)構(gòu),給通信服務(wù)器帶來沉重GC(Garbage Collection)壓力,同時消耗大量的內(nèi)存[11]。本系統(tǒng)在設(shè)計通信服務(wù)端的時候采用ByteBuffer內(nèi)存池技術(shù)來解決上述問題。
終端向服務(wù)器發(fā)送的數(shù)據(jù)是一連串的字節(jié)數(shù)據(jù),為了讓服務(wù)器識別這些字節(jié)數(shù)組,制定特殊的協(xié)議格式,服務(wù)器響應(yīng)終端也要通過該協(xié)議。圖1為協(xié)議的設(shè)計。
圖1 協(xié)議圖
8個字節(jié)的終端序列號,2個字節(jié)的消息體長度和1個字節(jié)的協(xié)議版本,最后是消息體[12]。消息體設(shè)計代碼如下:
public class ProtocolFrame{
private long seriaNumber;
private byte version;
private short contentLength;
private byte[] content
…
}
serialNumber為終端的序列號,每一個終端將有唯一的序列號;version表示協(xié)議的版本號;contentLength為消息體的長度;content表示消息體,消息體包含終端運(yùn)行的各種參數(shù)等。
服務(wù)器設(shè)計劃分為6模塊,分別是異常處理、日志記錄、數(shù)據(jù)接收、業(yè)務(wù)處理、數(shù)據(jù)發(fā)送、session管理等模塊。異常處理模塊主要負(fù)責(zé)捕獲IoT通信服務(wù)系統(tǒng)自身的異常,以及客戶端的異常,從而提高系統(tǒng)的穩(wěn)定性。處理異常日志記錄模塊主要滿足性能測試和維護(hù)工作需要,設(shè)置輸出內(nèi)容,輸出到控制臺和文件等。數(shù)據(jù)接收模塊首先對客戶端的連接IP進(jìn)行過濾,驗(yàn)證客戶端的合法性。其次根據(jù)自定義通信協(xié)議解碼接收的數(shù)據(jù),對解碼后的數(shù)據(jù)進(jìn)行解密,驗(yàn)證數(shù)據(jù)的有效性和封裝數(shù)據(jù)。同時添加空閑超時處理邏輯,設(shè)計失效時間為180 s,若服務(wù)器180 s沒有接受到數(shù)據(jù)包,則及時關(guān)閉超時的客戶端連接。業(yè)務(wù)處理模塊實(shí)現(xiàn)具體的業(yè)務(wù)邏輯,將來自客戶端的數(shù)據(jù)分類存儲到MYSQL數(shù)據(jù)庫中,實(shí)現(xiàn)數(shù)據(jù)的持久化。數(shù)據(jù)發(fā)送模塊主要負(fù)責(zé)將來自業(yè)務(wù)處理模塊的數(shù)據(jù)進(jìn)行處理并下發(fā)到終端,對數(shù)據(jù)加密,根據(jù)協(xié)議對發(fā)送數(shù)據(jù)編碼,之后發(fā)送出去。session管理模塊在終端連接服務(wù)器時將session信息進(jìn)行保存,從而管理和操作連接,定時清除非活躍的連接,釋放內(nèi)存,減輕服務(wù)器連接壓力。終端通信服務(wù)系統(tǒng)的功能模塊如圖2所示。
圖2 通信系統(tǒng)功能模塊圖
利用Netty框架構(gòu)建服務(wù)器的大致流程:
1) 配置服務(wù)器的NIO線程組。
//該線程組用于處理服務(wù)器接收終端的連接;
master=isLinux()?new EpollEventLoopGroup(DEFAULT_THREAD_NUM, threadFactory)
:new NioEventLoopGroup
(DEFAULT_THREAD_NUM, threadFactory);
//該線程組用于處理來自中端的網(wǎng)絡(luò)讀寫;
worker=isLinux()?new EpollEventLoopGroup
(DEFAULT_THREAD_NUM,threadFactory)
:new NioEventLoopGroup
(DEFAULT_THREAD_NUM, threadFactory);
2) 創(chuàng)建 ServerBootstrap對象,傳遞master、work兩個線程池。
bootstrap=new ServerBootstrap();
3) 調(diào)用ServerBootstrap類的childHandler方法傳入接口實(shí)現(xiàn)類處理具體的業(yè)務(wù),綁定I O事件的處理類,監(jiān)聽端口IP地址,處理網(wǎng)絡(luò)IO事件。
bootstrap.group(master, worker);
bootstrap.channel(serverChannelcls);
bootstrap.localAddress(ipaddress, port);
bootstrap.childHandler(handler);
網(wǎng)絡(luò)IO事件處理類的部分關(guān)鍵代碼塊:
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//IP過濾,對黑名單中IP拒絕連接
channel.pipeline().addLast(filter);
//數(shù)據(jù)解碼
channel.pipeline().addLast(getProtocolFrameDecoder());
//數(shù)據(jù)編碼
channel.pipeline().addLast(protocalFrameDecrypt);
//數(shù)據(jù)解密
channel.pipeline().addLast(protocalFrameEncoder);
//數(shù)據(jù)加密
channel.pipeline().addLast(protocalFrameEncrypt);
//添加空閑超時的工具,關(guān)閉超時連接,刪除
channel.pipeline().addLast(messageProcessofEventLoopGroup, new IdleStateHandler(180, 0, 0));
//具體業(yè)務(wù)處理
channel.pipeline().addLast(messageProcessofEventLoopGroup, messageProcessor);
//異常處理
channel.pipeline().addLast(messageProcessofEventLoopGroup, exceptionHandler);
}
4) 使用創(chuàng)建的ServerBootstrap對象綁定,開始監(jiān)聽,用于異步操作的回調(diào)通知。
ChannelFuture f=bootstrap.bind().sync();
經(jīng)過以上操作,服務(wù)系統(tǒng)成功啟動,等待終端的連接請求,服務(wù)器接收到終端的數(shù)據(jù)后,將從過濾IP模塊開始傳遞數(shù)據(jù)并處理。
業(yè)務(wù)處理需要訪問數(shù)據(jù)庫,如果直接使用Netty的worker線程進(jìn)行業(yè)務(wù)處理,可能會因不確定的執(zhí)行時間導(dǎo)致線程被阻塞,最終導(dǎo)致服務(wù)器宕機(jī)。因此采用自定義業(yè)務(wù)線程池來處理比較耗時的業(yè)務(wù)邏輯,進(jìn)而提高通信系統(tǒng)性能。ThreadPoolExecutor類從JDK1.5開始被提供自定義線程池[13]。newFixedThreadPool可創(chuàng)建固定大小的線程池和可控控制線程最大并發(fā)數(shù),當(dāng)線程池中的線程數(shù)達(dá)到其設(shè)定的核心線程大小時,新創(chuàng)建的線程會在無界隊列中等待[14]。當(dāng)線程池中的某個線程執(zhí)行失敗時新創(chuàng)建的線程會替代執(zhí)行剩下任務(wù)。當(dāng)線程池中創(chuàng)建的線程調(diào)用shutdown函數(shù)時會退出線程池[14]。線程池最大尺寸不要超過系統(tǒng)資源限制,算法公式如下:
(1)
式中:Ns為線程的設(shè)置數(shù);Ni為CPU核心數(shù);Nj為預(yù)期CPU核心利用率;T/C為任務(wù)等待的時間與執(zhí)行時間的比值[15]。
IoT終端與服務(wù)器通信的數(shù)據(jù)交互主要包括數(shù)據(jù)的解析、訪問緩存數(shù)據(jù)庫、入庫MYSQL數(shù)據(jù)庫、發(fā)送數(shù)據(jù)的預(yù)處理等操作。其中數(shù)據(jù)的編解碼、加解密等執(zhí)行時間短操作交由NIO線程執(zhí)行,而耗時比較長的業(yè)務(wù)處理交由自定義線程池執(zhí)行完成,具體執(zhí)行流程如圖3所示。
圖3 Netty的NIO線程池與自定義線程池交互流程圖
采用開源壓力測試工具Jmeter模擬IoT終端進(jìn)行通信系統(tǒng)性能測試,從服務(wù)系統(tǒng)平均響應(yīng)時間、系統(tǒng)IO吞吐量2個方面進(jìn)行數(shù)據(jù)分析。因條件有限,性能測試在網(wǎng)絡(luò)帶寬100 MB/S的局域網(wǎng)中執(zhí)行,服務(wù)器CPU四核Intel(R) Core(TM) i5-3210M CPU @2.50 GHz內(nèi)存8 GB, 操作系統(tǒng)centos-release-7-5.1804.e17.centos.2.x86_64。
模擬IoT終端每2秒發(fā)送一個180字節(jié)的實(shí)時數(shù)據(jù),由圖4可知,當(dāng)并發(fā)數(shù)小于1 600時,采用Netty框架結(jié)合自定義線程池方案并沒有較大的優(yōu)勢,因?yàn)檎{(diào)用本地方法會有一定的系統(tǒng)開銷。當(dāng)并發(fā)數(shù)量達(dá)到2 400時,采用Netty框架結(jié)合自定義線程池方案優(yōu)勢較為明顯。基于Java NIO的實(shí)現(xiàn)方案因系統(tǒng)消耗資源較大導(dǎo)致響應(yīng)時間過長,同時基于Netty的實(shí)現(xiàn)方案因業(yè)務(wù)操作性能開銷比較大,阻塞了I/O線程,導(dǎo)致響應(yīng)時間過長。采用Netty框架結(jié)合自定義線程池方案系統(tǒng)平均響應(yīng)時間仍在100 ms以下,且未達(dá)到瓶頸。較Java NIO和Netty實(shí)現(xiàn)方案平均響應(yīng)時間縮短了97%和95%。
圖4 平均響應(yīng)時間比較
在吞吐量方面,由圖5可知,當(dāng)并發(fā)達(dá)到1 200時,基于Java NIO的實(shí)現(xiàn)方案平均吞吐量達(dá)到最高值,隨著并發(fā)數(shù)請求數(shù)增大,服務(wù)器出現(xiàn)異常吞吐量下降。當(dāng)并發(fā)數(shù)達(dá)到2 000和2 400時,采用Netty框架結(jié)合自定義線程池方案比基于Netty框架平均吞吐量增加了16.7%和33.2%。實(shí)驗(yàn)證明,當(dāng)4 000并發(fā)連接系統(tǒng)時,系統(tǒng)仍然穩(wěn)定運(yùn)行,如圖6所示。這說明采用Netty框架結(jié)合自定義線程池方案符合IoT終端通信服務(wù)系統(tǒng)處理高并發(fā)的設(shè)計要求。
圖5 平均響應(yīng)吞吐量比較
圖6 連接統(tǒng)計圖
本文介紹了IoT移動終端與服務(wù)器數(shù)據(jù)交互系統(tǒng),基于Netty框架和Java的ThreadPoolExecutor結(jié)合的設(shè)計方案,重點(diǎn)研究設(shè)計通信系統(tǒng)編解碼、加解密等模塊,提出一種自定義線程池處理耗時業(yè)務(wù)的設(shè)計方案。經(jīng)驗(yàn)證采用Netty框架結(jié)合自定義線程池可提高系統(tǒng)的高并發(fā)處理能力。該設(shè)計已成功應(yīng)用于某企業(yè)通信系統(tǒng)當(dāng)中,并且與30 000多臺IoT終端進(jìn)行數(shù)據(jù)交互,未出現(xiàn)數(shù)據(jù)交互不穩(wěn)定情況,且系統(tǒng)運(yùn)行效果良好。實(shí)踐證明,該設(shè)計是一種可參考的通信設(shè)計方案。