摘要:本文整理自阿里云開源大數(shù)據(jù)平臺技術(shù)專家畢巖(尋徑)在 Apache Con ASIA 的分享。本篇內(nèi)容主要分為四個部分:
1.湖格式& Hudi & CDC
2.湖格式設(shè)計實現(xiàn) CDC 的思考
3.Hudi CDC 實現(xiàn)
4.湖格式 Streaming 的優(yōu)化
2021年中 Databricks 發(fā)布了一篇基于 Delta Lake 實現(xiàn) CDC 場景的介紹文檔,2022年初我們在阿里云EMR 內(nèi)部 Delta Lake 版本實現(xiàn)的 CDC 的能力,同期在 Apache Hudi 提案了 Hudi 基于 Spark 實現(xiàn) CDC 的設(shè)計文檔和實現(xiàn)代碼。
結(jié)合這些經(jīng)驗,今天以 Apache Hudi 為主,分享一下數(shù)據(jù)湖格式上實現(xiàn) CDC 的一些思考和注意點,以及一些流式 streaming 通用的優(yōu)化點,和 Hudi CDC 的后續(xù)規(guī)劃。
湖格式 & Hudi & CDC
該部分主要介紹下此次分享涉及到的一些概念,包括湖倉、數(shù)據(jù)湖格式、Apache Hudi,以及 Change Data Capture(CDC)的一些需要了解的東西。
湖倉 & 湖格式
相信大家對于數(shù)據(jù)倉庫,數(shù)據(jù)湖,進而到兩者結(jié)合的湖倉的演變有了一些了解,這里就不過多介紹了。湖倉(LakeHouse)有以下一些關(guān)鍵特性。

介紹幾個關(guān)鍵特性:
- ACID 事務(wù):同一張表經(jīng)常會同時有多個工作流來讀寫,事務(wù)保證了我們能夠讀、寫到正確的數(shù)據(jù);
- Schema Enforcement 和數(shù)據(jù)管理:可以加上 Schema Evolution。Enforcement 在 Databicks 相關(guān)文章解釋上等同于 Valication,在寫入數(shù)據(jù)時,嚴格檢測 schema 并要求和目標表定義的一致。Schema Evolution 允許修改表的字段信息(如增刪字段,修改字段類型和描述等)。另外,湖倉還應(yīng)提供健壯的湖表治理和審計的能力;
- 支持結(jié)構(gòu)/非結(jié)構(gòu)數(shù)據(jù),支持多類API:湖倉架構(gòu)能夠支持半/非結(jié)構(gòu)化數(shù)據(jù)(如JSON,圖像,語音等)的存儲,以及提供除了基本SQL之外豐富的API來處理數(shù)據(jù),應(yīng)用在如機器學習等場景;
- 批流一體:數(shù)據(jù)湖提出之初,很重要的就是替代 Lambda 架構(gòu),批流一體能夠有效的簡化流式和離線兩條數(shù)據(jù)鏈路的開發(fā)和運維成本;
- 存算分離:成本是各個公司都需要關(guān)注的。如果存儲和計算都能按需伸縮,會更便于精細化控制成本。
我們發(fā)現(xiàn)大部分的湖倉關(guān)鍵特性是需要由底層存儲之上的數(shù)據(jù)組織方式,即數(shù)據(jù)湖格式來提供的,我認為這也是 DeltaLake、Apache Hudi,Apache Iceberg 近兩年興起的主要背景和原因吧。
Apache Hudi

Apache Hudi 是一個構(gòu)建于自管理數(shù)據(jù)庫層之上的,使用增量數(shù)據(jù)流來構(gòu)建數(shù)據(jù)湖的一個功能豐富的平臺。相對于其他湖格式,Hudi具備更細粒度的數(shù)據(jù)布局(FileGroup),支持多種索引提升 Upsert 性能,以及在開源版本上較為豐富的自動化湖表管理能力。
CDC
Change Data Capture:定義了一種場景,即識別并捕獲數(shù)據(jù)庫表中數(shù)據(jù)的變更,并交付給下游進一步處理。CDC是對針對行級數(shù)據(jù)記錄的。其中數(shù)據(jù)的變更信息,即 CDC 的數(shù)據(jù)結(jié)構(gòu),包括變更是什么樣的操作(有三類:insert,update,delete),變更發(fā)生的時間點,以及變更前后的數(shù)據(jù)。顯然對于insert操作該記錄的變更信息中是沒有舊值的,對于 delete 操作該記錄的變更信息中是沒有新值(當前值)的。

