消息隊(duì)列 Kafka 簡(jiǎn)介
Apache Kafka是一個(gè)分布式流平臺(tái),作為互聯(lián)網(wǎng)領(lǐng)域不可或缺的消息組件,在全球獲得了廣泛的應(yīng)用。在使用過程中,Kafka一般被作為消息流轉(zhuǎn)的核心樞紐,上下游系統(tǒng)通過Kafka實(shí)現(xiàn)異步,削峰填谷。在大數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理領(lǐng)域Kafka也是不可替代的組件。
Kafka使用非常廣泛,在有些領(lǐng)域使用已經(jīng)非常成熟,如日志收集,大數(shù)據(jù)處理,數(shù)據(jù)庫(kù)等領(lǐng)域。Kafka跟上下游也有標(biāo)準(zhǔn)化的對(duì)接模塊,如日志收集有Flume,F(xiàn)ilebeat,Logstash,大數(shù)據(jù)處理有spark,flink等組件。同時(shí)在一些小眾的領(lǐng)域則沒有現(xiàn)成的工具可以直接對(duì)接,如對(duì)接某個(gè)小眾的數(shù)據(jù)庫(kù),或者用戶自己定制化的系統(tǒng)。這時(shí)一般的對(duì)接方法是自行開發(fā)Kafka生產(chǎn)消費(fèi)程序?qū)印?/p>
在不同系統(tǒng)對(duì)接時(shí)通常會(huì)遇到以下問題:
- 公司的不同團(tuán)隊(duì)對(duì)同一個(gè)系統(tǒng)有對(duì)接需求,各自開發(fā)重復(fù)造輪子,且實(shí)現(xiàn)方式不一,升級(jí)運(yùn)維成本高。
- 各子系統(tǒng)由不同的團(tuán)隊(duì)開發(fā),因此,各系統(tǒng)中的數(shù)據(jù)在內(nèi)容和格式上,存在天然的不一致性,需要進(jìn)行格式處理,以消除各系統(tǒng)數(shù)據(jù)之間格式的不同。
基于Kafka使用的廣泛度和上下游系統(tǒng)的多樣性考慮,Kafka推出了內(nèi)置的上下游系統(tǒng)對(duì)接框架Kafka Connect。

Kafka Connect 介紹
Kafka Connect是一個(gè)用于將數(shù)據(jù)流輸入和輸出Kafka的框架。下面介紹connector的一些主要概念:
- Connectors:通過管理task來協(xié)調(diào)數(shù)據(jù)流的高級(jí)抽象
- Tasks:如何將數(shù)據(jù)復(fù)制到Kafka或從Kafka復(fù)制數(shù)據(jù)的實(shí)現(xiàn)
- Workers:執(zhí)行Connector和Task的運(yùn)行進(jìn)程
- Converters:用于在Connect和外部系統(tǒng)發(fā)送或接收數(shù)據(jù)之間轉(zhuǎn)換數(shù)據(jù)的代碼
- Transforms:更改由連接器生成或發(fā)送到連接器的每個(gè)消息的簡(jiǎn)單邏
Connectors
Kafka Connect中的connector定義了數(shù)據(jù)應(yīng)該從哪里復(fù)制到哪里。connector實(shí)例是一種邏輯作業(yè),負(fù)責(zé)管理Kafka與另一個(gè)系統(tǒng)之間的數(shù)據(jù)復(fù)制。
connector有一些開源的實(shí)現(xiàn)。同時(shí)用戶也可以從頭編寫一個(gè)新的connector插件,編寫流程一般如下:

Tasks
Task是Connect數(shù)據(jù)模型中的主要處理數(shù)據(jù)的角色。每個(gè)connector實(shí)例協(xié)調(diào)一組實(shí)際復(fù)制數(shù)據(jù)的task。通過允許connector將單個(gè)作業(yè)分解為多個(gè)task,Kafka Connect提供了內(nèi)置的對(duì)并行性和可伸縮數(shù)據(jù)復(fù)制的支持,只需很少的配置。這些任務(wù)沒有存儲(chǔ)任何狀態(tài)。任務(wù)狀態(tài)存儲(chǔ)在Kafka中的特殊主題config.storage.topic和status.storage.topic中。因此,可以在任何時(shí)候啟動(dòng)、停止或重新啟動(dòng)任務(wù),以提供彈性的、可伸縮的數(shù)據(jù)管道。

