SpringBoot中事件流Event-stream(Server-Send Events)的用法

前言

近几年大热、以ChatGPT为主的自然语言AI让Server-Send Events服务器端推送(简称SSE)重新出现在大众视野。
当时刚接触的时候误以为“打字机”只是前端的效果,还想着AI生成的速度怎么这么快。
后来无意中看到别人介绍才知道用到了Event-stream这个东西。由服务器实时响应并返回数据,从用户体验来说确实会比“等待AI完成文字生成然后一次性返回”强上不少。
稍微阅读了一些相关的文档和文章,发现其实Event-stream也不算很新潮的东西,确实是自己接触少了。其实是个蛮老的技术的,就是以前浏览器支持情况有点堪忧。不过现在现代浏览器基本上都能很好的支持:Can I Use EventSource
SSE本质上还是一个普通的HTTP请求(走的HTTP协议,Content-Type是text/event-stream,原理实际上是长轮询),单从功能上看有点像弱化版的WebSocket:SSE在建立连接后,只支持“由服务端向客户端发送消息”,而Websocket是全双工通信协议,可以双向互相发送信息。
不过SSE相对来说更简单一点,很适合通知推送、AI问答回复这种需要服务端向客户端单方面推送消息,且实时性比较强的场景。

简单实现

在Java技术栈中,其实已经有对响应式编程支持很好的Webflex,不过不是很熟悉,以后再探究一下。
先从简单且常用的SpringMVC入手,其中提供了org.springframework.web.servlet.mvc.method.annotation.SseEmitter这个类的封装,用于SSE的发送。
这个类主要方法有sendcomplete以及类似onError这种以on开头的回调监听方法。send用于发送信息,而complete表示这次请求已处理完成。
同时,SseEmitter还提供了一个event方法,可以用于构造一个符合规范的事件流对象(包括事件ID、事件名称event、消息数据data、注释行comment、重连时间retry)。
在前端HTML5下,则提供了EventSource作为对SSE的支持。
以下简单模拟了ChatGPT的逐字回复,后端使用普通的SpringBoot version 2.6.13,前端是一个普通的HTML5静态页面
后端的功能就是将一个字符串拆成单个字符,然后分段发送,期间sleep模拟生成耗时,接口API:http://localhost:8080/test1

package com.example.flexdemo.demos.web;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.concurrent.CompletableFuture;

/**
 * @author nyable
 */
@CrossOrigin
@RestController
public class TestController {

    @GetMapping(path = "/test1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter test1() {
        SseEmitter emitter = new SseEmitter();
        // 注册事件监听
        eventListener(emitter);
        // 异步返回
        String text = "故人西辞黄鹤楼,烟花三月下扬州。\n" +
                "孤帆远影碧空尽,唯见长江天际流。";
        CompletableFuture.runAsync(() -> textGenerator(emitter, text));
        return emitter;
    }

    /**
     * 监听事件
     *
     * @param emitter emitter
     */
    private void eventListener(SseEmitter emitter) {
        emitter.onCompletion(() -> System.out.println("emitter completed"));
        emitter.onError(throwable -> System.out.println("emitter error"));
        emitter.onTimeout(() -> System.out.println("emitter timeout"));
    }

    /**
     * 模拟文字一个一个输出
     *
     * @param emitter emitter
     */
    private void textGenerator(SseEmitter emitter, String text) {
        int length = text.length();
        try {
            for (int i = 0; i < length; i++) {
                String word = String.valueOf(text.charAt(i));
                emitter.send(word);
                Thread.sleep(233);
                if (i == length - 1) {
                    emitter.complete();
                }
            }
        } catch (Exception e) {
            emitter.completeWithError(e);
            e.printStackTrace();
        }
    }
}

前端提供了一个按钮,点击后会请求后端API建立长连接,然后在onmessage回调方法中接收后端推送来的默认事件。

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>SSE</title>
  </head>
  <body>
    <button type="button" onclick="output()">输出</button>
    <div id="sse" style="white-space: pre-line"></div>
    <script>
      function output() {
        const source = new EventSource("http://localhost:8080/test1");
        console.log("source=>", source);
        source.onmessage = function (event) {
          console.log("event=>", event);
          const text = event.data;
          console.log("data=>", text);
          const el = document.getElementById("sse");
          el.innerText += text;
        };
      }
    </script>
  </body>
</html>

输出文字

