男人的av一区二区资源,亚洲日韩国产精品无码av,蜜桃久久久aaaa成人网一区,亚洲日韩中文字幕一区,在线观看国产亚洲视频免费

Flink狀態(tài)后端和CheckPoint 調優(yōu)

RocksDB 介紹

RocksDB 是嵌入式的 Key-Value 數據庫,在 Flink 中被用作 RocksDBStateBackend 的底層存儲。如下圖所示,RocksDB 持久化的 SST文件在本地文件系統上通過(guò)多個(gè)層級進(jìn)行組織,不同層級之間會(huì )通過(guò)異步Compaction 合并重復、過(guò)期和已刪除的數據。在 RocksDB 的寫(xiě)入過(guò)程中,數據經(jīng)過(guò)序列化后寫(xiě)入到WriteBuffer,WriteBuffer 寫(xiě)滿(mǎn)后轉換為 Immutable Memtable 結構,再通過(guò) RocksDB 的flush 線(xiàn)程從內存 flush 到磁盤(pán)上;讀取過(guò)程中,會(huì )先嘗試從 WriteBuffer 和 Immutable Memtable 中讀取數據,如果沒(méi)有找到,則會(huì )查詢(xún) Block Cache,如果內存中都沒(méi)有的話(huà),則會(huì )按層級查找底層的 SST 文件,并將返回的結果所在的 Data Block 加載到 BlockCache,返回給上層應用。

pic

RocksDBKeyedStateBackend增量快照介紹

這里介紹一下大家在大狀態(tài)場(chǎng)景下經(jīng)常需要調優(yōu)的 RocksDBKeyedStateBackend 增量快照。RocksDB 具有 append-only 特性,Flink 利用這一特性將兩次 checkpoint 之間 SST 文件列表的差異作為狀態(tài)增量上傳到分布式文件系統上,并通過(guò) JobMaster 中的SharedStateRegistry 進(jìn)行狀態(tài)的注冊和過(guò)期。

pic

如上圖所示,Task 進(jìn)行了 3 次快照(假設作業(yè)設置保留最近 2 次 Checkpoint):

  • CP-1:RocksDB 產(chǎn)生 sst-1 和 sst-2 兩個(gè)文件,Task 將文件上傳至 DFS,JM 記錄 sst 文件對應的引用計數
  • CP-2:RocksDB 中的 sst-1 和 sst-2 通過(guò) compaction 生成了 sst-1,2,并且新生成了 sst-3 文件,Task 將兩個(gè)新增的文件上傳至 DFS,JM 記錄 sst 文件對應的引用計數
  • CP-3:RocksDB 中新生成 sst-4 文件,Task 將增量的 sst-4 文件上傳至 DFS,且在 CP-3 完成后,由于只保留最近 2 次 CP,JobMaster 將 CP-1 過(guò)期,同時(shí)將 CP-1 中的 sst 文件對應的引用計數減 1,并刪除引用計數歸 0 的 sst 文件(sst-1 和 sst-2)

增量快照涉及到 Task 多線(xiàn)程上傳/下載增量文件,JobMaster 引用計數統計,以及大量與分布式文件系統的交互等過(guò)程,相對其他的StateBackend 要更為復雜,在 100+GB 甚至 TB 級別狀態(tài)下,作業(yè)比較容易出現性能和穩定性瓶頸的問(wèn)題。

RocksDb大狀態(tài)優(yōu)化

截至當前,Flink 作業(yè)的狀態(tài)后端仍然只有 Memory、FileSystem 和 RocksDB 三種可選,且 RocksDB 是
狀態(tài)數據量較大(GB 到 TB 級別)時(shí)的選擇。RocksDB 的性能發(fā)揮非常仰賴(lài)調優(yōu),如果全部采用默認配置,讀寫(xiě)性能有可能會(huì )很差。

但是,RocksDB 的配置也是極為復雜的,可調整的參數多達百個(gè),沒(méi)有放之四海而皆準的優(yōu)化方案。如果僅考慮 Flink 狀態(tài)存儲這一方面,我們仍然可以總結出一些相對普適的優(yōu)化思路。本文先介紹一些基礎知識,再列舉方法。

開(kāi)啟State訪(fǎng)問(wèn)性能監控

flink1.13中引入了State訪(fǎng)問(wèn)的性能監控,即latency tracking state、此功能不局限于State Backend的類(lèi)型,自定義實(shí)現的StateBackend也可以復用此功能。

pic

