男人的av一区二区资源,亚洲日韩国产精品无码av,蜜桃久久久aaaa成人网一区,亚洲日韩中文字幕一区,在线观看国产亚洲视频免费

Flink SQL 優(yōu)化

設置空閑狀態(tài)保留時(shí)間

不設置空閑狀態(tài)保留時(shí)間會(huì )導致?tīng)顟B(tài)爆炸。

  • FlinkSQL 的 regular join inner 、 left 、 right ),左右表的數據都會(huì )一直保存在狀態(tài)里,不會(huì )清理!要么設置
    TTL ,要么使用 Flink SQL 的 interval join 。
  • 使用 Top N 語(yǔ)法進(jìn)行去重,重復數據的出現一般都位于特定區間內(例如一小時(shí)或一天內),過(guò)了這段時(shí)間之后,對應的
    狀態(tài)就不再需要了。

Flink SQL可以指定空閑狀態(tài)(即未更新的狀態(tài))被保留的最小時(shí)間 當狀態(tài)中某個(gè) key對應的 狀態(tài)未更新的時(shí)間達到閾值時(shí),
該條狀態(tài)被自動(dòng)清理。

API 設置:

tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));

配置參數設置:

Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", " 1 h" );

開(kāi)啟MiniBatch

MiniBatch是微批處理,原理是 緩存一定的數據后再觸發(fā)處理,以減少對 State 的訪(fǎng)問(wèn)從而提升吞吐并減少數據的輸出量。MiniBatch主要依靠在每個(gè)Task上注冊的Timer線(xiàn)程來(lái)觸發(fā)微批,需要消耗一定的線(xiàn)程調度性能。

開(kāi)啟方式

MiniBatch 默認關(guān)閉,開(kāi)啟方式如下:

Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString(" table.exec.mini batch.enabled ", true);
configuration.setString(" table.exec.mini batch.allow latency ", 5 s);
configuration.setString(" table.exec.mini batch.size ", 20000);
  • table.exec.mini batch.enabled: 開(kāi)啟 miniBatch的參數。
  • table.exec.mini batch.allow-latency: 批量輸出的間隔時(shí)間。
  • table.exec.mini batch.size: 防止 OOM 設置每個(gè)批次最多緩存數據的條數 ,可以設為2 萬(wàn)條。

注意:

  • 目前上述樣例中的key value 配置項僅被 Blink planner支持。
  • 1.12 之前的版本有 bug ,開(kāi)啟 miniBatch ,不會(huì )清理過(guò)期狀態(tài),也就是說(shuō)如果設置狀態(tài)的 TTL ,無(wú)法清理過(guò)期狀態(tài)。
    1.12 版本才修復這個(gè)問(wèn)題 。

參考ISSUE:https://issues.apache.org/jira/browse/FLINK_17096

適用場(chǎng)景

微批處理通過(guò)增加延遲換取高吞吐,如果有超低延遲的要求,不建議開(kāi)啟微批處理。通常對于聚合的場(chǎng)景,微批處理可以顯
著(zhù)的提升系統性能,建議開(kāi)啟。

開(kāi)啟 LocalGlobal

原理介紹

LocalGlobal優(yōu)化將原先的 Aggregate 分成 Local+Global 兩階段聚合,即MapReduce 模型中的 Combine+Reduce
處理模式。第一階段在上游節點(diǎn)本地攢一批數據進(jìn)行聚合( localAgg ),并輸出這次微批的增量值 A ccumulator )。第
二階段再將收到的 Accumulator 合并( Merge ),得到最終的結果 GlobalAgg )。

LocalGlobal本質(zhì)上能夠靠 LocalAgg 的聚合篩除部分傾斜數據,從而降低 GlobalAgg的熱點(diǎn),提升性能。結合下圖理解
LocalGlobal 如何解決數據傾斜的問(wèn)題。

pic

  • 未開(kāi)啟 LocalGlobal 優(yōu)化,由于流中的數據傾斜, Key 為紅色的聚合算子實(shí)例需要處理更多的記錄,這就導致了熱點(diǎn)問(wèn)題。
  • 開(kāi)啟 LocalGlobal 優(yōu)化后,先進(jìn)行本地聚合,再進(jìn)行全局聚合??纱蟠鬁p少 GlobalAgg的熱點(diǎn),提高性能。

