前言
近几年大热、以ChatGPT为主的自然语言AI让Server-Send Events
服务器端推送(简称SSE)重新出现在大众视野。
当时刚接触的时候误以为“打字机”只是前端的效果,还想着AI生成的速度怎么这么快。
后来无意中看到别人介绍才知道用到了Event-stream
这个东西。由服务器实时响应并返回数据,从用户体验来说确实会比“等待AI完成文字生成然后一次性返回”强上不少。
稍微阅读了一些相关的文档和文章,发现其实Event-stream
也不算很新潮的东西,确实是自己接触少了。其实是个蛮老的技术的,就是以前浏览器支持情况有点堪忧。不过现在现代浏览器基本上都能很好的支持:Can I Use EventSource。
SSE本质上还是一个普通的HTTP请求(走的HTTP协议,Content-Type是text/event-stream
,原理实际上是长轮询),单从功能上看有点像弱化版的WebSocket
:SSE在建立连接后,只支持“由服务端向客户端发送消息”,而Websocket是全双工通信协议,可以双向互相发送信息。
不过SSE相对来说更简单一点,很适合通知推送、AI问答回复这种需要服务端向客户端单方面推送消息,且实时性比较强的场景。
简单实现
在Java技术栈中,其实已经有对响应式编程支持很好的Webflex
,不过不是很熟悉,以后再探究一下。
先从简单且常用的SpringMVC
入手,其中提供了org.springframework.web.servlet.mvc.method.annotation.SseEmitter
这个类的封装,用于SSE的发送。
这个类主要方法有send
、complete
以及类似onError
这种以on
开头的回调监听方法。send
用于发送信息,而complete
表示这次请求已处理完成。
同时,SseEmitter
还提供了一个event
方法,可以用于构造一个符合规范的事件流对象(包括事件ID、事件名称event、消息数据data、注释行comment、重连时间retry)。
在前端HTML5下,则提供了EventSource
作为对SSE的支持。
以下简单模拟了ChatGPT的逐字回复,后端使用普通的SpringBoot version 2.6.13
,前端是一个普通的HTML5静态页面
。
后端的功能就是将一个字符串拆成单个字符,然后分段发送,期间sleep模拟生成耗时,接口API:http://localhost:8080/test1
package com.example.flexdemo.demos.web;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.CompletableFuture;
/**
* @author nyable
*/
@CrossOrigin
@RestController
public class TestController {
@GetMapping(path = "/test1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter test1() {
SseEmitter emitter = new SseEmitter();
// 注册事件监听
eventListener(emitter);
// 异步返回
String text = "故人西辞黄鹤楼,烟花三月下扬州。\n" +
"孤帆远影碧空尽,唯见长江天际流。";
CompletableFuture.runAsync(() -> textGenerator(emitter, text));
return emitter;
}
/**
* 监听事件
*
* @param emitter emitter
*/
private void eventListener(SseEmitter emitter) {
emitter.onCompletion(() -> System.out.println("emitter completed"));
emitter.onError(throwable -> System.out.println("emitter error"));
emitter.onTimeout(() -> System.out.println("emitter timeout"));
}
/**
* 模拟文字一个一个输出
*
* @param emitter emitter
*/
private void textGenerator(SseEmitter emitter, String text) {
int length = text.length();
try {
for (int i = 0; i < length; i++) {
String word = String.valueOf(text.charAt(i));
emitter.send(word);
Thread.sleep(233);
if (i == length - 1) {
emitter.complete();
}
}
} catch (Exception e) {
emitter.completeWithError(e);
e.printStackTrace();
}
}
}
前端提供了一个按钮,点击后会请求后端API建立长连接,然后在onmessage
回调方法中接收后端推送来的默认事件。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SSE</title>
</head>
<body>
<button type="button" onclick="output()">输出</button>
<div id="sse" style="white-space: pre-line"></div>
<script>
function output() {
const source = new EventSource("http://localhost:8080/test1");
console.log("source=>", source);
source.onmessage = function (event) {
console.log("event=>", event);
const text = event.data;
console.log("data=>", text);
const el = document.getElementById("sse");
el.innerText += text;
};
}
</script>
</body>
</html>
点击页面上的按钮后,可以看到屏幕上在逐渐输出文字。控制台的网络栏中出现了一次路径
/test1
的HTTP请求,请求头Accept:text/event-stream
。响应中,每次都会接收到data:故
、data:人
此类的格式数据。不过如果每次只返回一个字符,换行\n
会被当成一个空的data:
解析。等待一首诗文字输出完毕后,会发现又发起了一次请求,返回的内容相同。这是因为
SSE
会自动重试,当后端调用emitter.complete();
方法时,在前端实际上会回调一次EventSource
的onerror
事件,来表示这次请求已经完成。如果没有处理,客户端会自动进行重试,所以就会重复请求。所以要对客户端代码进行一些改进,监听
onerror
事件,而EventSource对象
具有readyState
属性,可以判断请求状态。EventSource
上有几个常量:
EventSource.CONNECTING
,值0
,表示“正在连接中,可能是连接还没建立或已断线”。EventSource.OPEN
,值为1
,表示“已建立连接,此时可以接收数据”。EventSource.CLOSED
,值为2
,表示连接已关闭。
回调onmessage
方法时的状态是EventSource.OPEN
,而后端调用emitter.complete();
方法时的状态是EventSource.CONNECTING
,此时在onerror
中主动关闭连接即可。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SSE</title>
</head>
<body>
<button type="button" onclick="output()">输出</button>
<div id="sse" style="white-space: pre-line"></div>
<script>
function output() {
const source = new EventSource("http://localhost:8080/test1");
console.log("source=>", source);
source.onmessage = (event) => {
console.log("event=>", event);
const text = event.data;
console.log("data=>", text);
const el = document.getElementById("sse");
el.innerText += text;
};
source.onerror = (event) => {
console.log("onerror", source.readyState);
if (source.readyState == EventSource.CONNECTING) {
// 如果只想输出一次,就调用关闭方法关闭连接
source.close();
console.log('closed');
}
};
}
</script>
</body>
</html>
复杂数据
SSE的数据格式,按规范来说,用来传输UTF-8编码的文本数据,格式为字段名:内容
。然后有四种字段可用,分别是:
- id 事件ID。以
id:
开头,如id:2b130ec8-be37-4088-8998-3920a6bf52ba
- event 事件名称,以
event:
开头,如event:ping
,可以在前端通过source.addEventListener('事件名称',(e)=>{})
监听指定的事件名称进行单独处理,此时该事件不会在onmessage
中被处理。 - data 数据内容,以
data:
开头,如data:好
。 - retry重试时间,以
retry:
开头,如retry:5000
然后还有特殊消息,以:
开头,作为注释,如: test
。
package com.example.flexdemo.demos.web;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
* @author nyable
*/
@CrossOrigin
@RestController
public class TestController {
@GetMapping(path = "/test1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter test1() {
SseEmitter emitter = new SseEmitter();
// 注册事件监听
eventListener(emitter);
// 异步返回
String text = "故人西辞黄鹤楼,烟花三月下扬州。\n" +
"孤帆远影碧空尽,唯见长江天际流。";
CompletableFuture.runAsync(() -> textGenerator(emitter, text));
return emitter;
}
/**
* 监听事件
*
* @param emitter emitter
*/
private void eventListener(SseEmitter emitter) {
emitter.onCompletion(() -> System.out.println("emitter completed"));
emitter.onError(throwable -> System.out.println("emitter error"));
emitter.onTimeout(() -> System.out.println("emitter timeout"));
}
/**
* 模拟文字一个一个输出
*
* @param emitter emitter
*/
private void textGenerator(SseEmitter emitter, String text) {
int length = text.length();
String id = UUID.randomUUID().toString();
try {
// 发送一个非默认的自定义事件 "ping"
emitter.send(SseEmitter.event().name("ping").data("ping"));
for (int i = 0; i < length; i++) {
String word = String.valueOf(text.charAt(i));
// emitter.send(word);
// 虽然SseEmitter.event()是建造者模式,但是不要调用build方法,不然返回到前端的是一个Set转的JSON数组字符串,里面有很多没用的信息
emitter.send(SseEmitter.event()
.id(id)
.data(word)
.comment("114514")
);
Thread.sleep(233);
if (i == length - 1) {
emitter.complete();
}
}
} catch (Exception e) {
emitter.completeWithError(e);
e.printStackTrace();
}
}
}
然后在前端对自定义事件进行监听
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SSE</title>
</head>
<body>
<button type="button" onclick="output()">输出</button>
<div id="sse" style="white-space: pre-line"></div>
<script>
function output() {
const source = new EventSource("http://localhost:8080/test1");
console.log("source=>", source);
source.onmessage = (event) => {
console.log("event=>", event);
const text = event.data;
console.log("data=>", text);
const el = document.getElementById("sse");
el.innerText += text;
};
source.onerror = (event) => {
console.log("onerror", source.readyState);
if (source.readyState == EventSource.CONNECTING) {
// 如果只想输出一次,就调用关闭方法关闭连接
source.close();
console.log("closed");
}
};
// 自定义事件ping的回调
source.addEventListener("ping", (event) => {
console.log("pong");
});
}
</script>
</body>
</html>
既然data数据是文本,那么肯定也能传输JSON
之类的数据格式。
HashMap<String, Object> map = new HashMap<>();
map.put("name", "田所浩二");
map.put("age", 114514);
emitter.send(SseEmitter.event().data(map, MediaType.APPLICATION_JSON).id(id));
不过MediaType加不加其实都一样,传到前端的依然是纯文本,还是要手动反序列化成对象。
source.onmessage = (event) => {
const text = event.data;
const data = JSON.parse(text);
console.log(data);
};
本来想去看一下ChatGPT
返回的数据格式是啥样的,但是现在好像改版成WebSocket
发送了。/conversation
这个接口的content-type
变成application/json
,然后返回内容里包括一个wss
的url。
随便看一下通义千问
和智谱清言
的返回内容。
通义千问
传的是JSON格式,每次content
的内容都包含之前的文字,似乎是通过发送一个data: [DONE]
来表示回复结束。
data: {"canFeedback":true,"canRegenerate":true,"canShare":true,"canShow":true,"contentFrom":"text","contentType":"text","contents":[{"content":"回复的内容","contentType":"text","id":"","role":"assistant","status":"finished"}],"msgId":"","msgStatus":"finished","params":{},"parentMsgId":"","sessionId":"","sessionOpen":true,"sessionShare":true,"sessionWarnNew":false,"stopReason":"stop","traceId":""}
data: [DONE]
智谱清言
传的是JSON格式,指定了事件名称,同样也是在text
中包含之前的文字。
event:message
data: {"id": "", "conversation_id": "", "assistant_id": "", "parts": [{"id": "", "logic_id": "", "role": "assistant", "content": [{"type": "text", "text": "回复的内容", "status": "init"}], "model": "chatglm-all-tools", "recipient": "all", "created_at": "2024-03-29 09:51:06", "meta_data": {}, "status": "init"}], "created_at": "2024-03-29 09:51:06", "meta_data": {}, "status": "init", "last_error": {}}
仿写
后端
package com.example.flexdemo.demos.web;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
* @author nyable
*/
@CrossOrigin
@RestController
public class TestController {
@GetMapping(path = "/test1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter test1() {
SseEmitter emitter = new SseEmitter();
// 注册事件监听
eventListener(emitter);
// 异步返回
String text = "故人西辞黄鹤楼,烟花三月下扬州。\n" +
"孤帆远影碧空尽,唯见长江天际流。";
CompletableFuture.runAsync(() -> textGenerator(emitter, text));
return emitter;
}
/**
* 监听事件
*
* @param emitter emitter
*/
private void eventListener(SseEmitter emitter) {
emitter.onCompletion(() -> System.out.println("emitter completed"));
emitter.onError(throwable -> System.out.println("emitter error"));
emitter.onTimeout(() -> System.out.println("emitter timeout"));
}
/**
* 模拟文字一个一个输出
*
* @param emitter emitter
*/
private void textGenerator(SseEmitter emitter, String text) {
int length = text.length();
String id = UUID.randomUUID().toString();
try {
// 发送一个非默认的自定义事件 "ping"
emitter.send(SseEmitter.event().name("ping").data("ping"));
StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++) {
StringBuilder content = sb.append(text.charAt(i));
// 虽然SseEmitter.event()是建造者模式,但是不要调用build方法,不然返回到前端的是一个Set转的JSON数组字符串,里面有很多没用的信息
HashMap<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("content", content);
map.put("createTime", System.currentTimeMillis());
emitter.send(SseEmitter.event().data(map));
Thread.sleep(233);
if (i == length - 1) {
emitter.complete();
}
}
} catch (Exception e) {
emitter.completeWithError(e);
e.printStackTrace();
}
}
}
前端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SSE</title>
</head>
<body>
<button type="button" onclick="output()">输出</button>
<div id="sse" style="white-space: pre-line"></div>
<script>
function output() {
const source = new EventSource("http://localhost:8080/test1");
console.log("source=>", source);
source.onmessage = (event) => {
const text = event.data;
const data = JSON.parse(text);
console.log(data);
const el = document.getElementById("sse");
el.innerText = data.content;
};
source.onerror = (event) => {
console.log("onerror", source.readyState);
if (source.readyState == EventSource.CONNECTING) {
// 如果只想输出一次,就调用关闭方法关闭连接
source.close();
console.log("closed");
}
};
// 自定义事件ping的回调
source.addEventListener("ping", (event) => {
console.log("pong");
});
}
</script>
</body>
</html>
一些要注意的地方
超时时间
SseEmitter
其实是可以设置超时时间的,默认取底层配置的值(一般是30秒)。超时后会抛出一个java.lang.IllegalStateException: ResponseBodyEmitter has already completed
,并且调用onTimeout
(如果有的话)。然而很多时候我们并不知道到底要多久响应才能完成,如果按默认时间,文本太长就很容易超时。所以可以通过构造函数设置一个超时时间 new SseEmitter(0L);
,0为永不超时。
关闭连接
在我们刷新网页(断开连接)、后端主动调用complete();
方法、前端调用close()
方法后,SseEmitter
的父类ResponseBodyEmitter
中的字段complete
会被设置为true
。如果此时再调用send()
等方法,同样会抛出java.lang.IllegalStateException: ResponseBodyEmitter has already completed
异常,已经完成或关闭的事件流无法再打开复用。
缓存连接
如果使用场景是需要长时间保持连接,而并非像ChatGPT
这样一次问答后就关闭,可以用一个Map
来缓存SseEmitter
,以客户端唯一标识或者用户ID之类的标识作为key进行缓存,使用供相同连接的场景复用。然后在onError
、onCompletion
中移除掉对应的key。不过目前好像没啥特别需要的场景,大概服务器检测在线客户端需要?
复杂请求参数
HTML5中的EventSource
只能发出GET请求
且支持的参数只有url
和withCredentials
,如果想要传递简单参数倒是还好,可以直接丢到url的路径上,但是如果想要传复杂参数就比较麻烦了。因为没法设置Request Body
和Request headers
,并且url
也有长度限制。可以用Azure/fetch-event-source 这个 SSE增强库,用法和API比原生的EventSource
也更加简单。
import { fetchEventSource } from '@microsoft/fetch-event-source'
fetchEventSource('http://localhost:8080/test1', {
method: 'post',
headers: {
'Content-Type': 'application/json'
},
// 这里要JSON.stringify一下,不能直接丢JSON对象。后端正常用@RequestBody接收就行。
body: JSON.stringify({
text: '测试数据',
foo: 'bar',
id: 114514
}),
onmessage (event) {
console.log(event.data)
},
onclose (event) {
console.log('close=>', event)
},
onerror (event) {
console.log('error=>', event)
},
onopen (event) {
console.log('open=>', event)
}
})
还想到一种曲线救国的方式,就是先正常发一个POST请求到后端,这个请求的Request Body
里有复杂参数,然后生成一个唯一请求ID,与参数对象对应缓存起来最好弄个过期时间,然后返回给前端这个ID,后续用EventSource
的时候把ID丢路径上,然后后端再去取对应缓存的参数,取完清空缓存...感觉有点多此一举,不如直接用Azure/fetch-event-source ,不过可能有些场景用不了这个库又想用EventSource
的话大概需要这么绕一下。
相关链接
stackoverflow - WebSockets vs. Server-Sent events/EventSource