专栏名称: 芋道源码
纯 Java 源码分享公众号,目前有「Dubbo」「SpringCloud」「Java 并发」「RocketMQ」「Sharding-JDBC」「MyCAT」「Elastic-Job」「SkyWalking」「Spring」等等
目录
相关文章推荐
Java知音  ·  HashMap 一边循环一边删除,上线翻车了~~ ·  2 天前  
芋道源码  ·  微服务设计、拆分原则 ·  2 天前  
Java编程精选  ·  突发!GitLab将停止对中国区用户提供Gi ... ·  1 周前  
芋道源码  ·  实战!Arthas 定位 ... ·  4 天前  
芋道源码  ·  丝滑的打包部署,一套带走 ·  4 天前  
51好读  ›  专栏  ›  芋道源码

Spring 实现 3 种异步流式接口,干掉接口超时烦恼

芋道源码  · 公众号  · Java  · 2025-01-01 11:11

主要观点总结

本文介绍了Spring框架中用于处理异步流式接口的工具,包括ResponseBodyEmitter、SseEmitter和StreamingResponseBody的使用及其应用场景。

关键观点总结

关键观点1: 介绍Spring框架中的三种异步流式接口工具

本文详细介绍了Spring框架中的ResponseBodyEmitter、SseEmitter和StreamingResponseBody三种工具,这些工具用于处理比较耗时的接口,提高系统的性能和响应能力。

关键观点2: ResponseBodyEmitter的使用场景

ResponseBodyEmitter适用于要动态生成内容并逐步发送给客户端的场景,例如文件上传进度、实时日志等。通过创建ResponseBodyEmitter发送器对象,模拟耗时操作逐步调用send方法发送消息。

关键观点3: SseEmitter的使用场景

SseEmitter是ResponseBodyEmitter的一个子类,用于实现服务器向客户端推送实时数据,如实时消息推送、状态更新等场景。SSE在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息。

关键观点4: StreamingResponseBody的使用场景

StreamingResponseBody主要用于处理大数据量或持续数据流的传输,支持将数据直接写入OutputStream。适用于需要下载超大文件时,避免将文件数据一次性加载到内存中,而是持续不断的把文件流发送给客户端。


正文

👉 这是一个或许对你有用的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料: 

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号、CRM 等等功能:

  • Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本 

来源:程序员小富


如何处理比较耗时的接口?

这题我熟,直接上异步接口,使用 CallableWebAsyncTaskDeferredResultCompletableFuture等均可实现。

但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地向客户端响应处理结果,这些方法就不够看了。

Spring 框架提供了多种工具支持异步流式接口,如 ResponseBodyEmitterSseEmitterStreamingResponseBody。这些工具的用法简单,接口中直接返回相应的对象或泛型响应实体 ResponseEntity,如此这些接口就是异步的,且执行耗时操作亦不会阻塞 Servlet 的请求线程,不影响系统的响应能力。

下面将逐一介绍每个工具的使用及其应用场景。

ResponseBodyEmitter

ResponseBodyEmitter适用于要动态生成内容并逐步发送给客户端的场景,例如:文件上传进度、实时日志等,可以在任务执行过程中逐步向客户端发送更新。

举个例子,经常用GPT你会发现当你提问后,得到的答案并不是一次性响应呈现的,而是逐步动态显示。这样做的好处是,让你感觉它在认真思考,交互体验比直接返回完整答案更为生动和自然。

使用ResponseBodyEmitter来实现下这个效果,创建 ResponseBodyEmitter 发送器对象,模拟耗时操作逐步调用 send 方法发送消息。

注意:ResponseBodyEmitter 的超时时间,如果设置为 0-1,则表示连接不会超时;如果不设置,到达默认的超时时间后连接会自动断开。其他两种工具也是同样的用法,后边不在赘述了

@GetMapping("/bodyEmitter")
public ResponseBodyEmitter handle() {
    // 创建一个ResponseBodyEmitter,-1代表不超时
    ResponseBodyEmitter emitter = new ResponseBodyEmitter(-1L);
    // 异步执行耗时操作
    CompletableFuture.runAsync(() -> {
        try {
            // 模拟耗时操作
            for (int i = 0; i 10000; i++) {
                System.out.println("bodyEmitter " + i);
                // 发送数据
                emitter.send("bodyEmitter " + i + " @ " + new Date() + "\n");
                Thread.sleep(2000);
            }
            // 完成
            emitter.complete();
        } catch (Exception e) {
            // 发生异常时结束接口
            emitter.completeWithError(e);
        }
    });
    return emitter;
}

实现代码非常简单。通过模拟每2秒响应一次结果,请求接口时可以看到页面数据在动态生成。效果与 GPT 回答基本一致。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

SseEmitter

