pet/utils/websocket.js

781 lines
18 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* WebSocket连接管理工具类
* 提供WebSocket连接的建立、管理、自动重连等核心功能
* 基于uni-app的uni.connectSocket API实现
*/
import { HTTP_CONFIG } from '../http/config/config.js'
/**
* WebSocket连接状态枚举
*/
export const WS_STATUS = {
DISCONNECTED: 'disconnected', // 未连接
CONNECTING: 'connecting', // 连接中
CONNECTED: 'connected', // 已连接
RECONNECTING: 'reconnecting', // 重连中
FAILED: 'failed' // 连接失败
}
/**
* WebSocket消息类型枚举
*/
export const WS_MESSAGE_TYPE = {
CHAT: 'chat', // 聊天消息
CHUNK: 'chunk', // 流式数据块
COMPLETE: 'complete', // 完成标记
ERROR: 'error', // 错误消息
PING: 'ping', // 心跳检测
PONG: 'pong' // 心跳响应
}
/**
* WebSocket管理器类 - 单例模式
*/
class WebSocketManager {
constructor() {
// 连接实例
this.ws = null
// 连接状态
this.status = WS_STATUS.DISCONNECTED
// 重连配置
this.reconnectAttempts = 0
this.maxReconnectAttempts = 5
this.reconnectDelay = 1000 // 初始重连延迟1秒
this.maxReconnectDelay = 30000 // 最大重连延迟30秒
this.reconnectTimer = null
// 心跳配置
this.heartbeatInterval = 30000 // 30秒心跳间隔
this.heartbeatTimer = null
this.heartbeatTimeoutTimer = null
// 连接配置
this.wsUrl = ''
this.token = ''
// 事件回调
this.eventCallbacks = {
onOpen: [],
onMessage: [],
onClose: [],
onError: []
}
// 消息处理
this.messageCallbacks = new Map() // 存储消息回调
this.accumulatedContent = '' // 累积的消息内容
this.currentMessageId = null // 当前消息ID
console.log('WebSocketManager实例已创建')
}
/**
* 获取单例实例
*/
static getInstance() {
if (!WebSocketManager.instance) {
WebSocketManager.instance = new WebSocketManager()
}
return WebSocketManager.instance
}
/**
* 构建WebSocket连接URL
* @param {string} token 用户token
* @returns {string} WebSocket URL
*/
buildWebSocketUrl(token) {
// 将HTTP URL转换为WebSocket URL
const baseURL = HTTP_CONFIG.baseURL
let wsBaseURL
if (baseURL.startsWith('https://')) {
wsBaseURL = baseURL.replace('https://', 'wss://')
} else if (baseURL.startsWith('http://')) {
wsBaseURL = baseURL.replace('http://', 'ws://')
} else {
// 默认使用ws://
wsBaseURL = `ws://${baseURL}`
}
// 构建WebSocket端点URL携带token进行鉴权
return `${wsBaseURL}/user/ws?token=${encodeURIComponent(token)}`
}
/**
* 建立WebSocket连接
* @param {string} token 用户token
* @returns {Promise<boolean>} 连接是否成功
*/
async connect(token) {
if (!token) {
console.error('WebSocket连接失败缺少token')
return false
}
// 如果已经连接,直接返回
if (this.status === WS_STATUS.CONNECTED) {
console.log('WebSocket已连接无需重复连接')
return true
}
// 如果正在连接,等待连接完成
if (this.status === WS_STATUS.CONNECTING) {
console.log('WebSocket正在连接中等待连接完成')
return this.waitForConnection()
}
this.token = token
this.wsUrl = this.buildWebSocketUrl(token)
this.status = WS_STATUS.CONNECTING
console.log('开始建立WebSocket连接:', this.wsUrl)
return new Promise((resolve) => {
try {
// 创建WebSocket连接
this.ws = uni.connectSocket({
url: this.wsUrl,
protocols: ['chat'] // 指定协议
})
// 设置事件监听器
this.setupEventListeners(resolve)
} catch (error) {
console.error('WebSocket连接创建失败:', error)
this.status = WS_STATUS.FAILED
resolve(false)
}
})
}
/**
* 等待连接完成
* @returns {Promise<boolean>}
*/
waitForConnection() {
return new Promise((resolve) => {
const checkConnection = () => {
if (this.status === WS_STATUS.CONNECTED) {
resolve(true)
} else if (this.status === WS_STATUS.FAILED || this.status === WS_STATUS.DISCONNECTED) {
resolve(false)
} else {
setTimeout(checkConnection, 100)
}
}
checkConnection()
})
}
/**
* 设置WebSocket事件监听器
* @param {Function} connectResolve 连接Promise的resolve函数
*/
setupEventListeners(connectResolve) {
if (!this.ws) return
// 连接打开事件
this.ws.onOpen(() => {
console.log('WebSocket连接已建立')
this.status = WS_STATUS.CONNECTED
this.reconnectAttempts = 0 // 重置重连次数
// 启动心跳检测
this.startHeartbeat()
// 触发连接成功回调
this.triggerCallbacks('onOpen')
if (connectResolve) {
connectResolve(true)
}
})
// 消息接收事件
this.ws.onMessage((event) => {
console.log('WebSocket收到消息:', event.data)
this.handleMessage(event.data)
})
// 连接关闭事件
this.ws.onClose((event) => {
console.log('WebSocket连接已关闭:', event)
this.status = WS_STATUS.DISCONNECTED
// 停止心跳检测
this.stopHeartbeat()
// 触发关闭回调
this.triggerCallbacks('onClose', event)
// 如果不是主动关闭,尝试重连
if (event.code !== 1000) { // 1000表示正常关闭
this.attemptReconnect()
}
if (connectResolve) {
connectResolve(false)
}
})
// 连接错误事件
this.ws.onError((error) => {
console.error('WebSocket连接错误:', error)
this.status = WS_STATUS.FAILED
// 停止心跳检测
this.stopHeartbeat()
// 触发错误回调
this.triggerCallbacks('onError', error)
if (connectResolve) {
connectResolve(false)
}
})
}
/**
* 启动心跳检测
*/
startHeartbeat() {
this.stopHeartbeat() // 先停止之前的心跳
this.heartbeatTimer = setInterval(() => {
if (this.status === WS_STATUS.CONNECTED) {
// 发送心跳消息
this.sendRawMessage({
type: WS_MESSAGE_TYPE.PING,
timestamp: Date.now()
})
// 设置心跳超时检测
this.heartbeatTimeoutTimer = setTimeout(() => {
console.warn('心跳超时,连接可能已断开')
this.disconnect()
this.attemptReconnect()
}, 10000) // 10秒超时
}
}, this.heartbeatInterval)
}
/**
* 停止心跳检测
*/
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = null
}
if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer)
this.heartbeatTimeoutTimer = null
}
}
/**
* 尝试重连
*/
attemptReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('WebSocket重连次数已达上限停止重连')
this.status = WS_STATUS.FAILED
return
}
if (this.status === WS_STATUS.RECONNECTING) {
console.log('WebSocket正在重连中跳过本次重连')
return
}
this.status = WS_STATUS.RECONNECTING
this.reconnectAttempts++
// 计算重连延迟(指数退避)
const delay = Math.min(
this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1),
this.maxReconnectDelay
)
console.log(`WebSocket第${this.reconnectAttempts}次重连,延迟${delay}ms`)
this.reconnectTimer = setTimeout(() => {
this.connect(this.token)
}, delay)
}
/**
* 断开WebSocket连接
*/
disconnect() {
console.log('主动断开WebSocket连接')
// 停止心跳检测
this.stopHeartbeat()
// 清除重连定时器
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
// 关闭连接
if (this.ws) {
this.ws.close({
code: 1000,
reason: '主动断开连接'
})
this.ws = null
}
this.status = WS_STATUS.DISCONNECTED
this.reconnectAttempts = 0
}
/**
* 检查连接状态
* @returns {boolean} 是否已连接
*/
isConnected() {
return this.status === WS_STATUS.CONNECTED
}
/**
* 获取当前连接状态
* @returns {string} 连接状态
*/
getStatus() {
return this.status
}
/**
* 更新token并重新连接
* @param {string} newToken 新的token
*/
async updateToken(newToken) {
if (this.token === newToken) {
return
}
console.log('更新WebSocket token并重新连接')
this.disconnect()
await this.connect(newToken)
}
/**
* 发送原始消息
* @param {Object} message 消息对象
*/
sendRawMessage(message) {
if (!this.isConnected()) {
console.warn('WebSocket未连接无法发送消息')
return false
}
try {
const messageStr = JSON.stringify(message)
this.ws.send({
data: messageStr
})
console.log('WebSocket发送消息:', message)
return true
} catch (error) {
console.error('WebSocket发送消息失败:', error)
return false
}
}
/**
* 处理接收到的消息
* @param {string} rawData 原始消息数据
*/
handleMessage(rawData) {
try {
const message = JSON.parse(rawData)
console.log('WebSocket解析消息:', message)
// 处理不同类型的消息
switch (message.type) {
case WS_MESSAGE_TYPE.PONG:
// 收到心跳响应,清除超时定时器
if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer)
this.heartbeatTimeoutTimer = null
}
break
case WS_MESSAGE_TYPE.CHUNK:
this.handleChunkMessage(message)
break
case WS_MESSAGE_TYPE.COMPLETE:
this.handleCompleteMessage(message)
break
case WS_MESSAGE_TYPE.ERROR:
this.handleErrorMessage(message)
break
default:
console.log('收到未知类型消息:', message)
break
}
// 触发通用消息回调
this.triggerCallbacks('onMessage', message)
} catch (error) {
console.error('WebSocket消息解析失败:', error, rawData)
}
}
/**
* 处理流式数据块消息
* @param {Object} message 消息对象
*/
handleChunkMessage(message) {
const { data } = message
if (!data || !data.delta) {
return
}
// 累积内容
this.accumulatedContent += data.delta
// 获取对应的回调函数
const messageId = data.sessionId || data.messageId || 'default'
const callbacks = this.messageCallbacks.get(messageId)
if (callbacks && callbacks.onChunk) {
callbacks.onChunk({
content: data.delta,
totalContent: this.accumulatedContent,
chunk: message
})
}
}
/**
* 处理完成消息
* @param {Object} message 消息对象
*/
handleCompleteMessage(message) {
const { data } = message
const messageId = data.sessionId || data.messageId || 'default'
const callbacks = this.messageCallbacks.get(messageId)
if (callbacks && callbacks.onComplete) {
const result = {
data: {
message: this.accumulatedContent,
isSensitive: data.isSensitive,
tokenCount: data.tokenCount,
responseTime: data.responseTime,
...data
}
}
callbacks.onComplete(result)
}
// 清理回调和累积内容
this.messageCallbacks.delete(messageId)
this.accumulatedContent = ''
this.currentMessageId = null
}
/**
* 处理错误消息
* @param {Object} message 消息对象
*/
handleErrorMessage(message) {
const { data } = message
const messageId = data.sessionId || data.messageId || 'default'
const callbacks = this.messageCallbacks.get(messageId)
if (callbacks && callbacks.onError) {
callbacks.onError(new Error(data.error || '服务器返回错误'))
}
// 清理回调和累积内容
this.messageCallbacks.delete(messageId)
this.accumulatedContent = ''
this.currentMessageId = null
}
/**
* 发送聊天消息
* @param {Object} messageData 消息数据
* @param {Object} callbacks 回调函数 {onChunk, onComplete, onError}
* @returns {Promise} 发送结果
*/
sendMessage(messageData, callbacks = {}) {
return new Promise((resolve, reject) => {
if (!this.isConnected()) {
const error = new Error('WebSocket未连接')
if (callbacks.onError) {
callbacks.onError(error)
}
reject(error)
return
}
// 生成消息ID
const messageId = messageData.sessionId || `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
// 存储回调函数
this.messageCallbacks.set(messageId, {
...callbacks,
resolve,
reject
})
// 重置累积内容
this.accumulatedContent = ''
this.currentMessageId = messageId
// 构建消息
const message = {
type: WS_MESSAGE_TYPE.CHAT,
data: {
...messageData,
messageId
},
timestamp: Date.now()
}
// 发送消息
const success = this.sendRawMessage(message)
if (!success) {
const error = new Error('消息发送失败')
this.messageCallbacks.delete(messageId)
if (callbacks.onError) {
callbacks.onError(error)
}
reject(error)
}
})
}
/**
* 添加事件监听器
* @param {string} event 事件名称
* @param {Function} callback 回调函数
*/
addEventListener(event, callback) {
if (this.eventCallbacks[event]) {
this.eventCallbacks[event].push(callback)
}
}
/**
* 移除事件监听器
* @param {string} event 事件名称
* @param {Function} callback 回调函数
*/
removeEventListener(event, callback) {
if (this.eventCallbacks[event]) {
const index = this.eventCallbacks[event].indexOf(callback)
if (index > -1) {
this.eventCallbacks[event].splice(index, 1)
}
}
}
/**
* 触发事件回调
* @param {string} event 事件名称
* @param {*} data 事件数据
*/
triggerCallbacks(event, data) {
if (this.eventCallbacks[event]) {
this.eventCallbacks[event].forEach(callback => {
try {
callback(data)
} catch (error) {
console.error(`事件回调执行失败 [${event}]:`, error)
}
})
}
}
/**
* 判断是否应该使用降级方案
* @returns {boolean} 是否使用降级
*/
shouldUseFallback() {
// 如果重连次数过多,使用降级方案
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
return true
}
// 如果连接状态为失败,使用降级方案
if (this.status === WS_STATUS.FAILED) {
return true
}
return false
}
/**
* 重置连接状态
*/
reset() {
this.disconnect()
this.reconnectAttempts = 0
this.status = WS_STATUS.DISCONNECTED
this.messageCallbacks.clear()
this.accumulatedContent = ''
this.currentMessageId = null
}
/**
* 获取连接信息
* @returns {Object} 连接信息
*/
getConnectionInfo() {
return {
status: this.status,
url: this.wsUrl,
reconnectAttempts: this.reconnectAttempts,
maxReconnectAttempts: this.maxReconnectAttempts,
hasToken: !!this.token,
activeCallbacks: this.messageCallbacks.size
}
}
}
/**
* 创建宠物助手专用WebSocket接口
* @param {Object} messageData 消息数据
* @param {Object} callbacks 回调函数 {onChunk, onComplete, onError}
* @returns {Promise} WebSocket请求Promise
*/
export function createPetAssistantWebSocket(messageData, callbacks = {}) {
return new Promise(async (resolve, reject) => {
try {
const wsManager = WebSocketManager.getInstance()
// 检查是否应该使用降级方案
if (wsManager.shouldUseFallback()) {
throw new Error('WebSocket不可用使用降级方案')
}
// 检查连接状态,必要时建立连接
if (!wsManager.isConnected()) {
const token = uni.getStorageSync(HTTP_CONFIG.storageKeys.token)
if (!token) {
throw new Error('缺少认证token')
}
console.log('WebSocket未连接正在建立连接...')
const connected = await wsManager.connect(token)
if (!connected) {
throw new Error('WebSocket连接失败')
}
}
// 发送消息并处理回调
await wsManager.sendMessage(messageData, {
onChunk: callbacks.onChunk,
onComplete: (result) => {
if (callbacks.onComplete) {
callbacks.onComplete(result)
}
resolve(result)
},
onError: (error) => {
if (callbacks.onError) {
callbacks.onError(error)
}
reject(error)
}
})
} catch (error) {
console.warn('WebSocket请求失败尝试降级方案:', error)
// 降级到HTTP API
try {
const { askPetAssistant } = await import('../http/api/assistant.js')
const result = await askPetAssistant(messageData)
// 模拟流式效果
if (callbacks.onChunk && result.message) {
simulateStreamingEffect(result.message, callbacks.onChunk)
}
// 触发完成回调
const completeResult = {
data: {
message: result.message || result.content || '',
isSensitive: result.isSensitive || false,
tokenCount: result.tokenCount || 0,
responseTime: result.responseTime || 0
}
}
if (callbacks.onComplete) {
callbacks.onComplete(completeResult)
}
resolve(completeResult)
} catch (fallbackError) {
console.error('降级方案也失败:', fallbackError)
if (callbacks.onError) {
callbacks.onError(fallbackError)
}
reject(fallbackError)
}
}
})
}
/**
* 模拟流式效果
* @param {string} content 完整内容
* @param {Function} onChunk 数据块回调
*/
function simulateStreamingEffect(content, onChunk) {
if (!content || !onChunk) return
let index = 0
let accumulatedContent = ''
const interval = setInterval(() => {
if (index >= content.length) {
clearInterval(interval)
return
}
// 每次发送1-3个字符模拟真实的流式效果
const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length - index)
const chunk = content.substr(index, chunkSize)
accumulatedContent += chunk
index += chunkSize
onChunk({
content: chunk,
totalContent: accumulatedContent,
chunk: { data: { delta: chunk } }
})
}, 50) // 50ms间隔模拟打字机效果
}
// 导出WebSocket管理器类和相关常量
export { WebSocketManager, WS_STATUS, WS_MESSAGE_TYPE }
// 默认导出
export default {
WebSocketManager,
createPetAssistantWebSocket,
WS_STATUS,
WS_MESSAGE_TYPE
}