CDC 典型方法
CDC 不是數(shù)據(jù)湖格式特有的概念和場景,它存在已久。并且在傳統(tǒng)數(shù)據(jù)庫有些一些典型的方法:
- 時間戳/版本號:是在表上添加一個類似于 created_time 和 last_modified_time 這樣的字段,標識記錄的創(chuàng)建時間和最新修改時間,查詢時根據(jù) modified_time 做過濾,得到變化的數(shù)據(jù)。這個方法有幾個明顯的缺點:
- 不能感知到delete的變化。
- 不能直接獲取得到update的舊值,因此這類方案僅適用于沒有delete操作,且不關(guān)注舊值的業(yè)務(wù)場景。
- 由于沒有快照或者版本的概念,不能準確的捕捉每次變更。
CDC 場景示例

上圖定義了一個通用的數(shù)倉場景,user_city_tbl 和 user_name_tbl 維表做 Join 操作將打?qū)捄蟮臄?shù)據(jù)更新到 user_name_city_tbl中(類似于 ODS 到 DWD 的鏈路)。
user_name_city_tbl 按 city 聚合統(tǒng)計出城市的常駐人口后同步成 city_population_tbl(類似 DWD 到 DWS 的鏈路)。
在沒有實際 CDC 功能的場景下,傳統(tǒng)數(shù)倉的解決方案一般是基于 Kafka 里的流式數(shù)據(jù)生成 ODS、DWD、DWS 的各層數(shù)據(jù),便于查詢實時的數(shù)據(jù),同時小時/天粒度做離線的全表/分區(qū)的ETL修復(fù)數(shù)據(jù)用于歷史查詢使用。如上例離線方式,user_city_tbl 每次更新后,需要全表和 user_name_tbl 做 Join,然后 overwrite 到 user_name_city_tbl,后續(xù)也需要全表按 city 聚合 overwrite 到完整的 city_population 表中。很難兼得增量數(shù)據(jù)處理、實時性、低管理運維成本這幾個方面。
在具備 CDC 的場景下,只需要一個流式 workflow 即可。如 user_city 表變更后得到change data,僅將這部分數(shù)據(jù)和 user_name 表做 Join,就可以針對 user1 更新 city 信息,將新數(shù)據(jù) user5 對應(yīng)的記錄通過 upsert 語法(Merge Into)同步至 user_name_city_tbl 中。同時得到 user_name_city_tbl 的變更數(shù)據(jù)(user1的 city 從 bj 變更為 hz,新增 user5 記錄及對應(yīng)的 name 和 city 的值),實現(xiàn)對 bj city 的人口數(shù)減一,對 hz 和 wh 兩個 city 字段加一。僅使用一條流式 workflow,能夠?qū)崟r的將增量變更信息同步更新至下游。
舉例補充另外一個場景:銀行通過獲取當天凌晨和前一天凌晨之間的賬戶金額變更來檢測賬戶的健康程度。若有兩個賬戶,A全天賬戶沒任何操作,而賬戶B全天操作上百次,但最終金額也沒有變化。如果單純的比較前一天凌晨和當天的賬戶金額是否變化是沒辦法判斷賬戶的健康指標的。對于類似場景,CDC 的解決方案中需要具備追溯每一次的變更的能力。
綜合這些場景,梳理了 CDC 這個場景應(yīng)該具備以下能力:
- 變更信息中應(yīng)該包含所有的表字段,不能僅包括主鍵的;
- 對于 delete/update 操作,應(yīng)該包括操作前的舊值;
- 能追溯每一次的變更。
CDC 輸出格式

最后,我們來看下 CDC 的輸出格式。
關(guān)于 CDC 的輸出格式,我認為只要包含了全局的變更信息,包括操作類型,操作時間或?qū)?yīng)版本,以及前后值就足夠。這里列出了典型的 debezium 的格式,和上面場景示例中使用的格式,也是 deltalake 所采用的。debezium 是一個較為通用的格式,便于集成到已有系統(tǒng)中,而在部分場景 deltalake 自定義的格式對SQL查詢更友好。
湖格式設(shè)計實現(xiàn) CDC 的思考
CDC 設(shè)計

