Springboot集成 SSE向前端推送消息
SSE介绍
sse(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件
我们常见的 http交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式。
特点分析
SSE 最大的特点,可以简单规划为两个
- 长连接
- 服务端可以向客户端推送信息
了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别
sse 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道那么为什么有了 webscoket 还要搞出一个 sse 呢?既然存在,必然有着它的优越之处。
sse | websocket |
---|---|
http 协议 | 独立的 websocket 协议 |
轻量,使用简单 | 相对复杂 |
默认支持断线重连 | 需要自己实现断线重连 |
文本传输 | 二进制传输 |
支持自定义发送的消息类型 | - |
应用场景
从 sse 的特点出发,我们可以大致的判断出它的应用场景,需要轮询获取服务端最新数据的 case 下,多半是可以用它的比如显示当前网站在线的实时人数,法币汇率显示当前实时汇率,电商大促的实时成交额等等。
项目结构
使用到的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.25</version>
</dependency>
消息实体
package com.zytops.ssedemo.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageVo {
/**
* 客户端ID
*/
private String clientId;
/**
* 传输的内容
*/
private String data;
}
接口
package com.zytops.ssedemo.service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
public interface SseEmitterService {
/**
* 创建连接
* @param clientId
* @return
*/
SseEmitter createConnect(String clientId);
/**
* 根据客户端ID,获取SseEmitter对象
* @param clientId
* @return
*/
SseEmitter getSseEmitterByClientId(String clientId);
/**
* 给所有的客户端发送消息
* @param content
*/
void sendMessageToAllClient(String content);
/**
* 给指定的客户端发送消息
* @param clientId
* @param content
*/
void sendMessageToClient(String clientId,String content);
/**
* 关闭连接
* @param clientId
*/
void closeConnect(String clientId);
}
实现接口
package com.zytops.ssedemo.service.impl;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpStatus;
import com.zytops.ssedemo.entity.MessageVo;
import com.zytops.ssedemo.service.SseEmitterService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Slf4j
@Service
public class SseEmitterSericeImpl implements SseEmitterService {
/**
* 容器,保存连接,用于输出返回 ;可使用其他方法实现
*/
private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@Override
public SseEmitter createConnect(String clientId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
if (StrUtil.isBlank(clientId)) {
clientId = IdUtil.simpleUUID();
}
// 长链接完成后回调接口(即关闭连接时调用)
sseEmitter.onCompletion(completionCallBack(clientId));
// 连接超时回调
sseEmitter.onTimeout(timeoutCallBack(clientId));
// 推送消息异常时调用
sseEmitter.onError(errorCallBack(clientId));
sseCache.put(clientId, sseEmitter);
log.info("创建新的sse连接,当前用户:{} 累计用户:{}", clientId, sseCache.size());
try {
sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_CREATED)).data(clientId, MediaType.APPLICATION_JSON));
} catch (IOException e) {
log.error("创建长链接异常,客户端ID:{} 异常信息:{}", clientId, e.getMessage());
}
return sseEmitter;
}
/**
* 推送消息异常时,回调方法
*
* @param clientId
* @return
*/
private Consumer<Throwable> errorCallBack(String clientId) {
return throwable -> {
log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000 * 5L);
SseEmitter sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
continue;
}
sseEmitter.send("失败后重新推送");
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
/**
* 长连接超时时调用
*
* @param clientId
* @return
*/
private Runnable timeoutCallBack(String clientId) {
return () -> {
log.info("连接超时:{}", clientId);
removeUser(clientId);
};
}
/**
* 长连接完成后回调接口(即关闭连接时调用)
*
* @param clientId
* @return
*/
private Runnable completionCallBack(String clientId) {
return () -> {
log.info("结束连接:{}", clientId);
removeUser(clientId);
};
}
private void removeUser(String clientId) {
sseCache.remove(clientId);
log.info("移除用户:{}", clientId);
}
@Override
public SseEmitter getSseEmitterByClientId(String clientId) {
return sseCache.get(clientId);
}
@Override
public void sendMessageToAllClient(String content) {
if (MapUtil.isEmpty(sseCache)) {
return;
}
// 判断发送的消息是否为空
for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
MessageVo messageVo = new MessageVo();
messageVo.setClientId(entry.getKey());
messageVo.setData(content);
sendMessageToClientByClientId(entry.getKey(), messageVo, entry.getValue());
}
}
private void sendMessageToClientByClientId(String clientId, MessageVo messageVo, SseEmitter sseEmitter) {
if (null == sseEmitter) {
log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}",
clientId, messageVo.toString());
return;
}
SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_OK)).data(messageVo, MediaType.APPLICATION_JSON);
try {
sseEmitter.send(sendData);
} catch (IOException e) {
// 推送消息失败,记录错误日志,进行重推
log.error("推送消息失败:{},尝试进行重推", messageVo.toString());
boolean isSuccess = true;
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000 * 5L);
sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
log.error("{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
continue;
}
sseEmitter.send(sendData);
} catch (Exception ex) {
log.error("{}的第{}次消息重推失败", clientId, i + 1, ex);
continue;
}
log.info("{}的第{}次消息重推成功,{}", clientId, i + 1, messageVo.toString());
return;
}
}
}
@Override
public void sendMessageToClient(String clientId, String content) {
MessageVo messageVo = new MessageVo(clientId,content);
sendMessageToClientByClientId(clientId, messageVo, sseCache.get(clientId));
}
@Override
public void closeConnect(String clientId) {
SseEmitter sseEmitter = sseCache.get(clientId);
if (null != sseEmitter) {
sseEmitter.complete();
removeUser(clientId);
}
}
}
控制器类
package com.zytops.ssedemo.controller;
import com.zytops.ssedemo.entity.MessageVo;
import com.zytops.ssedemo.service.SseEmitterService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
@RequestMapping("/sse")
public class SseEmitterController {
@Resource
private SseEmitterService sseEmitterService;
@CrossOrigin
@GetMapping("/createConnect")
public SseEmitter createConnect(String clientId) {
return sseEmitterService.createConnect(clientId);
}
@CrossOrigin
@PostMapping("/broadcast")
public void sendMessageToAllClient(@RequestBody(required = false) String msg) {
sseEmitterService.sendMessageToAllClient(msg);
}
@CrossOrigin
@PostMapping("/sendMessage")
public void sendMessageToOneClient(@RequestBody(required = false) MessageVo messageVo) {
if (messageVo.getClientId().isEmpty()) {
return;
}
sseEmitterService.sendMessageToClient(messageVo.getClientId(), messageVo.getData());
}
@CrossOrigin
@GetMapping("/closeConnect")
public void closeConnect(@RequestParam(required = true) String clientId) {
sseEmitterService.closeConnect(clientId);
}
}
前端代码
const source = new EventSource('http://127.0.0.1:8081/sse/createConnect?clientId=ddz');
source.onopen = function () {
console.log('开始执行同步任务')
}
source.onmessage = function (e) {
try {
const data = JSON.parse(e.data)
console.log(data)
}catch (e) {
console.log(e.data)
}
}