- A+
我们是袋鼠云数栈 UED 团队,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值。
本文作者:修能
生产力工具 + AI 是不可逆转的趋势,慢慢的大模型能力通过 AI Agent 落地的工程化能力也开始趋于成熟。作为大数据产品的数栈也必然是需要借助 AI 能力提升产品竞争力。
去年 12 月,我们在产品中上线了 AI+ 的功能,借助已经开源的大模型的能力,帮助我们探索和落地更多地应用场景。在初版 AI+ 的功能中,我们实现了基础功能的通话。
SSE
在 ChatGPT 中,我们在等待大模型生成回答的时间通常不需要很久。这是因为 ChatGPT 通过 server-sent events(SSE)
来实现将生成的部分回答通过事件流传递到前端。而这就让前端不必等回答全部生成后再获取,也就使得不需要请求等待很久。
SSE 是一种基于 HTTP 协议的单向通信机制,用于服务端向客户端推送数据。
SSE | WebSocket |
---|---|
基于 HTTP 协议 | 基于 TCP 连接,本身是一种协议 |
单向通信 | 双向通信 |
简单易用 | 复杂 |
入门使用
// 创建 SSE 的实例 const evtSource = new EventSource("//api.example.com/ssedemo.php", { withCredentials: true, }); // 添加监听事件 evtSource.onmessage = (event) => { const newElement = document.createElement("li"); const eventList = document.getElementById("list"); newElement.textContent = `message: ${event.data}`; eventList.appendChild(newElement); }; // 错误处理 evtSource.onerror = (err) => { console.error("EventSource failed:", err); }; // 关闭事件流 evtSource.close();
需要注意的是,SSE 请求的服务端响应信息头的 MIME 类型必须是text/event-stream
,否则会无法监听到事件。
另外,由于是基于 HTTP 协议的,所以在 HTTP/1.1 或更低的时候,会受浏览器最大连接数的限制。
Fields
收到的消息格式一定是具有以下字段的某种组合,其他字段名都将忽略,每行一个:
event
data
id
retry
: this is a test stream // 第一条消息,这会被解析会注释 data: some text // 第二条消息 data: another message // 第三条消息 data: with two lines event: userconnect // 第四条消息 data: {"username": "bobby", "time": "02:33:48"}
如上所示,默认浏览器的 EventSource API 虽然可用,但是限制比较多。
- 只支持 url 和 withCredentials 参数。不支持往 body 里传参数。而通常来说 URL 是有最大长度限制的。
- 无法自定义请求头。
- 只能发起 GET 请求。
其实,我们也可以通过 Fetch 来实现 SSE 的通信,只不过需要额外自行处理数据流的传递。
实现
首先,我们借助 Fetch 的能力来实现请求。
const response = await fetch(url, options);
通过接受用户提供的 url 和 options 发起一个 fetch 的请求。
然后,我们需要排除掉非 SSE 的请求类型,我们可以直接拿响应的 header 中拿 content-type
进行判断。
const contentType = response.headers.get('content-type'); if (!contentType?.startsWith('text/event-stream')) { throw new Error('SSE 请求必须设置 content-type 为 text/event-stream'); }
接着,我们业务场景中通常直接通过 response.json()
获取 JSON 格式的数据了,但这里我们由于是事件流,所以我们通过 response.body
拿到的是一个 ReadableStream
。我们需要借助相关的 API 进行流的读取。
const reader = response.body.getReader(); let result: ReadableStreamDefaultReadResult<Uint8Array>; while (!(result = await reader.read()).done) { // 假定每一次 read 的 value 都是完整的消息 onmessage(onChunk(result.value)); }
其中 onChunk 函数就是处理事件流中的每一份数据的。
// 伪代码 function onChunk(arr: Uint8Array){ const links = seekLinks(); // 待完善 }
在实现 seekLinks
方法之前,我们需要先知道到什么时候算每一行的结束。
从 Fields 可以知道,每一行是以n
作为区分的。
function seekLinks(arr: Uint8Array){ const lines = []; const buffer = arr; const bufLength = buffer.length; let position = 0; let lineStart = 0; while(position < bufLength){ // 'n'.charCodeAt() === 10; if(buffer[position] === 10){ lines.push(buffer.slice(lineStart, position)); lineStart = position; }; position += 1; } return lines; }
在获取到所有行后,针对每一行做处理。
// 伪代码 function onChunk(arr: Uint8Array){ const links = seekLinks(); const decoder = new TextDecoder(); let message = { data: '', event: '', id: '', retry: undefined, }: links.forEach((line) => { // ':'.charCodeAt() === 58; const colon = line.findIndex(l => l === 58); const fieldArr = line.slice(0, colon); const valueArr = line.slice(colon); if(colon === -1){ // 当冒号作为开头的时候,解析成注释 return; } const field = decoder.decode(fieldArr); const value = decoder.decode(valueArr); switch (field) { case 'data': message.data = message.data ? message.data + 'n' + value : value; break; case 'event': message.event = value; break; case 'id': message.id = value; break; case 'retry': const retry = parseInt(value, 10); message.retry = retry break; } }); return message; }
大致完成了最简单的基础功能的解析,而以上伪代码参考 fetch-event-source 的源码。
借助 fetch-event-source 的能力,在数栈产品中调用的方式和 HTTP 请求基本保持一致。
function sse(url: string, params: any, options: FetchEventSourceInit) { const headers = { 'Content-Type': 'application/json', accept: 'text/event-stream', }; fetchEventSource(url, { method: 'POST', body: JSON.stringify(params), headers, ...options, }); }
打字机效果
接着,我们实现具备科技感的打字机效果:
输出
这里我们不能直接将响应的消息直接打印到屏幕上,因为响应的消息通常是好多字,这样子会导致打字机效果显得非常卡顿,用户体验不佳。
在数栈产品中,我们通过将响应的消息收集到暂存区中,然后通过每秒从暂存区中取出若干个字符打印到屏幕上,优化打字机卡顿的效果。
function AIGC(){ const typing = useTyping({ // 暂存区启动后,每个 delay 的时间都会执行该方法将消息打印到屏幕上 onTyping(val) { // ... }, }); const handleChat = (message: string) => { // 标志暂存区需要开始存响应的消息了 typing.start(); requestChat(params, { onmessage(event: { data: string }) { const { data } = event; // 把响应的消息存入暂存区中 typing.push(data); }, onclose() { // 关闭或失败的话,释放暂存区的数据 typing.close(); }, onerror() { typing.close(); }, }); }; }
其中,相关暂存区的代码整理成 useTyping
实现。
export default function useTyping({ onTyping, onEnd, }: { onTyping: (val: string) => void; onEnd: () => void; }) { const interval = useRef<number>(); const queue = useRef<string>(''); const isStart = useRef<boolean>(false); function startTyping() { if (interval.current) return; let index = 0; interval.current = window.setInterval(() => { if (index < queue.current.length) { const str = queue.current; onTyping(str.slice(0, index + 1)); index++; } else if (!isStart.current) { // 如果发送了全部的消息且信号关闭,则清空队列 window.clearInterval(interval.current); interval.current = 0; onEnd(); } // 如果发送了全部的消息,但是信号没有关闭,则什么都不做继续轮训等待新的消息 }, 50); } useEffect(() => { return () => { window.clearInterval(interval.current); interval.current = 0; }; }, []); function start() { isStart.current = true; window.clearInterval(interval.current); interval.current = 0; queue.current = ''; } function push(str: string) { if (!isStart.current) return; queue.current += str.replace(/\n/g, 'n'); startTyping(); } // 关闭的时候不需要清空队列,因为可能还有一些消息没有发送完毕,统一等消息发送完毕后关闭 function close() { isStart.current = false; } return { start, push, close }; }
光标
在实现了打字机效果后,我们还需要添加一个闪烁的光标。
原理比较简单,就是在消息区域的最后一个元素的末尾添加元素即可。
.markdown { >*:last-child::after { content: " "; width: 2px; height: 13px; transform: translate(1px, 2px); font-family: Menlo, Monaco, "Courier New", monospace; font-weight: normal; font-size: 0; font-feature-settings: "liga" 0, "calt" 0; line-height: 13px; letter-spacing: 0; display: inline-block; visibility: hidden; animation: blinker 1s step-end infinite; background: #000; } @keyframes blinker { 0% { visibility: inherit; } 50% { visibility: hidden; } 100% { visibility: inherit; } } }
当然,这里有一些问题,在 markdown 解析出 Code Block 的时候会导致光标错位,这个问题 ChatGPT 同样也有。
那么到这里,我们就实现了一个具备基础功能的 AI+ 的需求。
最后
欢迎关注【袋鼠云数栈UED团队】~
袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star