前面介紹了在數(shù)據(jù)庫中典型的 CDC 方法,但數(shù)據(jù)湖格式較數(shù)據(jù)庫、傳統(tǒng)基于 Hive 的數(shù)倉還是有很大不同的。湖格式的特點:
- 支持多版本/多快照。基于這個特性,至少可以使用表 Diff 的方法來獲取 CDC 數(shù)據(jù)。
- 每個版本準確的映射到一組有效數(shù)據(jù)文件,且單次操作會在文件元數(shù)據(jù)層面記錄數(shù)據(jù)文件的變更,即每次操作(如insert,merge into等)會將新增數(shù)據(jù)文件標記為有效的,歷史部分數(shù)據(jù)文件在當前版本被標記為無效。
- 沒有常駐服務(wù),操作依賴現(xiàn)有的計算引擎 Spark 或者 Flink。沒有后臺處理進程,那類似于數(shù)據(jù)庫觸發(fā)器的方法,是沒辦法由后臺執(zhí)行,如果需要只能在當前操作時間內(nèi)完成,增加部分寫開銷。
在設(shè)計湖格式支持 CDC 需要考慮到:
- CDC 各場景能力的覆蓋,包括上述提到的較為復(fù)雜的。至于提供的信息是否被需要由下游場景決定,但湖格式本身要能將需要的 CDC 相關(guān)信息都交付給下游。
- 讀寫性能。類似于觸發(fā)器或者事務(wù)日志的方式,需要記錄額外的信息,要考慮到對寫入性能的影響;同樣對于表 Diff 這樣的方式,要考慮到查詢讀取性能。
- 約定輸出格式。

傳統(tǒng)數(shù)據(jù)庫的 CDC 方法中,時間戳或者版本號的方法在場景支持上的缺陷;湖格式?jīng)]有自己常駐服務(wù),不存在事務(wù)日志,因此我們先來看下表 Diff 的實現(xiàn)方式。其原理就是基于湖格式本身的 time-travel 查詢歷史版本的能力,和后續(xù)的版本逐一做 Diff 得到 CDC 數(shù)據(jù)。優(yōu)點是不會增加寫入開銷,缺點是查詢性能差。如果我們想要進一步優(yōu)化查詢性能,可能的思路就是降低 Diff 或 Join 的粒度,從最差做全表的 Diff,到僅做當前 commit 涉及到的分區(qū)級 Diff,再進一步按桶等等。DeltaLake 可以通過當前 commit 內(nèi)新增文件和被標記無效的文件之間來做。Hudi 本身具備分區(qū)級更小的 FileGroup 文件組的概念,也可以減少 Diff 的數(shù)據(jù)量。
再看一下 DeltaLake 在 CDC 場景的實現(xiàn)方案 CDF,Change Data Feed。其核心在于數(shù)據(jù)寫入時直接持久化 CDC 數(shù)據(jù)。類似于數(shù)據(jù)庫 trigger 的方式,直接保存 CDC 需要的所有信息,查詢時直接加載這部分數(shù)據(jù)。優(yōu)點是查詢性能最好,缺點是增加了寫開銷。如果繼續(xù)在這個方案下做一些優(yōu)化:
- 根據(jù)不同的操作,避免每次commit都持久化。比如數(shù)據(jù)首次寫入表,那么所有的數(shù)據(jù)本身一定都是insert操作類型的。這種就不需要再額外雙寫一份CDC數(shù)據(jù)。
- 如果表有主鍵,那可以僅持久化主鍵,然后將前后兩個版本含主鍵的數(shù)據(jù)加載成兩個map結(jié)構(gòu),分別以點查的方式獲取舊值和當前值。若前值為空,該變更為insert操作,若新值為空,該操作為delete操作,否則即為update操作;這樣減少持久化的數(shù)據(jù)量,也就減少了寫時開銷;當然也可以同時持久化記錄的操作類型,來更準確的獲取舊值和當前值。
我們結(jié)合以下兩個場景來考慮這兩個方案:
- 上游數(shù)據(jù)完成一次 commit,下游何時消費是不確定的,很可能一次性消費幾個 commit 的 CDC,采用表 Diff 的方式,需要 Diff 每兩個相鄰版本,性能隨 commit 的數(shù)量成倍下降;
- 實際的 streaming 場景,每 batch 更新全表數(shù)據(jù)量占比很小,我接觸的場景占比小于0.0003。當然不同場景占比是不同的,但顯然較低的占比,使得寫入時增加的開銷是可以接受的。
基于以上考慮,Databricks 和阿里云EMR 在 DeltaLake 的實現(xiàn)都采用了 CDF 的方案,同時阿里云EMR 團隊貢獻到 Apache Hudi 社區(qū)的也是基于此實現(xiàn)。
CDC 實現(xiàn)