Task再平衡
當(dāng)connector首次提交到集群時(shí),workers會(huì)重新平衡集群中的所有connector及其tasks,以便每個(gè)worker的工作量大致相同。當(dāng)connector增加或減少它們所需的task數(shù)量,或者更改connector的配置時(shí),也會(huì)使用相同的重新平衡過程。當(dāng)一個(gè)worker失敗時(shí),task在活動(dòng)的worker之間重新平衡。當(dāng)一個(gè)task失敗時(shí),不會(huì)觸發(fā)再平衡,因?yàn)閠ask失敗被認(rèn)為是一個(gè)例外情況。因此,失敗的task不會(huì)被框架自動(dòng)重新啟動(dòng),應(yīng)該通過REST API重新啟動(dòng)。

Converters
在向Kafka寫入或從Kafka讀取數(shù)據(jù)時(shí),Converter是使Kafka Connect支持特定數(shù)據(jù)格式所必需的。task使用轉(zhuǎn)換器將數(shù)據(jù)格式從字節(jié)更改為連接內(nèi)部數(shù)據(jù)格式,反之亦然。
默認(rèn)提供以下converters:
- AvroConverter:與Schema Registry一起使用;
- JsonConverter:適合結(jié)構(gòu)數(shù)據(jù);
- StringConverter:簡(jiǎn)單的字符串格式;
- ByteArrayConverter:提供不進(jìn)行轉(zhuǎn)換的“傳遞”選項(xiàng);
轉(zhuǎn)換器與連接器本身解耦,以便在連接器之間自然地重用轉(zhuǎn)換器。

Transforms
Connector可以配置轉(zhuǎn)換,以便對(duì)單個(gè)消息進(jìn)行簡(jiǎn)單且輕量的修改。這對(duì)于小數(shù)據(jù)的調(diào)整和事件路由十分方便,且可以在connector配置中將多個(gè)轉(zhuǎn)換鏈接在一起。
開源問題
Kafka connect線下單獨(dú)部署時(shí),設(shè)計(jì)的很不錯(cuò)了,但作為一個(gè)云服務(wù)提供時(shí),還是存在了不少的問題,主要體現(xiàn)在以下幾點(diǎn):
- 與云服務(wù)的集成度不好:云廠商有不少閉源產(chǎn)品,對(duì)于開源產(chǎn)品的云托管版也會(huì)有訪問控制等問題。
- 占用Kafka集群資源:每個(gè)connector任務(wù)都需要三個(gè)內(nèi)置元信息topic,占用云產(chǎn)品資源,對(duì)于元信息topic的誤操作也會(huì)導(dǎo)致任務(wù)異常。
- 運(yùn)維管控接口和監(jiān)控簡(jiǎn)單:管控接口沒法控制運(yùn)行資源粒度,監(jiān)控缺少connector任務(wù)維度的指標(biāo)。
- 與云原生架構(gòu)結(jié)合不好:架構(gòu)初始設(shè)計(jì)并非云原生,任務(wù)之間隔離度不夠,負(fù)載均衡算法簡(jiǎn)單,沒有動(dòng)態(tài)自平衡能力。
基于Kafka connect部署在云上的種種問題,消息隊(duì)列Kafka團(tuán)隊(duì)在兼容原生kafka connect框架的前提下,以云原生的方式重新實(shí)現(xiàn)了Kafka connect模塊。
阿里云消息隊(duì)列 Kafka Connect 解決方案
阿里云消息隊(duì)列Kafka Connect框架介紹
架構(gòu)設(shè)計(jì)將控制面和運(yùn)行面分開,通過數(shù)據(jù)庫(kù)和Etcd進(jìn)行任務(wù)分發(fā)和模塊通信。底層運(yùn)行環(huán)境采用K8S集群,更好的控制了資源的粒度和隔離程度,整體架構(gòu)圖如下:

該架構(gòu)在很好的解決了Apache Kafka Connect模塊在云上遇到的問題:
- 與云服務(wù)的對(duì)接:運(yùn)行環(huán)境部署時(shí)默認(rèn)網(wǎng)絡(luò)打通,運(yùn)行面打通了訪問控制模塊;
- 占用Kafka集群資源:元信息采用數(shù)據(jù)庫(kù)和Etcd存儲(chǔ),不占用Kafka topic資源;
- 運(yùn)維管控接口增強(qiáng):增強(qiáng)了資源層面的管控Api,可以精細(xì)化的控制每個(gè)任務(wù)的運(yùn)行資源;
- 監(jiān)控指標(biāo)增強(qiáng):任務(wù)維度全鏈路運(yùn)行時(shí)metrics收集,監(jiān)控?cái)?shù)據(jù)從流入到流出的不同階段的運(yùn)行情況,出現(xiàn)問題是及時(shí)定位問題;
- 云原生架構(gòu)設(shè)計(jì):控制面統(tǒng)籌全局資源,實(shí)時(shí)監(jiān)測(cè)集群負(fù)載,并能夠自動(dòng)完成負(fù)載均衡,失敗重啟,異常漂移等運(yùn)維操作;
阿里云Kafka Connect介紹
阿里云消息隊(duì)列Kafka已經(jīng)支持的Connector類型如下:

