亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        基于響應(yīng)式的服務(wù)推送框架設(shè)計(jì)

        2021-08-23 04:00:10胡喜明
        關(guān)鍵詞:服務(wù)

        胡喜明,胡 淼

        (杭州電子科技大學(xué) 通信工程學(xué)院,浙江 杭州 310018)

        0 引 言

        隨著物聯(lián)網(wǎng)技術(shù)的發(fā)展,人們能夠通過移動(dòng)終端方便獲取所需要的信息與服務(wù)[1]。為了保證網(wǎng)絡(luò)設(shè)備與物理設(shè)備間可靠的傳輸,用戶將傳統(tǒng)的消息拉取模式轉(zhuǎn)變?yōu)閼?yīng)用消息推送。與消息拉取模式相比,由服務(wù)器將所產(chǎn)生的消息推送用戶,符合當(dāng)前互聯(lián)網(wǎng)流量管理以及資源分配的需求,同時(shí)在傳輸?shù)膶?shí)時(shí)性、高效性以及流量資源節(jié)約方面都有較大的提升[2]。物聯(lián)網(wǎng)設(shè)備需要考慮到硬件相關(guān)參數(shù),例如CPU性能、最大帶寬容量、電池使用時(shí)間、網(wǎng)絡(luò)最大實(shí)時(shí)流量等方面的問題,在服務(wù)推送方面需要一種功能完善、更加輕量、帶寬占比小且性能高的服務(wù)推送技術(shù)[3]。

        當(dāng)前主流的服務(wù)推送方案有GCM服務(wù)、MQTT以及HTTP消息輪循[4]。其中HTTP輪循方式最為簡單,但是在實(shí)用性、可靠性以及可擴(kuò)展性方面有所欠缺。GCM服務(wù)是google研發(fā)的一款云推送框架,擁有完善的推送方案,但是受限于網(wǎng)絡(luò)原因,無法在國內(nèi)使用。MQTT協(xié)議具有低延遲、低帶寬、推送速度快等優(yōu)勢,適合大部分物聯(lián)網(wǎng)開發(fā)場景,然而單純的MQTT協(xié)議本身過于簡單,在消息推送、安全性管理、主題化推送方式、數(shù)據(jù)緩存性等方面開發(fā)難度大,因此需要對MQTT協(xié)議進(jìn)行二次開發(fā)[5]。

        本文提出一種基于Reactor-Netty+MQTT的高性能服務(wù)推送框架。項(xiàng)目中應(yīng)用Reactor-Netty技術(shù)替代傳統(tǒng)Netty技術(shù)對MQTT協(xié)議進(jìn)行封裝,借助該技術(shù)的響應(yīng)式非阻塞編程、事件驅(qū)動(dòng)等特性構(gòu)建高性能通信服務(wù)。通過Redis進(jìn)行數(shù)據(jù)緩存和Kafka實(shí)現(xiàn)消息代理轉(zhuǎn)發(fā)進(jìn)一步提高系統(tǒng)服務(wù)處理能力以及線程安全。

        1 相關(guān)技術(shù)

        1.1 MQTT協(xié)議

        1.1.1 MQTT背景及特點(diǎn)

        MQTT是由IBM公司在1999年所提出的一款基于TCP協(xié)議的發(fā)布訂閱協(xié)議[6,7]。MQTT是為了在有限的帶寬以及內(nèi)存條件下實(shí)現(xiàn)消息可靠傳輸。該協(xié)議提供發(fā)布/訂閱形式,實(shí)現(xiàn)單點(diǎn)以及一對多的發(fā)布消息推送、應(yīng)用TCP協(xié)議實(shí)現(xiàn)有序可靠雙端連接、報(bào)頭以及心跳報(bào)文僅占用3個(gè)字節(jié),將帶寬傳輸最小化,有效降低網(wǎng)絡(luò)通信流量、支持“最多一次”、”至少一次”,“僅一次“3種服務(wù)質(zhì)量等級、支持遺囑機(jī)制,實(shí)現(xiàn)客戶端異常斷開后,自動(dòng)根據(jù)所設(shè)置的遺囑機(jī)制,發(fā)布相關(guān)主題信息通知其它訂閱的客戶端用戶。基于以上特點(diǎn)使得當(dāng)前物聯(lián)網(wǎng)的開發(fā)中絕大多數(shù)都將MQTT作為消息傳遞協(xié)議的首選[8]。

        MQTT協(xié)議主要擁有3個(gè)角色:消息發(fā)布者、消息代理、消息訂閱者[9]。消息通過推送的形式從發(fā)布者經(jīng)過消息代理進(jìn)行發(fā)布,此時(shí)消息已經(jīng)確定了自身所對應(yīng)的主題,消息訂閱者可以根據(jù)主題進(jìn)行相關(guān)數(shù)據(jù)的訂閱[10,11]。MQTT工作模型如圖1所示。

        圖1 MQTT工作模型

        1.1.2 MQTT協(xié)議消息傳輸格式

        MQTT協(xié)議主要由固定報(bào)頭、可變報(bào)頭、有效載荷3部分構(gòu)成。固定報(bào)頭是所有MQTT報(bào)文段中必須存在的,總長度為2字節(jié)。在第一個(gè)字節(jié)中,7-4位標(biāo)識MQTT控制報(bào)文類型,目前報(bào)文類型主要分為連接、訂閱、發(fā)布3種。服務(wù)質(zhì)量占兩個(gè)標(biāo)識位,分為Qos0、Qos1、Oos23種。4-0位最為控制報(bào)文標(biāo)志位,可看作一種屬性參數(shù)??勺儓?bào)頭與有效載荷根據(jù)協(xié)議的不同進(jìn)行省略[12,13]。

        1.2 Reactor-Netty框架

        為了應(yīng)對高并發(fā)場景下流量過大情況,微軟設(shè)計(jì)了一種異步的編程思想-響應(yīng)式編程(reactive programming)。響應(yīng)式編程是一種專注于數(shù)據(jù)流以及變化傳遞的異步編程模式,這也意味著可以應(yīng)用編程語言進(jìn)行靜態(tài)和動(dòng)態(tài)數(shù)據(jù)流的表示。在jdk9中java引入了Reactor的概念,Reactor-Netty作為響應(yīng)式編程家族的一員,其底層基于Netty框架,對Netty進(jìn)行響應(yīng)式編程封裝,將其轉(zhuǎn)換為異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架。Reactor-Netty內(nèi)部仍然保留了Netty的主從多線程模型,擁有Netty框架的全部優(yōu)勢。

        Reactor通過Reactor Streams中的背壓,進(jìn)行數(shù)據(jù)流量控制,發(fā)布者和訂閱者可以進(jìn)行數(shù)據(jù)流量協(xié)商,其中背壓分為4種回壓策略:①onBackpressureBuffer:對下游的請求數(shù)據(jù)采用緩存的形式,保證系統(tǒng)不會壓力過大。②OnBackpressureDrop:元素就緒時(shí),根據(jù)下游是否有未滿足的request來判斷是否發(fā)出當(dāng)前元素。③OnBackpressureLatest:一直發(fā)送當(dāng)前最新的數(shù)據(jù)。④OnBackpressureError:當(dāng)前數(shù)據(jù)已經(jīng)滿了,再次添加請求直接報(bào)錯(cuò)。圖2為OnBackpressureBuffer模式的背壓原理圖,當(dāng)訂閱者的消費(fèi)能力遠(yuǎn)小于發(fā)布者,訂閱者可以通知發(fā)布者進(jìn)行服務(wù)的取消和終止功能,保證傳輸數(shù)據(jù)流量合理。

        圖2 OnBackpressureBuffer模式背壓原理

        2 服務(wù)推送框架設(shè)計(jì)

        整體服務(wù)器分為3部分,消息推送broker模塊,針對MQTT協(xié)議所定義的多種消息類型進(jìn)行分類處理以及通訊傳輸協(xié)議的搭建。服務(wù)認(rèn)證模塊,為保證服務(wù)的安全性提供了接口化驗(yàn)證,通過將RSA算法與Redis緩存結(jié)合,實(shí)現(xiàn)安全性加密認(rèn)證。數(shù)據(jù)緩存模塊,為防止傳輸中服務(wù)突然宕機(jī)所導(dǎo)致的數(shù)據(jù)丟失,采用Redis對服務(wù)中傳輸?shù)臄?shù)據(jù)進(jìn)行緩存。消息分發(fā)模塊,針對服務(wù)中耗時(shí)操作以及大型互聯(lián)網(wǎng)數(shù)據(jù)進(jìn)行流量統(tǒng)計(jì)的需求,對接企業(yè)級消息隊(duì)列kafka實(shí)現(xiàn)消息服務(wù)端與數(shù)據(jù)分發(fā)接收端的解耦,便于對海量數(shù)據(jù)的處理。系統(tǒng)架構(gòu)如圖3所示。

        圖3 服務(wù)推送系統(tǒng)架構(gòu)

        3 服務(wù)推送框架模塊實(shí)現(xiàn)

        3.1 服務(wù)認(rèn)證模塊

        服務(wù)認(rèn)證是對系統(tǒng)進(jìn)行安全維護(hù)的手段,客戶端在接入服務(wù)器時(shí)需要進(jìn)行權(quán)限認(rèn)證工作,通過對接入用戶的身份驗(yàn)證,保證服務(wù)傳輸信息不被惡意獲取。驗(yàn)證流程如圖4所示。

        圖4 服務(wù)認(rèn)證流程

        為保證服務(wù)的安全性,需要對密碼進(jìn)行加密處理,防止由于密碼泄露所導(dǎo)致的非法客戶端接入竊取信息。這里采用RSA非對稱加密算法實(shí)現(xiàn)公私鑰加密校驗(yàn)。應(yīng)用項(xiàng)目中提供的RSAUTIL類生成公鑰與私鑰,其內(nèi)部經(jīng)過大量的邏輯運(yùn)算實(shí)現(xiàn)加密。系統(tǒng)需要保證每次生成的公鑰與私鑰唯一,這里將服務(wù)名稱與當(dāng)前時(shí)間進(jìn)行拼接作為加密的鹽值。這樣每次獲取生成的公鑰私鑰必然唯一。公鑰私鑰生成后以服務(wù)名稱作為key值,明文密碼作為value存儲到Redis中。

        客戶端在發(fā)起連接時(shí),需要將通過公鑰加密后的密碼和用戶名稱一并傳輸,服務(wù)器接收到信息后,通過私鑰對傳輸密碼進(jìn)行解密與Redis中存儲的密碼進(jìn)行對比驗(yàn)證。

        3.2 MQTT核心Broker模塊

        Broker模塊是推送服務(wù)的核心功能模塊,關(guān)于MQTT協(xié)議的邏輯處理部分均在該模塊進(jìn)行。其中主要包括QoS服務(wù)質(zhì)量等級選擇功能、遺囑消息功能,保留消息功能、客戶端自動(dòng)重連功能、心跳機(jī)制功能、MQTT/WS連接功能以及主題過濾功能。

        QoS服務(wù)質(zhì)量等級選擇功能主要是為根據(jù)訂閱端業(yè)務(wù)為其提供消息選擇類型的處理,內(nèi)部根據(jù)選擇等級的不同實(shí)現(xiàn)相應(yīng)邏輯處理。其中主要分為Qos1、Qos2以及Qos3這3種。

        (1)Qos0:在該服務(wù)質(zhì)量下,消息至多發(fā)送一次,其消息的發(fā)送完全依賴于OSI七層協(xié)議中的傳輸層進(jìn)行維護(hù),可能會發(fā)生消息丟失或者重復(fù)的情況。該服務(wù)質(zhì)量可用于如下情況:定時(shí)推送周圍環(huán)境相關(guān)數(shù)據(jù),丟失一兩組數(shù)據(jù)不會影響服務(wù)的應(yīng)用且在不久后會再次推送。該情況主要用于普通APP定時(shí)推送功能,若設(shè)備當(dāng)前推送數(shù)據(jù)時(shí)設(shè)備并未接入網(wǎng)絡(luò),此時(shí)即使再次聯(lián)網(wǎng)也會丟失數(shù)據(jù)。

        (2)Qos1:在該服務(wù)質(zhì)量下,消息至少發(fā)送一次,消息發(fā)送到客戶端后,客戶端返回確認(rèn)信息標(biāo)志當(dāng)前服務(wù)已送達(dá)。該服務(wù)由于有一次確認(rèn)機(jī)制,因此在網(wǎng)絡(luò)環(huán)境比較好的情況下可以實(shí)現(xiàn)數(shù)據(jù)正確流暢的傳輸,但是若網(wǎng)絡(luò)環(huán)境出現(xiàn)波動(dòng)使得服務(wù)端無法收到確認(rèn)信息,可能會造成數(shù)據(jù)多次重發(fā)的問題。

        (3)Qos2:該服務(wù)質(zhì)量在三者中處于最高級別,該消息僅會發(fā)送一次。主要用在當(dāng)消息丟失或者重發(fā)時(shí),對服務(wù)端造成業(yè)務(wù)上的影響的情況。為了保證傳輸一次的要求,采用了兩階段確認(rèn)的方式,與Qos1相比開銷較大。

        遺囑消息功能主要是在發(fā)生服務(wù)斷連問題的時(shí)候,訂閱端無法及時(shí)了解消息發(fā)布方狀態(tài)導(dǎo)致的持續(xù)等待問題。遺囑消息保證了消息發(fā)布方在網(wǎng)絡(luò)波動(dòng)所導(dǎo)致的服務(wù)下線時(shí)能夠及時(shí)通知訂閱方,這里遺囑的內(nèi)容以事件監(jiān)聽器的形式實(shí)現(xiàn),一旦出現(xiàn)服務(wù)斷連的情況即可通過監(jiān)聽模式進(jìn)行響應(yīng);

        保留消息功能主要是對重要信息進(jìn)行標(biāo)記,任何標(biāo)記為保留消息的內(nèi)容,新接入的訂閱方都可以在連接后收到,并且不需要等待消息發(fā)送方的推送,內(nèi)部將消息存儲在Redis緩存中,保證數(shù)據(jù)不丟失;

        訂閱方斷線重連功能主要是為了將服務(wù)自動(dòng)化管理,保證訂閱方在由于網(wǎng)絡(luò)波動(dòng)導(dǎo)致失去連接,服務(wù)器會自動(dòng)實(shí)現(xiàn)斷線重連,降低維護(hù)成本,由Reactor-Netty內(nèi)部采用的心跳檢測機(jī)制,通過設(shè)定心跳間隔實(shí)現(xiàn)斷線檢測功能,并且基于事件驅(qū)動(dòng),在產(chǎn)生了心跳斷連的情況時(shí)觸發(fā)重連機(jī)制,通過輪循的方式發(fā)送重連請求保證服務(wù)的可靠性;

        心跳檢測功能主要是通過底層Netty的心跳檢測模塊,MQTT協(xié)議中有PINGREQ心跳請求,通過實(shí)現(xiàn)底層Rea-ctor-Netty的Handler類,對傳入的PINGREQ進(jìn)行管理,客戶端定時(shí)發(fā)送心跳檢測請求,通過Reactor-Netty內(nèi)置的MONO類進(jìn)行調(diào)用,該類基于事件觸發(fā),可根據(jù)實(shí)際情況進(jìn)行結(jié)束處理。并且MONO類可以與java8中并發(fā)異步響應(yīng)類進(jìn)行轉(zhuǎn)化,服務(wù)端將心跳響應(yīng)包裝為MONO進(jìn)行返回操作;

        MQTT/WS連接功能在推送服務(wù)中,不僅要與采用MQTT協(xié)議的硬件設(shè)備進(jìn)行關(guān)聯(lián),同時(shí)還會與互聯(lián)網(wǎng)設(shè)備產(chǎn)生連接,因此在服務(wù)器接收連接請求時(shí)會判斷當(dāng)前訂閱端類別,根據(jù)MQTT以及WS進(jìn)行分類處理。底層采用工廠設(shè)計(jì)模式對服務(wù)進(jìn)行劃分,根據(jù)傳入的標(biāo)識進(jìn)行判斷并選取不同的處理方式。對于MQTT協(xié)議,首先需要傳入配置類,通過內(nèi)置buildServer方法將配置類中信息綁定到當(dāng)前服務(wù)中,為保證安全服務(wù)端可以設(shè)定SSL驗(yàn)證保證傳輸安全。在配置類中通過MONO類的鏈?zhǔn)秸{(diào)用,將協(xié)議的處理類,以及客戶端重傳機(jī)制進(jìn)行綁定。WS連接與MQTT不同之處在于底層采用http傳輸,同時(shí)WS也不需要重連機(jī)制。服務(wù)器內(nèi)部封裝通過Reactor-Netty底層中與網(wǎng)絡(luò)傳輸相關(guān)的Handler處理類進(jìn)行半包解析、長連接以及序列化;

        主題過濾功能主要是對通配符進(jìn)行匹配,其中”#“符號表示只要該符號前的信息匹配,后續(xù)字段可以忽略不計(jì)?!??“表示當(dāng)前的占位符,可替代任意符號。服務(wù)器通過對通配符進(jìn)行匹配,實(shí)現(xiàn)主題個(gè)性化定制,該功能主要針對大批量服務(wù)接入時(shí)需要根據(jù)主題進(jìn)行客戶端分類。

        borker模塊中Reactor-Netty對MQTT協(xié)議進(jìn)行封裝,結(jié)合Reactor框架響應(yīng)式以及背壓的特點(diǎn),提升MQTT整體性能。

        以Reactor-Netty框架初始化服務(wù)功能為例:

        (1)服務(wù)啟動(dòng)后首先調(diào)用服務(wù)端工廠方法TransportServerFactory進(jìn)行服務(wù)初始化操作,MONO類進(jìn)行服務(wù)的綁定以及初始化。

        //初始化類,綁定相關(guān)屬性信息

        Mono.from(protocolFactory.getProtocol(

        ProtocolType.valueOf(config.getProtocol()))

        //獲取傳輸信息

        .get()>.getTransport()

        //綁定Reactor內(nèi)部類

        .start(config,>unicastProcessor)

        //netty初始化類

        .map(this::>wrapper)

        //錯(cuò)誤信息打印

        .doOnError(config.getThrowableConsumer());}

        (2)調(diào)用底層Reactor-Netty方法進(jìn)行初始化。創(chuàng)建Netty連接并設(shè)置定時(shí)屬性。

        NettyInbound inbound = connection.getInbound();

        Connection c = connection.getConnection();

        // 定時(shí)關(guān)閉

        Disposable disposable = Mono.fromRunnable(c::dispose)

        .delaySubscription(Duration.ofSeconds(10))

        .subscribe();

        // 設(shè)置connection

        c.channel()

        .attr(AttributeKeys.connectionAttributeKey)

        .set(connection);

        // 設(shè)置定時(shí)關(guān)閉

        c.channel()

        .attr(AttributeKeys.closeConnection)

        .set(disposable);

        (3)設(shè)置心跳檢測以及遺囑消息處理模塊

        //心跳檢測

        connection.getConnection()>.onReadIdle(config.getHeart(), () -> connection.getConnection()>.dispose())

        //設(shè)置遺囑消息

        connection.getConnection()>.onDispose() ->

        {

        Optional.ofNullable(connection.getConnection()>.channel()>.attr(AttributeKeys.WILL_MESSAGE))>.map(Attribute::>get)

        (4)根據(jù)QOS等級設(shè)置不同的消息處理類。

        switch (qoS) {

        //QOS1等級

        case AT_LEAST_ONCE:

        co.sendMessage(false, >qoS,>willMessage.isRetain(), willMessage.getTopicName(), willMessage.getCopyByteBuf())>.subscribe();

        break;

        //QOS2等級以及QOS3等級

        case EXACTLY_ONCE:

        case AT_MOST_ONCE:

        co.sendMessageRetry(false,>qoS,>willMessage.>isRetain(), >willMessage.getTopicName(),>willMessage.getCopyByteBuf())>.

        subscribe();

        break;

        //未傳輸時(shí)默認(rèn)傳輸?shù)燃?/p>

        default:

        co.sendMessage(false,>qoS,>willMessage.is Retain(), willMessage.getTopicName(),>willMessage.getCopyByteBuf())>.subscribe();

        break;}

        })));

        (5)sendMessage內(nèi)部綁定了handler處理類,通過對傳輸類型的判斷設(shè)置不同的handler。

        messageTypeCollection.computeIfAbsent(messageType,

        type->{

        //根據(jù)傳入的類型進(jìn)行判斷所需要的handler

        switch(type){

        //數(shù)據(jù)傳輸響應(yīng)

        case PUBACK:

        case PUBREC:

        case PUBREL:

        case PUBLISH:

        case PUBCOMP:

        return new PubHandler();

        //連接請求

        case CONNECT:

        case DISCONNECT:

        return new ConnectHandler();

        //心跳檢測

        case PINGREQ:

        return new HeartHandler();

        //服務(wù)訂閱

        case SUBSCRIBE:

        case UNSUBSCRIBE:

        return new SubHandler();

        }

        經(jīng)過以上操作,服務(wù)器初始化成功,等待相關(guān)請求。服務(wù)器在接收到請求后,根據(jù)數(shù)據(jù)請求類型進(jìn)行相應(yīng)處理。

        3.3 服務(wù)器數(shù)據(jù)緩存模塊

        隨著服務(wù)連接數(shù)的增多,交互數(shù)據(jù)量也逐漸增大,如果將數(shù)據(jù)都存儲在內(nèi)存中,將降低整體服務(wù)的性能。項(xiàng)目中將數(shù)據(jù)存儲在Redis中,作為當(dāng)前服務(wù)數(shù)據(jù)的緩存。Redis內(nèi)部對存入信息按照key-value形式進(jìn)行存儲,保證數(shù)據(jù)的唯一性。Redis是一款Nosql數(shù)據(jù)庫,在存儲方面采用的單線程處理,因此訪問時(shí)不需要進(jìn)行并發(fā)維護(hù),并且Redis在并發(fā)數(shù)據(jù)訪問以及讀取方面的性能均好于數(shù)據(jù)庫。為防止Redis系統(tǒng)崩潰導(dǎo)致的數(shù)據(jù)丟失,內(nèi)部開啟數(shù)據(jù)持久化形式,將數(shù)據(jù)定時(shí)保存到磁盤文件中,同時(shí)采用sentinel監(jiān)控下的Redis集群模型,sentinel作為服務(wù)監(jiān)控組件主要功能為服務(wù)監(jiān)控、故障提示、故障自動(dòng)轉(zhuǎn)移,通過對當(dāng)前Redis集群的心跳監(jiān)控,保證當(dāng)Redis主節(jié)點(diǎn)崩潰時(shí),系統(tǒng)自動(dòng)在其它從服務(wù)器中進(jìn)行選舉,產(chǎn)生新的主節(jié)點(diǎn),實(shí)現(xiàn)服務(wù)的可用性。Redis集群模式采用哈希槽算法對每個(gè)存入的key進(jìn)行CRC16校驗(yàn)后對16 384進(jìn)行取模來決定放入那個(gè)服務(wù)器中。相對于傳統(tǒng)集群形式,該結(jié)構(gòu)更容易擴(kuò)展,如果擴(kuò)充一個(gè)節(jié)點(diǎn)D,只需要將A、B、C節(jié)點(diǎn)中的部分槽放置在D上;如果想移除節(jié)點(diǎn)A,只需要將A的數(shù)據(jù)轉(zhuǎn)移到B和C節(jié)點(diǎn)上。由于將哈希槽從一個(gè)節(jié)點(diǎn)移動(dòng)到另一個(gè)節(jié)點(diǎn)不需要停止服務(wù),只需要通過命令直接再分配,因而上述擴(kuò)展不會造成集群不可用,保證了服務(wù)的高可用性。

        如表1所示,項(xiàng)目中Redis主要對以下7個(gè)內(nèi)容進(jìn)行存儲。

        表1 Redis存儲信息

        publish以及pubrel:主要是為了滿足Qos質(zhì)量等級,質(zhì)量等級的不同導(dǎo)致數(shù)據(jù)需要重傳,因此需要對傳輸當(dāng)前數(shù)據(jù)進(jìn)行緩,等待重新發(fā)送。

        session:主要是對當(dāng)前客戶端與服務(wù)器所建立的會話進(jìn)行存儲。MQTT的設(shè)計(jì)主要是為了對信號不穩(wěn)定的網(wǎng)絡(luò)提供服務(wù),因此有時(shí)會出現(xiàn)網(wǎng)絡(luò)波動(dòng)導(dǎo)致的斷開連接情況,此時(shí)客戶端就可以在Session中將遺囑信息也進(jìn)行存儲,服務(wù)器會定期對客戶端狀態(tài)進(jìn)行檢查,如果出現(xiàn)異常可以向Topic中發(fā)送遺囑信息通知訂閱方,避免服務(wù)宕機(jī)而導(dǎo)致的訂閱方長時(shí)間等待問題。

        client:訂閱者根據(jù)主題的不同實(shí)現(xiàn)訂閱服務(wù),Redis中需要針對每個(gè)訂閱者進(jìn)行唯一的標(biāo)識ClientId,該標(biāo)識應(yīng)用Redis內(nèi)部的incr方法實(shí)現(xiàn)標(biāo)識號的遞增操作并與服務(wù)名進(jìn)行拼接。在高并發(fā)場景下,Redis內(nèi)部實(shí)現(xiàn)id的同步生成,保證線程安全。將ClientId與所訂閱的主題進(jìn)行綁定,方便后續(xù)服務(wù)推送時(shí)查詢。

        subWild與NotsubWild:發(fā)布方采用個(gè)性化的主題模式進(jìn)行服務(wù)發(fā)布,其中可以通過#與?進(jìn)行多字段匹配,與當(dāng)前單獨(dú)字段匹配,因此在存儲的時(shí)候需要將有無通配符的主題進(jìn)行分別存儲。

        retain:該緩存主要為了broker模塊中保留信息功能,可以讓新訂閱的客戶端獲取發(fā)布方當(dāng)前最新的信息,不需要進(jìn)行接收等待。

        3.4 消息代理分發(fā)模塊

        消息代理分發(fā)模式是在硬件設(shè)備之間通信的基礎(chǔ)上,將數(shù)據(jù)傳入大型互聯(lián)網(wǎng)設(shè)備中進(jìn)行處理。這里采用kafka作為消息代理分發(fā)的中間件,kafka用于海量數(shù)據(jù)的場景,通過分布式架構(gòu)所提供的消息處理機(jī)制對數(shù)據(jù)進(jìn)行流式存儲,消費(fèi)者通過訂閱不同的主題可以實(shí)現(xiàn)毫秒級延遲的信息接收,保證數(shù)據(jù)順序性消費(fèi)。大型企業(yè)項(xiàng)目中可通過kafka將該框架的推送數(shù)據(jù)轉(zhuǎn)發(fā)到hdfs歸檔或者elasticsearch做全文檢測,為大型互聯(lián)網(wǎng)設(shè)備提供高性能的數(shù)據(jù)獲取途徑。

        4 測試結(jié)果

        4.1 安全性測試

        客戶端輸入錯(cuò)誤的密碼并向服務(wù)器發(fā)送連接請求,在已知服務(wù)器的IP地址以及運(yùn)行端口號的情況下進(jìn)行訪問測試。測試結(jié)果如圖5所示,經(jīng)過rsa私鑰解碼顯示解析后的密碼與Redis中所存儲的密碼不符,返回錯(cuò)誤提示。因此該推送框架可以保證服務(wù)的安全性。

        圖5 安全認(rèn)證失敗

        4.2 性能測試

        為檢測推送服務(wù)的性能,需要對其在不同并發(fā)量情況下進(jìn)行性能測試,測試采用Jmeter進(jìn)行服務(wù)推送性能檢測,根據(jù)并發(fā)數(shù)量以及響應(yīng)時(shí)間對框架進(jìn)行分析。測試設(shè)備采用兩臺阿里云服務(wù)器,配置見表2。

        Jmeter端每間隔1 s發(fā)送一個(gè)350字節(jié)的數(shù)據(jù),在不同的并發(fā)數(shù)情況下進(jìn)行測試。如圖6所示,在低并發(fā)情況下,由于系統(tǒng)中自身處理速度比較快,無法清楚看出MQTT與本框架的性能,但隨著并發(fā)數(shù)的增長,單純采用MQTT的響應(yīng)時(shí)間上升較快,當(dāng)并發(fā)數(shù)達(dá)到了2000時(shí)可以看到MQTT的響應(yīng)時(shí)間已經(jīng)遠(yuǎn)遠(yuǎn)超過Reactor-Netty+MQTT,同時(shí)從與Netty+MQTT的響應(yīng)時(shí)間對比可以看出,該框架的響應(yīng)時(shí)間也相對較低。這說明采用Reactor-Netty+MQTT的組合形式能夠極大降低高并發(fā)場景下的響應(yīng)時(shí)間。

        圖6 平均響應(yīng)時(shí)間對比

        從吞吐量方面考慮,圖7可知當(dāng)并發(fā)數(shù)達(dá)到1500左右MQTT達(dá)到了性能的瓶頸期,隨著并發(fā)數(shù)的增高,服務(wù)吞吐量不再上升。而在并發(fā)數(shù)量提升到3000左右時(shí),Netty+MQTT方案的吞吐量提升速度不如Reactor-Netty+MQTT方案快。實(shí)驗(yàn)結(jié)果表明在4000并發(fā)數(shù)的時(shí)候,系統(tǒng)仍然有較好的吞吐量,并且其上升趨勢遠(yuǎn)好于原生MQTT以及Netty+MQTT方案。這說明Reactor-Netty+MQTT的方案有良好的并發(fā)性能,適用于高并發(fā)訪問。

        圖7 平均響應(yīng)速度對比

        5 結(jié)束語

        本文介紹了一種高性能消息推送框架,基于Reactor-Netty與MQTT相結(jié)合的方案,通過Reactor-Netty響應(yīng)式編程思想對MQTT協(xié)議進(jìn)行封裝,實(shí)現(xiàn)異步高性能響應(yīng)。應(yīng)用Redis進(jìn)行消息緩存,保證服務(wù)消息的實(shí)時(shí)性,設(shè)計(jì)kafka內(nèi)容分發(fā)代理模式,可將數(shù)據(jù)推送服務(wù)與大型互聯(lián)網(wǎng)架構(gòu)相結(jié)合,實(shí)現(xiàn)服務(wù)數(shù)據(jù)高質(zhì)量處理。經(jīng)過實(shí)驗(yàn)驗(yàn)證發(fā)現(xiàn),Reactor-Netty實(shí)現(xiàn)的消息推送服務(wù)可以提高整體的并發(fā)處理能力,相對于傳統(tǒng)MQTT框架以及Netty框架性能有極大的提高。

        猜你喜歡
        服務(wù)
        自助取卡服務(wù)
        服務(wù)在身邊 健康每一天
        服務(wù)在身邊 健康每一天
        服務(wù)在身邊 健康每一天
        服務(wù)在身邊 健康每一天
        服務(wù)在身邊 健康每一天
        服務(wù)在身邊 健康每一天
        服務(wù)在身邊 健康每一天
        高等教育為誰服務(wù):演變與啟示
        招行30年:從“滿意服務(wù)”到“感動(dòng)服務(wù)”
        商周刊(2017年9期)2017-08-22 02:57:56
        日本人妻97中文字幕| 一本久到久久亚洲综合| 欧美亚洲另类 丝袜综合网| 亚洲天堂av一区二区三区不卡| 极品少妇xxxx精品少妇偷拍| 国产丝袜视频一区二区三区| 国产精品一区二区av片| 蜜桃一区二区三区在线视频| 精品人伦一区二区三区蜜桃91| 国产亚洲真人做受在线观看| 综合网自拍| 日本成人三级视频网站| 亚洲人成网站色在线入口口| 色综合色狠狠天天综合色| 有码精品一二区在线| 国产毛片一区二区日韩| 91精品国产综合久久久密臀九色 | 在教室伦流澡到高潮hgl动漫| 欧美日韩精品一区二区在线观看 | 精品日韩av专区一区二区| 久久日日躁夜夜躁狠狠躁| 国产精品毛片久久久久久久| 中文字幕人妻中文| 91在线精品老司机免费播放| 亚洲免费视频一区二区三区| 国产偷国产偷亚洲综合av| 国产精品乱码一区二区三区| 热久久这里只有| 亚洲色图偷拍自拍亚洲色图| 国产一区二区三区小说| 人妻少妇精品视中文字幕国语| 亚洲精品综合在线影院| 亚洲一区二区三区偷拍女| 999国内精品永久免费观看 | АⅤ天堂中文在线网| 护士人妻hd中文字幕| 亚洲精品成人无码中文毛片| 456亚洲老头视频| av免费在线播放一区二区| 中文字幕人妻在线中字| 中文字幕av日韩精品一区二区 |