確定了以 Change Data Feed 作為設(shè)計方案后,就需要考慮是具體實現(xiàn)上的注意事項了。
首先是寫入。
- 針對不同湖格式各類寫操作,明確涉及到的文件元數(shù)據(jù)變化。CDF 方案下可優(yōu)化的第一點是根據(jù)不同的操作,判斷是否需要持久化 CDC 數(shù)據(jù)。也就是一部分操作的變更信息直接讀取 CDC 文件,而另一部分操作的表更信息就從普通的數(shù)據(jù)文件中提取轉(zhuǎn)換。但是不同的湖格式對不同的操作有自己的定義,因此要具體湖格式具體分析。比如 DeltaLake 的 Insert Into/Insert Overwrite、Update,Delete,Merge 等操作就是我們正常認知的。而 hudi 由于引入了主鍵 recordKey 和比較建 preCombineField 的概念,即使是簡單的 Insert Into 的 SQL 語法可能對應(yīng)實際邏輯是 Update 操作。
- 針對 CDC 場景涉及到的寫操作,明確需要拓展 CDF 能力的場景。湖格式內(nèi)置很多湖表管理操作,如 DeltaLake 的 vacuum 和 Hudi 的 Clean 用于清理歷史數(shù)據(jù), DeltaLake 的 optimize 和 Hudi 的 clustering 用于合并小文件和做 zorder,Hudi 的 compaction 針對 mor 表合并增量數(shù)據(jù)文件等,這些操作都不會涉及到實際表的數(shù)據(jù)文件,僅僅是清理文件或者對數(shù)據(jù)重分布而已,是不需要關(guān)注和處理的,在查詢時遇到這些操作,可直接忽略。而其他 DML 操作是直接修改表的數(shù)據(jù),需要感知并處理的。這里舉例說明下:
- 對于 Insert Into 操作,DeltaLake 不會讀取任何其他已有文件,僅新增數(shù)據(jù)文件。因此 DeltaLake 實現(xiàn) CDF 方案執(zhí)行 Insert Into 不需額外再持久化,而僅需查詢時加載到這批文件,將數(shù)據(jù)轉(zhuǎn)換成約定的輸出格式。但在 Hudi 內(nèi)由于有數(shù)據(jù) combine 的邏輯或者將數(shù)據(jù)寫入現(xiàn)存的小文件的優(yōu)化,會讀取已有文件再重寫,因此就需要持久化 CDC 數(shù)據(jù)。
- 對于 drop partition 操作,DeltaLake(社區(qū)版本不支持 drop partition,阿里云EMR 版本支持)和 Hudi 都直接將該分區(qū)的所有數(shù)據(jù)文件都標記為刪除。這樣也不需要持久化任何信息,查詢時找到這些文件,加載并將每條記錄標記為delete的操作類型,添加上其他如時間戳信息即可。
- 對于 update 操作,最底層的操作一定是加載滿足 where 條件記錄的數(shù)據(jù)文件,更新滿足 where 條件的那部分數(shù)據(jù),然后連同未修改的數(shù)據(jù)直接一起寫入到新文件。這樣的情況下,就需要拓展 CDF 能力。
最后在具體實現(xiàn)上,我們還要注意以下幾點:
- 持久化的 cdc 數(shù)據(jù)需要保存到文件系統(tǒng)中,可能會改變原本的表的文件布局,這樣的改變可能會對其他操作語義,如湖管理功能造成影響,必要時需要聯(lián)動調(diào)整;
- 額外的 cdc 寫入操作依然要保證 ACID 的語義。

