服务器推送(SSE)

Server-Sent Events (SSE) 允许服务器向客户端推送实时更新,适用于单向数据流场景。

SSE特点

单向通信

仅服务器向客户端推送数据

自动重连

连接断开后自动尝试重连

简单易用

基于HTTP,实现相对简单

文本数据

传输纯文本数据,通常为JSON

SSE数据格式

Server-Sent Events遵循特定的数据格式规范,服务器发送的每个事件都包含特定的字段:

字段 说明 示例
data 事件的实际数据 data: Hello world\n\n
event 事件类型名称 event: message\ndata: Hello\n\n
id 事件ID,用于重连时恢复 id: 123\ndata: Hello\n\n
retry 重连间隔时间(毫秒) retry: 5000\ndata: Hello\n\n

协议对比

协议 方向 实时性 复杂度 适用场景
SSE 单向(服务器→客户端) 实时通知、股票价格、日志流
WebSocket 双向 极高 聊天应用、在线游戏、协作工具
Ajax轮询 双向 简单数据更新、兼容性要求高
长轮询 双向 中等 中等 实时应用、但需要兼容低版本浏览器

实现示例

// 客户端实现 if (window.EventSource) { const eventSource = new EventSource('/api/events'); eventSource.onmessage = event => { const data = JSON.parse(event.data); updateUI(data); }; eventSource.addEventListener('notification', event => { const data = JSON.parse(event.data); showNotification(data.message); }); eventSource.addEventListener('error', event => { console.error('SSE连接错误:', event); }); } else { console.log('浏览器不支持Server-Sent Events'); } // 服务器端示例 (Node.js/Express) app.get('/api/events', (req, res) => { // 设置SSE响应头 res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('Access-Control-Allow-Origin', '*'); // 发送数据 const sendEvent = data => { res.write(`data: ${JSON.stringify(data)}\n\n`); }; // 模拟定期发送数据 const interval = setInterval(() => { sendEvent({timestamp: Date.now(), message: '实时数据'}); }, 1000); // 连接关闭时清理 req.on('close', () => { clearInterval(interval); }); }); // 客户端高级用法示例 const eventSource = new EventSource('/api/events', { // 携带认证信息 withCredentials: true }); // 设置最后事件ID,用于断线重连时恢复 eventSource.withCredentials = true; eventSource.lastEventId = 'last-known-event-id'; // 处理不同事件类型 eventSource.addEventListener('userUpdate', event => { const user = JSON.parse(event.data); updateUserInterface(user); }); eventSource.addEventListener('systemNotification', event => { const notification = JSON.parse(event.data); showSystemNotification(notification); }); // 自定义重连逻辑 eventSource.addEventListener('error', event => { if (eventSource.readyState === EventSource.CLOSED) { setTimeout(() => { // 重新连接 eventSource.close(); const newEventSource = new EventSource('/api/events'); }, 5000); } });

实用示例

以下是一些在实际项目中常见的SSE应用场景和实现方式:

1. 实时股票价格更新

// 前端代码 const stockEvents = new EventSource('/api/stocks/AAPL'); stockEvents.addEventListener('priceUpdate', event => { const data = JSON.parse(event.data); document.getElementById('current-price').textContent = data.price; document.getElementById('change-percent').textContent = data.changePercent; }); // 服务器端代码 (Node.js/Express) app.get('/api/stocks/:symbol', (req, res) => { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); const symbol = req.params.symbol; let lastPrice = Math.random() * 100 + 100; // 初始价格 const sendPriceUpdate = () => { lastPrice += (Math.random() - 0.5) * 2; // 随机价格变动 const change = ((lastPrice - (lastPrice / (1 + (Math.random() - 0.5) * 0.01))) / (lastPrice / (1 + (Math.random() - 0.5) * 0.01))) * 100; res.write(`event: priceUpdate\ndata: ${JSON.stringify({symbol, price: lastPrice.toFixed(2), changePercent: change.toFixed(2)})}\n\n`); }; const interval = setInterval(sendPriceUpdate, 2000); // 每2秒更新一次 req.on('close', () => clearInterval(interval)); });

2. 实时博客更新

