简介
Flink本身为了保证其高可用的特性,以及保证作用的Exactly Once的快速恢复,进而提供了一套强大的Checkpoint机制。Checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的Checkpoint机制原理来自“Chandy-Lamport algorithm”算法 (分布式快照算法)。
Checkpoint的执行流程
每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。
- CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier;
- 当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理;
- 下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理;
- 每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。
- 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败 ;
Checkpoint常用设置
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
复制代码
- 使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode);interval用于指定checkpoint的触发间隔(单位milliseconds),而CheckpointingMode默认是CheckpointingMode.EXACTLY ONCE,也可以指定为CheckpointingMode.AT LEAST_ONCE
- 也可以通过StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT LEAST ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以
- checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉
- minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
- maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)
- enableExternalizedCheckpoints用于开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE ON CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN ON CANCELLATION则在job canceled的时候会保留externalized checkpoint state
- failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行
flink-conf.yaml相关配置
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
# state.backend.incremental: false
复制代码
- state.backend用于指定checkpoint state存储的backend,默认为none
- state.backend.async用于指定backend是否使用异步snapshot(默认为true),有些不支持async或者只支持async的state backend可能会忽略这个参数
- state.backend.fs.memory-threshold,默认为1024,用于指定存储于files的state大小阈值,如果小于该值则会存储在root checkpoint metadata file
- state.backend.incremental,默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置
- state.backend.local-recovery,默认为false
- state.checkpoints.dir,默认为none,用于指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见
- state.checkpoints.num-retained,默认为1,用于指定保留的已完成的checkpoints个数
- state.savepoints.dir,默认为none,用于指定savepoints的默认目录
- taskmanager.state.local.root-dirs,默认为none
增量式的检查点- Checkpoint设置的奇技淫巧