涵蓋了數(shù)據(jù)庫(kù),數(shù)據(jù)倉(cāng)庫(kù),數(shù)據(jù)檢索和報(bào)表,告警系統(tǒng),備份需求這些主流的使用場(chǎng)景。
根據(jù)不同場(chǎng)景的實(shí)際需求,阿里云消息隊(duì)列Kafka Connect主要兩種實(shí)現(xiàn)方式:
1. 通過擴(kuò)展Kafka Connect框架,完成外部系統(tǒng)與Kafka的直接對(duì)接。
2. 對(duì)于需要數(shù)據(jù)處理的任務(wù)類型,通過Kafka->函數(shù)計(jì)算(下簡(jiǎn)稱fc)->外部系統(tǒng)的,在fc上可以靈活的定制化處理邏輯。
具體connect的實(shí)現(xiàn)方式如下:
數(shù)據(jù)庫(kù)
數(shù)據(jù)庫(kù)之間備份一般不會(huì)走kafka,msyql->kafka一般都是為了將數(shù)據(jù)分發(fā)給下游訂閱,在mysql數(shù)據(jù)有變更時(shí)作出告警或這其他響應(yīng),鏈路mysql->kafka->訂閱程序->告警/變更其他系統(tǒng)。
數(shù)據(jù)倉(cāng)庫(kù)
數(shù)據(jù)倉(cāng)庫(kù)阿里云上常用的是maxCompute,任務(wù)特點(diǎn)是吞吐量大,也有數(shù)據(jù)清洗需求,一般流程為kafka->maxCompute,然后maxCompute內(nèi)部任務(wù)進(jìn)行數(shù)據(jù)轉(zhuǎn)換。也可以在入maxCompute之前進(jìn)行數(shù)據(jù)清洗,鏈路一般為kafka->flink->maxCompute。對(duì)于數(shù)據(jù)轉(zhuǎn)換簡(jiǎn)單或者數(shù)據(jù)量小的任務(wù),可以使用函數(shù)計(jì)算替換flink,鏈路為kafka->fc->maxCompute。
數(shù)據(jù)檢索和報(bào)表
通用的數(shù)據(jù)檢索和報(bào)表一般通過es,數(shù)據(jù)傳入es前需要做清洗處理,適合的路徑kafka->flink->es/kafka->fc->es。
告警系統(tǒng)
告警系統(tǒng)中使用kafka一般流程 前置模塊->kafka->訂閱程序->告警模塊,這種最好的方式是 前置模塊->kafka->fc->告警。
備份需求
有些數(shù)據(jù)可能需要定期歸檔,做長(zhǎng)期保存,oss是一個(gè)不錯(cuò)的介質(zhì),這種場(chǎng)景一般只需要保存原屬數(shù)據(jù),所以好的方式可能是kafka->oss。如果數(shù)據(jù)需要處理,可以通過Kafka->fc->oss鏈路。
阿里云消息隊(duì)列 Kafka 生態(tài)規(guī)劃
消息隊(duì)列Kafka當(dāng)前支持的connect都采用自研新架構(gòu)獨(dú)立開發(fā),對(duì)于主流的使用場(chǎng)景已經(jīng)有了不錯(cuò)的覆蓋,但同時(shí)也可以看到,Kafka生態(tài)發(fā)展非常迅猛,Kafka的使用場(chǎng)景也越來越多,開源Kafka connect也在不斷的發(fā)展,下一步消息隊(duì)列Kafka會(huì)對(duì)接開源Kafka connect,讓開源Kakfa connect可以無需修改,無縫的運(yùn)行在自研的架構(gòu)上。
總結(jié)
Kafka在互聯(lián)網(wǎng)架構(gòu)中已經(jīng)占據(jù)了重要的位置,同時(shí)也在積極往上下游拓展,除了Kafka connect,還有Kafka Streams,Ksql,Kafka Rest Proxy等模塊也在不斷完善和成熟,相信在后續(xù)的發(fā)展中,Kafka在軟件架構(gòu)中會(huì)扮演越來越多的重要角色。
作者:塵輝
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。