■ 河南 許紅軍
編者按:Kafka是高吞吐量的分布式發(fā)布/訂閱消息系統(tǒng),能夠在生產(chǎn)者與消費者之間建立通信的橋梁,其實質(zhì)上是解決了在不同系統(tǒng)中如何傳遞消息的問題。Kafka集群包一些Producer、Broker、Consumer Group,以及一個Zookeeper集群。使用SASL安全認證策略會讓Kafka群集變得更加穩(wěn)固,讓用戶可以更加安全的對其進行訪問。
老版本的kafka實際上沒有考慮到安全認證方面的要求,只能依靠防火墻、代理服務(wù)等技術(shù),來實現(xiàn)對其進行安全訪問。
即Kafka Client應(yīng)用可以通過連接Zookeeper地址,獲取存儲在Zookeeper中的Kafka元數(shù)據(jù)信息。在得到Kafka Broker地址后,就可以連接到Kafka集群,并能夠操作集群上的所有Topic主題。
從V.09.0.0版本開始,Kafka增加了很多新特性,其中就包含了安全認證技術(shù)。
新版本的Kafka集群可以支持Kerberos、SASL/PLAIN、SASL/SCRAM和SASL/OAUTHBEARER等安全認證協(xié)議。Kerberos協(xié)議是一種網(wǎng)絡(luò)認證協(xié)議,擁有認證中心服務(wù),使用對稱DES密碼體制,在客戶機與服務(wù)器間起到安全橋梁作用。
該協(xié)議使用和配置起來比較復雜,對于用戶的訪問來說,必須經(jīng)由認證中心服務(wù)器的認證,通過后才可以訪問目標資源,這在很大程度上消除潛在安全隱患。
SASL/PLAIN協(xié)議是一種基于賬號和密碼的認證方式,Kafka使用的JAAS(即Java認證和授權(quán)服務(wù))就需要使用到該協(xié)議。
SASL/SCRAM協(xié)議同樣是kafka安全機制SASL家族中的一員,通過執(zhí)行PLAIN和DIGEST-MD5等用戶名/密碼認證機制來解決安全問題。
注意,Kafka中的默認SCRAM實現(xiàn)將SCRAM憑證存儲在Zookeeper中。
SASL/OAUTHBEARER協(xié)議需要OAuth2服務(wù)器來提供客戶端或代理的令牌才可以進行驗證操作。該協(xié)議允許第三方應(yīng)用通過編排資源所有者和HTTP服務(wù)之間的審批交互,來獲得對HTTP服務(wù)的有限訪問權(quán),或者允許第三方應(yīng)用以自身的名義獲得訪問權(quán)。對于Kafka中的默認Oauthholder來說,實現(xiàn)創(chuàng)建并驗證不安全的JSON Web令牌,只能在非生產(chǎn)的Kafka環(huán)境中安裝使用。
Kafka的用戶會使用到Java身份認證和授權(quán)服務(wù),即不管處于何種運行環(huán)境,都必須能夠安全地確定什么用戶在執(zhí)行Java代碼,并確保操作者擁有合適的訪問權(quán)限。
在Java KDK1.4+組件中已經(jīng)集成了JAAS服務(wù)。JAAS中的LoginModule模塊主要描述了認證技術(shù)所要實現(xiàn)的接口,該模塊會嵌入到應(yīng)用程序中,來提供特定類型的身份驗證。該模塊利用javax.security.auth.spi.LoginModule 接口的 login、commit、abort、logout等方法,來執(zhí)行用戶的登錄和登出等操作。
其中的LoginModule接口會使用CallbackHandler方法與用戶進行通信,要求輸入用戶名和密碼。身份驗證過程分兩個階段實現(xiàn),首先LoginModule接口會調(diào)用login方法執(zhí)行實際的身份驗證,將身份驗證狀態(tài)作為私有狀態(tài)信息保存。當完成身份驗證操作后,會返回成功或者失敗信息。之后如果LoginContext的整個身份驗證操作成功,則調(diào)用LoginModule 接口的commit方法,檢查其私有狀態(tài)信息的保存狀態(tài),以查看身份驗證是否成功。
如果整個LoginContext身份驗證成功,并且LoginModule自己的身份驗證也獲得成功,那么就會調(diào)用commit方法,將相關(guān)的身份和密鑰等信息和處于LoginModule模塊中的Subject相連接。如果LoginContext的整個身份驗證失敗,就會調(diào)用每個LoginModule的abort方法。
這樣,LoginModule就會移除/銷毀原先保存的任何狀態(tài)。JAAS實際認證機制是通過配置文件來實現(xiàn)的,其中包含很多配置塊,不同的配置塊對應(yīng)了一個或多個特定的LoginModule對象。
對于Kafka群集來說,需要使用到Zookeeper群集。因為kafka是一個分布式的系統(tǒng),所以它依賴Zookeeper來完成諸如協(xié)調(diào)眾多任務(wù),狀態(tài)管理的和配置存儲等操作。
ZooKeeper是一種分布式協(xié)調(diào)技術(shù),針對分布式應(yīng)用實現(xiàn)高可用、高性能的開源協(xié)調(diào)服務(wù),ZooKeeper主要提供了分布式鎖服務(wù)服務(wù),還提供了數(shù)據(jù)的維護和管理機制,包括集群管理、分布式消息隊列等。
本例中部署了zksvr1、zksvr2和zksvr3三臺服務(wù)器來組成ZooKeeper群集,IP分別為“172.16.1.100”、“172.16.1.101”以 及“172.16.1.102”。
在所有群集主機上都執(zhí)行以下命令來安裝ZooKeeper:
在每臺主機上打開“/usr/local/zookeeper/conf”目錄,將其中的“zoo_sample.cfg”更名為“zoo.cfg”。打開該配置文件,其中的“tickTime”欄表示基本時間度量單位,以毫秒為單位,它用來控制心跳和超 時。在“initLimit”欄中配置Zookeeper集群中Follower服務(wù)器初始化連接到Leader時,最長能忍受多少個心跳時間間隔數(shù)。在“dataDir”欄中配置存儲快照的目錄,在“clientPort”欄中設(shè)置Zookeeper服務(wù)進程監(jiān)聽的TCP端口,默認為TCP 2181。
在“server.x”欄中具體的Zookeeper主機的信息,包括其IP,與集群中的Leader服務(wù)器通信的端口,以及用來執(zhí)行選舉時服務(wù)器相互通信的端口。其中的“x”表示本主機的主機編號,例如“server.1=172.16.1.100:2888:3888”。因為使用的是Zookeeper群集,所以需要在每臺主機的保存快照的目錄中創(chuàng)建名為“myid”的文件,其中包含本主機的編號,例如第一臺主機就輸入“1”等。在該文件尾部輸入以下命令,讓Zookeeper支持SASL認證功能:
在上述“conf”目錄下創(chuàng)建名為“zk_server_jaas.conf”的文件,在其中輸入:
其中的“username”和“password”為ZK集群之間通信使用到的賬號密碼,“user_kafka”是Kafka用戶,密碼為“pass@hello”。
因為SASL認證需要使用到Kafka Common包,所以需要在上述“conf”路徑下創(chuàng)建名為“sasl_lib”的目錄,將“kafkaclients-0.11.0.0.jar”“l(fā)z4-1.3.0.jar”“slf4japi-1.7.25.jar”“slf4japi-1.7.25.jar”“snappyjava-1.1.2.6.jar”等kafka相關(guān)依賴復制到該目錄中。
進入Zookeeper安裝路徑下的“bin”目錄,打開其中的“zkEnv.sh”文件,在其尾部添加:
其中的“/usr/local/zookeeper/conf”表示Zookeeper的配置目錄。在ZK群集所有的主機上執(zhí)行“cd/usr/local/zookeeper/bin”“./zkServer.sh start”命令,來啟動ZK群集。執(zhí)行“jps”命令,可以顯示ZK進程的ID。
Kafka群集可以用來管理消息隊列,在本例中使用了kasrv1、kasrv2和kasrv3主機來組成Kafka群集,其IP分別為“172.16.1.200”“172.16.1.201”以 及“172.16.1.202”。
在這些主機上分別執(zhí)行以下命令完成Kafka安裝:
進入“/usr/local/kafka/config”目錄,新建名為“kafka_server_jaas.conf”的文件,用來配置Kafka的SASL認證信息,輸入:
其中的賬戶和密碼信息與上述ZK的配置相同。在上述“config”目錄中打開“server.properties”文件,可對Kafka配置進行調(diào)整。
其中的“broker.id”欄表示每一個broker在集群中的唯一表示值,例如Kazsvr1值為1?!發(fā)isteners”欄表示kafka的監(jiān)聽地址與端口,在“l(fā)og.dirs”欄中設(shè)置Kafka保存數(shù)據(jù)的目錄。在“num.partitions”欄中設(shè)置新創(chuàng)建的topic擁有的分區(qū)數(shù)量。
在“l(fā)og.retention.hours”欄中設(shè)置消息保存的時間,在“l(fā)og.segment.bytes”欄中設(shè)置Partition中每個Segment數(shù)據(jù)文件的大小。在“zookeeper.connect”欄中設(shè)置Zookeeper主機所在的地址和端口,在“auto.create.topics.enable”欄中設(shè)置是否自動創(chuàng)建Topic,在“delete.topic.enable”欄中設(shè)置是否允許物理刪除Topic的功能。在文件尾部輸入以下命令設(shè)置所需的SASL認證參數(shù):
在Kafka安裝路徑的“bin”目錄中打開“kafkaserver-start.sh”文件,這是其啟動腳本,在尾部添加:
要想順利啟動Kafka群集,必須保證ZK群集已經(jīng)正常啟動。在每臺Kafka主機上執(zhí)行以下命令來啟動Kafka群集:
這里以kafkatools工具為例,來說明如何安全連接Kafka集群。
在其安裝目錄“f:kafkatool”中創(chuàng)建名為“kafka_client_jaas.conf”的文件,輸入以下命令用于kafkatools連接認證:
然后執(zhí)行“kafkatool.exe -J-Djava.security.auth.login.config=f:kafkatoolkafka_client_jaas.conf”命令,即可順利連接到Kafka集群。
如果客戶端使用Java來連接SASL認證的Kafka集群,需在Kafka配置中添加:
也可使用“kafka_client_jaas.conf”進行連接,例如執(zhí)行“java --spring.profiles.active=dev-Djava.security.auth.login.config=/data/tmp/kafka_client_jaas.conf-jar App.jar”命令進行連接。為保證Kafka自帶的生產(chǎn)者和消費者正常工作,可對上述“kafka_server_jaas.conf”進行修改,輸入以下命令添加“producer”和“consumer”:
對“kafka_client_jaas.conf”文件進行修改,使其內(nèi)容變成:
之后打開“server.properties”文件,添加:
在Kafka安裝路徑的“bin”目錄中打開“kafkaconsole-producer.sh”,添加:
之后執(zhí)行“bin/kafkaacls.sh --authorizerproperties zookeeper.connect=172.16.1.100:2181,172.16.1.101:2181,172.16.1.102:2183 --add--allow-principal User:producer --operation Write --topic newtop1”命令,為“producer”進行授權(quán)。執(zhí)行“bin/kafka-consoleproducer.sh --brokerlist 172.16.1.200:9092,172.16.1.201:9092,172.16.1.202:9092 --producer.config ./config/producer.properties--topic newtopic1”命令,即可正常生產(chǎn)消息,其中“newtopic1”為具體的Topic名稱。
Kafka群集默認提供了ACL,使用Zookeeper來存儲數(shù)據(jù)。在上述Kafka主機上打開配置文件“server.properties”,如果輸入“uthorizer.class.name=kafka.security.auth.SimpleAclAuthorizer”行,表示如果沒有匹配權(quán)限,那么除超級用戶,其他人無法訪問Kafka群集。如果輸入“allow.everyone.if.no.acl.found=true”行,表 示未設(shè)置訪問權(quán)限,任何用戶都可以進行訪問。如果輸入“super.users=User:xxx”行,表示添加超級用戶,“xxx”表示超級用戶名稱。
Kafka授權(quán)管理腳本名稱為“kafka-acs.sh”,可根據(jù)需要靈活使用。例如執(zhí)行“bin/kafka-acls.sh--authorizer-properties zookeeper.connect=172.16.1.100:2181 --list--topic tp1”命令,顯示當前Topic權(quán)限,“tp1”為具體的Topic名稱。執(zhí)行“kafkaacls.sh --authorizerproperties zookeeper.connect=172.16.1.100:2181-add --allowprincipal User:newuser1--operation Read --topic tp1”命令,可為指定用戶添加讀取指定Topic的權(quán)限。
執(zhí)行“bin/kafkaacls.sh --authorizerproperties zookeeper.connect=172.16.1.100:2181-add --allow-principal User:newuser1 --operation Write --topic tp1”命令,可為指定用戶添加寫入指定Topic的權(quán)限。執(zhí)行“kafkaacls.sh --authorizerproperties zookeeper.connect=172.16.1.100:2181--remove --topic tp1”命令,可刪除和指定Topic相關(guān)的權(quán)限。執(zhí)行“kafkaacls.sh --authorizer-properties zookeeper.connect=172.16.1.100:2181--add --allow-principal User:newuser1”命 令,添加消費者/生產(chǎn)者的ACL。Kafka的認證范圍包含Client與Broker,Broker與Broker,以 及Broker與Zookeeper之間的安全控制。
Zookeeper和Broker間的認證分為兩個環(huán)節(jié),一是創(chuàng)建JAAS配置文件,并在“zkEnv.sh”中配置此文件位置,二是在每個broker配置文件中配置“zookeeper.set”屬性,將每個broker中的ACL設(shè)為True。Zookeeper中存儲的kafka元數(shù)據(jù)可通過多種方式讀取,包括一些客戶端工具或命令行,但只能通過broker修改元數(shù)據(jù)。如果修改錯誤,很可能導致集群中斷,所以在連接Zookeeper時,最好增加一些網(wǎng)絡(luò)隔離手段來避免上述情況的發(fā)生。