基于 CDF 方案的實現(xiàn)了寫操作后,查詢變更數(shù)據(jù)時就會遇到這三類文件:
- 持久化的cdc數(shù)據(jù)文件。在正常情況下,CDC 數(shù)據(jù)文件記錄了完全的 cdc 數(shù)據(jù)信息,包括變化數(shù)據(jù)的操作類型,舊值和新值,可以直接加載讀取返回。
- 全為新數(shù)據(jù)的文件。如 Insert Into 操作引起的,查詢時對每條來自這樣文件的數(shù)據(jù)添加上值為insert的操作類型字段,和其他信息。
- 全為被刪除的文件。如 drop partition 操作引起的,查詢時對每條來自這樣文件的數(shù)據(jù)添加上值為delete的操作類型字段,和其他信息。
- 上圖右側(cè)為抽象的一個數(shù)據(jù)結(jié)構(gòu) CDCFileSplit,主要的字段是 cdcFileType 和 filePath:
- filePath 為 CDC 查詢時涉及到的數(shù)據(jù)文件,可能為 CDC 數(shù)據(jù)文件或者普通的數(shù)據(jù)文件。
- cdcFileType 標識 filePath 是哪類文件,也因此決定了如何從該文件中抽取解析 cdc 數(shù)據(jù)的具體邏輯。
- 在討論 CDF 方案可優(yōu)化點時提到,對于有主鍵概念的表可以僅持久化主鍵(和操作類型)來減少寫時開銷。那么就需要另外兩個包含了該主鍵對應(yīng)的舊值和當前值的數(shù)據(jù)文件 preImageFilePath 和 postImageFilePath,來平衡對寫時開銷較為在意的場景。
Hudi CDC 實現(xiàn)
結(jié)合 Apache Hudi 我們來看下具體實現(xiàn)。
Hudi CDC Write 實現(xiàn)

由于引入了主鍵和比較鍵的概念,Hudi 抽象了自己的寫操作類型,如上圖左側(cè)所示。其中有普通 Insert/Upsert/Insert Overwrite 等常規(guī)寫操作,也有 Cluster/Cmpact 等這樣的湖管理操作。同上面提到寫時優(yōu)化的一些思考,我們僅需要關(guān)注標紅的普通寫操作,而可以直接忽略湖表管理操作。
同樣由于有了自己的寫操作語義,Hudi 抽象了兩類寫處理方式:其中HoodieWriteHandle(上圖中有筆誤)的子類 HoodieMergeHandler 是數(shù)據(jù)執(zhí)行 upsert 的核心邏輯,將新數(shù)據(jù)和同一個主鍵的老數(shù)據(jù)(如果存在)合并,最終寫入數(shù)據(jù)文件。而 HoodieWriteHandle 的其他子類處理的是非合并場景下的寫入操作,如 bulk_insert 等。因此 HoodieMergeHandle 也就是我們之前提到的在必要的場景下要拓展 CDF 能力的地方。

DeltaLake 和 Hudi 的 CDC 查詢流程基本一致,但由于數(shù)據(jù)布局的不同,在實現(xiàn)細節(jié)上也有不同。以 Hudi 為例來看一下湖格式上完整的 cdc 查詢流程:
- 根據(jù)請求指定的 start,end 區(qū)間,獲取關(guān)聯(lián)到的 commit 信息,然后根據(jù) commit 中的寫操作過濾掉湖表管理這些不影響數(shù)據(jù)的 commit;
- 根據(jù)每個 commit 的寫操作,或是讀取 cdc 數(shù)據(jù)文件,或是加載當前版本的數(shù)據(jù)文件,或是加載前一個版本的數(shù)據(jù)文件,得到一個類似于前面提到的 CDCFileSplit 的對象列表(對應(yīng)到 Hudi 代碼中的 HoodieCDCFileSplit)。
- 按 CDCFileSplit 中的 cdcFileType 定義的加載策略,從文件中提取、解析成 cdc 數(shù)據(jù),直接 union 返回。
Hudi CDC 使用示例

