写在前面

我们知道,早期网站只展示静态内容,但是现在我们更希望实时更新、即时聊天、通知推送和动态仪表盘等功能,因此就有必要学习目前常用的3种实时技术了。

常用的有SSE、WebSocket和Long Polling这三种,下面将分别进行介绍:

(1)SSE(Server-Send Events):轻量级单向数据流;

(2)WebSocket:全双工双向通信;

(3)Long Polling(长轮询):传统过渡方案。

假设现在我们有如下三个业务场景,它们都需要实现数据实时更新:

  • 股票交易仪表盘;
  • 即时聊天平台;
  • 实时新闻推送。

面对这些需求,我们该如何选择合适的方案呢?接下来我们将从实战、架构、性能和扩展性角度来进行分析。

Long Polling(长轮询)

原理

长轮询的原理就是客户端持续地询问服务器,有点类似于吃饭排队时,站在店门口,每隔几分钟询问是否轮到你吃饭了么,可见效率非常低下。

优点

(1)实现简单,标准的REST;(2)兼容性最好。

缺点

(1)高延迟;(2)浪费大量资源,存在大量的空请求;(3)扩展性差。

适用场景

当系统无法使用WebSocket或者SSE,且需要支持对老旧浏览器或者代理时使用,一般在大型企业的遗留系统中使用。

实战

后端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@CrossOrigin
@RestController
@RequestMapping("/api/longpoll")
public class LongPollingController {
//使用队列存储待推送的消息
private static final BlockingQueue<String> messageQueue =
new LinkedBlockingQueue<>();

/**
* 长轮询端点
* @param timeout 超时时间(毫秒)
* @return 消息内容或者超时时间
*/
@GetMapping("/poll")
public ResponseEntity<String> longPoll(
@RequestParam(defaultValue = "30000")long timeout
){
try{
//等待消息,最多等待指定的超时时间
String message = messageQueue.poll(timeout, TimeUnit.MILLISECONDS);
if(!StringUtils.isEmpty(message)){
return ResponseEntity.ok(message);
}else{
//超时返回空响应
return ResponseEntity.ok("timeout");
}
}catch (InterruptedException e){
Thread.currentThread().interrupt();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("服务器内部发生错误");
}
}


/**
* 发送消息
* @param message 待发送的消息
* @return 发送结果
*/
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody String message){
try {
messageQueue.offer(message);
return ResponseEntity.ok("消息发送成功");
}catch (Exception e){
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("消息发送失败");
}
}
}

前端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>长轮询(LongPolling)消息推送</title>
</head>
<body>
<h1>长轮询(LongPolling)消息推送</h1>
<div id="messages">

</div>
<script>
class LongPollingClient{
constructor() {
this.isPolling = false;
}

/**
* 开始长轮询
*/
startPolling(){
if(this.isPolling){
return;
}
this.isPolling = true;
this.poll();
}

/**
* 停止长轮询
*/
stopPolling(){
this.isPolling = false;
}

/**
* 执行轮询请求
*/
async poll(){
while (this.isPolling){
try{
const response = await fetch('http://127.0.0.1:8080/api/longpoll/poll?timeout=30000');
const message = await response.text();
if(message && message !== 'timeout'){
console.log("收到消息:",message);
this.handleMessage(message);
}
}catch (error){
console.error("轮询错误:",error);
//发生错误时,等待一段时间后重试
await this.sleep(5000);
}
}
}

/**
* 处理接收到的消息
*/
handleMessage(message){
//将消息显示在页面上
document.getElementById('messages').innerHTML += '<div>' + message +'</div>';
}

/**
* 延迟函数
*/
sleep(ms){
return new Promise(resolve => setTimeout(resolve,ms));
}
}

//客户端调用
const client = new LongPollingClient();
client.startPolling();
</script>
</body>
</html>

运行结果

先运行后端项目,再打开前端页面,之后打开Postman,我们发起一个请求:

可以看到已经显示消息发送成功,接着我们查看一下前端页面,可以看到页面已经出现了对应信息:

SSE(Server-Send Events)

原理

SSE的原理如下,即当客户端建立连接后,“持续监听中…..”,之后服务器随时推送消息,不断的消息发送给前端,双方一直保持连接。请注意,SSE仅支持服务器到客户端的单向通信,比较适合实时数据流。

优点

(1)轻量(基于HTTP /1.1);(2)兼容大多数代理;(3)自动重连机制。

缺点

(1)单向通信;(2)部分环境支持有限;(3)控制粒度较粗。

适用场景

当需要简单高效的服务器到客户端更新,如股票行情、实时比分、状态仪表盘、监控系统等。

实战

