服务器推送(SSE)
Server-Sent Events (SSE) 允许服务器向客户端推送实时更新,适用于单向数据流场景。
SSE特点
单向通信
仅服务器向客户端推送数据
自动重连
连接断开后自动尝试重连
简单易用
基于HTTP,实现相对简单
文本数据
传输纯文本数据,通常为JSON
SSE数据格式
Server-Sent Events遵循特定的数据格式规范,服务器发送的每个事件都包含特定的字段:
| 字段 | 说明 | 示例 |
|---|---|---|
| data | 事件的实际数据 | data: Hello world\n\n |
| event | 事件类型名称 | event: message\ndata: Hello\n\n |
| id | 事件ID,用于重连时恢复 | id: 123\ndata: Hello\n\n |
| retry | 重连间隔时间(毫秒) | retry: 5000\ndata: Hello\n\n |
协议对比
| 协议 | 方向 | 实时性 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| SSE | 单向(服务器→客户端) | 高 | 低 | 实时通知、股票价格、日志流 |
| WebSocket | 双向 | 极高 | 高 | 聊天应用、在线游戏、协作工具 |
| Ajax轮询 | 双向 | 低 | 低 | 简单数据更新、兼容性要求高 |
| 长轮询 | 双向 | 中等 | 中等 | 实时应用、但需要兼容低版本浏览器 |
实现示例
// 客户端实现
if (window.EventSource) {
const eventSource = new EventSource('/api/events');
eventSource.onmessage = event => {
const data = JSON.parse(event.data);
updateUI(data);
};
eventSource.addEventListener('notification', event => {
const data = JSON.parse(event.data);
showNotification(data.message);
});
eventSource.addEventListener('error', event => {
console.error('SSE连接错误:', event);
});
} else {
console.log('浏览器不支持Server-Sent Events');
}
// 服务器端示例 (Node.js/Express)
app.get('/api/events', (req, res) => {
// 设置SSE响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
// 发送数据
const sendEvent = data => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
// 模拟定期发送数据
const interval = setInterval(() => {
sendEvent({timestamp: Date.now(), message: '实时数据'});
}, 1000);
// 连接关闭时清理
req.on('close', () => {
clearInterval(interval);
});
});
// 客户端高级用法示例
const eventSource = new EventSource('/api/events', {
// 携带认证信息
withCredentials: true
});
// 设置最后事件ID,用于断线重连时恢复
eventSource.withCredentials = true;
eventSource.lastEventId = 'last-known-event-id';
// 处理不同事件类型
eventSource.addEventListener('userUpdate', event => {
const user = JSON.parse(event.data);
updateUserInterface(user);
});
eventSource.addEventListener('systemNotification', event => {
const notification = JSON.parse(event.data);
showSystemNotification(notification);
});
// 自定义重连逻辑
eventSource.addEventListener('error', event => {
if (eventSource.readyState === EventSource.CLOSED) {
setTimeout(() => {
// 重新连接
eventSource.close();
const newEventSource = new EventSource('/api/events');
}, 5000);
}
});
实用示例
以下是一些在实际项目中常见的SSE应用场景和实现方式:
1. 实时股票价格更新
// 前端代码
const stockEvents = new EventSource('/api/stocks/AAPL');
stockEvents.addEventListener('priceUpdate', event => {
const data = JSON.parse(event.data);
document.getElementById('current-price').textContent = data.price;
document.getElementById('change-percent').textContent = data.changePercent;
});
// 服务器端代码 (Node.js/Express)
app.get('/api/stocks/:symbol', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const symbol = req.params.symbol;
let lastPrice = Math.random() * 100 + 100; // 初始价格
const sendPriceUpdate = () => {
lastPrice += (Math.random() - 0.5) * 2; // 随机价格变动
const change = ((lastPrice - (lastPrice / (1 + (Math.random() - 0.5) * 0.01))) / (lastPrice / (1 + (Math.random() - 0.5) * 0.01))) * 100;
res.write(`event: priceUpdate\ndata: ${JSON.stringify({symbol, price: lastPrice.toFixed(2), changePercent: change.toFixed(2)})}\n\n`);
};
const interval = setInterval(sendPriceUpdate, 2000); // 每2秒更新一次
req.on('close', () => clearInterval(interval));
});
2. 实时博客更新
// 前端代码
const blogEvents = new EventSource('/api/blog/updates');
blogEvents.addEventListener('newPost', event => {
const post = JSON.parse(event.data);
const postElement = document.createElement('div');
postElement.classList.add('blog-post');
postElement.innerHTML = `
${post.title}
${post.excerpt}
发布于: ${new Date(post.publishTime).toLocaleString()}
`;
document.getElementById('blog-posts').prepend(postElement);
});
blogEvents.addEventListener('postUpdate', event => {
const update = JSON.parse(event.data);
const existingPost = document.getElementById(`post-${update.id}`);
if (existingPost) {
existingPost.querySelector('.post-content').textContent = update.content;
}
});
// 服务器端代码
app.get('/api/blog/updates', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 将连接添加到活动连接列表
const clientId = Date.now();
const newConnection = {
id: clientId,
response: res
};
activeConnections.push(newConnection);
// 连接关闭时清理
req.on('close', () => {
activeConnections = activeConnections.filter(conn => conn.id !== clientId);
res.end();
});
});
// 模拟新博客发布
function publishNewPost(post) {
const data = `event: newPost\ndata: ${JSON.stringify(post)}\n\n`;
activeConnections.forEach(conn => conn.response.write(data));
}
3. 文件上传进度跟踪
// 前端代码
function uploadFile(file) {
const formData = new FormData();
formData.append('file', file);
// 创建上传进度事件监听
const progressSource = new EventSource(`/api/upload-progress/${file.lastModified}`);
progressSource.addEventListener('progress', event => {
const progress = JSON.parse(event.data);
const progressBar = document.getElementById('progress-bar');
progressBar.style.width = `${progress.percent}%`;
progressBar.textContent = `${progress.percent}%`;
});
progressSource.addEventListener('complete', event => {
const result = JSON.parse(event.data);
alert(`文件上传成功: ${result.filename}`);
progressSource.close();
});
// 发起上传请求
fetch('/api/upload', {
method: 'POST',
body: formData
});
}
// 服务器端代码
const uploadProgress = {}; // 存储上传进度
app.post('/api/upload', upload.single('file'), (req, res) => {
// 模拟文件上传过程
const fileId = req.file.filename;
uploadProgress[fileId] = {percent: 0, status: 'uploading'};
// 模拟上传进度
let progress = 0;
const interval = setInterval(() => {
progress += Math.floor(Math.random() * 10) + 5; // 随机增加5-15%
if (progress >= 100) {
progress = 100;
clearInterval(interval);
// 更新进度记录
uploadProgress[fileId] = {percent: progress, status: 'complete', filename: req.file.filename};
} else {
uploadProgress[fileId] = {percent: progress, status: 'uploading'};
}
// 向所有监听此文件的客户端发送更新
if (activeFileProgressListeners[fileId]) {
activeFileProgressListeners[fileId].forEach(client => {
client.write(`event: progress\ndata: ${JSON.stringify({percent: progress})}\n\n`);
});
}
}, 500);
res.json({message: '上传开始', fileId});
});
app.get('/api/upload-progress/:fileId', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const fileId = req.params.fileId;
// 初始化文件进度监听器列表
if (!activeFileProgressListeners[fileId]) {
activeFileProgressListeners[fileId] = [];
}
activeFileProgressListeners[fileId].push(res);
// 定期发送当前进度
const sendProgress = () => {
if (uploadProgress[fileId]) {
res.write(`event: progress\ndata: ${JSON.stringify(uploadProgress[fileId])}\n\n`);
if (uploadProgress[fileId].status === 'complete') {
res.write(`event: complete\ndata: ${JSON.stringify(uploadProgress[fileId])}\n\n`);
clearInterval(progressInterval);
}
}
};
const progressInterval = setInterval(sendProgress, 1000);
req.on('close', () => {
clearInterval(progressInterval);
if (activeFileProgressListeners[fileId]) {
activeFileProgressListeners[fileId] =
activeFileProgressListeners[fileId].filter(client => client !== res);
}
});
});
完整示例:实时聊天室通知
以下是一个完整的实时聊天室系统示例,展示如何使用SSE实现聊天室中的实时通知。
前端实现 (chat.html)
<!DOCTYPE html>
<!-- HTML 结构 -->
<html>
<head>
<title>SSE 聊天室通知示例</title>
<style>
#notifications {
border: 1px solid #ccc;
height: 300px;
overflow-y: scroll;
padding: 10px;
margin: 10px 0;
}
.notification {
padding: 5px;
margin: 5px 0;
border-left: 3px solid #007bff;
background: #f8f9fa;
}
</style>
</head>
<body>
<h2>实时聊天室通知</h2>
<div id="notifications"></div>
<button id="startNotifications">开始接收通知</button>
<button id="stopNotifications">停止接收通知</button>
<script>
let eventSource = null;
document.getElementById('startNotifications').addEventListener('click', () => {
if (!eventSource) {
eventSource = new EventSource('/api/chat-updates');
eventSource.onmessage = event => {
const data = JSON.parse(event.data);
addNotification(data);
};
eventSource.addEventListener('new-message', event => {
const message = JSON.parse(event.data);
addNotification(`新消息: ${message.user} - ${message.text}`);
});
eventSource.addEventListener('user-joined', event => {
const user = JSON.parse(event.data);
addNotification(`用户 ${user.name} 加入了聊天室`);
});
eventSource.addEventListener('user-left', event => {
const user = JSON.parse(event.data);
addNotification(`用户 ${user.name} 离开了聊天室`);
});
eventSource.onerror = event => {
console.error('SSE 连接错误:', event);
};
}
});
document.getElementById('stopNotifications').addEventListener('click', () => {
if (eventSource) {
eventSource.close();
eventSource = null;
console.log('SSE 连接已关闭');
}
});
function addNotification(message) {
const notificationsDiv = document.getElementById('notifications');
const notificationDiv = document.createElement('div');
notificationDiv.className = 'notification';
notificationDiv.textContent = `[${new Date().toLocaleTimeString()}] ` + message;
notificationsDiv.appendChild(notificationDiv);
notificationsDiv.scrollTop = notificationsDiv.scrollHeight; // 滚动到底部
}
</script>
</body>
</html>
后端实现 (server.js - Node.js/Express)
const express = require('express');
const cors = require('cors');
const app = express();
app.use(cors());
app.use(express.static('public')); // 提供静态文件服务
// 存储活跃的 SSE 连接
const connections = new Set();
app.get('/api/chat-updates', (req, res) => {
// 设置 SSE 必需的响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
// 将当前连接添加到活跃连接集合
connections.add(res);
// 发送初始连接确认
res.write('data: {"status": "connected", "message": "SSE 连接已建立"}\n\n');
// 连接关闭时清理
req.on('close', () => {
connections.delete(res);
console.log('客户端断开连接');
});
});
// 模拟聊天室活动
const users = ['Alice', 'Bob', 'Charlie', 'Diana'];
// 模拟用户加入聊天室
setInterval(() => {
if (connections.size > 0) {
const randomUser = users[Math.floor(Math.random() * users.length)];
const event = `event: user-joined\ndata: ${JSON.stringify({name: randomUser})}\n\n`;
connections.forEach(conn => conn.write(event));
}
}, 10000); // 每10秒随机一个用户加入
// 模拟用户发送消息
setInterval(() => {
if (connections.size > 0) {
const randomUser = users[Math.floor(Math.random() * users.length)];
const messages = [
'你好大家!',
'今天天气真好',
'有人在吗?',
'这个 SSE 示例很棒!',
'谢谢分享'
];
const randomMessage = messages[Math.floor(Math.random() * messages.length)];
const event = `event: new-message\ndata: ${JSON.stringify({user: randomUser, text: randomMessage})}\n\n`;
connections.forEach(conn => conn.write(event));
}
}, 5000); // 每5秒随机发送一条消息
// 模拟用户离开聊天室
setInterval(() => {
if (connections.size > 0 && users.length > 1) {
const randomUser = users[Math.floor(Math.random() * users.length)];
const event = `event: user-left\ndata: ${JSON.stringify({name: randomUser})}\n\n`;
connections.forEach(conn => conn.write(event));
}
}, 15000); // 每15秒随机一个用户离开
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`服务器运行在端口 ${PORT}`);
});
运行说明
- 首先安装依赖:
npm install express cors - 将前端代码保存为 chat.html,后端代码保存为 server.js
- 运行后端:
node server.js - 在浏览器中访问 http://localhost:3000/chat.html
- 点击"开始接收通知"按钮,即可看到实时的聊天室通知
更多使用场景
实时通知
推送系统通知、消息提醒等
数据流更新
股票价格、实时统计、日志输出等
进度追踪
文件上传进度、任务执行状态等
监控数据
系统性能监控、服务器状态等
体育比分实时更新
在体育赛事直播页面中使用SSE实时更新比分和比赛状态。
// 前端代码
const matchEvents = new EventSource('/api/match/123/live-updates');
matchEvents.addEventListener('scoreUpdate', event => {
const data = JSON.parse(event.data);
document.getElementById('home-score').textContent = data.homeScore;
document.getElementById('away-score').textContent = data.awayScore;
});
matchEvents.addEventListener('matchStatus', event => {
const data = JSON.parse(event.data);
document.getElementById('match-status').textContent = data.status;
document.getElementById('match-time').textContent = data.time;
});
matchEvents.addEventListener('matchEvent', event => {
const data = JSON.parse(event.data);
const eventList = document.getElementById('match-events');
const eventItem = document.createElement('div');
eventItem.innerHTML = `<strong>${data.time}'</strong> ${data.type}: ${data.player}`;
eventList.prepend(eventItem);
});
系统监控面板
实时显示服务器性能指标,如CPU使用率、内存占用、网络流量等。
// 前端代码
const monitorEvents = new EventSource('/api/system/monitoring');
monitorEvents.addEventListener('cpuUpdate', event => {
const data = JSON.parse(event.data);
updateGauge('cpu-gauge', data.usage);
});
monitorEvents.addEventListener('memoryUpdate', event => {
const data = JSON.parse(event.data);
updateGauge('memory-gauge', data.percentUsed);
document.getElementById('memory-text').textContent =
`${data.used} / ${data.total} GB`;
});
// 服务器端模拟代码
app.get('/api/system/monitoring', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const sendMetrics = () => {
// 模拟CPU使用率
const cpuUsage = Math.floor(Math.random() * 100);
res.write(`event: cpuUpdate\ndata: ${JSON.stringify({usage: cpuUsage})}\n\n`);
// 模拟内存使用情况
const totalMemory = 16;
const usedMemory = (Math.random() * 0.8 + 0.1) * totalMemory;
const percentUsed = Math.round((usedMemory / totalMemory) * 100);
res.write(`event: memoryUpdate\ndata: ${JSON.stringify({used: usedMemory.toFixed(2), total: totalMemory, percentUsed})}\n\n`);
};
// 每2秒发送一次指标
const interval = setInterval(sendMetrics, 2000);
req.on('close', () => clearInterval(interval));
});
社交媒体实时更新
在社交媒体应用中实时显示新帖子、点赞和评论通知。
// 前端代码
const socialEvents = new EventSource('/api/social/updates');
socialEvents.addEventListener('newPost', event => {
const post = JSON.parse(event.data);
const postElement = document.createElement('div');
postElement.className = 'social-post';
postElement.innerHTML = `
<h4>${post.author}</h4>
<p>${post.content}</p>
<small>${new Date(post.timestamp).toLocaleString()}</small>
`;
document.getElementById('posts-container').prepend(postElement);
});
socialEvents.addEventListener('newLike', event => {
const like = JSON.parse(event.data);
document.getElementById(`post-${like.postId}-likes`).textContent =
`点赞数: ${like.count}`;
});
socialEvents.addEventListener('newComment', event => {
const comment = JSON.parse(event.data);
const commentElement = document.createElement('div');
commentElement.className = 'comment';
commentElement.innerHTML = `
<strong>${comment.author}:</strong>
<span>${comment.text}</span>
`;
document.getElementById(`post-${comment.postId}-comments`).appendChild(commentElement);
});
安全考虑
认证与授权
使用JWT令牌或会话验证确保只有授权用户能接收事件
跨站请求伪造(CSRF)
验证请求来源,使用CSRF令牌防止恶意请求
数据过滤
服务器端过滤敏感信息,避免向客户端泄露私有数据
连接限制
限制单个客户端的连接数,防止资源耗尽攻击
最佳实践
- 设置合理的重连时间间隔,避免连接过于频繁
- 使用事件ID实现断线重连时的数据恢复
- 服务器端设置合适的超时时间,避免长时间占用连接
- 客户端处理连接错误和异常情况
- 注意浏览器兼容性,必要时提供降级方案
- 使用HTTPS加密传输敏感数据
- 实现适当的错误处理和日志记录机制