本文將從數倉誕生的背景、數倉架構、離線與實時數倉的對比著手,綜述數倉發展演進,然后分享基于Flink實現典型ETL場景的幾個方案。
1.實時數倉的相關概述
1.1實時數倉產生背景
我們先來回顧一下數據倉庫的概念。
image
數據倉庫的概念是于90年代由BillInmon提出,當時的背景是傳統的OLTP數據庫無法很好的支持長周期分析決策場景,所以數據倉庫概念的4個核心點,我們要結合著OLTP數據庫當時的狀態來對比理解。
面向主題的:數據倉庫的數據組織方式與OLTP面向事務處理不同。因為數據倉庫是面向分析決策的,所以數據經常按分析場景或者是分析對象等主題形式來組織。
集成的:對于數據倉庫來說,經常需要去集合多個分散的、異構的數據源,做一些數據清洗等ETL處理,整合成一塊數據倉庫,OLTP則不需要做類似的集成操作。
相對穩定的:OLTP數據庫一般都是面向業務的,它主要的作用是把當前的業務狀態精準的反映出來,所以OLTP數據庫需要支持大量的增、刪、改的操作。但是對于數據倉庫來說,只要是入倉存下來的數據,一般使用場景都是查詢,因此數據是相對穩定的。
反映歷史變化:數據倉庫是反映歷史變化的數據集合,可以理解成它會將歷史的一些數據的快照存下來。而對于OLTP數據庫來說,只要反映當時的最新的狀態就可以了。
以上這4個點是數據倉庫的一個核心的定義。我們也可以看出對于實時數據倉庫來說,傳統數據倉庫也就是離線數據倉庫中的一些定義會被弱化,比如說在反映歷史變化這一點。介紹完數據倉庫的基本概念,簡單說下數據倉庫建模這塊會用到一些經典的建模方法,主要有范式建模、維度建模和DataVault。在互聯網大數據場景下,用的最多的是維度建模方法。
然后先看一下離線數倉的經典架構。如下圖:
image
這個數倉架構主要是偏向互聯網大數據的場景方案,由上圖可以看出有三個核心環節。
第一個環節是數據源部分,一般互聯網公司的數據源主要有兩類:
第1類是通過在客戶端埋點上報,收集用戶的行為日志,以及一些后端日志的日志類型數據源。對于埋點行為日志來說,一般會經過一個這樣的流程,首先數據會上報到Nginx然后經過Flume收集,然后存儲到Kafka這樣的消息隊列,然后再由實時或者離線的一些拉取的任務,拉取到我們的離線數據倉庫HDFS。
第2類數據源是業務數據庫,而對于業務數據庫的話,一般會經過Canal收集它的binlog,然后也是收集到消息隊列中,最終再由Camus拉取到HDFS。
這兩部分數據源最終都會落地到HDFS中的ODS層,也叫貼源數據層,這層數據和原始數據源是保持一致的。
第二個環節是離線數據倉庫,是圖中藍色的框展示的部分。可以看到它是一個分層的結構,其中的模型設計是依據維度建模思路。
最底層是ODS層,這一層將數據保持無信息損失的存放在HDFS,基本保持原始的日志數據不變。
在ODS層之上,一般會進行統一的數據清洗、歸一,就得到了DWD明細數據層。這一層也包含統一的維度數據。
然后基于DWD明細數據層,我們會按照一些分析場景、分析實體等去組織我們的數據,組織成一些分主題的匯總數據層DWS。
在DWS之上,我們會面向應用場景去做一些更貼近應用的APP應用數據層,這些數據應該是高度匯總的,并且能夠直接導入到我們的應用服務去使用。
在中間的離線數據倉庫的生產環節,一般都是采用一些離線生產的架構引擎,比如說MapReduce、Hive、Spark等等,數據一般是存在HDFS上。
經過前兩個環節后,我們的一些應用層的數據會存儲到數據服務里,比如說HBase、Redis、Kylin這樣的一些KV的存儲。并且會針對存在這些數據存儲上的一些數據,封裝對應的服務接口,對外提供服務。在最外層我們會去產出一些面向業務的報表、面向分析的數據產品,以及會支持線上的一些業務產品等等。這一層的話,稱之為更貼近業務端的數據應用部分。
基于 Starknet 的去中心化永續合約交易所 RabbitX 宣布推出公共測試網:1月30日消息,去中心化永續合約和衍生品交易所 RabbitX 宣布推出公共測試網。據悉,RabbitX (原 Strips Finance)是 Starknet 去中心化永續合約和衍生品交易所,允許用戶在多個不同市場交易永續合約,即時確認且零手續費。[2023/1/30 11:37:00]
以上是一個基本的離線數倉經典架構的介紹。
大家都了解到現在隨著移動設備的普及,我們逐漸的由制造業時代過渡到了互聯網時代。在制造業的時代,傳統的數倉,主要是為了去支持以前的一些傳統行業的企業的業務決策者、管理者,去做一些業務決策。那個時代的業務決策周期是比較長的,同時當時的數據量較小,Oracle、DB2這一類數據庫就已經足夠存了。
但隨著分布式計算技術的發展、智能化技術發展、以及整體算力的提升、互聯網的發展等等因素,我們現在在互聯網上收集的數據量,已經呈指數級的增長。并且業務不再只依賴人做決策,做決策的主體很大部分已轉變為計算機算法,比如一些智能推薦場景等等。所以這個時候決策的周期,就由原來的天級要求提升到秒級,決策時間是非常短的。在場景上的話,也會面對更多的需要實時數據處理的場景,例如實時的個性化推薦、廣告的場景、甚至一些傳統企業已經開始實時監控加工的產品是否有質量問題,以及金融行業重度依賴的反作弊等等。因此在這樣的一個背景下,實時數倉就必須被提出來了。
1.2實時數倉架構
首先跟大家介紹一下實時數倉經典架構-Lambda架構:
image
這個架構是Storm的作者提出來的,其實Lambda架構的主要思路是在原來離線數倉架構的基礎上疊加上實時數倉的部分,然后將離線的存量數據與我們t+0的實時的數據做一個merge,就可以產生數據狀態實時更新的結果。
和上述1.1離線數據倉庫架構圖比較可以明顯的看到,實時數倉增加的部分是上圖黃色的這塊區域。我們一般會把實時數倉數據放在Kafka這樣的消息隊列上,也會有維度建模的一些分層,但是在匯總數據的部分,我們不會將APP層的一些數據放在實時數倉,而是更多的會移到數據服務側去做一些計算。
然后在實時計算的部分,我們經常會使用Flink、Spark-streaming和Storm這樣的計算引擎,時效性上,由原來的天級、小時級可以提升到秒級、分鐘級。
大家也可以看到這個架構圖中,中間數據倉庫環節有兩個部分,一個是離線的數據倉庫,一個是實時的數據倉庫。我們必須要運維兩套引擎,并且在代碼層面,我們也需要去實現實時和離線的業務代碼。然后在合并的時候,我們需要保證實施和離線的數據一致性,所以但凡我們的代碼做變更,我們也需要去做大量的這種實時離線數據的對比和校驗。其實這對于不管是資源還是運維成本來說都是比較高的。這是Lamda架構上比較明顯和突出的一個問題。因此就產生了Kappa結構。
image
Kappa架構的一個主要的思路就是在數倉部分移除了離線數倉,數倉的生產全部采用實時數倉。從上圖可以看到剛才中間的部分,離線數倉模塊已經沒有了。
關于Kappa架構,熟悉實時數倉生產的同學,可能會有一個疑問。因為我們經常會面臨業務變更,所以很多業務邏輯是需要去迭代的。之前產出的一些數據,如果口徑變更了,就需要重算,甚至重刷歷史數據。對于實時數倉來說,怎么去解決數據重算問題?
Kappa架構在這一塊的思路是:首先要準備好一個能夠存儲歷史數據的消息隊列,比如Kafka,并且這個消息對列是可以支持你從某個歷史的節點重新開始消費的。接著需要新起一個任務,從原來比較早的一個時間節點去消費Kafka上的數據,然后當這個新的任務運行的進度已經能夠和現在的正在跑的任務齊平的時候,你就可以把現在任務的下游切換到新的任務上面,舊的任務就可以停掉,并且原來產出的結果表也可以被刪掉。
隨著我們現在實時OLAP技術的一些提升,有一個新的實時架構被提了出來,這里暫且稱為實時OLAP變體。
基于 Hedera 的數字隱私和身份應用 Meeco 獲 HBAR 基金會 1 億美元可持續發展基金支持:3月25日消息,基于Hedera的數字隱私和身份應用Meeco獲 HBAR 基金會 1 億美元可持續發展基金支持。Meeco 推出了代幣可視化工具Trustury,該應用可以展示 Hedera 生態項目以及代幣的全方位信息,并對項目在與利益相關者的溝通的透明度以及審計方面提供技術支持。此外,Meeco 采用零知識證明技術實現針對不同身份人員對不同內容的訪問,保護項目的相關隱私。[2022/3/25 14:17:05]
image
這個思路是把大量的聚合、分析、計算由實時OLAP引擎來承擔。在實時數倉計算的部分,我們不需要做的特別重,尤其是聚合相關的一些邏輯,然后這樣就可以保障我們在數據應用層能靈活的面對各種業務分析的需求變更,整個架構更加靈活。
最后我們來整體對比一下,實時數倉的這幾種架構:
image
這是整體三個關于實時數倉架構的一個對比:
從計算引擎角度:Lamda架構它需要去維護批流兩套計算引擎,Kappa架構和實時OLAP變體只需要維護流計算引擎就好了。
開發成本:對Lamda架構來說,因為它需要維護實時離線兩套代碼,所以它的開發成本會高一些。Kappa架構和實時OLAP變體只用維護一套代碼就可以了。
分析靈活性:實時OLAP變體是相對最靈活的。
在實時OLAP引擎依賴上:實時OLAP變體是強依賴實時OLAP變體引擎的能力的,前兩者則不強依賴。
計算資源:Lamda架構需要批流兩套計算資源,Kappa架構只需要流計算資源,實時OLAP變體需要額外的OLAP資源。
邏輯變更重算:Lamda架構是通過批處理來重算的,Kappa架構需要按照前面介紹的方式去重新消費消息隊列重算,實時OLAP變體也需要重新消費消息隊列,并且這個數據還要重新導入到OLAP引擎里,去做計算。
1.3傳統數倉vs實時數倉
然后我們來看一下傳統數倉和實時數倉整體的差異。
image
首先從時效性來看:離線數倉是支持小時級和天級的,實時數倉到秒級分鐘級,所以實時數倉時效性是非常高的。
在數據存儲方式來看:離線數倉它需要存在HDFS和RDS上面,實時數倉一般是存在消息隊列,還有一些kv存儲,像維度數據的話會更多的存在kv存儲上。
在生產加工過程方面,離線數倉需要依賴離線計算引擎以及離線的調度。但對于實時數倉來說,主要是依賴實時計算引擎。
2.基于Flink實現典型的ETL場景
這里我們主要介紹兩大實時ETL場景:維表join和雙流join。
維表join
預加載維表
熱存儲關聯
廣播維表
Temporaltablefunctionjoin
雙流join
離線joinvs.實時join
Regularjoin
Intervaljoin
Windowjoin
2.1維表join
2.1.1預加載維表
方案1:
將維表全量預加載到內存里去做關聯,具體的實現方式就是我們定義一個類,去實現RichFlatMapFunction,然后在open函數中讀取維度數據庫,再將數據全量的加載到內存,然后在probe流上使用算子,運行時與內存維度數據做關聯。
這個方案的優點就是實現起來比較簡單,缺點也比較明顯,因為我們要把每個維度數據都加載到內存里面,所以它只支持少量的維度數據。同時如果我們要去更新維表的話,還需要重啟作業,所以它在維度數據的更新方面代價是有點高的,而且會造成一段時間的延遲。對于預加載維表來說,它適用的場景就是小維表,變更頻率訴求不是很高,且對于變更的及時性的要求也比較低的這種場景。
ZigZag 開始搭建基于 Mimblewimble 的 ZK rollup 支付系統:3月16日消息,ZigZag 開始搭建基于 Mimblewimble 的 ZK rollup 支付系統 ZigZag InvisibL3。該項目由 0xonurinanc 帶領,旨在突破零識技術的極限,包括將現有的隱私技術擴展到更廣泛的應用程序并構建 L3, 使現有的 ZK 支持系統更強大。
據悉,官方團隊表示,ZigZag InvisibL3 的最終目標是成為最好的隱私 DEX。[2022/3/16 13:59:25]
接下來我們看一個簡單的代碼的示例:
image
在這段代碼截取的是關鍵的一個片段。這里定義了一個DimFlatMapFunction來實現RichFlatMapFunction。其中有一個Map類型的dim,其實就是為了之后在讀取DB的維度數據以后,可以用于存放我們的維度數據,然后在open函數里面我們需要去連接我們的DB,進而獲取DB里的數據。然后在下面代碼可以看到我們的場景是從一個商品表里面去取出商品的ID、商品的名字。然后我們在獲取到DB里面的維度數據以后會把它存放到dim里面。
接下來在flatMap函數里面我們就會使用到dim,我們在獲取了probe流的數據以后,我們會去dim里面比較。是否含有同樣的商品ID的數據,如果有的話就把相關的商品名稱append到數據元組,然后做一個輸出。這就是一個基本的流程。
其實這是一個基本最初版的方案實現。但這個方案也有一個改進的方式,就是在open函數里面,可以新建一個線程,定時的去加載維表。這樣就不需要人工的去重啟job來讓維度數據做更新,可以實現一個周期性的維度數據的更新。
方案2:
通過Distributedcash的機制去分發本地的維度文件到taskmanager后再加載到內存做關聯。實現方式可以分為三步:
第1步是通過env.registerCached注冊文件。
第2步是實現RichFunction,在open函數里面通過RuntimeContext來獲取cache文件。
第3步是解析和使用這部分文件數據。
這種方式的一個優點是你不需要去準備或者依賴外部數據庫,缺點就是因為數據也是要加載到內存中,所以支持的維表數據量也是比較小。而且如果這個維度數據需要做更新,也需要重啟作業。因此在正規的生產過程中不太建議使用這個方案,因為其實從數倉角度,希望所有的數據都能夠通過schema化方式來管理。把數據放在文件里面去做這樣一個操作,不利于我們做整體數據的管理和規范化。所以這個方式的話,大家在做一些小的demo的時候,或者一些測試的時候可以去使用。
那么它適用的場景就是維度數據是文件形式的、數據量比較小、并且更新的頻率也比較低的一些場景,比如說我們讀一個靜態的碼表、配置文件等等。
2.1.2熱存儲關聯
image
維表join里第二類大的實現思路是熱存儲關聯。具體是我們把維度數據導入到像Redis、Tair、HBase這樣的一些熱存儲中,然后通過異步IO去查詢,并且疊加使用Cache機制,還可以加一些淘汰的機制,最后將維度數據緩存在內存里,來減輕整體對熱存儲的訪問壓力。
如上圖展示的這樣的一個流程。在Cache這塊的話,比較推薦谷歌的GuavaCache,它封裝了一些關于Cache的一些異步的交互,還有Cache淘汰的一些機制,用起來是比較方便的。
剛才的實驗方案里面有兩個重要點,一個就是我們需要用異步IO方式去訪問存儲,這里也跟大家一起再回顧一下同步IO與異步IO的區別:
對于同步IO來說,發出一個請求以后,必須等待請求返回以后才能繼續去發新的request。所以整體吞吐是比較小的。由于實時數據處理對于延遲特別關注,這種同步IO的方式,在很多場景是不太能夠接受的。
異步IO就是可以并行發出多個請求,整個吞吐是比較高的,延遲會相對低很多。如果使用異步IO的話,它對于外部存儲的吞吐量上升以后,會使得外部存儲有比較大的壓力,有時也會成為我們整個數據處理上延遲的瓶頸。所以引入Cache機制是希望通過Cache來去減少我們對外部存儲的訪問量。
基于 Cardano 的 DEX RavenDex 推出前端演示版:金色財經報道,根據 RavenDex 團隊分享的官方公告,其前端(UX / UI)的演示版本已發布以進行實驗。[2021/10/22 20:47:48]
剛才提到的CuavaCache,它的使用是非常簡單的,下圖是一個定義Cache樣例:
image
可以看到它的使用接口非常簡單,大家可以去嘗試使用。對于熱存儲關聯方案來說,它的優點就是維度數據因為不用全量加載在內存里,所以就不受限于內存大小,維度數據量可以更多。在美團點評的流量場景里面,我們的維度數據可以支持到10億量級。另一方面該方案的缺點也是比較明顯的,我們需要依賴熱存儲資源,而且維度的更新反饋到結果是有一定延遲的。因為我們首先需要把數據導入到熱存儲,然后同時在Cache過期的時間上也會有損失。
總體來說這個方法適用的場景是維度數據量比較大,又能夠接受維度更新有一定延遲的情況。
2.1.3廣播維表
第三個大的思路是廣播維表,主要是利用broadcastState將維度數據流廣播到下游task做join。
實現方式:
將維度數據發送到Kafka作為廣播原始流S1
定義狀態描述符MapStateDescriptor。調用S1.broadcast(),獲得broadCastStreamS2
調用非廣播流S3.connect(S2),得到BroadcastConnectedStreamS4
在KeyedBroadcastProcessFunction/BroadcastProcessFunction實現關聯處理邏輯,并作為參數調用S4.process()
這個方案,它的優點是維度的變更可以及時的更新到結果。然后缺點就是數據還是需要保存在內存中,因為它是存在state里的,所以支持維表數據量仍然不是很大。適用的場景就是我們需要時時的去感知維度的變更,且維度數據又可以轉化為實時流。
下面是一個小的demo:
image
我們這里面用到的廣播流pageStream,它其實是定義了一個頁面ID和頁面的名稱。對于非廣播流probeStream,它是一個json格式的string,里面包含了設備ID、頁面的ID、還有時間戳,我們可以理解成用戶在設備上做PV訪問的行為記錄。
整個實現來看,就是遵循上述4個步驟:
第1步驟是要定義廣播的狀態描述符。
第2步驟我們這里去生成broadCastStream。
第3步驟的話我們就需要去把兩個stream做connect。
第4步最主要的一個環節就是需要實現BroadcastProcessFunction。第1個參數是我們的probeStream,第2個參數是廣播流的數據,第3個參數就是我們的要輸出的數據,可以看到主要的數據處理邏輯是在processElement里面。
在數據處理過程中,我們首先通過context來獲取我們的broadcastStateDesc,然后解析probe流的數據,最終獲取到對應的一個pageid。接著就在我們剛才拿到了state里面去查詢是否有同樣的pageid,如果能夠找到對應的pageid話,就把對應的pagename添加到我們整個jsonstream去做輸出。
2.1.4Temporaltablefunctionjoin
介紹完了上面的方法以后,還有一種比較重要的方法是用Temporaltablefunctionjoin。首先說明一下什么是Temporaltable?它其實是一個概念:就是能夠返回持續變化表的某一時刻數據內容的視圖,持續變化表也就是changingtable,可以是一個實時的changelog的數據,也可以是放在外部存儲上的一個物化的維表。
它的實現是通過UDTF去做probe流和Temporaltable的join,稱之Temporaltablefunctionjoin。這種join的方式,它適用的場景是維度數據為changelog流的形式,而且我們有需要按時間版本去關聯的訴求。
基于 Avalanche 的去中心化交易所 Pangolin 已上線:Avalanche 共識協議發明者、康奈爾大學教授 Emin Gün Sirer 發推表示,基于 Avalanche 的去中心化交易所 Pangolin 已正式上線。該項目使用與 Uniswap 相同的自動做市商(AMM)模型,治理令牌為 PNG。
官方稱,Pangolin 的優勢在于快速廉價的交易、社區驅動的產品開發以及 100% 社區分配的代幣發行。[2020/12/10 14:48:36]
首先來看一個例子,這里使用的是官網關于匯率和貨幣交易的一個例子。對于我們的維度數據來說,也就是剛剛提到的changelogstream,它是RateHistory。它反映的是不同的貨幣相對于日元來說,不同時刻的匯率。
image
第1個字段是時間,第2個字段是currency貨幣。第3個字段是相對日元的匯率,然后在我們的probetable來看的話,它定義的是購買不同貨幣的訂單的情況。比如說在10:15購買了兩歐元,該表記錄的是貨幣交易的一個情況。在這個例子里面,我們要求的是購買貨幣的總的日元交易額,如何通Temporaltablefunctionjoin來去實現我們這個目標呢?
第1步首先我們要在changelog流上面去定義TemporalTableFunction,這里面有兩個關鍵的參數是必要的。第1個參數就是能夠幫我們去識別版本信息的一個timeattribute,第2個參數是需要去做關聯的組件,這里的話我們選擇的是currency。
接著的話我們在tableEnv里面去注冊TemporalTableFunction的名字。
然后我們來看一下我們注冊的TemporalTableFunction,它能夠起到什么樣的效果。
image
比如說如果我們使用rates函數,去獲取11:50的狀態。可以看到對于美元來說,它在11:50的狀態其實落在11:49~11:56這個區間的,所以選取的是99。然后對于歐元來說,11:50的時刻是落在11:15和12:10之間的,所以我們會選取119這樣的一條數據。它其實實現的是我們在一剛開始定義的TemporalTable的概念,能夠獲取到changelog某一時刻有效數據。定義好TemporalTableFunction以后,我們就要需要使用這個Function,具體實現業務邏輯。
image
大家注意這里需要去指定我們具體需要用到的joinkey。比如說因為兩個流都是在一直持續更新的,對于我們的ordertable里面11:00的這一條記錄來說,關聯到的就是歐元在10:45這一條狀態,然后它是116,所以最后的結果就是232。
剛才介紹的就是Temporaltablefunctionjoin的用法。
2.1.5維表join的對比
然后來整體回顧一下在維表join這塊,各個維度join的一些差異,便于我們更好的去理解各個方法適用的場景。
image
在實現復雜度上面的:除了熱存儲關聯稍微復雜一些,其它的實現方式基本上復雜度是比較低的。
在維表數據量上:熱存儲關聯和Temporaltablefunctionjoin兩種方式可以支持比較多的數據量。其它的方式因為都要把維表加載到內存,所以就受限內存的大小。
在維表更新頻率上面:因為預加載DB數據到內存和DistributedCache在重新更新維表數據的時候都需要重啟,所以它們不適合維表需要經常變更的場景。而對于廣播維表和Temporaltablefunctionjoin來說,可以實時的更新維表數據并反映到結果,所以它們可以支持維表頻繁更新的場景。
對維表更新實時性來說:在廣播維表和Temporaltablefunctionjoin,它們可以達到比較快的實時更新的效果。熱存儲關聯在大部分場景也是可以滿足業務需求的。
在維表形式上面:可以看到第1種方式主要是支持訪問DB存儲少量數據的形式,DistributedCache支持文件的形式,熱存儲關聯需要訪問HBase和Tair等等這種熱存儲。廣播維表和Temporaltablefunctionjoin都需要維度數據能轉化成實時流的形式。
在外部存儲上面:第1種方式和熱存儲關聯都是需要依賴外部存儲的。
在維表join這一塊,我們就先介紹這幾個基本方法。可能有的同學還有一些其他方案,之后可以反饋交流,這里主要提了一些比較常用的方案,但并不限于這些方案。
2.2雙流join
首先我們來回顧一下,批處理是怎么去處理兩個表join的?一般批處理引擎實現的時候,會采用兩個思路。
一個是基于排序的Sort-Mergejoin。另外一個是轉化為Hashtable加載到內存里做Hashjoin。這兩個思路對于雙流join的場景是否還同樣適用?在雙流join場景里面要處理的對象不再是這種批數據、有限的數據,而是是無窮數據集,對于無窮數據集來說,我們沒有辦法排序以后再做處理,同樣也沒有辦法把無窮數據集全部轉成Cache加載到內存去做處理。所以這兩種方式基本是不能夠適用的。同時在雙流join場景里面,我們的join對象是兩個流,數據也是不斷在進入的,所以我們join的結果也是需要持續更新的。
那么我們應該有什么樣的方案去解決雙流join的實現問題?Flink的一個基本的思路是將兩個流的數據持續性的存到state中,然后使用。因為需要不斷的去更新join的結果,之前的數據理論上如果沒有任何附加條件的話是不能丟棄的。但是從實現上來說state又不能永久的保存所有的數據,所以需要通過一些方式將join的這種全局范圍局部化,就是說把一個無限的數據流,盡可能給它拆分切分成一段一段的有線數據集去做join。
其實基本就是這樣一個大的思路,接下來去看一下具體的實現方式。
2.2.1離線joinvs.實時join
接下來我們以innerjoin為例看一下,一個簡單的實現的思路:
image
左流是黑色標出來的這一條,右流是藍色標出來的,這條兩流需要做innerjoin。首先左流和右流在元素進入以后,需要把相關的元素存儲到對應的state上面。除了存儲到state上面以外,左流的數據元素到來以后需要去和右邊的RightState去做比較看能不能匹配到。同樣右邊的流元素到了以后,也需要和左邊的LeftState去做比較看是否能夠match,能夠match的話就會作為innerjoin的結果輸出。這個圖是比較粗的展示出來一個innerjoin的大概細節。也是讓大家大概的體會雙流join的實現思路。
2.2.2Regularjoin
我們首先來看一下第1類雙流join的方式,Regularjoin。這種join方式需要去保留兩個流的狀態,持續性地保留并且不會去做清除。兩邊的數據對于對方的流都是所有可見的,所以數據就需要持續性的存在state里面,那么state又不能存的過大,因此這個場景的只適合有界數據流。它的語法可以看一下,比較像離線批處理的SQL:
image
在上圖頁面里面是現在Flink支持Regularjoin的一些寫法,可以看到和我們普通的SQL基本是一致的。
2.2.3Intervaljoin
在雙流join里面Flink支持的第2類join就是Intervaljoin也叫區間join。它是什么意思呢?就是加入了一個時間窗口的限定,要求在兩個流做join的時候,其中一個流必須落在另一個流的時間戳的一定時間范圍內,并且它們的joinkey相同才能夠完成join。加入了時間窗口的限定,就使得我們可以對超出時間范圍的數據做一個清理,這樣的話就不需要去保留全量的State。
Intervaljoin是同時支持processingtime和eventime去定義時間的。如果使用的是processingtime,Flink內部會使用系統時間去劃分窗口,并且去做相關的state清理。如果使用eventime就會利用Watermark的機制去劃分窗口,并且做State清理。
下面我們來看一些示例:
image
上圖這個示例用的數據是兩張表:一個是訂單表,另外一個是配送表。這里定義的時間限定是配送的時間必須在下單后的4個小時內。
Flink的作者之前有一個內容非常直觀的分享,這里就直接復用了他這部分的一個示例:
image
我們可以看到對于Intervaljoin來說:它定義一個時間的下限,就可以使得我們對于在時間下限之外的數據做清理。比如在剛才的SQL里面,其實我們就限定了join條件是ordertime必須要大于shiptime減去4個小時。對于Shipments流來說,如果接收到12:00點的Watermark,就意味著對于Orders流的數據小于8:00點之前的數據時間戳就可以去做丟棄,不再保留在state里面了。
image
同時對于shiptime來說,其實它也設定了一個時間的下限,就是它必須要大于ordertime。對于Orders流來說如果接收到了一個10:15點的Watermark,那么Shipments的state10:15之前的數據就可以拋棄掉。所以Intervaljoin使得我們可以對于一部分歷史的state去做清理。
2.2.4Windowjoin
最后來說一下雙流join的第3種Windowjoin:它的概念是將兩個流中有相同key和處在相同window里的元素去做join。它的執行的邏輯比較像Innerjoin,必須同時滿足joinkey相同,而且在同一個window里元素才能夠在最終結果中輸出。具體使用的方式是這樣的:
image
目前Windowjoin只支持Datastream的API,所以這里使用方式也是Datastream的一個形式。可以看到我們首先把兩流去做join,然后在where和equalTo里面去定義joinkey的條件,然后在window中需要去指定window劃分的方式WindowAssigner,最后要去定義JoinFunction或者是FlatJoinFunction,來實現我們匹配元素的具體處理邏輯。
因為window其實劃分為三類,所以我們的Windowjoin這里也會分為三類:
第1類TumblingWindowjoin:它是按照時間區間去做劃分的window。
image
可以看到這個圖里面是兩個流。在這個例子里我們定義的是一個兩毫秒的窗口,每一個圈是我們每個流上一個單個元素,上面的時間戳代表元素對應的時間,所以我們可以看到它是按照兩毫秒的間隔去做劃分的,window和window之間是不會重疊的。對于第1個窗口我們可以看到綠色的流有兩個元素符合,然后黃色流也有兩個元素符合,它們會以pair的方式組合,最后輸入到JoinFunction或者是FlatJoinFunction里面去做具體的處理。
第2類window是SlidingWindowJoin:這里用的是SlidingWindow。
image
slidingwindow是首先定義一個窗口大小,然后再定義一個滑動時間窗的大小。如果滑動時間窗的大小小于定義的窗口大小,窗口和窗口之間會存在重疊的情況。就像這個圖里顯示出來的,紅色的窗口和黃色窗口是有重疊的,其中綠色流的0元素同時處于紅色的窗口和黃色窗口,說明一個元素是可以同時處于兩個窗口的。然后在具體的SlidingWindowJoin的時候,可以看到對于紅色的窗口來說有兩個元素,綠色0和黃色的0,它們兩個元素是符合windowjoin條件的,于是它們會組成一個0,0的pair。然后對于黃色的窗口符合條件的是綠色的0與黃色0和1兩位數,它們會去組合成0,1、0,0和1,0兩個pair,最后會進入到我們定義的JoinFunction里面去做處理。
第3類是SessionWindowjoin:這里面用到的window是sessionwindow。
image
sessionwindow是定義一個時間間隔,如果一個流在這個時間間隔內沒有元素到達的話,那么它就會重新開一個新的窗口。在上圖里面我們可以看到窗口和窗口之間是不會重疊的。我們這里定義的Gap是1,對于第1個窗口來說,可以看到有綠色的0元素和黃色的1、2元素都是在同一個窗口內,所以它會組成在1,0和2,0這樣的一個pair。剩余的也是類似,符合條件的pair都會進入到最后JoinFunction里面去做處理。
整體我們可以回顧一下,這一節主要是介紹了維表join和雙流join兩大類場景的FlinkETL實現方法。在維表join上主要介紹了預加載維表、熱存儲關聯、廣播維表、Temporaltablefunctionjoin這4種方式。然后在雙流join上我們介紹了Regularjoin、Intervaljoin和Windowjoin。
“疫苗也沒辦法從激增的病例中保護美國”“沒有任何疫苗能夠終結美國的無知和不理性” …… 白宮疫情專家和美國媒體曾發出這樣的感慨,而最近幾天,美國制藥巨頭首個新冠口服藥的消息,讓美股沸騰.
1900/1/1 0:00:00從上世紀90年代開始的計算機浪潮,為人類生產生活帶來了顛覆性改變,隨后我們又經歷了一個從PC電腦向智能手機過渡的時代.
1900/1/1 0:00:00根據《保障農民工工資支付條例》,關于農民工工資的說法,正確的是。A施工總承包單位應按照規定存儲工資保證金B農民工工資可以以部分實務或者有價證券的方式發放給農民工本人C開設和使用農民工工資專用賬戶.
1900/1/1 0:00:00去中心化 CBN碳中和告訴你,由于使用分布式核算和存儲,不存在中心化的硬件或管理機構,任意節點的權利和義務都是均等的,系統中的數據塊由整個系統中具有維護功能的節點來共同維護.
1900/1/1 0:00:00今年以來,濰坊安丘魯安藥業公司在安丘市委、市政府的正確領導下,不斷抓好企業內部生產經營管理,大力拓展深化國際市場,不斷擴大撲熱息痛產品出口量,促進外貿高質量發展.
1900/1/1 0:00:00關注酒加丨Wine+ 快速獲取葡萄酒行業資訊03 2021年香檳產量回升 在2020年初因新冠疫情導致的銷量暴跌后,香檳銷售以驚人的速度復蘇,該地區的監管機構將今年的收成設定為高水平.
1900/1/1 0:00:00