/** * SSE (Server-Sent Events) 流式响应工具类 * 提供通用的SSE数据流处理功能 */ /** * 创建SSE流式请求 * @param {Object} options 配置选项 * @param {string} options.url 请求URL * @param {string} options.method 请求方法,默认POST * @param {Object} options.data 请求数据 * @param {Object} options.headers 请求头,会自动添加SSE相关头部 * @param {Function} options.onChunk 接收到数据块时的回调 (chunk) => {} * @param {Function} options.onComplete 流式响应完成时的回调 (result) => {} * @param {Function} options.onError 发生错误时的回调 (error) => {} * @param {string} options.doneMarker 结束标记,默认为 '[DONE]' * @returns {Promise} 返回Promise,resolve时返回完整结果 */ export function createSSERequest(options) { const { url, method = 'POST', data = {}, headers = {}, onChunk, onComplete, onError, doneMarker = '[DONE]' } = options return new Promise((resolve, reject) => { // 构建完整的请求头 const requestHeaders = { 'Content-Type': 'application/json', 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache', 'Authorization': `Bearer ${uni.getStorageSync('token') || ''}`, ...headers } // 累积的响应数据 let accumulatedData = { content: '', metadata: {} } // 发起uni.request请求 const requestTask = uni.request({ url, method, data, header: requestHeaders, responseType: 'text', success: (res) => { try { // 处理SSE数据流 const lines = res.data.split('\n') for (const line of lines) { if (line.startsWith('data: ')) { const data = line.substring(6).trim() // 检查是否为结束标记 if (data === doneMarker) { // 流式响应结束 const result = { data: { message: accumulatedData.content, ...accumulatedData.metadata } } // 调用完成回调 if (onComplete) { onComplete(result) } resolve(result) return } try { const parsed = JSON.parse(data) // 处理内容数据 if (parsed.content) { accumulatedData.content += parsed.content // 调用数据块回调 if (onChunk) { onChunk({ content: parsed.content, totalContent: accumulatedData.content, chunk: parsed }) } } // 更新元数据 if (parsed.isSensitive !== undefined) { accumulatedData.metadata.isSensitive = parsed.isSensitive } if (parsed.tokenCount !== undefined) { accumulatedData.metadata.tokenCount = parsed.tokenCount } if (parsed.responseTime !== undefined) { accumulatedData.metadata.responseTime = parsed.responseTime } // 保存其他元数据 Object.keys(parsed).forEach(key => { if (!['content', 'isSensitive', 'tokenCount', 'responseTime'].includes(key)) { accumulatedData.metadata[key] = parsed[key] } }) } catch (parseError) { console.warn('解析SSE数据失败:', parseError, data) // 继续处理其他数据,不中断流程 } } } // 如果没有收到结束标记,也认为完成 const result = { data: { message: accumulatedData.content, ...accumulatedData.metadata } } if (onComplete) { onComplete(result) } resolve(result) } catch (error) { console.error('处理SSE响应失败:', error) if (onError) { onError(error) } reject(error) } }, fail: (error) => { console.error('SSE请求失败:', error) if (onError) { onError(error) } reject(error) } }) // 返回请求任务,允许外部取消请求 return requestTask }) } /** * 创建带有默认配置的SSE请求 * @param {string} endpoint API端点路径 * @param {Object} data 请求数据 * @param {Object} callbacks 回调函数 {onChunk, onComplete, onError} * @returns {Promise} SSE请求Promise */ export function createDefaultSSERequest(endpoint, data, callbacks = {}) { const baseURL = uni.$u?.http?.config?.baseURL || '' return createSSERequest({ url: `${baseURL}${endpoint}`, method: 'POST', data, ...callbacks }) } /** * 专用于宠物助手的SSE请求 * @param {Object} data 请求数据 * @param {Object} callbacks 回调函数 * @returns {Promise} SSE请求Promise */ export function createPetAssistantSSE(data, callbacks = {}) { return createDefaultSSERequest('/pet/user/assistant/stream-ask', data, callbacks) } /** * SSE请求状态枚举 */ export const SSE_STATUS = { PENDING: 'pending', STREAMING: 'streaming', COMPLETED: 'completed', ERROR: 'error', CANCELLED: 'cancelled' } /** * 创建可取消的SSE请求 * @param {Object} options 配置选项 * @returns {Object} 返回包含promise和cancel方法的对象 */ export function createCancellableSSE(options) { let requestTask = null let status = SSE_STATUS.PENDING const promise = new Promise((resolve, reject) => { const enhancedOptions = { ...options, onChunk: (chunk) => { status = SSE_STATUS.STREAMING if (options.onChunk) { options.onChunk(chunk) } }, onComplete: (result) => { status = SSE_STATUS.COMPLETED if (options.onComplete) { options.onComplete(result) } resolve(result) }, onError: (error) => { status = SSE_STATUS.ERROR if (options.onError) { options.onError(error) } reject(error) } } requestTask = createSSERequest(enhancedOptions) }) return { promise, cancel: () => { if (requestTask && status === SSE_STATUS.STREAMING) { requestTask.abort?.() status = SSE_STATUS.CANCELLED } }, getStatus: () => status } } export default { createSSERequest, createDefaultSSERequest, createPetAssistantSSE, createCancellableSSE, SSE_STATUS }