岑凱倫,于紅巖,楊騰霄
(1.上海海事大學(xué)信息工程學(xué)院,上海201306;2.上海海事大學(xué)交通運(yùn)輸學(xué)院,上?!?01306;3.上海紐盾科技有限公司研發(fā)部,上?!?00092)
大數(shù)據(jù)下基于Spark的電商實(shí)時(shí)推薦系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)
岑凱倫1,于紅巖2,楊騰霄3
(1.上海海事大學(xué)信息工程學(xué)院,上海201306;2.上海海事大學(xué)交通運(yùn)輸學(xué)院,上海201306;3.上海紐盾科技有限公司研發(fā)部,上海200092)
隨著互聯(lián)網(wǎng)規(guī)模的迅速增長(zhǎng),導(dǎo)致用戶在面對(duì)海量的互聯(lián)網(wǎng)信息時(shí),無(wú)法從中獲取自己真正感興趣的信息,產(chǎn)生“信息超載”問(wèn)題。個(gè)性化推薦在此問(wèn)題上彌補(bǔ)了搜索引擎的不足,即代替用戶評(píng)估其所有未看過(guò)的產(chǎn)品,并通過(guò)分析用戶的興趣愛(ài)好和歷史行為,主動(dòng)推薦符合用戶喜好的項(xiàng)目。目前個(gè)性化推薦系統(tǒng)已在電子商務(wù)、電影、音樂(lè)網(wǎng)站等領(lǐng)域取得了顯著的成功。
根據(jù)IDC發(fā)布的數(shù)字宇宙報(bào)告顯示,至2020年數(shù)字宇宙將超出預(yù)期,達(dá)到40ZB,相當(dāng)于地球上人均產(chǎn)生5247GB的數(shù)據(jù)[1]。面對(duì)未來(lái)如此巨大規(guī)模的數(shù)據(jù)量,傳統(tǒng)單機(jī)環(huán)境下的推薦系統(tǒng)存在著兩大問(wèn)題:一是單機(jī)節(jié)點(diǎn)的推薦模型訓(xùn)練由于單機(jī)硬件條件的限制,無(wú)法存儲(chǔ)所有需要運(yùn)算的數(shù)據(jù)量;二是由于訓(xùn)練數(shù)據(jù)集規(guī)模的增大,單機(jī)節(jié)點(diǎn)進(jìn)行訓(xùn)練的時(shí)長(zhǎng)不斷增長(zhǎng)。傳統(tǒng)單機(jī)環(huán)境下的推薦系統(tǒng)無(wú)法滿足大數(shù)據(jù)時(shí)間推薦的需求,Hadoop[2]平臺(tái)能夠處理高達(dá)上TB級(jí)別的海量數(shù)據(jù)。目前有大量的學(xué)者對(duì)單機(jī)的機(jī)器學(xué)習(xí)算法使用Hadoop平臺(tái)編寫進(jìn)行擴(kuò)展以實(shí)現(xiàn)對(duì)大規(guī)模數(shù)據(jù)集的處理。江小平[3]等基于MapReduce編程模型對(duì)樸素貝葉斯文本分類算法進(jìn)行并行化擴(kuò)展。劉義[4]等基于Map-Reduce編程模型在Hadoop平臺(tái)上實(shí)現(xiàn)了基于R-樹的k-近鄰連接算法。對(duì)于推薦內(nèi)容的計(jì)算,大量的學(xué)者將推薦系統(tǒng)和Hadoop平臺(tái)進(jìn)行集成,Yu[5]等采集用戶之間傳遞的信息以及發(fā)表的游記文本作為訓(xùn)練數(shù)據(jù),利用Hadoop平臺(tái)構(gòu)建旅游推薦系統(tǒng)。 Walunj[6]等利用基于MapReduce實(shí)現(xiàn)的Mathout算法庫(kù)構(gòu)建電子商務(wù)推薦系統(tǒng),該算法庫(kù)集成了協(xié)同過(guò)濾算法,具有更好的操作性。
Hadoop平臺(tái)解決了海量數(shù)據(jù)計(jì)算推薦模型的問(wèn)題,但是Hadoop平臺(tái)在并行計(jì)算時(shí)必須將中間結(jié)果存儲(chǔ)在磁盤中,并且需要從磁盤中再次讀取,導(dǎo)致Hadoop平臺(tái)構(gòu)建的推薦系統(tǒng)存在如下不足:一是離線推薦模型在面對(duì)海量數(shù)據(jù)時(shí)會(huì)出現(xiàn)訓(xùn)練時(shí)間較長(zhǎng)的問(wèn)題;二是無(wú)法對(duì)用戶的實(shí)時(shí)日志行為做出實(shí)時(shí)處理。由于基于Hadoop平臺(tái)構(gòu)建的推薦系統(tǒng)存在的不足,無(wú)法滿足實(shí)時(shí)推薦的需求,使得用戶對(duì)于電商網(wǎng)站的推薦反饋速度提出了更高的要求。Spark是新興的大數(shù)據(jù)處理引擎,其很好地解決了Hadoop平臺(tái)在計(jì)算時(shí)需要將運(yùn)算的中間結(jié)果存入磁盤所導(dǎo)致的計(jì)算速度緩慢問(wèn)題。從2009年Spark誕生至今,作為開源項(xiàng)目已經(jīng)在流處理、圖計(jì)算、機(jī)器學(xué)習(xí)、結(jié)構(gòu)化數(shù)據(jù)查詢等各個(gè)方面,取得了很多重要的成果[7]。Spark平臺(tái)為迭代式數(shù)據(jù)處理提供更好的支持,每次迭代的數(shù)據(jù)可以保存在內(nèi)存中,而不是寫入文件。Spark平臺(tái)提供了集群的分布式內(nèi)存抽象,即RDD[8],一個(gè)不可變的帶分區(qū)集合,以實(shí)現(xiàn)數(shù)據(jù)操作方式的多樣性。目前針對(duì)Spark平臺(tái)的相關(guān)研究論文較少,Lu[9]等利用遠(yuǎn)程內(nèi)存提升Spark平臺(tái)在處理大數(shù)據(jù)時(shí)的速度。Qi[10]等利用Spark平臺(tái)將用于配對(duì)測(cè)試檢測(cè)的基因算法進(jìn)行兩階段并行處理,提升了配對(duì)測(cè)試的體積大小和計(jì)算的效率。Yang[11]等基于Spark平臺(tái)提出了分批處理的梯度下降算法,并對(duì)深度置信網(wǎng)絡(luò)進(jìn)行訓(xùn)練,提升了收斂速度。國(guó)內(nèi)對(duì)于Spark平臺(tái)的研究目前主要集中在一些互聯(lián)網(wǎng)行業(yè),如阿里巴巴、百度、騰訊、網(wǎng)易、搜狐等。騰訊公司數(shù)據(jù)倉(cāng)庫(kù)已經(jīng)大量使用 Spark平臺(tái)替代原來(lái)的Hadoop平臺(tái)的MapReduce,并使系統(tǒng)性能大大提高。曹波[12]等將傳統(tǒng)關(guān)聯(lián)分析中的FP-Growth算法在Spark平臺(tái)實(shí)現(xiàn)了并行處理,解決了識(shí)別大數(shù)據(jù)的伴隨車輛組問(wèn)題。王虹旭[13]等設(shè)計(jì)了在Spark平臺(tái)上的并行數(shù)據(jù)分析系統(tǒng),來(lái)解決海量數(shù)據(jù)分析問(wèn)題。嚴(yán)玉良[14]等提出了一種基于Spark的大規(guī)模單圖頻繁子集挖掘算法,通過(guò)次優(yōu)樹構(gòu)建并行計(jì)算的候選子圖,在給定最小支持度時(shí)挖掘出所有的頻繁子圖。王詔遠(yuǎn)[15]等基于Spark平臺(tái)提出一種并行蟻群優(yōu)化算法,通過(guò)將螞蟻轉(zhuǎn)換為彈性分布式數(shù)據(jù)集,由此給出一系列轉(zhuǎn)換算子,實(shí)現(xiàn)螞蟻構(gòu)造過(guò)程的并行化。
目前基于Hadoop平臺(tái)的推薦系統(tǒng)解決了推薦模型并行訓(xùn)練的問(wèn)題,但離線訓(xùn)練速度慢。通過(guò)對(duì)Spark平臺(tái)的研究,Spark平臺(tái)擁有比Hadoop平臺(tái)更強(qiáng)大的計(jì)算能力,能更快速地處理并行數(shù)據(jù),但目前的研究?jī)H是針對(duì)大數(shù)據(jù)下電商網(wǎng)站離線推薦系統(tǒng)的設(shè)計(jì),并未提出基于Spark平臺(tái)的實(shí)時(shí)推薦流程和算法。本文設(shè)計(jì)和實(shí)現(xiàn)了應(yīng)對(duì)大數(shù)據(jù)的基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng),設(shè)計(jì)了實(shí)時(shí)推薦系統(tǒng)流程,提出了分布式日志實(shí)時(shí)采集、分布式日志實(shí)時(shí)傳輸、實(shí)時(shí)日志過(guò)濾和基于Spark平臺(tái)的實(shí)時(shí)推薦模型的關(guān)鍵技術(shù)。實(shí)驗(yàn)結(jié)果表明,本系統(tǒng)具有高可靠性和穩(wěn)定性,能夠滿足大數(shù)據(jù)下實(shí)時(shí)推薦的需求。
1.1系統(tǒng)架構(gòu)設(shè)計(jì)
(1)設(shè)計(jì)思想
電商網(wǎng)站存在著大量的用戶隱式行為 (例如用戶瀏覽商品、用戶下單、用戶取消訂單、用戶將商品加入購(gòu)物車和用戶將商品從購(gòu)物車刪除),此外,由于電商系統(tǒng)規(guī)模的擴(kuò)大和各個(gè)業(yè)務(wù)系統(tǒng)的拆分,使得系統(tǒng)日志文件散落在各個(gè)服務(wù)器上。傳統(tǒng)基于Hadoop平臺(tái)的推薦系統(tǒng)無(wú)法有效地匯總用戶隱式行為日志,并對(duì)隱式行為日志進(jìn)行有效分析,無(wú)法滿足系統(tǒng)實(shí)時(shí)推薦的需求。本文的設(shè)計(jì)思想是根據(jù)電商網(wǎng)站的顯式用戶行為相對(duì)稀缺這一特點(diǎn),采用用戶隱式行為來(lái)構(gòu)建用戶評(píng)分,并在隱式數(shù)據(jù)源的基礎(chǔ)上將傳統(tǒng)基于Hadoop平臺(tái)構(gòu)建的推薦系統(tǒng)移植到Spark平臺(tái),同時(shí)在傳統(tǒng)離線推薦的基礎(chǔ)上結(jié)合用戶實(shí)時(shí)點(diǎn)擊流,實(shí)時(shí)分析用戶行為,并融合離線推薦模型,以反饋?zhàn)钸m合當(dāng)前用戶的實(shí)時(shí)推薦列表。本文設(shè)計(jì)的基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng)架構(gòu),如圖1所示。
在圖1中,基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng)架構(gòu)分為3層:離線處理層、服務(wù)層和實(shí)時(shí)處理層。在服務(wù)層,首先系統(tǒng)將訪問(wèn)各個(gè)業(yè)務(wù)系統(tǒng)的請(qǐng)求交由多臺(tái)應(yīng)用網(wǎng)關(guān)進(jìn)行下發(fā),在應(yīng)用網(wǎng)關(guān)集群前通過(guò)HTTP服務(wù)器進(jìn)行負(fù)載均衡。然后通過(guò)構(gòu)建分布式日志框架,在應(yīng)用網(wǎng)關(guān)服務(wù)器上安裝分布式日志采集Agent,采集訪問(wèn)各個(gè)業(yè)務(wù)系統(tǒng)的日志信息。由于電商網(wǎng)站的日志產(chǎn)出量巨大,需要可靠的消息傳送中間件作為模型訓(xùn)練與數(shù)據(jù)源采集之間的紐帶,系統(tǒng)構(gòu)建了基于Kafka集群的消息分發(fā)中間件,實(shí)現(xiàn)日志數(shù)據(jù)的統(tǒng)一下發(fā)。由于日志數(shù)據(jù)中包含著各個(gè)業(yè)務(wù)系統(tǒng)的日志以及用戶點(diǎn)擊流的日志,在進(jìn)入離線或?qū)崟r(shí)推薦階段前,需經(jīng)過(guò)統(tǒng)一的數(shù)據(jù)清洗。與以往將日志數(shù)據(jù)存儲(chǔ)于某一固定介質(zhì)、統(tǒng)一做離線批處理完成清洗不同,本系統(tǒng)采用Spark平臺(tái)的Spark Streaming技術(shù)實(shí)現(xiàn)日志的實(shí)時(shí)處理。Spark Streaming技術(shù)可以按照時(shí)間分片,對(duì)固定時(shí)間間隔內(nèi)收到的數(shù)據(jù)進(jìn)行統(tǒng)一批處理,能達(dá)到實(shí)時(shí)處理的效果,并具有很高的吞吐量。
圖1 基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng)架構(gòu)
在離線處理層,作為實(shí)時(shí)推薦的數(shù)據(jù)源收集完畢后,對(duì)數(shù)據(jù)源中的用戶行為進(jìn)行權(quán)重的分級(jí),得到用戶對(duì)于某商品的基本評(píng)分,并輸入推薦模型訓(xùn)練。傳統(tǒng)的方案是使用Hadoop平臺(tái)的離線推薦模型訓(xùn)練,但Hadoop平臺(tái)存在三個(gè)問(wèn)題:一是抽象層次低,需要編寫很冗余的代碼完成操作;二是Hadoop平臺(tái)只提供Map和Reduce兩個(gè)操作,表達(dá)能力欠缺;三是處理中間結(jié)果存儲(chǔ)在HDFS文件系統(tǒng)中,使得計(jì)算迭代式任務(wù)速度緩慢。本設(shè)計(jì)采用的Spark平臺(tái)利用RDD進(jìn)行抽象,實(shí)現(xiàn)的數(shù)據(jù)邏輯相比Hadoop平臺(tái)更簡(jiǎn)短,同時(shí)提供多種轉(zhuǎn)換和操作,具有很強(qiáng)的表達(dá)力。同時(shí),相對(duì)于Hadoop平臺(tái),Spark平臺(tái)的中間計(jì)算結(jié)果可以緩存在內(nèi)存中,對(duì)于需要很多迭代計(jì)算的推薦任務(wù),提高了計(jì)算效率。此外,基于Spark計(jì)算框架和Spark Mlib機(jī)器學(xué)習(xí)庫(kù)提供了ALS推薦模型,可以構(gòu)建新的離線推薦系統(tǒng),并且將電商網(wǎng)站所有用戶推薦列表寫入Redis緩存系統(tǒng)中,緩解電商網(wǎng)站系統(tǒng)壓力。
在電商網(wǎng)站中,如果只進(jìn)行離線的模型訓(xùn)練,用戶當(dāng)天的訪問(wèn)行為并不能實(shí)時(shí)地反映在推薦列表上,無(wú)法更好地滿足用戶需求以及提高電商網(wǎng)站商品的轉(zhuǎn)化率。因此,在實(shí)時(shí)處理層,系統(tǒng)需要對(duì)實(shí)時(shí)的用戶行為進(jìn)行處理,將其與離線推薦的結(jié)果進(jìn)行混合,從而提高實(shí)時(shí)推薦的效果。Hadoop平臺(tái)由于存儲(chǔ)的特性,只適用于批處理的場(chǎng)景,而采用了Spark Streaming(Spark流技術(shù))的Spark平臺(tái),針對(duì)用戶的每次訪問(wèn),可以實(shí)時(shí)過(guò)濾日志信息,抽出所需要的信息,獲得與該商品相似的前N位商品列表,并與離線模型進(jìn)行混合處理,進(jìn)行重排序,使得電商網(wǎng)站可以感知到用戶最新的行為,提升電商網(wǎng)站的轉(zhuǎn)化率。
與以往基于Hadoop平臺(tái)的離線推薦系統(tǒng)相比,本文構(gòu)建的基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng)具有比以往更快的反饋速度和訓(xùn)練速度。
(2)實(shí)時(shí)推薦系統(tǒng)流程
基于以上設(shè)計(jì)思想,系統(tǒng)從Spark Streaming端獲取所需要的數(shù)據(jù),并復(fù)用了日志數(shù)據(jù)源端提供的數(shù)據(jù),經(jīng)過(guò)數(shù)據(jù)聚合、數(shù)據(jù)傳輸和數(shù)據(jù)過(guò)濾后,進(jìn)行離線和實(shí)時(shí)推薦,返回融合了離線推薦和實(shí)時(shí)推薦結(jié)果的推薦列表。系統(tǒng)設(shè)計(jì)的實(shí)時(shí)推薦流程如下。
步驟1:計(jì)算隱式評(píng)分。電商網(wǎng)站通過(guò)HTTP服務(wù)器Nginx,根據(jù)配置好的響應(yīng)規(guī)則,將用戶的請(qǐng)求分發(fā)到多臺(tái)應(yīng)用網(wǎng)關(guān)中,由應(yīng)用網(wǎng)關(guān)完成向各個(gè)業(yè)務(wù)系統(tǒng)的請(qǐng)求調(diào)用,如購(gòu)物車、交易以及商品系統(tǒng)。在應(yīng)用網(wǎng)關(guān)中植入分布式日志采集工具Agent,收集發(fā)向各個(gè)業(yè)務(wù)系統(tǒng)的日志信息,并匯集后發(fā)向Kafka消息集群。Kafka集群會(huì)接入Spark Streaming實(shí)時(shí)處理框架進(jìn)行日志過(guò)濾,抽取出用戶交易行為、用戶瀏覽行為和用戶對(duì)購(gòu)物車操作行為,并寫入Hive表。使用Shark讀取Hive表。其中Shark是基于Spark平臺(tái)上且兼容Hive語(yǔ)法的SQL執(zhí)行引擎,其底層調(diào)用Spark并行實(shí)現(xiàn)。在調(diào)用Shark時(shí),系統(tǒng)賦予每一種用戶行為的不同權(quán)重,利用Shark計(jì)算用戶對(duì)商品的評(píng)分。
步驟2:離線推薦模型訓(xùn)練。計(jì)算完隱式評(píng)分,即可以得到(用戶ID-商品ID-評(píng)分)三元組,作為離線推薦模型的數(shù)據(jù)源,由于單一用戶在網(wǎng)站上的購(gòu)買數(shù)據(jù)占商品總量很低,因此使用交替最小二乘(ALS)算法,計(jì)算出隱式因子,填補(bǔ)用戶未購(gòu)買的商品的預(yù)測(cè)評(píng)分,然后訓(xùn)練出離線推薦模型。
步驟3:生成離線推薦列表。將電商網(wǎng)站上的用戶依次輸入模型,得到所有電商網(wǎng)站注冊(cè)用戶的離線推薦列表,設(shè)置推薦列表長(zhǎng)度,為了減低數(shù)據(jù)庫(kù)訪問(wèn)的壓力,系統(tǒng)將所有的推薦列表放入Redis緩存系統(tǒng)中,同時(shí)提供獲取推薦列表的接口,供PC端、移動(dòng)網(wǎng)頁(yè)端和移動(dòng)App端調(diào)用。其中Redis是一款基于內(nèi)存存儲(chǔ)的,可持久化的鍵值對(duì)數(shù)據(jù)庫(kù)。
步驟4:生成實(shí)時(shí)推薦列表。首先利用 Spark Streming技術(shù),將Kafka集群傳來(lái)的日志信息過(guò)濾出日志點(diǎn)擊流,從中抽取出用戶產(chǎn)生行為操作的商品ID和用戶ID。然后根據(jù)步驟2訓(xùn)練好的離線推薦模型,進(jìn)行商品相似度排序,可得到相似度排名前5的商品。最后根據(jù)得到的用戶ID和商品ID的推薦列表,構(gòu)建商品ID和用戶ID的列表,即商品被推薦到用戶的鍵值對(duì),定位到相關(guān)用戶ID,并將用戶推薦列表的前5個(gè)替換為步驟5得到的TOP 5商品,以此減少Redis的更新次數(shù),來(lái)優(yōu)化系統(tǒng)的響應(yīng)速度。
1.2系統(tǒng)架構(gòu)設(shè)計(jì)
本文設(shè)計(jì)和實(shí)現(xiàn)的基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng),主要會(huì)經(jīng)歷如下階段:日志數(shù)據(jù)的采集;日志數(shù)據(jù)的聚合;日志數(shù)據(jù)的傳輸;日志數(shù)據(jù)的過(guò)濾;用戶隱式行為的實(shí)時(shí)推薦。
(1)分布式日志的實(shí)時(shí)采集
電商實(shí)時(shí)推薦系統(tǒng)需要大量隱式的用戶行為作為基礎(chǔ)數(shù)據(jù),而且每種用戶行為的源日志信息分布在不同的業(yè)務(wù)系統(tǒng)中,需要構(gòu)建分布式日志匯總系統(tǒng)將日志進(jìn)行收集,以備后續(xù)流程使用。本系統(tǒng)基于開源的分布式日志收集工具Logstash,實(shí)現(xiàn)對(duì)各業(yè)務(wù)子系統(tǒng)的日志進(jìn)行收集。分布式日志采集模塊如圖2所示。
圖2 分布式日志采集模塊
在圖2中,系統(tǒng)植入在應(yīng)用網(wǎng)關(guān)處的日志監(jiān)控可以實(shí)時(shí)監(jiān)測(cè)日志文件的變化,并根據(jù)偏移量,讀取來(lái)自交易系統(tǒng)、商品系統(tǒng)和購(gòu)物車系統(tǒng)的最新日志信息,然后將日志輸出到Redis中緩存起來(lái)。日志聚合索引目錄是日志的存儲(chǔ)者,負(fù)責(zé)從Redis緩存中收集日志,并格式化處理,輸出給所需要的用戶。分布式日志采集模塊的自定義輸出為Kafka消息集群。
(2)基于Kafka集群的數(shù)據(jù)傳輸
通過(guò)構(gòu)建分布式日志實(shí)時(shí)采集模塊,完成了用戶行為日志的采集。但是在進(jìn)入日志過(guò)濾階段之前,由于日志流并發(fā)產(chǎn)生且數(shù)量很大,如何保證數(shù)據(jù)的實(shí)時(shí)性以及盡量減少數(shù)據(jù)丟失,這些都給隱式用戶行為日志數(shù)據(jù)的收集帶來(lái)了巨大的挑戰(zhàn)。LinkedIn公司開發(fā)了一套專用的分布式消息訂閱和發(fā)布系統(tǒng)——Kafka,于2010年開源,并且成為Apache的開源項(xiàng)目之一。本文設(shè)計(jì)和實(shí)現(xiàn)的電商實(shí)時(shí)推薦系統(tǒng)中,構(gòu)建Kafka集群,來(lái)承載上千萬(wàn)的用戶行為日志信息,為后續(xù)的日志過(guò)濾階段提供了安全可靠的消息傳輸。由于Kafka集群是一套分布式系統(tǒng),其吞吐量可以隨著集群的擴(kuò)展而線性增加。圖3為基于Kafka集群的數(shù)據(jù)分發(fā)架構(gòu)。
圖3 基于Kafka集群的數(shù)據(jù)傳輸
在圖3中,Kafka集群由三個(gè)部分構(gòu)成:生產(chǎn)者(Producer),代表日志的來(lái)源;代理(Broker),代表消息的中間存儲(chǔ)層;消費(fèi)者(Consumer),代表消息的使用者。其中,Producer負(fù)責(zé)將消息收集并推送(Push)到Broker,而Broker則負(fù)責(zé)接收Producer發(fā)送來(lái)的消息,并將消息本地持久化,Consumer則是消息的真正使用者,從Broker拉取(Pull)消息并進(jìn)行處理。系統(tǒng)中植入在應(yīng)用網(wǎng)關(guān)的 Logstash日志監(jiān)控會(huì)將處理完的日志發(fā)送至LogStash日志聚合索引,由LogStash日志聚合索引作為生產(chǎn)者將日志數(shù)據(jù)發(fā)送至Kafka集群,Spark節(jié)點(diǎn)作為消費(fèi)者,啟動(dòng)Spark Spreaming處理實(shí)時(shí)傳來(lái)的日志流,并根據(jù)實(shí)時(shí)推薦的需求做不同的過(guò)濾處理。
(3)基于Spark Streaming的日志過(guò)濾
數(shù)據(jù)傳輸后,系統(tǒng)統(tǒng)一使用Spark Streaming過(guò)濾數(shù)據(jù),并根據(jù)流程將日志做不同的處理,實(shí)現(xiàn)離線和實(shí)時(shí)推薦的復(fù)用的日志過(guò)濾模塊。Spark Streaming接收到的是實(shí)時(shí)收集到的日志信息,含有很多的噪聲數(shù)據(jù),需要從中抽取出所需要的信息。在實(shí)時(shí)推薦流程中,需要獲取點(diǎn)擊流的日志數(shù)據(jù),從中抽取出用戶ID和商品ID。用戶點(diǎn)擊商品所調(diào)用的接口方法用于獲取商品詳情,根據(jù)預(yù)先定義的日志信息的主題,從Kafka代理層中拉取日志信息。其中在日志信息中記錄了用戶這次請(qǐng)求調(diào)用的接口。LogStash展示的是商品詳情查看源日志的格式化日志,具體如表1所示。
表1 LogStash的格式化日志
由于表1只是LogStash提供的前端展現(xiàn),在系統(tǒng)流程中,需要調(diào)用Spark Sreaming對(duì)所有接受到的日志調(diào)用filter函數(shù),將消息中包含獲取商品詳情方法的日志過(guò)濾出來(lái),過(guò)濾后得到所有的商品詳情的請(qǐng)求日志,在消息中解析變量字段對(duì)應(yīng)的內(nèi)容,從中獲得itemId,即商品ID,然后獲取用戶行為字段,并從字段對(duì)應(yīng)的內(nèi)容中獲取_cip(用戶IP),_uid(用戶id)等關(guān)鍵信息,為后續(xù)的實(shí)時(shí)推薦提供了數(shù)據(jù)源。
(4)基于Spark平臺(tái)的實(shí)時(shí)推薦算法
本文設(shè)計(jì)的大數(shù)據(jù)電商實(shí)時(shí)推薦系統(tǒng)主要分為離線處理和實(shí)時(shí)處理兩個(gè)不同的流程,基于Spark平臺(tái)對(duì)已有離線推薦系統(tǒng)進(jìn)行優(yōu)化,并且在實(shí)時(shí)性上進(jìn)一步加強(qiáng),將離線推薦的結(jié)果和實(shí)時(shí)推薦的結(jié)果進(jìn)行融合,實(shí)現(xiàn)電商網(wǎng)站的實(shí)時(shí)推薦。系統(tǒng)首先進(jìn)行離線模型的訓(xùn)練,離線推薦主要基于對(duì)用戶隱式行為的挖掘,如支付、未支付、增刪購(gòu)物車和瀏覽詳情等操作,因而需要獲得用戶的隱式行為,得到用戶對(duì)商品的隱式評(píng)分。隱式用戶行為表如表2所示。
表2 隱式用戶行為表
基于Spark平臺(tái)的實(shí)時(shí)推薦算法如下:
(1)讀取用戶行為表。系統(tǒng)運(yùn)用Shark從Hive中獲取3個(gè)用戶行為表,即用戶交易表、用戶購(gòu)物車數(shù)據(jù)表和用戶瀏覽商品記錄表。
(2)構(gòu)建訓(xùn)練數(shù)據(jù)源。讀入用戶行為表,根據(jù)用戶點(diǎn)擊行為的權(quán)重,得到(用戶ID,商品ID),評(píng)分)鍵值對(duì)。讀入交易表,對(duì)支付行為以及非支付行為進(jìn)行分別處理,根據(jù)對(duì)應(yīng)的權(quán)重,得到(用戶ID,商品ID),評(píng)分)鍵值對(duì)。讀入購(gòu)物車數(shù)據(jù)表,因?yàn)橘?gòu)物車有多種不同的行為,本系統(tǒng)只需要增加物品至購(gòu)物車,以及從購(gòu)物車刪除商品,購(gòu)物車表進(jìn)行過(guò)濾,篩選出需要的記錄,得到(用戶ID,商品ID),評(píng)分)。讀入用戶瀏覽商品記錄表,根據(jù)對(duì)應(yīng)的權(quán)重,得到(用戶ID,商品ID),評(píng)分)鍵值對(duì)。
(3)離線推薦模型訓(xùn)練。處理完3個(gè)用戶行為表,調(diào)用union函數(shù),將用戶行為表中得到的(用戶ID,商品ID),評(píng)分)鍵值對(duì)進(jìn)行融合,去掉重復(fù)鍵值對(duì),并構(gòu)建Spark Mlib機(jī)器學(xué)習(xí)庫(kù)中基于ALS的協(xié)同過(guò)濾算法的數(shù)據(jù)源,即(用戶ID,商品ID,評(píng)分)三元組。設(shè)置ALS迭代的次數(shù)以及相關(guān)參數(shù),ALS算法會(huì)對(duì)用戶-商品評(píng)分矩陣進(jìn)行分解,利用隱語(yǔ)義因子進(jìn)行表達(dá),同時(shí)用于預(yù)測(cè)缺失的元素。
(4)實(shí)時(shí)推薦模型。離線模型訓(xùn)練完畢后,首先電商網(wǎng)站將網(wǎng)站所有的用戶輸入模型,將推薦列表寫入Redis緩存系統(tǒng),優(yōu)化網(wǎng)站性能。然后啟動(dòng)實(shí)時(shí)推薦任務(wù),根據(jù)從點(diǎn)擊流中取得的商品ID,利用離線推薦模型,取得與之最相似的前5個(gè)商品。最后在Redis緩存系統(tǒng)中找到對(duì)應(yīng)用戶ID的推薦列表,剔除原有列表的最后5個(gè),將第2步中得出的5個(gè)商品放入Redis中推薦列表的隊(duì)首。
通過(guò)上述離線推薦與實(shí)時(shí)推薦的融合,完成基于Spark平臺(tái)的實(shí)時(shí)推薦模型,達(dá)到實(shí)時(shí)響應(yīng)用戶請(qǐng)求,實(shí)現(xiàn)實(shí)時(shí)推薦反饋的目的。
本文的實(shí)驗(yàn)環(huán)境如下:基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng)搭建了3臺(tái)云服務(wù)器,托管在阿里云上,承擔(dān)每日的用戶訪問(wèn);每臺(tái)服務(wù)器配置8核CPU,16GB內(nèi)存和300GB硬盤。軟件配置如下:采用Spark 1.5.2版本用于大數(shù)據(jù)處理;Java 1.8版本用于編寫Spark程序;Logstash 2.1.1版本用于分布式日志采集;Kafka 0.8.2.2版本用于分布式日志數(shù)據(jù)傳輸;此外Hadoop 2.6版本用于分布式文件系統(tǒng)并與Spark平臺(tái)進(jìn)行測(cè)試對(duì)比。本文對(duì)分布式日志采集、分布式日志傳輸、實(shí)時(shí)日志過(guò)濾和實(shí)時(shí)推薦等系統(tǒng)關(guān)鍵技術(shù)進(jìn)行實(shí)驗(yàn)。
2.1分布式日志采集
大數(shù)據(jù)下電商網(wǎng)站每天為大量的用戶提供服務(wù),圖4顯示了電商網(wǎng)站的每天采集的日志總量,達(dá)到1600 萬(wàn)/天的日志吞吐量。海量的用戶行為日志數(shù)據(jù)為實(shí)時(shí)推薦提供了足量的訓(xùn)練數(shù)據(jù)。本文所構(gòu)建的分布式日志采集模塊解決了大數(shù)據(jù)電商網(wǎng)站跨系統(tǒng)收集用戶訪問(wèn)日志的問(wèn)題。
圖4 電商網(wǎng)站每天日志總量
2.2分布式日志數(shù)據(jù)傳輸
分布式日志采集系統(tǒng)每天會(huì)采集到1600萬(wàn)的日志信息,其中絕大部分會(huì)交給Kafka集群進(jìn)行傳遞,作為實(shí)時(shí)推薦的數(shù)據(jù)源,因此需要對(duì)Kafka集群進(jìn)行吞吐量的測(cè)試,以保證數(shù)據(jù)可靠、實(shí)時(shí)傳輸。多個(gè)Producer可同時(shí)向同一個(gè)主題發(fā)送數(shù)據(jù),在Broker負(fù)載飽和前,Producer數(shù)量越多,集群每秒收到的消息量越大,并且呈線性增漲,不同個(gè)數(shù)Producer時(shí)的總吞吐率如圖5所示。
圖5 Kafka集群的生產(chǎn)者性能實(shí)驗(yàn)
由圖5可以看出,單個(gè)Producer每秒可成功發(fā)送約128萬(wàn)條負(fù)載為100字節(jié)的消息,并且隨著Producer個(gè)數(shù)的提升,每秒總共發(fā)送的消息量線性提升。系統(tǒng)中有4臺(tái)Producer,每天產(chǎn)生的日志總量是1600萬(wàn)。系統(tǒng)構(gòu)建的Kafka集群,經(jīng)實(shí)驗(yàn)證明,足以接收穩(wěn)定傳輸分布式采集到的數(shù)據(jù)。在穩(wěn)定接收的前提下,對(duì)Kafka集群又進(jìn)行消費(fèi)測(cè)試,在集群中已有大量消息的情況下,使用1-3個(gè)Consumer時(shí)的Kafka集群總吞吐量如圖6所示。
圖6 Kafka集群的消費(fèi)者性能實(shí)驗(yàn)
由圖6可知,單個(gè)Consumer每秒可消費(fèi)306萬(wàn)條消息,該數(shù)量遠(yuǎn)大于單個(gè)Producer每秒可消費(fèi)的消息數(shù)量,這保證了在默認(rèn)配置下,消息可被及時(shí)處理。并且隨著Consumer數(shù)量的增加,系統(tǒng)集群的總吞吐量呈線性增加,能夠滿足用戶訪問(wèn)量增大,日志傳輸量增大的需求。
2.3基于Spark Streaming的日志過(guò)濾
Kafka集群可以穩(wěn)定負(fù)載本文構(gòu)建的實(shí)時(shí)推薦系統(tǒng)的日志傳輸量,因此需要對(duì)Spark Streaming實(shí)時(shí)處理日志,對(duì)提取出所需要數(shù)據(jù)的力進(jìn)行測(cè)試。Spark-
Streaming處理日志速率如圖7所示。
圖7 SparkStreaming處理日志速率
在圖7中,Spark Streaming平均每秒處理202條記錄,且運(yùn)行狀況良好。同時(shí)根據(jù)系統(tǒng)運(yùn)行15小時(shí)的日志顯示,Spark Streaming一共完成18557次實(shí)時(shí)批處理,提取了13533355條記錄,能夠滿足實(shí)時(shí)日志的處理需求。
利用Spark Streaming對(duì)實(shí)時(shí)日志流進(jìn)行實(shí)時(shí)過(guò)濾,從日志中抽取出對(duì)應(yīng)的商品ID和用戶ID,供實(shí)時(shí)推薦流程使用,抽取出的日志信息如表3所示。
表3 實(shí)時(shí)抽取的日志信息
2.4基于Spark平臺(tái)的實(shí)時(shí)推薦
由于Spark平臺(tái)在處理任務(wù)上相對(duì)于Hadoop平臺(tái)的優(yōu)越性,本文采用Spark以及其生態(tài)系統(tǒng)中的ALS模型作為實(shí)時(shí)推薦平臺(tái)的計(jì)算平臺(tái)與訓(xùn)練模型。為了測(cè)試Hadoop平臺(tái)與Spark平臺(tái)在處理計(jì)算任務(wù)時(shí)的性能差異,本系統(tǒng)選用了電商平臺(tái)采集的數(shù)據(jù)集對(duì)Spark平臺(tái)與Hadoop平臺(tái)的MapReduce在執(zhí)行作業(yè)性能上做了對(duì)比實(shí)驗(yàn)。Spark與Hadoop執(zhí)行作業(yè)時(shí)間對(duì)比如圖8所示。
從圖8中可以看出,Spark平臺(tái)在進(jìn)行不同作業(yè)類型的計(jì)算時(shí),性能都相對(duì)于Hadoop平臺(tái)的MapReduce平均提升4倍以上。但對(duì)于WordCount、UserBased及ItemBased此類迭代次數(shù)不多的任務(wù)時(shí),相對(duì)于Hadoop平臺(tái)的MapReduce計(jì)算速率提升幅度較小,平均提升3倍以上。
在進(jìn)行本系統(tǒng)所使用的ALS模型訓(xùn)練時(shí),因?yàn)槠湫枰啻蔚\(yùn)算,性能提升非常顯著。ALS模型在Hadoop平臺(tái)及Spark平臺(tái)上訓(xùn)練性能對(duì)比,如圖9所示。
圖8 Spark平臺(tái)與Hadoop平臺(tái)執(zhí)行作業(yè)時(shí)間對(duì)比
圖9 ALS模型在Hadoop及Spark平臺(tái)上訓(xùn)練性能對(duì)比
從圖9中可以發(fā)現(xiàn),在多次迭代后,Spark平臺(tái)的效率相比Hadoop平臺(tái)要提高10倍以上,這是由于Hadoop平臺(tái)的Mapreduce每次迭代后,都要重新讀取HDFS,使得作業(yè)完成的時(shí)間和迭代次數(shù)成線性增長(zhǎng),而Spark平臺(tái)由于其將中間結(jié)果緩存在內(nèi)存中,即使進(jìn)行多次迭代,時(shí)間也不會(huì)出現(xiàn)明顯增加。
圖8和圖9的對(duì)比實(shí)驗(yàn)顯示了Spark平臺(tái)作為推薦平臺(tái)的基礎(chǔ)架構(gòu)相對(duì)于傳統(tǒng)推薦系統(tǒng)的優(yōu)越性。實(shí)驗(yàn)最后對(duì)離線推薦的結(jié)果進(jìn)行了測(cè)試。圖10顯示的是在移動(dòng)App端基于測(cè)試用戶的用戶行為的離線推薦結(jié)果。當(dāng)測(cè)試用戶點(diǎn)擊了巧克力的類目,通過(guò)實(shí)時(shí)獲取用戶訪問(wèn)的信息,實(shí)時(shí)推薦模塊會(huì)啟動(dòng),抽取出與該商品最相似的商品,并與離線推薦列表進(jìn)行融合,產(chǎn)生實(shí)時(shí)推薦列表。測(cè)試用戶的實(shí)時(shí)推薦結(jié)果如圖11所示。
圖10 離線推薦結(jié)果
圖11 實(shí)時(shí)推薦結(jié)果
圖10和圖11的實(shí)驗(yàn)結(jié)果驗(yàn)證了本文設(shè)計(jì)的基于Spark的電商實(shí)時(shí)推薦系統(tǒng)能夠有效承載網(wǎng)站的日志信息,并根據(jù)用戶的實(shí)時(shí)用戶行為做出實(shí)時(shí)推薦反饋,優(yōu)化了用戶體驗(yàn),提升了網(wǎng)站的銷售額。根據(jù)日志采集系統(tǒng),電商網(wǎng)站推薦模塊的交易轉(zhuǎn)化率提升了5%,有效優(yōu)化了用戶體驗(yàn)。
基于Hadoop實(shí)現(xiàn)的推薦系統(tǒng)存在著離線訓(xùn)練速度慢,并且無(wú)法對(duì)用戶實(shí)時(shí)行為做出推薦反饋,不能滿足大數(shù)據(jù)時(shí)代用戶對(duì)實(shí)時(shí)推薦系統(tǒng)的需求。以往研究表明,Spark平臺(tái)在并行處理大數(shù)據(jù)上擁有比Hadoop平臺(tái)更強(qiáng)的運(yùn)算性能,但目前未有一套完整的實(shí)現(xiàn)流程解決Spark平臺(tái)下針對(duì)用戶隱式行為日志做出實(shí)時(shí)推薦的問(wèn)題。本文設(shè)計(jì)和實(shí)現(xiàn)了大數(shù)據(jù)下基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng);提出了一套新的實(shí)時(shí)推薦流程;針對(duì)跨系統(tǒng)用戶隱式行為日志的收集及傳輸?shù)男枨?,設(shè)計(jì)并實(shí)現(xiàn)了分布式日志采集模塊和分布式日志傳輸模塊;并且通過(guò)基于Spark Streaming的日志實(shí)時(shí)過(guò)濾模塊完成日志數(shù)據(jù)的過(guò)濾。在統(tǒng)一數(shù)據(jù)源的基礎(chǔ)上,本文創(chuàng)新地提出了大數(shù)據(jù)下電商網(wǎng)站的實(shí)時(shí)推薦算法,將離線推薦推薦的推薦結(jié)果和實(shí)時(shí)流計(jì)算出的推薦結(jié)果進(jìn)行融合,生成實(shí)時(shí)推薦列表。最后用實(shí)驗(yàn)驗(yàn)證了系統(tǒng)的可靠性、穩(wěn)定性以及相對(duì)于Hadoop平臺(tái)的高效性。下一步的工作將針對(duì)大數(shù)據(jù)下電商網(wǎng)站越來(lái)越多種類的用戶行為,設(shè)計(jì)多樣的數(shù)據(jù)處理方式,以提升系統(tǒng)的通用性。
[1]IDC.The Digital Universe of Opportunities:Rich Data and the Incdreasing Value of the Internet of Things[EB/OL].[2014-04]. http://www.emc.com/leadership/digital-universe/2014iview/executive-summary.htm
[2]FERRERIA C R L,Traina J C,MACHADO T A J,et al.Clustering Very Large Multi-Dimensional Datasets with Mapreduce[C]. 17th ACM SIGKDD International Conference on Knowledge Discovery and Data Mining,2011 ACM.San Diego:ACM Press,2011:690-698.
[3]江小平,李成華,向文等.云計(jì)算環(huán)境下樸素貝葉斯文本分類算法的實(shí)現(xiàn)[J].計(jì)算機(jī)應(yīng)用,2011,31(9):2551-2555.
[4]劉義,景寧,陳犖,熊偉.MapReduce框架下基于R-樹的k-近鄰連接算法.軟件學(xué)報(bào),2013,24(8):1836-1851.
[5]YU Y,HUANG C,LEE Y.An Intelligent Touring System Based on Mobile Social Network and Cloud Computing for Travel Recommendation[C].28th International Conference on Advanced Information Networking and Applications Workshops(AINA),2014 IEEE. Victoria,Canada:IEEE Press,2014:19-24.
[6]WALUNJ S G,SADAFALE K.An Online Recommendation System for E-commerce Based on Apache Mahout Framework[C].2013 Annual Conference on Computers and People Research,2013 ACM.Cincinnati:ACM Press,2013:153-158.
[7]ZAHARIA M,CHOWDHURY M,F(xiàn)RANKLIN M J,et al.Spark:Cluster Computing with Working Sets[C].Proceedings of the 2nd USENIX Conference on Hot Topics in Cloud Computing,2010:10-10.
[8]ZAHARIA M,CHOWDHURY M,DAS T,et al.Resilient Distributed Datasets:A Fault-Tolerant Abstraction for in-Memory Cluster Computing[C].Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation.USENIX Association,2012:2-2.
[9]X.LU,M.W.U.RAHMAN,N.ISLAM,D.SHANKAR.Accelerating Spark with RDMA for Big Data Processing:Early Experiences[C]. Proceedings of the 22nd Annual Symposium on High-Performance Interconnects.2010:9-16.
[10]QI RZ,WANG ZJ,LI SY.A Parallel Genetic Algorithm Based on Spark for Pairwise Test Suite Generation[J].Journal of ComputerScience and Technology,2016,31(2):417-27.
[11]YANG J,HE SQ.The Optimization of Parallel DBN Based on Spark[C].Proceedings of the 19th Asia Pacific Symposium on Intelligent and Evolutionary Systems,2016:157-169.
[12]曹波,韓燕波,王桂玲.基于車牌識(shí)別大數(shù)據(jù)的伴隨車輛組發(fā)現(xiàn)方法[J].計(jì)算機(jī)應(yīng)用,2015,35(11):3203-3207.
[13]王虹旭,吳斌,劉旸.基于Spark的并行圖數(shù)據(jù)分析系統(tǒng)[J].計(jì)算機(jī)科學(xué)與探索,2015,9(9):1066-1074.
[14]嚴(yán)玉良,董一鴻,何賢芒等.FSMBUS:一種基于Spark的大規(guī)模頻繁子圖挖掘算法[J].計(jì)算機(jī)研究與發(fā)展,2015,52(8):1768-1783.
[15]王詔遠(yuǎn),王宏杰,刑煥來(lái)等.基于Spark的蟻群優(yōu)化算法[J].計(jì)算機(jī)應(yīng)用,2015,35(10):2777-2780,2797.
Big-Data;Spark Platform;Hadoop Platform;Real-Time Recommendation;Implicit User Behavior
Design and Implement of E-Commerce Real-Time Recommender System with Spark Based on Big Data
CEN Kai-lun1,YU Hong-yan2,YANG Teng-xiao3
(1.College of Information Engineering,Shanghai Maritime University,Shanghai 201306;2.College of Transport and Communications,Shanghai Maritime University,Shanghai 201306;3.Research and Department,Shanghai Newdon Technology Company Limited,Shanghai 200092)
國(guó)家自然科學(xué)基金(No.61562056)、教育部人文社科青年基金資助項(xiàng)目(No.13YJC630210)、2014年上海市科技型技術(shù)創(chuàng)新基金項(xiàng)目(No.1401H164800)、上海市楊浦區(qū)國(guó)家創(chuàng)新型試點(diǎn)城區(qū)建設(shè)與管理專項(xiàng)資金項(xiàng)目(No.2015YPCX03-002)
1007-1423(2016)24-0061-09DOI:10.3969/j.issn.1007-1423.2016.24.015
岑凱倫(1991-),男,上海人,碩士研究生,研究方向?yàn)樵朴?jì)算、大數(shù)據(jù)處理
于紅巖(1979-),女,山東文登人,講師,博士,研究方向?yàn)殡娮由虅?wù)、云計(jì)算安全
楊騰霄(1977-),男,山西長(zhǎng)治人,工程師,碩士,研究方向?yàn)樵朴?jì)算安全
2016-05-12
2016-07-25
大數(shù)據(jù)下基于Hadoop平臺(tái)構(gòu)建的電商推薦系統(tǒng)存在著計(jì)算緩慢、無(wú)法根據(jù)用戶實(shí)時(shí)行為作出推薦的問(wèn)題。針對(duì)以上問(wèn)題,設(shè)計(jì)和實(shí)現(xiàn)基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng)。與Hadoop平臺(tái)構(gòu)建的推薦系統(tǒng)相比,系統(tǒng)首先基于Spark平臺(tái)構(gòu)建了分布式日志采集模塊和分布式日志數(shù)據(jù)傳輸模塊,用于采集和傳輸用戶隱式行為日志,解決電子商務(wù)跨系統(tǒng)數(shù)據(jù)源收集問(wèn)題;其次在統(tǒng)一數(shù)據(jù)源的基礎(chǔ)上,采用基于Spark的矩陣分解推薦模型進(jìn)行離線訓(xùn)練,提升離線推薦訓(xùn)練的效率;進(jìn)而在離線推薦的基礎(chǔ)上,提出一種使用Spark Streaming實(shí)時(shí)流技術(shù)對(duì)電商日志數(shù)據(jù)做實(shí)時(shí)過(guò)濾,獲取用戶當(dāng)前所需商品,并將離線推薦結(jié)果與實(shí)時(shí)推薦結(jié)果通過(guò)統(tǒng)一介質(zhì)融合的方案,實(shí)現(xiàn)對(duì)用戶隱式行為進(jìn)行實(shí)時(shí)推薦反饋的功能。最后經(jīng)實(shí)驗(yàn)證明,基于Spark平臺(tái)的電商實(shí)時(shí)推薦系統(tǒng)相對(duì)于Hadoop平臺(tái)的電商推薦系統(tǒng)具有更高的可靠性和穩(wěn)定性,能夠承載大規(guī)模數(shù)據(jù)量,離線推薦訓(xùn)練速度相對(duì)于Hadoop平臺(tái)提高10倍,并且對(duì)用戶的實(shí)時(shí)行為也能夠作出實(shí)時(shí)推薦反饋,提升5%的交易轉(zhuǎn)化率,增強(qiáng)電商網(wǎng)站的用戶體驗(yàn)。
大數(shù)據(jù);Spark平臺(tái);Hadoop平臺(tái);實(shí)時(shí)推薦;用戶隱式行為
Concerns the problem that the e-commerce recommendation system which based on Hadoop platform has low computing speed and can't make recommendation based on real-time user behavior.In order to solve the problem,designs real-time e-commerce recommendation system which is based on Spark platform.What is different from the previous system is that distributed log collection module and distributed log data transmission module are designed to collect and transfer log data of implicit user behavior,which solves the problem of collecting the log data come from different system.On the basis of a unified data source,the matrix decomposition model based on Spark is used to do off-line training and Spark streaming is used to do real-time log filtering to get the most similar goods to the good which included in the log.The result of real-time recommendation and off-line recommendation is merged in the system as feedback to the realtime user behavior.The experimental results show that the system which can carry massive amounts of data has the higher reliability and stability than the system which is based on Hadoop,the training speed of the off-line recommendation is 10 times as fast as that of the Hadoop platform,can make real-time recommended feedback to real-time user behavior which increase the user experience and the percent conversion of trade can be increased 5%.