点击页面上的按钮后,可以看到屏幕上在逐渐输出文字。控制台的网络栏中出现了一次路径/test1的HTTP请求,请求头Accept:text/event-stream。响应中,每次都会接收到data:故data:人此类的格式数据。不过如果每次只返回一个字符,换行\n会被当成一个空的data:解析。
等待一首诗文字输出完毕后,会发现又发起了一次请求,返回的内容相同。这是因为SSE会自动重试,当后端调用emitter.complete();方法时,在前端实际上会回调一次EventSourceonerror事件,来表示这次请求已经完成。如果没有处理,客户端会自动进行重试,所以就会重复请求。
所以要对客户端代码进行一些改进,监听onerror事件,而EventSource对象具有readyState属性,可以判断请求状态。EventSource上有几个常量:

  1. EventSource.CONNECTING,值0,表示“正在连接中,可能是连接还没建立或已断线”。
  2. EventSource.OPEN,值为1,表示“已建立连接,此时可以接收数据”。
  3. EventSource.CLOSED,值为2,表示连接已关闭。

回调onmessage方法时的状态是EventSource.OPEN,而后端调用emitter.complete();方法时的状态是EventSource.CONNECTING,此时在onerror中主动关闭连接即可。

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>SSE</title>
  </head>
  <body>
    <button type="button" onclick="output()">输出</button>
    <div id="sse" style="white-space: pre-line"></div>
    <script>
      function output() {
        const source = new EventSource("http://localhost:8080/test1");
        console.log("source=>", source);
        source.onmessage = (event) => {
          console.log("event=>", event);
          const text = event.data;
          console.log("data=>", text);
          const el = document.getElementById("sse");
          el.innerText += text;
        };
        source.onerror = (event) => {
          console.log("onerror", source.readyState);
          if (source.readyState == EventSource.CONNECTING) {
            // 如果只想输出一次,就调用关闭方法关闭连接
            source.close();
            console.log('closed');
          }
        };
      }
    </script>
  </body>
</html>

复杂数据

SSE的数据格式,按规范来说,用来传输UTF-8编码的文本数据,格式为字段名:内容。然后有四种字段可用,分别是:

  1. id 事件ID。以id:开头,如id:2b130ec8-be37-4088-8998-3920a6bf52ba
  2. event 事件名称,以event:开头,如event:ping,可以在前端通过source.addEventListener('事件名称',(e)=>{})监听指定的事件名称进行单独处理,此时该事件不会在onmessage中被处理。
  3. data 数据内容,以data:开头,如data:好
  4. retry重试时间,以retry:开头,如retry:5000
    然后还有特殊消息,以:开头,作为注释,如: test
package com.example.flexdemo.demos.web;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
 * @author nyable
 */
@CrossOrigin
@RestController
public class TestController {

    @GetMapping(path = "/test1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter test1() {
        SseEmitter emitter = new SseEmitter();
        // 注册事件监听
        eventListener(emitter);
        // 异步返回
        String text = "故人西辞黄鹤楼,烟花三月下扬州。\n" +
                "孤帆远影碧空尽,唯见长江天际流。";
        CompletableFuture.runAsync(() -> textGenerator(emitter, text));
        return emitter;
    }

    /**
     * 监听事件
     *
     * @param emitter emitter
     */
    private void eventListener(SseEmitter emitter) {
        emitter.onCompletion(() -> System.out.println("emitter completed"));
        emitter.onError(throwable -> System.out.println("emitter error"));
        emitter.onTimeout(() -> System.out.println("emitter timeout"));
    }

    /**
     * 模拟文字一个一个输出
     *
     * @param emitter emitter
     */
    private void textGenerator(SseEmitter emitter, String text) {

        int length = text.length();
        String id = UUID.randomUUID().toString();
        try {
            // 发送一个非默认的自定义事件 "ping"
            emitter.send(SseEmitter.event().name("ping").data("ping"));
            for (int i = 0; i < length; i++) {
                String word = String.valueOf(text.charAt(i));
                // emitter.send(word);
                // 虽然SseEmitter.event()是建造者模式,但是不要调用build方法,不然返回到前端的是一个Set转的JSON数组字符串,里面有很多没用的信息
                emitter.send(SseEmitter.event()
                        .id(id)
                        .data(word)
                        .comment("114514")
                );
                Thread.sleep(233);
                if (i == length - 1) {
                    emitter.complete();
                }
            }
        } catch (Exception e) {
            emitter.completeWithError(e);
            e.printStackTrace();
        }
    }
}

