Flink作業(yè)反壓處理
簡(jiǎn)介
反壓(backpressure)是實(shí)時(shí)計算應用開(kāi)發(fā)中,特別是流式計算中,十分常見(jiàn)的問(wèn)題。反壓意味著(zhù)數據管道中某個(gè)節點(diǎn)成為
瓶頸,處理速率跟不上上游發(fā)送數據的速率,而需要對上游進(jìn)行限速。由于實(shí)時(shí)計算應用通常使用消息隊列來(lái)進(jìn)行生產(chǎn)端和
消費端的解耦,消費端數據源是 pull-based 的,所以反壓通常是從某個(gè)節點(diǎn)傳導至數據源并降低數據源(比如 Kafka
consumer)的攝入速率。
簡(jiǎn)單來(lái)說(shuō),Flink 拓撲中每個(gè)節點(diǎn)(Task)間的數據都以阻塞隊列的方式傳輸,下游來(lái)不及消費導致隊列被占滿(mǎn)后,上游的
生產(chǎn)也會(huì )被阻塞,最終導致數據源的攝入被阻塞。
TCP-based 反壓的弊端

- 單個(gè)Task導致的反壓,會(huì )阻斷整個(gè)TM-TM之間的socket,連checkpoint barries也無(wú)法發(fā)出。
- 反壓傳播路徑長(cháng),導致生效時(shí)延較大。
Credit-based 反壓
反壓過(guò)程簡(jiǎn)介
如圖所示在 Flink 層面實(shí)現反壓機制,就是每一次 ResultSubPartition 向 InputChannel 發(fā)送消息的時(shí)候都會(huì )發(fā)送一個(gè)
backlog size 告訴下游準備發(fā)送多少消息,下游就會(huì )去計算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer
就會(huì )返還給上游一個(gè) Credit 告知他可以發(fā)送消息(圖上兩個(gè) ResultSubPartition 和 InputChannel 之間是虛線(xiàn)是因為最
終還是要通過(guò) Netty 和 Socket 去通信),下面我們看一個(gè)具體示例。

假設我們上下游的速度不匹配,上游發(fā)送速率為 2,下游接收速率為 1,可以看到圖上在 ResultSubPartition 中累積了兩
條消息,10 和 11, backlog 就為 2,這時(shí)就會(huì )將發(fā)送的數據 <8,9> 和 backlog = 2 一同發(fā)送給下游。下游收到了之后
就會(huì )去計算是否有 2 個(gè) Buffer 去接收,可以看到 InputChannel 中已經(jīng)不足了這時(shí)就會(huì )從 Local BufferPool 和 Network
BufferPool 申請,好在這個(gè)時(shí)候 Buffer 還是可以申請到的。
過(guò)了一段時(shí)間后由于上游的發(fā)送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已經(jīng)到達了申請上限,這時(shí)候
下游就會(huì )向上游返回 Credit = 0,ResultSubPartition 接收到之后就不會(huì )向 Netty 去傳輸數據,上游 TaskManager 的
Buffer 也很快耗盡,達到反壓的效果,這樣在 ResultSubPartition 層就能感知到反壓,不用通過(guò) Socket 和 Netty 一層
層地向上反饋,降低了反壓生效的延遲。同時(shí)也不會(huì )將 Socket 去阻塞,解決了由于一個(gè) Task 反壓導致 TaskManager 和
TaskManager 之間的 Socket 阻塞的問(wèn)題。
反壓的理解
Flink拓撲中的每個(gè)節點(diǎn)(Task)間的數據都已阻塞隊列的方式傳輸,下游來(lái)不及消費導致隊列被占滿(mǎn)后,上游生產(chǎn)也會(huì )被阻
塞,最終導致數據源的攝入被阻塞。
反壓通常產(chǎn)生于這樣的場(chǎng)景:短時(shí)間的負載高峰期導致系統接受數據的速率遠高于他處理數據的速率。許多日常問(wèn)題都會(huì )導
致反壓,例如:垃圾回收可能會(huì )導致流入的數據快速堆積,或遇到大促銷(xiāo)、秒殺活動(dòng)導致流量暴增。
反壓的影響
反壓并不會(huì )直接影響作業(yè)的可用性,它表明作業(yè)處于亞健康的狀態(tài),有潛在的性能瓶頸并可能導致更大的數據處理延遲。通
常來(lái)說(shuō),對于一些對延遲要求不太高或者數據量比較小的應用來(lái)說(shuō),反壓的影響可能并不明顯,然而對于規模比較大的
Flink 作業(yè)來(lái)說(shuō)反壓可能會(huì )導致嚴重的問(wèn)題。
反壓如果不能正確處理,可能會(huì )影響到checkpoint時(shí)長(cháng)和state大小,甚至可能會(huì )導致資源耗盡甚至系統崩潰。
- 影響checkpoint時(shí)長(cháng):barries不會(huì )越過(guò)普通數據,數據處理會(huì )被阻塞也可能會(huì )導致checkpoint barries流經(jīng)整個(gè)數據管道
的時(shí)長(cháng)變長(cháng),導致checkpoint的總時(shí)長(cháng)(End to Duration)變長(cháng)。 - 影響state大?。篵arries對齊時(shí),接受到較快的輸入管道的barries后,他后面數據會(huì )被緩存起來(lái)單不處理,直到較慢的輸
入管道的barries也到達,這些被緩存的數據會(huì )被放到state里面,導致checkpoint變大。
這兩個(gè)影響對于生產(chǎn)環(huán)境的作業(yè)十分危險的,因為checkpoint時(shí)保證數據一致性的關(guān)鍵,checkpoint時(shí)間變長(cháng)有可能會(huì )導致
checkpoint超時(shí)失敗。而state大小同樣可能拖慢checkpoint甚至OOM(使用Heap-based StateBackend)或者物理機內存
使用超過(guò)容器資源(使用RocksDBStateBackend)的穩定性。
反壓定位
Flink Web UI 自帶的反壓監控
Flink Web UI 的反壓監控提供了 Subtask 級別的反壓監控。監控的原理是通過(guò)Thread.getStackTrace() 采集在
TaskManager 上正在運行的所有線(xiàn)程,收集在緩沖區請求中阻塞的線(xiàn)程數(意味著(zhù)下游阻塞),并計算緩沖區阻塞線(xiàn)程數與
總線(xiàn)程數的比值 rate。其中,rate < 0.1 為 OK,0.1 <= rate <= 0.5 為 LOW,rate > 0.5 為 HIGH。