// 前端代码 const blogEvents = new EventSource('/api/blog/updates'); blogEvents.addEventListener('newPost', event => { const post = JSON.parse(event.data); const postElement = document.createElement('div'); postElement.classList.add('blog-post'); postElement.innerHTML = `

${post.title}

${post.excerpt}

发布于: ${new Date(post.publishTime).toLocaleString()} `; document.getElementById('blog-posts').prepend(postElement); }); blogEvents.addEventListener('postUpdate', event => { const update = JSON.parse(event.data); const existingPost = document.getElementById(`post-${update.id}`); if (existingPost) { existingPost.querySelector('.post-content').textContent = update.content; } }); // 服务器端代码 app.get('/api/blog/updates', (req, res) => { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); // 将连接添加到活动连接列表 const clientId = Date.now(); const newConnection = { id: clientId, response: res }; activeConnections.push(newConnection); // 连接关闭时清理 req.on('close', () => { activeConnections = activeConnections.filter(conn => conn.id !== clientId); res.end(); }); }); // 模拟新博客发布 function publishNewPost(post) { const data = `event: newPost\ndata: ${JSON.stringify(post)}\n\n`; activeConnections.forEach(conn => conn.response.write(data)); }

3. 文件上传进度跟踪

// 前端代码 function uploadFile(file) { const formData = new FormData(); formData.append('file', file); // 创建上传进度事件监听 const progressSource = new EventSource(`/api/upload-progress/${file.lastModified}`); progressSource.addEventListener('progress', event => { const progress = JSON.parse(event.data); const progressBar = document.getElementById('progress-bar'); progressBar.style.width = `${progress.percent}%`; progressBar.textContent = `${progress.percent}%`; }); progressSource.addEventListener('complete', event => { const result = JSON.parse(event.data); alert(`文件上传成功: ${result.filename}`); progressSource.close(); }); // 发起上传请求 fetch('/api/upload', { method: 'POST', body: formData }); } // 服务器端代码 const uploadProgress = {}; // 存储上传进度 app.post('/api/upload', upload.single('file'), (req, res) => { // 模拟文件上传过程 const fileId = req.file.filename; uploadProgress[fileId] = {percent: 0, status: 'uploading'}; // 模拟上传进度 let progress = 0; const interval = setInterval(() => { progress += Math.floor(Math.random() * 10) + 5; // 随机增加5-15% if (progress >= 100) { progress = 100; clearInterval(interval); // 更新进度记录 uploadProgress[fileId] = {percent: progress, status: 'complete', filename: req.file.filename}; } else { uploadProgress[fileId] = {percent: progress, status: 'uploading'}; } // 向所有监听此文件的客户端发送更新 if (activeFileProgressListeners[fileId]) { activeFileProgressListeners[fileId].forEach(client => { client.write(`event: progress\ndata: ${JSON.stringify({percent: progress})}\n\n`); }); } }, 500); res.json({message: '上传开始', fileId}); }); app.get('/api/upload-progress/:fileId', (req, res) => { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); const fileId = req.params.fileId; // 初始化文件进度监听器列表 if (!activeFileProgressListeners[fileId]) { activeFileProgressListeners[fileId] = []; } activeFileProgressListeners[fileId].push(res); // 定期发送当前进度 const sendProgress = () => { if (uploadProgress[fileId]) { res.write(`event: progress\ndata: ${JSON.stringify(uploadProgress[fileId])}\n\n`); if (uploadProgress[fileId].status === 'complete') { res.write(`event: complete\ndata: ${JSON.stringify(uploadProgress[fileId])}\n\n`); clearInterval(progressInterval); } } }; const progressInterval = setInterval(sendProgress, 1000); req.on('close', () => { clearInterval(progressInterval); if (activeFileProgressListeners[fileId]) { activeFileProgressListeners[fileId] = activeFileProgressListeners[fileId].filter(client => client !== res); } }); });

完整示例:实时聊天室通知

以下是一个完整的实时聊天室系统示例,展示如何使用SSE实现聊天室中的实时通知。

前端实现 (chat.html)

<!DOCTYPE html> <!-- HTML 结构 --> <html> <head> <title>SSE 聊天室通知示例</title> <style> #notifications { border: 1px solid #ccc; height: 300px; overflow-y: scroll; padding: 10px; margin: 10px 0; } .notification { padding: 5px; margin: 5px 0; border-left: 3px solid #007bff; background: #f8f9fa; } </style> </head> <body> <h2>实时聊天室通知</h2> <div id="notifications"></div> <button id="startNotifications">开始接收通知</button> <button id="stopNotifications">停止接收通知</button> <script> let eventSource = null; document.getElementById('startNotifications').addEventListener('click', () => { if (!eventSource) { eventSource = new EventSource('/api/chat-updates'); eventSource.onmessage = event => { const data = JSON.parse(event.data); addNotification(data); }; eventSource.addEventListener('new-message', event => { const message = JSON.parse(event.data); addNotification(`新消息: ${message.user} - ${message.text}`); }); eventSource.addEventListener('user-joined', event => { const user = JSON.parse(event.data); addNotification(`用户 ${user.name} 加入了聊天室`); }); eventSource.addEventListener('user-left', event => { const user = JSON.parse(event.data); addNotification(`用户 ${user.name} 离开了聊天室`); }); eventSource.onerror = event => { console.error('SSE 连接错误:', event); }; } }); document.getElementById('stopNotifications').addEventListener('click', () => { if (eventSource) { eventSource.close(); eventSource = null; console.log('SSE 连接已关闭'); } }); function addNotification(message) { const notificationsDiv = document.getElementById('notifications'); const notificationDiv = document.createElement('div'); notificationDiv.className = 'notification'; notificationDiv.textContent = `[${new Date().toLocaleTimeString()}] ` + message; notificationsDiv.appendChild(notificationDiv); notificationsDiv.scrollTop = notificationsDiv.scrollHeight; // 滚动到底部 } </script> </body> </html>

后端实现 (server.js - Node.js/Express)

const express = require('express'); const cors = require('cors'); const app = express(); app.use(cors()); app.use(express.static('public')); // 提供静态文件服务 // 存储活跃的 SSE 连接 const connections = new Set(); app.get('/api/chat-updates', (req, res) => { // 设置 SSE 必需的响应头 res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('Access-Control-Allow-Origin', '*'); // 将当前连接添加到活跃连接集合 connections.add(res); // 发送初始连接确认 res.write('data: {"status": "connected", "message": "SSE 连接已建立"}\n\n'); // 连接关闭时清理 req.on('close', () => { connections.delete(res); console.log('客户端断开连接'); }); }); // 模拟聊天室活动 const users = ['Alice', 'Bob', 'Charlie', 'Diana']; // 模拟用户加入聊天室 setInterval(() => { if (connections.size > 0) { const randomUser = users[Math.floor(Math.random() * users.length)]; const event = `event: user-joined\ndata: ${JSON.stringify({name: randomUser})}\n\n`; connections.forEach(conn => conn.write(event)); } }, 10000); // 每10秒随机一个用户加入 // 模拟用户发送消息 setInterval(() => { if (connections.size > 0) { const randomUser = users[Math.floor(Math.random() * users.length)]; const messages = [ '你好大家!', '今天天气真好', '有人在吗?', '这个 SSE 示例很棒!', '谢谢分享' ]; const randomMessage = messages[Math.floor(Math.random() * messages.length)]; const event = `event: new-message\ndata: ${JSON.stringify({user: randomUser, text: randomMessage})}\n\n`; connections.forEach(conn => conn.write(event)); } }, 5000); // 每5秒随机发送一条消息 // 模拟用户离开聊天室 setInterval(() => { if (connections.size > 0 && users.length > 1) { const randomUser = users[Math.floor(Math.random() * users.length)]; const event = `event: user-left\ndata: ${JSON.stringify({name: randomUser})}\n\n`; connections.forEach(conn => conn.write(event)); } }, 15000); // 每15秒随机一个用户离开 const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`服务器运行在端口 ${PORT}`); });

运行说明

  • 首先安装依赖: npm install express cors
  • 将前端代码保存为 chat.html,后端代码保存为 server.js
  • 运行后端: node server.js
  • 在浏览器中访问 http://localhost:3000/chat.html
  • 点击"开始接收通知"按钮,即可看到实时的聊天室通知

更多使用场景

实时通知

推送系统通知、消息提醒等

数据流更新

股票价格、实时统计、日志输出等

进度追踪

文件上传进度、任务执行状态等

监控数据

系统性能监控、服务器状态等

体育比分实时更新

在体育赛事直播页面中使用SSE实时更新比分和比赛状态。

// 前端代码 const matchEvents = new EventSource('/api/match/123/live-updates'); matchEvents.addEventListener('scoreUpdate', event => { const data = JSON.parse(event.data); document.getElementById('home-score').textContent = data.homeScore; document.getElementById('away-score').textContent = data.awayScore; }); matchEvents.addEventListener('matchStatus', event => { const data = JSON.parse(event.data); document.getElementById('match-status').textContent = data.status; document.getElementById('match-time').textContent = data.time; }); matchEvents.addEventListener('matchEvent', event => { const data = JSON.parse(event.data); const eventList = document.getElementById('match-events'); const eventItem = document.createElement('div'); eventItem.innerHTML = `<strong>${data.time}'</strong> ${data.type}: ${data.player}`; eventList.prepend(eventItem); });

