专栏名称: 逸言
文学与软件,诗意地想念。
目录
相关文章推荐
程序员的那些事  ·  DeepSeek 下棋靠忽悠赢了 ... ·  14 小时前  
程序员的那些事  ·  趣图:“微软穷疯了?上架的 ... ·  14 小时前  
码农翻身  ·  漫画 | ... ·  21 小时前  
程序员小灰  ·  这款AI编程工具,将会取代Cursor! ·  昨天  
程序员的那些事  ·  热搜第一!DeepSeek百万年薪招AI人才 ... ·  3 天前  
51好读  ›  专栏  ›  逸言

Flink分布式程序的异常处理

逸言  · 公众号  · 程序员  · 2021-07-04 22:38

正文


在我们的数据平台产品中,为了简化开发,对Flink做了一层封装,定义了Job和Flow的抽象。一个Job其实就是Flink的一个作业,每个Job可以定义多个Flow,一个Flow可以理解为是Flink的一个DataStream,利用Job传递的StreamExecutionEnvironment可以在Flow中添加包括Source与Sink的多个算子。


Job与Flow之间的关系可以利用自定义的@JobFlow注解进行配置,如此就可以在执行抽象的AbstractJob的run()方法时,利用反射获得该Job下的所有Flow,遍历执行每个Flow的run()方法。在Flow的run()方法中,才会真正根据StreamExecutionEnvironment执行多个算子。


Flink为了保证计算的稳定性,提供了不同的重启策略。例如,当我们将重启策略设置为失败率(failure-rate)时,如果执行的任务出错次数达到了失败率配置的要求,Flink的Worker节点的TaskManager就会重启。如果超过重启次数,Task Manager就会停止运行。


失败的原因可能有很多,例如资源不足、网络通信出现故障等Flink集群环境导致的故障,但是也可能是我们编写的作业在处理流式数据时,因为处理数据不当抛出了业务异常,使得Flink将其视为一次失败。


为了减少因为业务原因抛出异常导致Task Manager的不必要重启,需要规定我们编写的Flink程序的异常处理机制。由于封装了Flink的Job,从一开始,我就考虑一劳永逸地解决业务异常的问题,即在AbstractJob的run()方法中,捕获我们自定义的业务异常,在日志记录了错误信息后,把该异常“吃”掉,避免异常的抛出导致执行失败,造成TaskManager的重启,如:

public abstract class AbstractFlow implements Flow {      public void run() {        try {            runBare();        } catch (DomainException ex) {            //...        }    }      protected abstract void runBare();}


哪知道这一处理机制压根儿就无法捕获业务异常! 为什么呢? 这就要从Flink的分布式机制说起了。


在Flink集群上执行任务,需要Client将作业提交给Flink集群的Master节点。Master的Dispatcher接收到Job并启动JobManager,通过解析Job的逻辑视图,了解Job对资源的要求,然后向ResourceManager(Standalone模式,如果是YARN,则由YARN管理和调度资源)申请本次Job需要的资源。JobManager将Job的逻辑视图转换为物理视图,并将计算任务分发部署到Flink集群的TaskManager上。整个执行过程如下图所示:



我们封装的一个Flow,在物理视图中,其实就是一个作业,即前面所说的计算任务。一个作业可以包含多个算子。如果相邻算子之间不存在数据Shuffle、并行度相同,则会合并为算子链(Operator Chain)。每个算子或算子链组成一个JobVertex,在执行时作为一个任务(Task)。根据并行度的设置,每个任务包含并行度数目的子任务(SubTask),这些子任务就是作业调度的最小逻辑单元,对应于进程资源中的一个线程,在Flink中,就是一个Slot(如果不考虑Slot共享的话)。


假定Flink环境的并行度设置为1,作业的前面两个算子满足合并算子链的要求,且并行度设置为2;之后,通过keyBy()之类的算子完成了数据的Shuffle,然后再合并到同一个Sink中。那么它们的关系如下图所示:








请到「今天看啥」查看全文