然后在前端对自定义事件进行监听

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>SSE</title>
  </head>
  <body>
    <button type="button" onclick="output()">输出</button>
    <div id="sse" style="white-space: pre-line"></div>
    <script>
      function output() {
        const source = new EventSource("http://localhost:8080/test1");
        console.log("source=>", source);

        source.onmessage = (event) => {
          console.log("event=>", event);
          const text = event.data;
          console.log("data=>", text);
          const el = document.getElementById("sse");
          el.innerText += text;
        };

        source.onerror = (event) => {
          console.log("onerror", source.readyState);
          if (source.readyState == EventSource.CONNECTING) {
            // 如果只想输出一次,就调用关闭方法关闭连接
            source.close();
            console.log("closed");
          }
        };
        // 自定义事件ping的回调
        source.addEventListener("ping", (event) => {
          console.log("pong");
        });
      }
    </script>
  </body>
</html>

既然data数据是文本,那么肯定也能传输JSON之类的数据格式。

  HashMap<String, Object> map = new HashMap<>();
                map.put("name", "田所浩二");
                map.put("age", 114514);
                emitter.send(SseEmitter.event().data(map, MediaType.APPLICATION_JSON).id(id));

不过MediaType加不加其实都一样,传到前端的依然是纯文本,还是要手动反序列化成对象。

        source.onmessage = (event) => {
          const text = event.data;
          const data = JSON.parse(text);
          console.log(data);
        };

本来想去看一下ChatGPT返回的数据格式是啥样的,但是现在好像改版成WebSocket发送了。/conversation这个接口的content-type变成application/json,然后返回内容里包括一个wss的url。
随便看一下通义千问智谱清言的返回内容。

通义千问

传的是JSON格式,每次content的内容都包含之前的文字,似乎是通过发送一个data: [DONE]来表示回复结束。

data: {"canFeedback":true,"canRegenerate":true,"canShare":true,"canShow":true,"contentFrom":"text","contentType":"text","contents":[{"content":"回复的内容","contentType":"text","id":"","role":"assistant","status":"finished"}],"msgId":"","msgStatus":"finished","params":{},"parentMsgId":"","sessionId":"","sessionOpen":true,"sessionShare":true,"sessionWarnNew":false,"stopReason":"stop","traceId":""}

data: [DONE]

智谱清言

传的是JSON格式,指定了事件名称,同样也是在text中包含之前的文字。

event:message
data: {"id": "", "conversation_id": "", "assistant_id": "", "parts": [{"id": "", "logic_id": "", "role": "assistant", "content": [{"type": "text", "text": "回复的内容", "status": "init"}], "model": "chatglm-all-tools", "recipient": "all", "created_at": "2024-03-29 09:51:06", "meta_data": {}, "status": "init"}], "created_at": "2024-03-29 09:51:06", "meta_data": {}, "status": "init", "last_error": {}}

仿写

后端

package com.example.flexdemo.demos.web;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
 * @author nyable
 */
@CrossOrigin
@RestController
public class TestController {

    @GetMapping(path = "/test1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter test1() {
        SseEmitter emitter = new SseEmitter();
        // 注册事件监听
        eventListener(emitter);
        // 异步返回
        String text = "故人西辞黄鹤楼,烟花三月下扬州。\n" +
                "孤帆远影碧空尽,唯见长江天际流。";
        CompletableFuture.runAsync(() -> textGenerator(emitter, text));
        return emitter;
    }

    /**
     * 监听事件
     *
     * @param emitter emitter
     */
    private void eventListener(SseEmitter emitter) {
        emitter.onCompletion(() -> System.out.println("emitter completed"));
        emitter.onError(throwable -> System.out.println("emitter error"));
        emitter.onTimeout(() -> System.out.println("emitter timeout"));
    }