系统监控面板

实时显示服务器性能指标,如CPU使用率、内存占用、网络流量等。

// 前端代码 const monitorEvents = new EventSource('/api/system/monitoring'); monitorEvents.addEventListener('cpuUpdate', event => { const data = JSON.parse(event.data); updateGauge('cpu-gauge', data.usage); }); monitorEvents.addEventListener('memoryUpdate', event => { const data = JSON.parse(event.data); updateGauge('memory-gauge', data.percentUsed); document.getElementById('memory-text').textContent = `${data.used} / ${data.total} GB`; }); // 服务器端模拟代码 app.get('/api/system/monitoring', (req, res) => { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); const sendMetrics = () => { // 模拟CPU使用率 const cpuUsage = Math.floor(Math.random() * 100); res.write(`event: cpuUpdate\ndata: ${JSON.stringify({usage: cpuUsage})}\n\n`); // 模拟内存使用情况 const totalMemory = 16; const usedMemory = (Math.random() * 0.8 + 0.1) * totalMemory; const percentUsed = Math.round((usedMemory / totalMemory) * 100); res.write(`event: memoryUpdate\ndata: ${JSON.stringify({used: usedMemory.toFixed(2), total: totalMemory, percentUsed})}\n\n`); }; // 每2秒发送一次指标 const interval = setInterval(sendMetrics, 2000); req.on('close', () => clearInterval(interval)); });

