袋鼠云数栈产品中 AI+ 实现原理剖析

  • 袋鼠云数栈产品中 AI+ 实现原理剖析已关闭评论
  • 133 次浏览
  • A+
所属分类:Web前端
摘要

我们是袋鼠云数栈 UED 团队,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值。


我们是袋鼠云数栈 UED 团队,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值。

本文作者:修能

生产力工具 + AI 是不可逆转的趋势,慢慢的大模型能力通过 AI Agent 落地的工程化能力也开始趋于成熟。作为大数据产品的数栈也必然是需要借助 AI 能力提升产品竞争力。
去年 12 月,我们在产品中上线了 AI+ 的功能,借助已经开源的大模型的能力,帮助我们探索和落地更多地应用场景。在初版 AI+ 的功能中,我们实现了基础功能的通话。

SSE

在 ChatGPT 中,我们在等待大模型生成回答的时间通常不需要很久。这是因为 ChatGPT 通过 server-sent events(SSE)来实现将生成的部分回答通过事件流传递到前端。而这就让前端不必等回答全部生成后再获取,也就使得不需要请求等待很久。

SSE 是一种基于 HTTP 协议的单向通信机制,用于服务端向客户端推送数据。

SSE WebSocket
基于 HTTP 协议 基于 TCP 连接,本身是一种协议
单向通信 双向通信
简单易用 复杂

入门使用

// 创建 SSE 的实例 const evtSource = new EventSource("//api.example.com/ssedemo.php", {   withCredentials: true, });  // 添加监听事件 evtSource.onmessage = (event) => {   const newElement = document.createElement("li");   const eventList = document.getElementById("list");    newElement.textContent = `message: ${event.data}`;   eventList.appendChild(newElement); };  // 错误处理 evtSource.onerror = (err) => {   console.error("EventSource failed:", err); };  // 关闭事件流 evtSource.close(); 

需要注意的是,SSE 请求的服务端响应信息头的 MIME 类型必须是text/event-stream,否则会无法监听到事件。
另外,由于是基于 HTTP 协议的,所以在 HTTP/1.1 或更低的时候,会受浏览器最大连接数的限制。


Fields

收到的消息格式一定是具有以下字段的某种组合,其他字段名都将忽略,每行一个:

  • event
  • data
  • id
  • retry
: this is a test stream // 第一条消息,这会被解析会注释  data: some text // 第二条消息  data: another message // 第三条消息 data: with two lines  event: userconnect // 第四条消息 data: {"username": "bobby", "time": "02:33:48"} 

如上所示,默认浏览器的 EventSource API 虽然可用,但是限制比较多。

  1. 只支持 url 和 withCredentials 参数。不支持往 body 里传参数。而通常来说 URL 是有最大长度限制的。
  2. 无法自定义请求头。
  3. 只能发起 GET 请求。

其实,我们也可以通过 Fetch 来实现 SSE 的通信,只不过需要额外自行处理数据流的传递。

实现

首先,我们借助 Fetch 的能力来实现请求。

const response = await fetch(url, options); 

通过接受用户提供的 url 和 options 发起一个 fetch 的请求。
然后,我们需要排除掉非 SSE 的请求类型,我们可以直接拿响应的 header 中拿 content-type进行判断。

const contentType = response.headers.get('content-type'); if (!contentType?.startsWith('text/event-stream')) {     throw new Error('SSE 请求必须设置 content-type 为 text/event-stream'); } 

接着,我们业务场景中通常直接通过 response.json()获取 JSON 格式的数据了,但这里我们由于是事件流,所以我们通过 response.body 拿到的是一个 ReadableStream。我们需要借助相关的 API 进行流的读取。