state訪(fǎng)問(wèn)的性能監控會(huì )產(chǎn)生一定的性能影響,所以默認每100次做一次抽樣sample,對不同的state Backend性能損失影響不同。

對于RocksDB State Backend,性能損失大概在1%左右

對于heap State Backend,性能損失最多可達10%(內存本身速度比較快,一點(diǎn)損失影響就很大)

性能監控默認不開(kāi)啟,開(kāi)啟需要配置下面參數(第一個(gè)參數為開(kāi)啟,其他參數可根據實(shí)際情況調整):

state.backend.latency-track.keyed-state-enabled:true:?jiǎn)⒂迷L(fǎng)問(wèn)狀態(tài)的性能監控

state.backend.latency-track.sample-interval:100: 采樣間隔

state.backend.latency-track.histroy-size:128:保留的采樣數據個(gè)數,越大越精確

state.backend.latency-track.state-name-as-variable:true:將狀態(tài)名作為變量

pic

0代表是任務(wù)編號,filter.visit-state是定義的狀態(tài)的變量名。

pic

有很多這種統計值可以查看,中位值,75分位值等。

開(kāi)啟增量CheckPoint和本地恢復

開(kāi)啟增量CheckPoint

RocksDB是目前唯一可用于支持有狀態(tài)流處理應用程序增量檢查點(diǎn)的狀態(tài)后端,可以修改參數開(kāi)啟增量CheckPoint:

state.backend.incremental:true:默認false,可以改為true。

或代碼中指定

new EmbededRocksDBStateBackend(true)

開(kāi)啟本地恢復

當flink任務(wù)失敗時(shí),可以基于本地的狀態(tài)信息進(jìn)行恢復任務(wù)??赡懿恍枰獜膆dfs拉取數據。本地恢復目前僅涵蓋鍵值類(lèi)型的狀態(tài)后端(RocksDB)。MemoryStateBackend不支持本地恢復并忽略此選項

state.backend.local-recovery:true

設置多目錄

如果你有多塊磁盤(pán),可以考慮指定本地多目錄

state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb

注意: 不要配置單塊磁盤(pán)的多個(gè)目錄,務(wù)必將目錄配置到多塊不同的磁盤(pán)上,讓多塊磁盤(pán)來(lái)分擔io壓力。

調整RockSDB的預定義選項

Flink針對不同的設置為RocksDB提供了一些預定義選項,如果調整預定義選項達不到預期,再去調整block、writebuffer等參數。

當前支持的預定義選項有支持的選項有:DEFAULT、SPINING_DISK_OPTIMIZED、SPINNING_DISK_OPTIMIZED_HIGH_MEM、
FLASH_SSD_OPTIMIZED (有條件使用ssd的可以使用這個(gè)選項)。

我們一般使用第三個(gè)SPINNING_DISK_OPTIMIZED_HIGH_MEM,設置為機械硬盤(pán)+內存模式。

該模式下flink會(huì )幫我們設置一些它認為比較ok的參數(選項集合),具體如下:

SPINNING_DISK_OPTIMIZED_HIGH_MEM(new HashMap<ConfigOption<?>, Object>() {
    private static final long serialVersionUID = 1L;
    {
      put(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
      put(RocksDBConfigurableOptions.MAX_OPEN_FILES, -1);
      put(RocksDBConfigurableOptions.COMPACTION_STYLE, CompactionStyle.LEVEL);
      put(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, true);
      put(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, MemorySize.parse("256mb"));
      put(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, MemorySize.parse("1gb"));
      put(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, MemorySize.parse("64mb"));
      put(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE, 3);
      put(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, 4);
      put(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, MemorySize.parse("256mb"));
      put(RocksDBConfigurableOptions.BLOCK_SIZE, MemorySize.parse("128kb"));
      put(RocksDBConfigurableOptions.USE_BLOOM_FILTER, true);
    }
  })

可以在提交任務(wù)時(shí)指定(磁盤(pán)+內存):

state.backend.rocksdb.predefined-options:SPINNING_DISK_OPTIMIZED_HIGH+MEN

也可以在代碼中指定:

EmbededRocksDBStateBackend embededRocksDBStateBackend = new EmbededRocksDBStateBackend();
EmbededRocksDBStateBackend,setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
env.setStateBackend(embeddedRocksDBStateBackend);

增大block緩存

整個(gè)RocksDB共享一個(gè)block cache,讀數據時(shí)內存的cache大小,該參數越大讀數據時(shí)緩存命中率越高,默認大小為8MB,建議設置到64~256MB 。

state.backend.rocksdb.block.cache-size: 64m: 默認為8m。

增加write_buffer和level閾值大小

RocksDB中,每個(gè)State使用一個(gè)Column Family,每個(gè)Column Family使用獨占的write buffer, 默認64MB,建議調大。

調整這個(gè)參數通常要適配L1層的大小閾值max-size-level-base,默認256m。

該值太小惠造成能存放的SST文件過(guò)少,層數變多造成查找困難,太大會(huì )造成文件過(guò)多,合并困難。建議設為
target_file_size_base(默認為64MB)的倍數。且不能太小,例如5-10倍,即320~640MB。

state.backend,rocksdb.writebuffer.size: 128m

state.backend.rocksdb.compaction.level.max-size-level-base: 320m

增大write buffer數量

每個(gè)Column Family對應的write buffer最大數量,實(shí)際上時(shí)內存中只讀內存表的最大數量,默認值為2,對于機械磁盤(pán)來(lái)講,如果內存足夠大,可以調整到5左右。

state.backend.rocksdb.writebuffer.count: 5

增大后臺線(xiàn)程數和write buffer閾值大小

增大線(xiàn)程數

用于后臺flush和合并sst文件的線(xiàn)程數,默認為1.建議調大,機械硬盤(pán)用戶(hù)可以改為4等更大的值:

state.backend,rocksdb.thread.num: 4

增大write buffer最小合并數

將數據從write buffer中flush到磁盤(pán)時(shí),需要合并的write buffer最小數量。默認為1,可以調整為3.

state.backend.rocksdb.writebuffer.number-to-merge: 3

開(kāi)啟分區索引功能

Flink 1.13 中對 RocksDB 增加了分區索引功能 ,復用了 RocksDB 的 partitioned Index & filter 功能,簡(jiǎn)單來(lái)說(shuō)就是對 RocksDB的 partitioned Index 做了多級索引。

也就是將內存中的最上層常駐,下層根據需要再 load 回來(lái),這樣就大大降低了數據 Swap 競爭。線(xiàn)上測試中,相對于內存比較小的場(chǎng)景中,性能提升 10 倍左右。如果在內存管控下 RocksDB 性能不如預期的話(huà),這也能成為一個(gè)性能優(yōu)化點(diǎn)。

state.backend.rocksdb.memory.partitioned-index-filters:true

Checkpoint 設置

一般需求,我們的 Checkpoint 時(shí)間間隔可以設置為分鐘級別 (1 ~ 5 分鐘)。對于狀態(tài)很大的任務(wù),每次 Checkpoint 訪(fǎng)問(wèn) HDFS 比較耗時(shí),可以設置為 5~10 分鐘一次 Checkpoint,并且調大兩次 Checkpoint 之間的暫停間隔,例如設置兩次 Checkpoint 之間至少暫停 4 或 8 分鐘。同時(shí),也需要考慮時(shí)效性的要求,需要在時(shí)效性和性能之間做一個(gè)平衡,如果時(shí)效性要求高,結合 end- to-end 時(shí)長(cháng), 設置秒級或毫秒級。如果Checkpoint 語(yǔ)義配置為EXACTLY_ONCE,那么在 Checkpoint 過(guò)程中還會(huì )存在 barrier 對齊的過(guò)程,可以
通過(guò) Flink Web UI 的 Checkpoint 選項卡來(lái)查看 Checkpoint 過(guò)程中各階段的耗時(shí)情況,從而確定到底是哪個(gè)階段導致 Checkpoint時(shí)間過(guò)長(cháng)然后針對性的解決問(wèn)題。

RocksDB 相關(guān)參數在前面已說(shuō)明,可以在 flink-conf.yaml 指定,也可以在 Job 的代碼中調用 API 單獨指定,這里不再列出。

// 使?  RocksDBStateBackend 做為狀態(tài)后端,  并開(kāi)啟增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://hadoop1:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 開(kāi)啟 Checkpoint , 間隔為 3 分鐘
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 最小間隔 4 分鐘
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4)); // 超時(shí)時(shí)間  10 分鐘
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存 checkpoint
checkpointConf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);



標 題:《Flink狀態(tài)后端和CheckPoint 調優(yōu)
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    0 評論
avatar

取消
男人的av一区二区资源,亚洲日韩国产精品无码av,蜜桃久久久aaaa成人网一区,亚洲日韩中文字幕一区,在线观看国产亚洲视频免费