Hudi 端開啟 CDC 的方式也是很簡單的。建表 SQL 時或者 Spark Dataframe 寫入時開啟 hoodie.table.cdc.enabled 參數(shù)即可。這樣數(shù)據(jù)寫入時會自動持久化必要的 CDC 數(shù)據(jù)。查詢時指定 cdc 的查詢類型,及 start 和 end 的區(qū)間。
補充說明:
- 按當前實現(xiàn)查詢時需要將 query.type 的配置調(diào)整為:
hoodie.datasource.query.type = incremental hoodie.datasource.query.incremental.format = cdc
- 同時支持 CDC 持久化選擇持久化的類型,參數(shù)為:
hoodie.table.cdc.supplemental.logging.name
默認值為 data_before_after,會持久化所有 CDC 數(shù)據(jù),查詢時不再需要加載其他數(shù)據(jù)文件,利用提高 CDC 查詢效率。其他可選值為 op_key_only 和 data_before,僅持久化操作類型和主鍵,或者額外多持久化舊值,這樣寫入時減少了一點 overhead,但查詢時需要加載其他數(shù)據(jù)文件來獲取確實的舊值當前值。
Hudi CDC 后續(xù)規(guī)劃
目前 Hudi 已經(jīng)完整支持了 Flink 和 Spark 兩個引擎的 CDC 讀寫功能。
后續(xù)將會繼續(xù)拓展 Spark SQL 語法,便于查詢 CDC 數(shù)據(jù);同時支持類似 DeltaLake 的扁平化的 CDC 輸出格式,給 Hudi 用戶另外一種選擇集成到自己的數(shù)倉場景中。

目前 Hudi 已經(jīng)完整支持了 Flink 和 Spark 兩個引擎的 CDC 讀寫功能。
后續(xù)將會繼續(xù)拓展 Spark SQL 語法,便于查詢 CDC 數(shù)據(jù);同時支持類似 DeltaLake 的扁平化的 CDC 輸出格式,給 Hudi 用戶另外一種選擇集成到自己的數(shù)倉場景中。
湖格式Streaming 的優(yōu)化

CDC 大多還是用于 streaming 場景,構(gòu)建增量實時數(shù)倉,遇到的問題也是 streaming 通用的。這里列舉兩類
- Apache Hudi 或者阿里云EMR 版本的 DeltaLake 等湖格式都具備表的自管理能力,如定期清理歷史數(shù)據(jù)文件的 vacuum(DeltaLake)和 clean(Hudi)操作,和合并小文件做 Zorder 優(yōu)化的 optimize(DeltaLake)和 clustering(Hudi)操作。但這些如果放到 streaming 任務(wù)中某次 batch提交后操作,會占用當前 batch 的執(zhí)行時間,影響寫入性能,甚至有些場景直接導(dǎo)致任務(wù)失敗,影響正常的寫入流程。
- Streaming 任務(wù)實現(xiàn)復(fù)雜:基于 Spark Streaming 開發(fā)流式任務(wù)目前需要通過 dataframe 的 API 來實現(xiàn),沒有像離線場景的 SQL 語法那樣簡單。
針對這兩類問題,介紹以下阿里云EMR 團隊的解決方案。

應(yīng)對第一類問題,阿里云EMR 在 Data Lake Formation 數(shù)據(jù)湖構(gòu)建產(chǎn)品上支持了自動化湖表管理。
在實際生產(chǎn)使用中,我們可以在流式 workflow 中關(guān)閉 DeltaLake 或者 Hudi 的定期執(zhí)行表管理的功能,推送 commit 信息到 DLF 服務(wù)端。DLF 會結(jié)合我們定義的策略,結(jié)合表的實時指標,來判斷和采取相應(yīng)的湖表管理操作。
不同于定期執(zhí)行的無腦式執(zhí)行,DLF 可以精細化的感知湖格式表的狀態(tài),比如實時分析歷史過期數(shù)據(jù)的占比,根據(jù)對應(yīng)策略中的閾值判斷是否提交 clean 或 vacuum 任務(wù)來清理,再比如自動感知以時間分區(qū)的表的分區(qū)狀態(tài),自動對剛完成寫入的分區(qū)執(zhí)行小文件合并類的任務(wù)。另外,DLF 執(zhí)行的湖管理任務(wù)是單獨啟動,不在當前的流式任務(wù)中,不影響正常的寫入和性能。

應(yīng)對第二類 Streaming 任務(wù)實現(xiàn)復(fù)雜問題,阿里云EMR 拓展了 Spark 的 SQL 語法,實現(xiàn)了 StreamingSQL。
如上述示例,左側(cè)分別創(chuàng)建了 Hudi 目標表,和一張 Kafka 的源表。右側(cè)是拓展的 StreamingSQL 語法,CREATE SCAN 創(chuàng)建一個關(guān)聯(lián)到 Kafka 源表的流式視圖;CREATE STREAM 語法創(chuàng)建流,消費 Kafka 數(shù)據(jù)并按用戶給定的 SQL(示例中位 Merge Into 語法)完成寫入操作。這樣,我們可以極大的簡化任務(wù)的開發(fā)和運維成本。
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。