張樂
摘 要 針對(duì)現(xiàn)在移動(dòng)互聯(lián)網(wǎng)復(fù)雜、信號(hào)不穩(wěn)定的網(wǎng)絡(luò)特點(diǎn),設(shè)計(jì)了一套以 RabbitMQ消息中間件的即時(shí)通訊系統(tǒng),在確保服務(wù)質(zhì)量的情況下,減少了消息的冗余,減少了應(yīng)用間的耦合關(guān)系。使移動(dòng)設(shè)備更加省電省流量,適用于當(dāng)今移動(dòng)互聯(lián)網(wǎng)。
【關(guān)鍵詞】AMQP RabbitMQ 即時(shí)通訊
1 引言
近幾年來(lái),移動(dòng)互聯(lián)網(wǎng)憑借其攜帶方便、接入迅速、業(yè)務(wù)內(nèi)容豐富等特點(diǎn),取得了前所未有的高速發(fā)展。目前,移動(dòng)應(yīng)用服務(wù)呈現(xiàn)多樣化,但是移動(dòng)終端最基本的功能是滿足用戶的溝通需求,因此即時(shí)通訊類應(yīng)用以其隨時(shí)隨地溝通的特點(diǎn),滿足了用戶的需求。
目前常用于即時(shí)通訊的兩大協(xié)議是XMPP協(xié)議和SIMPLE協(xié)議。XMPP和SIMPLE 都是根據(jù)消息體里的消息頭尋址的,在消息體里面含有消息的發(fā)送者、接收者、消息路由用的會(huì)話標(biāo)示信息等眾多頭域,使得消息體的體積變大,帶寬利用率較低。XMPP 和 SIMPLE協(xié)議都是基于字符文本的通信協(xié)議,其優(yōu)點(diǎn)是可讀性強(qiáng),便于抓包分析,但字符文本協(xié)議通信效率較低,并且,為了保證通信安全,采用TLS 等加密傳輸計(jì)算量也較大,耗能較高。
移動(dòng)互聯(lián)網(wǎng)相對(duì)傳統(tǒng)互聯(lián)網(wǎng)最顯著的特點(diǎn)就是其移動(dòng)性,這種移動(dòng)性隨之帶來(lái)的是諸多不穩(wěn)定、不可靠和隨意性,使得移動(dòng)互聯(lián)網(wǎng)上的應(yīng)用與應(yīng)用服務(wù)器之間建立的網(wǎng)絡(luò)連接可靠性較差,很難保持長(zhǎng)時(shí)間的連接狀態(tài),因此傳統(tǒng)的通信模型,比如 SIMPLE和XMPP 中采用的消息傳遞,不適合應(yīng)用于移動(dòng)互聯(lián)網(wǎng),移動(dòng)互聯(lián)網(wǎng)應(yīng)用與應(yīng)用服務(wù)器之間可以通過松散耦合的關(guān)系來(lái)改善移動(dòng)性帶來(lái)的問題。
2 AMQP及RabbitMQ
2.1 AMQP
AMQP(AdvancedMessage Queuing Protocol),即高級(jí)消息隊(duì)列是一個(gè)基于消息異步處理的應(yīng)用層高級(jí)消息隊(duì)列協(xié)議,是消息中間件的開發(fā)標(biāo)準(zhǔn)。它的主要特征是面向消息、隊(duì)列、路由,且安全、可靠。AMQP是基于客戶端/代理模式,為客戶端應(yīng)用與消息中間件之間提供異步、安全、高效的交互?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端和中間件不同產(chǎn)品、不同開發(fā)語(yǔ)言等條件的限制。
2.2 RabbitMQ
RabbitMQ是流行的開源消息隊(duì)列系統(tǒng),由以高性能、健壯以及可伸縮性出名的erlang 語(yǔ)言開發(fā),是 AMQP 的標(biāo)準(zhǔn)實(shí)現(xiàn)。它可以支持各種消息交換的體系結(jié)構(gòu):
(1)存儲(chǔ)轉(zhuǎn)發(fā)(多個(gè)消息發(fā)送者,單個(gè)消息接收者);
(2)分布式事務(wù)(多個(gè)消息發(fā)送者,多個(gè)消息接收者);
(3)發(fā)布訂閱(多個(gè)消息發(fā)送者,多個(gè)消息接收者);
(4)基于內(nèi)容的路由(多個(gè)消息發(fā)送者,多個(gè)消息接收者);
(5)文件傳輸隊(duì)列(多個(gè)消息發(fā)送者,多個(gè)消息接收者);
(6)點(diǎn)對(duì)點(diǎn)連接(單個(gè)消息發(fā)送者,單個(gè)消息接收者)。
RabbitMQ的主要特性包括如下方面:
(1)定義、隊(duì)列、通道等概念,使對(duì)消息投遞的控制更加精準(zhǔn)靈活;
(2)支持高可用,以及消息和隊(duì)列的持久化;
(3)支持集群,允許節(jié)點(diǎn)動(dòng)態(tài)變化和節(jié)點(diǎn)故障;
(4)支持眾多語(yǔ)言的客戶端,包括Java、C/C++、php、.Netpython等;
(5)高性能、消息吞吐量大。
生產(chǎn)者發(fā)送消息到RabbitMQ 服務(wù)器,在服務(wù)器內(nèi)部,用戶創(chuàng)建交換機(jī)和隊(duì)列,通過綁定規(guī)則將兩者聯(lián)系在一起。交換機(jī)負(fù)責(zé)分發(fā)消息,根據(jù)類型綁定的不同分發(fā)策略有區(qū)別。消息最后來(lái)到隊(duì)列中,等待消費(fèi)者取走。
3 即時(shí)通訊系統(tǒng)的設(shè)計(jì)
本文提出一種以RabbitMQ為消息中間件,采用發(fā)布/訂閱模型作為消息傳輸機(jī)制,以protobuf作為數(shù)據(jù)傳輸格式。采用C/S 體系結(jié)構(gòu),將消息存儲(chǔ)與轉(zhuǎn)發(fā),確保消息不重不漏,充分的適應(yīng)移動(dòng)互聯(lián)網(wǎng)的一套即時(shí)通訊的方案。
本文的消息系統(tǒng)可以看做是一個(gè)發(fā)布/訂閱系統(tǒng),有消息生產(chǎn)者publisher,消息中轉(zhuǎn)者RabbitMQ Server,消息處理者M(jìn)essage Server,以及消息消費(fèi)者Receiver.這個(gè)系統(tǒng)中的publisher和receiver共同構(gòu)成了一個(gè)channel。
4 消息流程
4.1 發(fā)送流程
如圖1所示 。
(1)客戶端A發(fā)送一條消息內(nèi)容為send_client_uId【發(fā)送者id】 , channel name ,msg time【消息發(fā)送時(shí)間,精確到秒】,msg content【消息內(nèi)容】,保存在本地的SqliteSQL數(shù)據(jù)庫(kù),然后用protobuf序列化后發(fā)發(fā)個(gè)RabbitMQ。
(2)Message服務(wù)器的消息隊(duì)列通過RabbitMQ收到來(lái)自客戶端A的消息,反序列化。
(3)Message服務(wù)器收到消息后以channel name+ msg time為key到本地消息緩存中查詢消息是否已經(jīng)存在,如果存在則終止消息流程,通過RabbitMQ服務(wù)器發(fā)送"duplicate msg"這個(gè)msg ack 給客戶端A,否則繼續(xù)。
(4)Message服務(wù)器到Counter服務(wù)器(消息計(jì)數(shù)器,為每個(gè)text等類型的消息分配msg id)以channel name為key查詢其最新的msg id,把msg id自增一后作為這條消息的id。
(5)Message服務(wù)器把分配好id的消息插入本地msg cache和msg DB中。
(6)Message服務(wù)器給客戶端A返回ack, 內(nèi)容為msg id , msg time , channel name。
(7)客戶端A收到ack包后終止消息流程,并刪除本地?cái)?shù)據(jù)庫(kù)中的數(shù)據(jù)。如果在發(fā)送流程超時(shí)后仍未收到消息則轉(zhuǎn)到步驟1進(jìn)行重試,并計(jì)算重試次。
(8)如果重試次數(shù)超過兩次依然失敗則提示“系統(tǒng)繁忙” or “網(wǎng)絡(luò)環(huán)境不佳,請(qǐng)稍后再嘗試發(fā)送”等,終止消息發(fā)送流程。
4.2 接收流程
Message服務(wù)器到RabbitMQ中檢測(cè)Channel中的receive_client是否在線,如果在線則將消息發(fā)送出去。
4.3 心跳發(fā)送流程
(1)客戶端發(fā)送心跳包,內(nèi)容為{client_uId, network type, list{channel name:newest channel msg id} },即心跳包要上報(bào)客戶端所在的所有channel,以及本地歷史消息記錄中每個(gè)channel最新的消息的id;心跳包轉(zhuǎn)給專門處理心跳邏輯的心跳服務(wù)器。
(2)心跳服務(wù)器收到心跳包后到Counter服務(wù)器循環(huán)查詢每個(gè)channel的最新消息id,如果客戶端上報(bào)的id與這個(gè)id不等,就發(fā)送一條消息通知Message服務(wù)器,消息內(nèi)容為{publish_uId, channel name, client newest msg id of channel【channel內(nèi)的最新消息Id】}。
(3)Message服務(wù)器收到這條消息后,重新啟動(dòng)消息下發(fā)邏輯,到緩存中取出所有的大于{client newest msg id of channel}的id列表。
(4)Message服務(wù)器依據(jù)list中的id到消息存儲(chǔ)服務(wù)器中依次取出每條消息。
(5)Message服務(wù)器把這些消息作為"未讀消息"下發(fā)給客戶端。
(6)心跳服務(wù)器給客戶端下發(fā)heartbeat ack包,數(shù)據(jù)包括其所在的每個(gè)channel的最新消息的msg id。
(7)客戶端收到heartbeat ack包后,依據(jù)每個(gè)channel的最新的msg id與本地消息緩存中對(duì)應(yīng)的channel的最新消息id做對(duì)比,如果id不等,客戶端可以啟動(dòng)拉取消息流程。
5 結(jié)束語(yǔ)
本文簡(jiǎn)單介紹了基于AMQP協(xié)議的標(biāo)準(zhǔn)實(shí)現(xiàn)RabbitMQ,并提出了以開源跨平臺(tái)的RabbitMQ為消息中間件,松耦合的發(fā)布/訂閱模型為通訊方式,protocolBuffer為數(shù)據(jù)傳輸格式的一套完備即時(shí)通訊系統(tǒng)設(shè)計(jì),詳細(xì)敘述了消息下發(fā)的技術(shù)流程,同時(shí)保證了消息不重不漏。從根本上解決了由于消息冗余而占用大量的帶寬利用率,使得該方案更加適合移動(dòng)互聯(lián)網(wǎng)復(fù)雜的網(wǎng)絡(luò)環(huán)境。下一步的工作將著重研究在復(fù)雜的移動(dòng)網(wǎng)絡(luò)下,用于保持連接的心跳包的發(fā)送時(shí)間間隔的方案和消息傳遞時(shí)的安全問題。
參考文獻(xiàn)
[1]王默涵.面向移動(dòng)互聯(lián)網(wǎng)的Presence/IM機(jī)制的設(shè)計(jì)與實(shí)現(xiàn)[D].小型微型計(jì)算機(jī)系統(tǒng),2015(04).
[2]高曉婷.基于AMQP的信息發(fā)布與訂閱[D].2013(10).
[3]RabbitMQ與AMQP協(xié)議詳解.http://www.cnblogs.com/frankyou/p/5283539.html
[4]Rabbit MQ[EB/OL].http://www.rabbitmq.com/.z