以下兩種場(chǎng)景可能導致反壓:
- 該節點(diǎn)發(fā)送速率跟不上它的產(chǎn)生數據速率。該場(chǎng)景一般是單輸入多輸出的算子,例如FlatMap。定位手段是因為這是從
Source Task 到 Sink Task 的第一個(gè)出現反壓的節點(diǎn),所以該節點(diǎn)是反壓的根源節點(diǎn)。 - 下游的節點(diǎn)處理數據的速率較慢,通過(guò)反壓限制了該節點(diǎn)的發(fā)送速率。定位手段是從該節點(diǎn)開(kāi)始繼續排查下游節點(diǎn)。
注意事項:
- 因為Flink Web UI 反壓面板是監控發(fā)送端的,所以反壓的根源節點(diǎn)并不一定會(huì )在反壓面板體現出高反壓。如果某個(gè)節點(diǎn)是
性能瓶頸并不會(huì )導致它本身出現高反壓,而是導致它的上游出現高反壓??傮w來(lái)看,如果找到第一個(gè)出現反壓的節點(diǎn),則反
壓根源是這個(gè)節點(diǎn)或者是它的下游節點(diǎn)。 - 通過(guò)反壓面板無(wú)法區分上述兩種狀態(tài),需要結合 Metrics 等監控手段來(lái)定位。如果作業(yè)的節點(diǎn)數很多或者并行度很大,即
需要采集所有 Task 的棧信息,反壓面板的壓力也會(huì )很大甚至不可用 。
Flink Task Metrics 監控反壓
Network和 task I/Ometrics 是輕量級反壓監視器,用于正在持續運行的作業(yè),其中一下幾個(gè) metrics 是最有用的反壓指標。
| Metrics | 描述 |
|---|---|
| outPoolUsage | 發(fā)送端Buffer的使用率 |
| inPoolUsage | 接受端Buffer的使用率 |
| floatinguffersUsage(1.9以上) | 接受端floating Buffer的使用率 |
| exclusiveBuffersUsage(1.9以上) | 接受端exclusive Buffer的使用率 |
采用 Metrics 分析反壓的思路:如果一個(gè) Subtask 的發(fā)送端 Buffer 占用率很高,則表明它被下游反壓限速了;如果一個(gè)
Subtask 的接受端 Buffer 占用很高,則表明它將反壓傳導至上游。
| / | outPoolUsage 低 | outPoolUsage 高 |
|---|---|---|
| inPoolUsage 低 | 正常 | 被下游反壓,處于臨時(shí)情況,沒(méi)傳遞到上游;可能時(shí)反壓的根源,一條輸入多條輸出的場(chǎng)景 |
| inPoolUsage 高 | 如果時(shí)上游所有outPoolUsage 都是低,有可能最終可能導致反壓(還沒(méi)傳遞到上游;如果時(shí)上游所有的outPoolUsage 都是高,則為反壓根源) | 被下游反壓。 |
inPoolUsage和outPoolUsage反壓分析表
- outPoolUsage 和 inPoolUsage 同為低表明當前 Subtask 是正常的,同為高分別表明當前 Subtask 被下游反壓。
- 如果一個(gè) Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影響,所以可以排查它本身是反壓根源的可能性。
- 如果一個(gè) Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,則表明它有可能是反壓的根源。因為通常反壓會(huì )傳
導至其上游,導致上游某些 Subtask 的 outPoolUsage 為高。
反壓有時(shí)是短暫的且影響不大,比如來(lái)自某個(gè) channel 的短暫網(wǎng)絡(luò )延遲或者 TaskManager 的正常 GC,這種情況下可以不用處理。
outPoolUsage 與 floatingBuffersUsage 、 exclusiveBuffersUsage 的關(guān)系:
- floatingBuffersUsage 為高則表明反壓正在傳導至上游。
- exclusiveBuffersUsage 則表明了反壓可能存在傾斜。如果floatingBuffersUsage 高、exclusiveBuffersUsage 低,則存
在傾斜。因為少數 channel 占用了大部分的 floating Buffer(channel 有自己的 exclusive buffer,當 exclusive
buffer 消耗完,就會(huì )使用floating Buffer)
反壓的原因及處理
注意:反壓可能時(shí)暫時(shí)的,可能由于負載高峰,CheckPoint或者作業(yè)重啟引起的數據積壓而導致的反壓。如果反壓是暫時(shí)的,
應該忽略它。另外,請記住,斷斷續續的反壓會(huì )影響我們的分析和解決問(wèn)題。
定位到反壓節點(diǎn)后,分析造成反壓的原因的辦法主要是觀(guān)察Task Thread。按照下面順序一步步排查。
使用火焰圖分析
火焰圖是跟蹤堆棧線(xiàn)程然后重復多次采樣而生成的。每個(gè)方法的調用都會(huì )有一個(gè)長(cháng)方型表示,長(cháng)方型的長(cháng)度和它在采樣中出
現的次數成正比。是Flink 1.13 新特性。
開(kāi)啟方法:
rest.flamegraph.enabled : true
橫向就是耗時(shí)時(shí)長(cháng),橫向越長(cháng)表示耗時(shí)越長(cháng)??v向表示調用棧。一般只需要看最上面函數。
分析GC情況
TaskManager的內存以及GC問(wèn)題也會(huì )導致反壓,包括TaskManager JVM 各區內存不合理導致頻繁Full GC甚至失聯(lián)。通常建議
使用默認的G1垃圾回收器。
打印 GC 日志的第一步,就是開(kāi)啟 GC 打印的參數了,也是最基本的參數。
-XX:+PrintGCDetails -XX:+PrintGCDateStamps
-D參數配置方式:
-Denv.java.opt="-XX:+PrintGCDetails -XX:+PrintGCDateStamps"
外部交互組件
如果我們發(fā)現我們的source端數據讀取性能比較低或者Sink端寫(xiě)入性能較差,需要檢查第三方組件是否遇到瓶頸,以及做維表
join時(shí)的性能問(wèn)題,也許要和外部組件交互。
關(guān)于第三方的性能問(wèn)題,需要結合具體的組件來(lái)分析,最常用的思路:
1、異步IO + 熱緩存來(lái)優(yōu)化讀寫(xiě)性能,減少對外部組件的訪(fǎng)問(wèn)。
2、先攢批在進(jìn)行讀寫(xiě)操作。
評論