    /**
     * 模拟文字一个一个输出
     *
     * @param emitter emitter
     */
    private void textGenerator(SseEmitter emitter, String text) {

        int length = text.length();
        String id = UUID.randomUUID().toString();
        try {
            // 发送一个非默认的自定义事件 "ping"
            emitter.send(SseEmitter.event().name("ping").data("ping"));
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < length; i++) {
                StringBuilder content = sb.append(text.charAt(i));
                // 虽然SseEmitter.event()是建造者模式,但是不要调用build方法,不然返回到前端的是一个Set转的JSON数组字符串,里面有很多没用的信息
                HashMap<String, Object> map = new HashMap<>();
                map.put("id", id);
                map.put("content", content);
                map.put("createTime", System.currentTimeMillis());
                emitter.send(SseEmitter.event().data(map));
                Thread.sleep(233);
                if (i == length - 1) {
                    emitter.complete();
                }
            }
        } catch (Exception e) {
            emitter.completeWithError(e);
            e.printStackTrace();
        }
    }
}

前端

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>SSE</title>
  </head>
  <body>
    <button type="button" onclick="output()">输出</button>
    <div id="sse" style="white-space: pre-line"></div>
    <script>
      function output() {
        const source = new EventSource("http://localhost:8080/test1");
        console.log("source=>", source);

        source.onmessage = (event) => {
          const text = event.data;
          const data = JSON.parse(text);
          console.log(data);
          const el = document.getElementById("sse");
          el.innerText = data.content;
        };

        source.onerror = (event) => {
          console.log("onerror", source.readyState);
          if (source.readyState == EventSource.CONNECTING) {
            // 如果只想输出一次,就调用关闭方法关闭连接
            source.close();
            console.log("closed");
          }
        };
        // 自定义事件ping的回调
        source.addEventListener("ping", (event) => {
          console.log("pong");
        });
      }
    </script>
  </body>
</html>

一些要注意的地方

超时时间

SseEmitter其实是可以设置超时时间的,默认取底层配置的值(一般是30秒)。超时后会抛出一个java.lang.IllegalStateException: ResponseBodyEmitter has already completed,并且调用onTimeout(如果有的话)。然而很多时候我们并不知道到底要多久响应才能完成,如果按默认时间,文本太长就很容易超时。所以可以通过构造函数设置一个超时时间 new SseEmitter(0L);,0为永不超时。

关闭连接

在我们刷新网页(断开连接)、后端主动调用complete();方法、前端调用close()方法后,SseEmitter的父类ResponseBodyEmitter中的字段complete会被设置为true。如果此时再调用send()等方法,同样会抛出java.lang.IllegalStateException: ResponseBodyEmitter has already completed异常,已经完成或关闭的事件流无法再打开复用。

缓存连接

如果使用场景是需要长时间保持连接,而并非像ChatGPT这样一次问答后就关闭,可以用一个Map来缓存SseEmitter,以客户端唯一标识或者用户ID之类的标识作为key进行缓存,使用供相同连接的场景复用。然后在onErroronCompletion中移除掉对应的key。不过目前好像没啥特别需要的场景,大概服务器检测在线客户端需要?

复杂请求参数

HTML5中的EventSource只能发出GET请求且支持的参数只有urlwithCredentials,如果想要传递简单参数倒是还好,可以直接丢到url的路径上,但是如果想要传复杂参数就比较麻烦了。因为没法设置Request BodyRequest headers,并且url也有长度限制。可以用Azure/fetch-event-source 这个 SSE增强库,用法和API比原生的EventSource也更加简单。

import { fetchEventSource } from '@microsoft/fetch-event-source'
fetchEventSource('http://localhost:8080/test1', {
  method: 'post',
  headers: {
    'Content-Type': 'application/json'
  },
  // 这里要JSON.stringify一下,不能直接丢JSON对象。后端正常用@RequestBody接收就行。
  body: JSON.stringify({
    text: '测试数据',
    foo: 'bar',
    id: 114514
  }),
  onmessage (event) {
    console.log(event.data)
  },
  onclose (event) {
    console.log('close=>', event)
  },
  onerror (event) {
    console.log('error=>', event)
  },
  onopen (event) {
    console.log('open=>', event)
  }
})

还想到一种曲线救国的方式,就是先正常发一个POST请求到后端,这个请求的Request Body里有复杂参数,然后生成一个唯一请求ID,与参数对象对应缓存起来最好弄个过期时间,然后返回给前端这个ID,后续用EventSource的时候把ID丢路径上,然后后端再去取对应缓存的参数,取完清空缓存...感觉有点多此一举,不如直接用Azure/fetch-event-source ,不过可能有些场景用不了这个库又想用EventSource的话大概需要这么绕一下。

相关链接

MDN EventSource

阮一峰 - Server-Sent Events 教程

stackoverflow - WebSockets vs. Server-Sent events/EventSource

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