const reader = response.body.getReader(); let result: ReadableStreamDefaultReadResult<Uint8Array>; while (!(result = await reader.read()).done) {   	// 假定每一次 read 的 value 都是完整的消息     onmessage(onChunk(result.value)); } 

其中 onChunk 函数就是处理事件流中的每一份数据的。

// 伪代码 function onChunk(arr: Uint8Array){   const links = seekLinks();   // 待完善 } 

在实现 seekLinks 方法之前,我们需要先知道到什么时候算每一行的结束。


从 Fields 可以知道,每一行是以n作为区分的。

function seekLinks(arr: Uint8Array){   const lines = [];   const buffer = arr;   const bufLength = buffer.length;   let position = 0;   let lineStart = 0;   while(position < bufLength){     // 'n'.charCodeAt() === 10;     if(buffer[position] === 10){       lines.push(buffer.slice(lineStart, position));       lineStart = position;     };     position += 1;   }   return lines; } 

在获取到所有行后,针对每一行做处理。

// 伪代码 function onChunk(arr: Uint8Array){   const links = seekLinks();   const decoder = new TextDecoder();   let message = {     data: '',     event: '',     id: '',     retry: undefined,   }:   links.forEach((line) => {     // ':'.charCodeAt() === 58;     const colon = line.findIndex(l => l === 58);     const fieldArr = line.slice(0, colon);     const valueArr = line.slice(colon);     if(colon === -1){       // 当冒号作为开头的时候,解析成注释       return;     }     const field = decoder.decode(fieldArr);     const value = decoder.decode(valueArr);     switch (field) {       case 'data':           message.data = message.data               ? message.data + 'n' + value               : value;           break;       case 'event':           message.event = value;           break;       case 'id':           message.id = value;           break;       case 'retry':           const retry = parseInt(value, 10);           message.retry = retry           break;   	}   });   return message; } 

大致完成了最简单的基础功能的解析,而以上伪代码参考 fetch-event-source 的源码。


借助 fetch-event-source 的能力,在数栈产品中调用的方式和 HTTP 请求基本保持一致。

function sse(url: string, params: any, options: FetchEventSourceInit) {   const headers = {     'Content-Type': 'application/json',     accept: 'text/event-stream',   };   fetchEventSource(url, {     method: 'POST',     body: JSON.stringify(params),     headers,     ...options,   }); } 

打字机效果

接着,我们实现具备科技感的打字机效果:
袋鼠云数栈产品中 AI+ 实现原理剖析

输出

这里我们不能直接将响应的消息直接打印到屏幕上,因为响应的消息通常是好多字,这样子会导致打字机效果显得非常卡顿,用户体验不佳。
在数栈产品中,我们通过将响应的消息收集到暂存区中,然后通过每秒从暂存区中取出若干个字符打印到屏幕上,优化打字机卡顿的效果。

function AIGC(){    const typing = useTyping({       // 暂存区启动后,每个 delay 的时间都会执行该方法将消息打印到屏幕上       onTyping(val) {         // ...       },   }); 	const handleChat = (message: string) => {       // 标志暂存区需要开始存响应的消息了       typing.start();       requestChat(params, {         onmessage(event: { data: string }) {            	const { data } = event;             // 把响应的消息存入暂存区中             typing.push(data);         },         onclose() {             // 关闭或失败的话,释放暂存区的数据             typing.close();         },         onerror() {             typing.close();         },     });   }; } 

其中,相关暂存区的代码整理成 useTyping 实现。

export default function useTyping({     onTyping,     onEnd, }: {     onTyping: (val: string) => void;     onEnd: () => void; }) {     const interval = useRef<number>();     const queue = useRef<string>('');     const isStart = useRef<boolean>(false);      function startTyping() {         if (interval.current) return;         let index = 0;         interval.current = window.setInterval(() => {             if (index < queue.current.length) {                 const str = queue.current;                 onTyping(str.slice(0, index + 1));                 index++;             } else if (!isStart.current) {                 // 如果发送了全部的消息且信号关闭,则清空队列                 window.clearInterval(interval.current);                 interval.current = 0;                 onEnd();             }             // 如果发送了全部的消息,但是信号没有关闭,则什么都不做继续轮训等待新的消息         }, 50);     }      useEffect(() => {         return () => {             window.clearInterval(interval.current);             interval.current = 0;         };     }, []);      function start() {         isStart.current = true;         window.clearInterval(interval.current);         interval.current = 0;         queue.current = '';     }      function push(str: string) {         if (!isStart.current) return;         queue.current += str.replace(/\n/g, 'n');         startTyping();     }      // 关闭的时候不需要清空队列,因为可能还有一些消息没有发送完毕,统一等消息发送完毕后关闭     function close() {         isStart.current = false;     }      return { start, push, close }; } 

光标

在实现了打字机效果后,我们还需要添加一个闪烁的光标。
原理比较简单,就是在消息区域的最后一个元素的末尾添加元素即可。

.markdown {   >*:last-child::after {     content: " ";     width: 2px;     height: 13px;     transform: translate(1px, 2px);     font-family: Menlo, Monaco, "Courier New", monospace;     font-weight: normal;     font-size: 0;     font-feature-settings: "liga" 0, "calt" 0;     line-height: 13px;     letter-spacing: 0;     display: inline-block;     visibility: hidden;     animation: blinker 1s step-end infinite;     background: #000;   }    @keyframes blinker {     0% {       visibility: inherit;     }     50% {       visibility: hidden;     }     100% {       visibility: inherit;     }   } } 

当然,这里有一些问题,在 markdown 解析出 Code Block 的时候会导致光标错位,这个问题 ChatGPT 同样也有。
袋鼠云数栈产品中 AI+ 实现原理剖析


那么到这里,我们就实现了一个具备基础功能的 AI+ 的需求。

最后

欢迎关注【袋鼠云数栈UED团队】~
袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star