SseEmitterResponseBodyEmitter 的一个子类,它同样能够实现动态内容生成,不过主要将它用在服务器向客户端 推送实时数据,如实时消息推送、状态更新等场景。

SSE在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息,在有数据变更时从服务器流式传输到客户端。

整体的实现思路有点类似于在线视频播放,视频流会连续不断的推送到浏览器,你也可以理解成,客户端在完成一次用时很长(网络不畅)的下载。

客户端JS实现,通过一次 HTTP 请求建立连接后,等待接收消息。此时,服务端为每个连接创建一个 SseEmitter 对象,通过这个通道向客户端发送消息。

<body>
<div id="content" style="text-align: center;">
    <h1>SSE 接收服务端事件消息数据h1>
    <div id="message">等待连接...div>
div>
<script>
    let source = null;
    let userId = 7777

    function setMessageInnerHTML(message{
        const messageDiv = document.getElementById("message");
        const newParagraph = document.createElement("p");
        newParagraph.textContent = message;
        messageDiv.appendChild(newParagraph);
    }

    if (window.EventSource) {
        // 建立连接
        source = new EventSource('http://127.0.0.1:9033/subSseEmitter/'+userId);
        setMessageInnerHTML("连接用户=" + userId);
        /**
         * 连接一旦建立,就会触发open事件
         * 另一种写法:source.onopen = function (event) {}
         */

        source.addEventListener('open'function (e{
            setMessageInnerHTML("建立连接。。。");
        }, false);
        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */

        source.addEventListener('message'function (e{
            setMessageInnerHTML(e.data);
        });
    } else {
        setMessageInnerHTML("你的浏览器不支持SSE");
    }
script>
body>

在服务端,我们将 SseEmitter 发送器对象进行持久化,以便在消息产生时直接取出对应的 SseEmitter 发送器,并调用 send 方法进行推送。

private static final Map EMITTER_MAP = new ConcurrentHashMap<>();

@GetMapping("/subSseEmitter/{userId}")
public SseEmitter sseEmitter(@PathVariable String userId) {
    log.info("sseEmitter: {}", userId);
    SseEmitter emitterTmp = new SseEmitter(-1L);
    EMITTER_MAP.put(userId, emitterTmp);
    CompletableFuture.runAsync(() -> {
        try {
            SseEmitter.SseEventBuilder event = SseEmitter.event()
                    .data("sseEmitter" + userId + " @ " + LocalTime.now())
                    .id(String.valueOf(userId))
                    .name("sseEmitter");
            emitterTmp.send(event);
        } catch (Exception ex) {
            emitterTmp.completeWithError(ex);
        }
    });
    return emitterTmp;
}

@GetMapping("/sendSseMsg/{userId}")
public void sseEmitter(@PathVariable String userId, String msg) throws IOException {
    SseEmitter sseEmitter = EMITTER_MAP.get(userId);
    if (sseEmitter == null) {
        return;
    }
    sseEmitter.send(msg);
}

接下来向 userId=7777 的用户发送消息,127.0.0.1:9033/sendSseMsg/7777?msg=欢迎关注-->程序员小富,该消息可以在页面上实时展示。

而且SSE有一点比较好,客户端与服务端一旦建立连接,即便服务端发生重启,也可以做到自动重连

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

StreamingResponseBody

StreamingResponseBody 与其他响应处理方式略有不同,主要用于处理大数据量或持续数据流的传输,支持将数据直接写入OutputStream

例如,当我们需要下载一个超大文件时,使用 StreamingResponseBody 可以避免将文件数据一次性加载到内存中,而是持续不断的把文件流发送给客户端,从而解决下载大文件时常见的内存溢出问题。

接口实现直接返回 StreamingResponseBody 对象,将数据写入输出流并刷新,调用一次flush就会向客户端写入一次数据。

@GetMapping("/streamingResponse")
public ResponseEntity handleRbe() {

    StreamingResponseBody stream = out -> {
        String message = "streamingResponse";
        for (int i = 0; i 1000; i++) {
            try {
                out.write(((message + i) + "\r\n").getBytes());
                out.write("\r\n".getBytes());
                //调用一次flush就会像前端写入一次数据
                out.flush();
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    return ResponseEntity.ok().contentType(MediaType.TEXT_HTML).body(stream);
}

demo这里输出的是简单的文本流,如果是下载文件那么转换成文件流效果是一样的。

总结

这篇介绍三种实现异步流式接口的工具,算是 Spring 知识点的扫盲。使用起来比较简单,没有什么难点,但它们在实际业务中的应用场景还是很多的,通过这些工具,可以有效提高系统的性能和响应能力。

文中 Demo Github 地址:https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot101/通用功能/springboot-streaming


欢迎加入我的知识星球,全面提升技术能力。

👉 加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)