開(kāi)啟方式

  • LocalGlobal 優(yōu)化需要先開(kāi)啟 MiniBatch ,依賴(lài)于 MiniBatch 的參數。
  • table.optimizer.agg phase strategy : 聚合策略。默認 AUTO ,支持參數 AUTO 、TWO_PHASE( 使用 LocalGlobal 兩階
    段聚合 、 ONE_PHASE( 僅使用 Global 一階段聚合)。
// 初始化運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = StreamTableEnvironment.create(env);
Configuration configuration = tEnv.getConfig().getConfiguration();
// 開(kāi)啟 miniBatch
configuration.setString("table.exec.mini-batch.enabled ", true);
// 批量輸出的間隔時(shí)間
configuration.setString("table.exec.mini-batch.allow latency ", "5s");
// 防止 OOM 設置每個(gè)批次最多緩存數據的條數 ,可以設為 2 萬(wàn)條
configuration.setString("table.exec.mini-batch.size ", "20000");
// 開(kāi)啟 LocalGlobal
config uration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
    1. 需要先開(kāi)啟 MiniBatch。
    1. 開(kāi)啟 LocalGlobal 需要 UDAF 實(shí)現 Merge 方法 。

開(kāi)啟Split Distinct

LocalGlobal優(yōu)化針對普通聚合(例如 SUM 、 COUNT 、 MAX 、 MIN 和 AVG )有較好的效果。對于 DISTINCT 的聚合(如
COUNT DISTINCT 收效不明顯,因為 COUNT DISTINCT 在 Local 聚合時(shí),對于 DISTINCT KEY 的去重率不高,導致在
Global 節點(diǎn)仍然存在熱點(diǎn)。

原理介紹

為了解決COUNT DISTINCT 的熱點(diǎn)問(wèn)題,通常需要手 動(dòng)改寫(xiě)為兩層聚合(增加按 Distinct Key取模的打散層)。

從 Flink1.9.0 版本開(kāi)始,提供了 COUNT DISTINCT 自動(dòng)打散功能, 通過(guò)HASH_CODE(distinct_key) % BUCKET_NUM 打散,
不需要手動(dòng)重寫(xiě)。Split Distinct 和LocalGlobal 的原理對比參見(jiàn)下圖。

pic

Distinct舉例

SELECT 
  a,COUNT(DISTINCT b)
FROM 
  T
GROUP BY a

使用SQL語(yǔ)句手動(dòng)打散:

SELECT a,SUM(cnt)
FROM (
  SELECT a,COUNT(DISTINCT b ) as cnt
  FROM T
  GROUP BY a,MOD(HASH_CODE(b), 1024) 
)
GROUP BY a

特性開(kāi)啟

默認不開(kāi)啟,使用參數顯式開(kāi)啟。

  • table.optimizer.distinct agg.split.enabled: true: 默認 false 。
  • table.optimizer.distinct agg.split.bucket num: Split Distinct: 優(yōu)化在第一層聚合中,被打散的bucket 數目。默認 1024。
// 初始化 table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv 的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設置參數:要結合 minibatch 一起 使用
// 開(kāi)啟 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層 打 散 的 bucket 數目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");

注意事項

  • (1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 優(yōu)化方法。
  • (2)拆分出來(lái)的兩個(gè) GROUP 聚合還可參與 LocalGlobal 優(yōu)化。
  • (3)該功能在Flink 1.9.0 版本 及以上版本才支持。

多維DISTINCT 使用Filter

在某些場(chǎng)景下,可能需要從不同維度來(lái)統計count distinct )的結果 (比如統計 uv 、app 端的 uv 、 web 端的 uv 可能
會(huì )使用如下 CASE WHEN 語(yǔ)法 。

SELECT
  a,
  COUNT(DISTINCT b ) AS total_ b,
  COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB b,
  COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a

在這種情況下,建議使用FILTER 語(yǔ)法 , 目前的 Flink SQL 優(yōu)化器可以識別同一唯一鍵上的不同 FILTER 參數。如,在上
面的示例中,三個(gè) COUNT DISTINCT 都作用在 b 列上。此時(shí),經(jīng)過(guò)優(yōu)化器識別后,Flink 可以只使用一個(gè)共享狀態(tài)實(shí)例,
而不是三個(gè)狀態(tài)實(shí)例,可減少狀態(tài)的大小和對狀態(tài)的訪(fǎng)問(wèn)。

將上邊的CASE WHEN 替換成 FILTER 后 ,如下所示:

SELECT
  a,
  COUNT(DISTINCT b ) AS b,
  COUNT(DISTINCT b ) FILT ER (WHERE c IN ('A', 'B')) AS AB_b,
  COUNT(DISTINCT b ) FILTER (WHERE c IN ('C', 'D')) AS CD b
FROM T
GROUP BY a



標 題:《Flink SQL 優(yōu)化
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    0 評論
avatar

取消
男人的av一区二区资源,亚洲日韩国产精品无码av,蜜桃久久久aaaa成人网一区,亚洲日韩中文字幕一区,在线观看国产亚洲视频免费