張敬環(huán),張志東,劉 鑫
(山西工程技術學院,山西 陽泉 045097)
隨著經濟的發(fā)展和生活水平的提高,空氣質量受到人們前所未有的關注。為了實時監(jiān)測我校的空氣質量,通過部署的傳感器每隔五分鐘傳輸一次空氣中各個污染物的含量數(shù)據(jù)至精簡云儲存,其中主要包括溫度、濕度、PM2.5、PM10、NO2、O3、SO2、CO。通過在Python中引入Selenium庫虛擬登錄的方式從精簡云平臺爬取這些數(shù)據(jù),并將數(shù)據(jù)存入MySQL數(shù)據(jù)庫中,然后利用PySpark連接數(shù)據(jù)庫使用SparkSQL對總數(shù)據(jù)進行分析及拆分,最后用ECHarts將分析后的數(shù)據(jù)進行可視化,為我校師生提供全面、準確、及時的出行建議與預警。系統(tǒng)設計流程圖如圖1所示。
圖1 系統(tǒng)設計流程
首先,在我校選取測試點部署無線傳感器并接入精訊云平臺http://www.sennor.net。精訊云是一款通用性的物聯(lián)網云平臺,部署在騰訊云服務器上,可以實現(xiàn)全球任意地方都快速訪問與接入[1]。在Python中使用Selenium庫模擬人為使用瀏覽器的行為,訪問與傳感器關聯(lián)的精訊云賬戶,再通過模擬點擊的方式在云平臺中找到所需的數(shù)據(jù)進行爬取。并將爬取到的數(shù)據(jù)存入Mysql數(shù)據(jù)庫中的“jilu”表中。
表1 jilu表結構
空氣質量監(jiān)測的傳感器收集的是各個空氣污染物在空氣中的含量,并不能很直觀客觀地表現(xiàn)出空氣質量的好壞,根據(jù)國家制定的《環(huán)境空氣質量標準》(AQI分級計算參考標準GB 3095—2012),構造AQI計算函數(shù)計算出各污染物的AQI并與爬取到的原始數(shù)據(jù)一同存入數(shù)據(jù)庫中的總數(shù)據(jù)表中,即表1中的字段aqi。與污染物濃度相對應的空氣質量分指數(shù)計算公式見(1)式。
(1)
由于計算每個污染物對應的IAQI時所用到的計算公式是同一個,所以本系統(tǒng)為簡化計算復雜度故將計算公式封裝為一個計算函數(shù)。主要代碼如下:
def js_iaqi(iaqi_lo, iaqi_hi, bp_lo, bp_hi, cp):
iaqi = (iaqi_hi - iaqi_lo) * (cp - bp_lo) / (bp_hi - bp_lo) + iaqi_lo
#代碼化計算公式
return iaqi
由于計算單個污染物對應的AQI時所用到的濃度對應標準是不同的,所以對每個污染物的計算IAQI函數(shù)均為單獨制作并引入相同的計算公式,無法同計算公式一樣封裝至同一個函數(shù)中。主要代碼如下:
def js_pm10_iaqi_24h(pm10):
if 0 <= pm10 < 50:
pm10_iaqi = js_iaqi(0,50,0,50,pm10) #各濃度對應參數(shù)
elif 50 <= pm10 <100:
pm10_iaqi = js_iaqi(50,100,50,150,pm10)
elif 100 <= pm10 <150:
pm10_iaqi = js_iaqi(100,150,150,250,pm10)
elif 150 <= pm10 <200:
pm10_iaqi = js_iaqi(150,200,250,350,pm10)
elif 200 <= pm10 <300:
pm10_iaqi = js_iaqi(200,300,350,420,pm10)
elif 300 <= pm10 <400:
pm10_iaqi = js_iaqi(300,400,420,500,pm10)
elif 400 <= pm10 <500:
pm10_iaqi = js_iaqi(400,500,500,600,pm10)
elif pm10 < 0:
return -1
else:
pass
return pm10_iaqi #返回該污染物對應IAQI
由于空氣質量監(jiān)測系統(tǒng)的傳感器每五分鐘就會收集并傳輸一次數(shù)據(jù),數(shù)據(jù)增長速度非???,后期數(shù)據(jù)量會越來越大,如果每次都從數(shù)據(jù)庫中的總數(shù)據(jù)表來進行數(shù)據(jù)分析的話分析效率會非常低,因此將總的數(shù)量龐大的數(shù)據(jù)進行提前拆分,依據(jù)需求分割成不同的表。
1) 時AQI表(表2:hour_aqi)——該表為數(shù)據(jù)總表里提取出每小時AQI的最高值,并記錄該時間與AQI值存入時AQI表中;
2) 日AQI表(表3:day_aqi)——該表為數(shù)據(jù)總表中提取出每天AQI的最大值,作為當天的AQI的值,并記錄下來存入日AQI表中;
3) 月AQI表(表4:month_aqi)——該表為數(shù)據(jù)總表中提取出每月AQI的最大值,作為該月的AQI的值,并記錄下來存入月AQI表中。
表2 hour_aqi表結構
表3 day_aqi表結構
表4 month_aqi表結構
將計算得到的各個IAQI計算出最大值作為AQI并返回[2]。主要代碼如下:
def aqi(l):
pm10_iaqi_24h = js_pm10_iaqi_24h(float(l[6]))
pm2_iaqi_24h = js_pm2_iaqi_24h(float(l[5]))
no2_iaqi_24h = js_no2_iaqi_24h(float(l[7]))
so2_iaqi_24h = js_so2_iaqi_24h(float(l[9]))
o3_iaqi_8h = js_o3_iaqi_8h(float(l[8]))
co_iaqi_24h =js_co_iaqi_24h(float(l[10]))
AQI = max(so2_iaqi_24h, no2_iaqi_24h, pm10_iaqi_24h, co_iaqi_24h, o3_iaqi_8h, pm2_iaqi_24h)
return AQI
本系統(tǒng)的數(shù)據(jù)提取是通過在Python文件中引入os庫、sys庫、pyspark庫來實現(xiàn)的。通過pyspark連接MySQL數(shù)據(jù)庫中的某個表[3],建立一個臨時表,在這個臨時表中通過sparkSQL來運行編寫好的sql語句。主要代碼如下:
df=sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/air?user=root&password=123456789",dbtable="jilu").load()
df.registerTempTable("jilu") #連接jilu表創(chuàng)建臨時表
df_day=sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/air?user=root&password=123456789",dbtable="day_aqi").load()
df_day.registerTempTable("day_aqi") #連接day_aqi表創(chuàng)建臨時表
sql_day = "insert into day_aqi select * from (select riqi,shebei ,max(aqi) from jilu where riqi not in (select riqi from day_aqi) group by riqi,shebei) rq"
ls_day = sqlContext.sql(sql_day).collect() #執(zhí)行sql語句
空氣質量監(jiān)測系統(tǒng)可視化采用了flask框架,app.route(‘/’)設置默認路徑所執(zhí)行的函數(shù),本系統(tǒng)中通過render_template("main.html")將執(zhí)行函數(shù)后的頁面跳轉至本系統(tǒng)的主頁。利用Ajax技術設置了可視化頁面的異步更新,以保證數(shù)據(jù)的時效性和準確性。運用ECHarts技術繪制各種可視化圖表,方便進行各種數(shù)據(jù)的可視化顯示。可視化大屏如圖2所示。
圖2 空氣質量監(jiān)測可視化大屏
本系統(tǒng)綜合運用了網絡爬蟲、MySQL、Spark、Ajax、ECharts等技術[4],將山西工程技術學院的空氣質量狀況直觀地展示給我校師生。實現(xiàn)了我校實時AQI值顯示、24小時AQI值變化趨勢,每月AQI等級天數(shù)的統(tǒng)計、歷年AQI值變化趨勢等的可視化展示,從而獲得一個良好的出行建議。還可以通過觀測主要污染物的實時監(jiān)測圖,得出影響AQI的主要因素,進而采取有針對性的環(huán)保措施。