后端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@CrossOrigin
@RestController
@RequestMapping("/api/sse")
public class SSEController {
/**
* 创建SSE连接端点
* @return SseEmitter对象,用于发送服务器推送事件
*/
@GetMapping("/connect")
public SseEmitter connect(){
// 创建SseEmitter对象,设置超时时间为30分钟
SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);
try{
//发送连接成功消息
emitter.send(SseEmitter.event()
.name("connect")
.data("SSE连接建立成功"));
}catch (IOException e){
emitter.completeWithError(e);
}
//模拟定时推送消息
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(()->{
try{
//每5秒推送一下当前时间
emitter.send(SseEmitter.event()
.name("message")
.data("当前数字为:" + LocalDateTime.now()));
}catch (IOException e){
emitter.complete();
executor.shutdown();
}
},0,5, TimeUnit.SECONDS);
return emitter;
}
}

前端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE消息推送</title>
</head>
<body>
<h1>SSE消息推送</h1>
<div id="messages">

</div>
<script>
// 创建EventSource对象连接SSE端点
const eventSource = new EventSource("http://127.0.0.1:8080/api/sse/connect");
//监听连接事件
eventSource.addEventListener('connect',function (event){
console.log('连接状态:',event.data);
});
//监听消息事件
eventSource.addEventListener('message',function (event){
console.log('收到消息:',event.data);
//将消息显示在页面上
document.getElementById('messages').innerHTML += '<div>' + event.data +'</div>';
});
//监听错误事件
eventSource.onerror = function (event) {
console.error('SSE连接错误:',event);
};
</script>
</body>
</html>

运行结果

先运行后端项目,再打开前端页面,可以看到页面每隔5秒钟显示当前时间:

WebSocket

原理

WebSocket的原理就是建立双向通道,实现实时对话,类似于对讲机的全双工通信模式。

优点

(1)双向通信;(2)低延迟;(3)可通过消息中间件扩展。

缺点

(1)代理兼容性差;(2)扩展复杂度高;(3)需要维持长连接。

适用场景

适用于聊天室、游戏、协作应用等需要实现双向交互的场景。

实战

后端代码

其中的MyWebSocketHandler类代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Component
public class MyWebSocketHandler extends TextWebSocketHandler {
//存储所有活跃的WebSocket会话
private static final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());

/**
* 建立连接后调用
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
System.out.println("WebSocket连接建立:" + session.getId());
//向新连接的客户端发送欢迎消息
session.sendMessage(new TextMessage("欢迎使用WebSocket!"));
}

/**
* 收到客户端发来的消息时调用
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
System.out.println("收到消息,消息内容为:" + payload);

//广播消息给所有连接的客户端
broadcastMessage("服务器回复的消息为:" + payload);
}

/**
* 连接关闭后调用
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
System.out.println("WebSocket连接关闭:" + session.getId());
}

/**
* 广播消息给所有连接的客户端
*/
private static void broadcastMessage(String message){
synchronized (sessions){
sessions.removeIf(session -> {
try {
if(session.isOpen()){
session.sendMessage(new TextMessage(message));
//保留该会话
return false;
}
}catch (Exception e){
System.err.println("消息发送失败:" + e.getMessage());
}
//移除无效会话
return true;
});
}
}
}

再来一个MyWebSocketHandler类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//注册websocket处理器,并允许跨域访问
registry.addHandler(new MyWebSocketHandler(),"/websocket")
.setAllowedOrigins("*");
}
}

前端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket消息推送</title>
</head>
<body>
<h1>WebSocket消息推送</h1>
<input type="text" id="messageInput" placeholder="请输入需要发送的消息...">
<button type="submit" onclick="messageSend()" >提交</button>
<div id="messages">

</div>
<script>
// 创建WebSocket连接
const socket = new WebSocket("ws://127.0.0.1:8080/websocket");
//连接打开时的处理逻辑
socket.onopen = function (event) {
console.log("WebSocket连接已经建立");
//发送消息给服务端
socket.send("测试一下WebSocket!")
};
//客户端接收到消息时的处理逻辑
socket.onmessage = function (event) {
console.log("收到消息:" + event.data);
//将消息显示在页面上
document.getElementById('messages').innerHTML += '<div>' + event.data +'</div>';
}
//连接关闭时调用
socket.onclose = function (event) {
console.log("WebSocket连接已关闭");
}
//错误处理逻辑
socket.onerror = function (error) {
console.error("WebSocket错误:" + error);
}
//界面发送消息的逻辑
function messageSend() {
const input = document.getElementById('messageInput');
if(socket.readyState === WebSocket.OPEN){
socket.send(input.value);
input.value = '';
}
}
</script>
</body>
</html>

运行结果

先运行后端项目,再打开前端页面,可以看到初始界面如下所示:

之后我们尝试在输入框内输入“测试一下WebSocket”:

然后点击提交按钮,可以看到界面信息如下:

小结

通过前面的分析,我们就可以对上述三个场景需要的方案进行选项了:

  • 股票交易仪表盘:SSE
  • 即时聊天平台:WebSocket
  • 实时新闻推送(历史遗留系统):Long Polling

当然技术选型需要因地制宜,结合实际情况和场景来选择合适的方案。