专栏名称: OSC开源社区
OSChina 开源中国 官方微信账号
目录
相关文章推荐
程序员小灰  ·  上海微软大裁员,赔偿达N+8,12年老员工感 ... ·  22 小时前  
程序员小灰  ·  OpenAI正式发布o3:通往AGI的路上, ... ·  4 天前  
程序员的那些事  ·  “应该禁止所有新项目使用 ... ·  3 天前  
51好读  ›  专栏  ›  OSC开源社区

Java消息队列任务的平滑关闭

OSC开源社区  · 公众号  · 程序员  · 2016-11-19 08:21

正文

#长按上图识别二维码,参与OSC源创会年终盛典#


摘要


对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收消息,放入一个线程池进行异步处理,并发的快速处理。当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?


1.问题背景


对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收消息,放入一个线程池进行异步处理,并发的快速处理。


那么问题来了,当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?


正常来说,订阅者程序关闭后,消息会在发送者队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。唯一可能丢失的消息,就是在关闭的一瞬间,已经从队列中取出但还没有处理完毕的消息。


因此我们需要一套平滑关闭的机制,保证在重启的时候,消息可以正常处理完成。


2.问题分析


平滑关闭的思路如下:


1.在关闭程序时,首先关闭消息订阅,这个时候消息都在发送者队列中


2.关闭本地消息处理线程池(等待本地线程池中的消息处理完毕)


3.程序退出


关闭消息订阅:一般消息队列的客户端都提供关闭连接的方法,具体可以自行查看api


关闭线程池:Java的ThreadPoolExecutor线程池提供shutdown()和shutdownNow()两个方法,区别是前者会等待线程池中的消息都处理完毕,后者直接停止线程的执行并返回list集合。因为我们需要使用shutdown()方法进行关闭,并通过isTerminated(),方法判断线程池是否已经关闭。


那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?


在Linux中,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过 kill -l查看kill 命令的其它信号量,比如使用 12) SIGUSR2 信号量

我们可以在Java程序启动时,注册对应的信号量,对信号量进行监听,在收到对应的kill操作时,执行相关的业务操作。


伪代码如下



下面通过一个demo模拟相关逻辑操作


首先模拟一个生产者,每秒生产5个消息


然后模拟一个订阅者,收到消息后交给线程池进行处理,线程池固定4个线程,每个消息处理时间1秒,这样线程池每秒会积压1个消息。(详细代码请点击“阅读原文”)







当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数


另打开一个终端,通过ps命令查看进程号,或者通过nohup启动Java进程拿到进程id


当我们执行kill -12 pid的时候 可以看到关闭业务逻辑


3.问题总结


在部门的实际业务中,消息队列的消息量还是挺大的,某些业务高峰时每秒有几百的消息量,因此对消息的处理要保证速度,避免消息积压,也可以通过负载解决单个订阅节点的压力。


在某些业务场景中,对消息的完整性要求不那么高,那么就不用考虑重启时的一点损耗。反之,就需要好好思考和设计了。




推荐阅读
从零开始学做微信小程序,看这些就够了!

为何我暂停了维护 Python 社区的志愿者工作

15 款顶级开源人工智能工具推荐

那些适合日常使用的开源工具和应用(办公篇)

DB-Engines 11月全球数据库排行出炉,Oracle 霸主地位受威胁

点击“阅读原文”查看更多精彩内容