社交媒体实时更新

在社交媒体应用中实时显示新帖子、点赞和评论通知。

// 前端代码 const socialEvents = new EventSource('/api/social/updates'); socialEvents.addEventListener('newPost', event => { const post = JSON.parse(event.data); const postElement = document.createElement('div'); postElement.className = 'social-post'; postElement.innerHTML = ` <h4>${post.author}</h4> <p>${post.content}</p> <small>${new Date(post.timestamp).toLocaleString()}</small> `; document.getElementById('posts-container').prepend(postElement); }); socialEvents.addEventListener('newLike', event => { const like = JSON.parse(event.data); document.getElementById(`post-${like.postId}-likes`).textContent = `点赞数: ${like.count}`; }); socialEvents.addEventListener('newComment', event => { const comment = JSON.parse(event.data); const commentElement = document.createElement('div'); commentElement.className = 'comment'; commentElement.innerHTML = ` <strong>${comment.author}:</strong> <span>${comment.text}</span> `; document.getElementById(`post-${comment.postId}-comments`).appendChild(commentElement); });

安全考虑

认证与授权

使用JWT令牌或会话验证确保只有授权用户能接收事件

跨站请求伪造(CSRF)

验证请求来源,使用CSRF令牌防止恶意请求

数据过滤

服务器端过滤敏感信息,避免向客户端泄露私有数据

连接限制

限制单个客户端的连接数,防止资源耗尽攻击

最佳实践

  • 设置合理的重连时间间隔,避免连接过于频繁
  • 使用事件ID实现断线重连时的数据恢复
  • 服务器端设置合适的超时时间,避免长时间占用连接
  • 客户端处理连接错误和异常情况
  • 注意浏览器兼容性,必要时提供降级方案
  • 使用HTTPS加密传输敏感数据
  • 实现适当的错误处理和日志记录机制
← Backend Websocket Backend Examples →