- A+
所属分类:Web前端
01. 背景
最近项目需求里有个文件上传功能,而客户需求里的文件基本上是比较大的,基本上得有 1 GiB 以上的大小,而上传大文件尤其是读大文件,可能会造成卡 UI 或者说点不动的问题。而用后台的 Worker 去实现是一个比较不错的解决办法。
02. 原理讲解
02.01. Shared Worker
Shared Worker 的好处是可以从几个浏览上下文中访问,例如几个窗口、iframe 或其他 worker。这样我们可以保证全局的页面上传任务都在我们的控制之下,甚至可以防止重复提交等功能。
02.02. 组合式函数
组合式函数的好处是在 Vue 3 是可以在任何 *.vue
文件中使用,并且是响应式方法,可以侦听 pinia 内 token 等的变化,传递给 Worker
02.03 简单流程设计
flowchart TB id1[用户选择文件] --> id2[创建上传任务] id2 --> id3[任务推送到 Worker] id3 --> id4[上传到服务器] id4 --> id5[Worker 返回任务状态] id5 --> id6[组合式函数拦截状态放到 Map 里]
03. 代码
upload-worker.ts
代码
import { sha256 } from '@noble/hashes/sha256'; import { bytesToHex as toHex } from '@noble/hashes/utils'; interface SharedWorkerGlobalScope { onconnect: (event: MessageEvent<any>) => void; } const _self: SharedWorkerGlobalScope = self as any; /** * 分片大小 */ const pieceSize = 1024 * 1024; /** * 消息参数 */ interface MessageArg<T> { /** * 函数名 */ func: string; /** * 参数 */ arg: T; } /** * 上传任务信息 */ interface UploadTaskInfo { /** * 文件名 */ fileName: string; /** * 上传路径 */ uploadPath: string; /** * 任务 id */ id: string; /** * 文件大小 */ size: number; /** * 上传进度 */ progress: number; /** * 上传速度 */ speed?: number; /** * 任务状态 */ status: 'uploading' | 'paused' | 'canceled' | 'done' | 'error' | 'waiting'; /** * 开始时间 */ startTime?: Date; /** * 结束时间 */ endTime?: Date; /** * 错误信息 */ errorMessage?: string; } /** * 上传任务 */ interface UploadTask extends UploadTaskInfo { file: File; pieces: Array<boolean>; abort?: AbortController; } /** * 任务/哈希值映射 */ const hashs = new Map(); /** * 上传任务列表 */ const uploadTasks: Array<UploadTask> = []; /** * 状态接收器 */ const statusReceivers = new Map<string, MessagePort>(); /** * token 仓库 */ const tokenStore = { /** * token */ BearerToken: '', }; /** * 返回上传状态 * @param task 上传任务 */ const updateStatus = (task: UploadTaskInfo) => { const taskInfo: UploadTaskInfo = { fileName: task.fileName, uploadPath: task.uploadPath, id: task.id, size: task.size, progress: task.progress, speed: task.speed, status: task.status, startTime: task.startTime, endTime: task.endTime, errorMessage: task.errorMessage, }; statusReceivers.forEach((item) => { item.postMessage(taskInfo); }); }; /** * 运行上传任务 * @param task 上传任务 */ const runUpload = async (task: UploadTask) => { task.status = 'uploading'; const hash = hashs.get(task.id) || sha256.create(); hashs.set(task.id, hash); let retryCount = 0; const abort = new AbortController(); task.abort = abort; while (task.status === 'uploading') { const startTime = Date.now(); const index = task.pieces.findIndex((item) => !item); if (index === -1) { try { const response: { code: number; message: string } = await fetch( '/api/File/Upload', { method: 'PUT', headers: { Authorization: tokenStore.BearerToken, 'Content-Type': 'application/json', }, body: JSON.stringify({ id: task.id, fileHash: toHex(hash.digest()), filePath: task.uploadPath, }), } ).then((res) => res.json()); if (response.code !== 200) { throw new Error(response.message); } task.status = 'done'; task.endTime = new Date(); updateStatus(task); } catch (e: any) { task.status = 'error'; task.errorMessage = e.toString(); task.endTime = new Date(); deleteUpload(task.id); updateStatus(task); } break; } const start = index * pieceSize; const end = start + pieceSize >= task.size ? task.size : start + pieceSize; const buffer = task.file.slice(index * pieceSize, end); hash.update(new Uint8Array(await buffer.arrayBuffer())); const form = new FormData(); form.append('file', buffer); let isTimeout = false; try { const timer = setTimeout(() => { isTimeout = true; abort.abort(); }, 8000); const response: { code: number; message: string } = await fetch( `/api/File/Upload?id=${task.id}&offset=${start}`, { method: 'POST', body: form, headers: { Authorization: tokenStore.BearerToken, }, signal: abort.signal, } ).then((res) => res.json()); clearTimeout(timer); if (response.code !== 200) { throw new Error(response.message); } task.pieces[index] = true; task.progress = task.pieces.filter((item) => item).length / task.pieces.length; task.speed = (pieceSize / (Date.now() - startTime)) * 1000; updateStatus(task); } catch (e: any) { retryCount++; if (retryCount > 3) { task.status = 'error'; if (isTimeout) { task.errorMessage = 'UploadTimeout'; } else { task.errorMessage = e.toString(); } task.endTime = new Date(); deleteUpload(task.id); updateStatus(task); } } runNextUpload(); } }; /** * 运行下一个上传任务 */ const runNextUpload = async () => { if (uploadTasks.filter((item) => item.status === 'uploading').length > 3) { return; } const task = uploadTasks.find((item) => item.status === 'waiting'); if (task) { await runUpload(task); } }; /** * 排队上传 * @param e 消息事件 */ const queueUpload = async ( e: MessageEvent< MessageArg<{ id: string; file: File; uploadPath: string; }> > ) => { uploadTasks.push({ file: e.data.arg.file, fileName: e.data.arg.file.name, id: e.data.arg.id, uploadPath: e.data.arg.uploadPath, size: e.data.arg.file.size, progress: 0, speed: 0, status: 'waiting', pieces: new Array(Math.ceil(e.data.arg.file.size / pieceSize)).fill(false), errorMessage: undefined, }); updateStatus(uploadTasks[uploadTasks.length - 1]); await runNextUpload(); }; /** * 注册状态接收器 * @param e 消息事件 * @param sender 发送者 */ const registerStatusReceiver = ( e: MessageEvent<MessageArg<string>>, sender?: MessagePort ) => { if (sender) statusReceivers.set(e.data.arg, sender); }; /** * 注销状态接收器 * @param e 消息事件 */ const unregisterStatusReceiver = (e: MessageEvent<MessageArg<string>>) => { statusReceivers.delete(e.data.arg); }; /** * 更新 token * @param e 消息事件 */ const updateToken = (e: MessageEvent<MessageArg<string>>) => { tokenStore.BearerToken = 'Bearer ' + e.data.arg; }; /** * 暂停上传 * @param e 消息事件 */ const pauseUpload = (e: MessageEvent<MessageArg<string>>) => { const task = uploadTasks.find((item) => item.id === e.data.arg); if (task) { task.status = 'paused'; if (task.abort) { task.abort.abort(); } updateStatus(task); } }; /** * 取消上传 * @param e 消息事件 */ const cancelUpload = (e: MessageEvent<MessageArg<string>>) => { const task = uploadTasks.find((item) => item.id === e.data.arg); if (task) { task.status = 'canceled'; if (task.abort) { task.abort.abort(); } deleteUpload(task.id); updateStatus(task); } }; /** * 删除上传 * @param id 任务 id */ const deleteUpload = async (id: string) => { uploadTasks.splice( uploadTasks.findIndex((item) => item.id === id), 1 ); hashs.delete(id); await fetch(`/api/File/Upload?id=${id}`, { method: 'DELETE', headers: { Authorization: tokenStore.BearerToken, }, }).then((res) => res.json()); }; /** * 消息路由 */ const messageRoute = new Map< string, (e: MessageEvent<MessageArg<any>>, sender?: MessagePort) => void >([ ['queueUpload', queueUpload], ['registerStatusReceiver', registerStatusReceiver], ['updateToken', updateToken], ['pauseUpload', pauseUpload], ['cancelUpload', cancelUpload], ['unregisterStatusReceiver', unregisterStatusReceiver], ]); // 监听连接 _self.onconnect = (e) => { const port = e.ports[0]; port.onmessage = async (e) => { // 调用函数 const func = messageRoute.get(e.data.func); if (func) { func(e, port); } }; port.start(); };
upload-service.ts
代码
import UploadWorker from './upload-worker?sharedworker'; import { onUnmounted, ref, watch } from 'vue'; import { storeToRefs } from 'pinia'; import { useAuthStore } from 'src/stores/auth'; /** * 上传任务信息 */ interface UploadTaskInfo { /** * 文件名 */ fileName: string; /** * 上传路径 */ uploadPath: string; /** * 任务 id */ id: string; /** * 文件大小 */ size: number; /** * 上传进度 */ progress: number; /** * 上传速度 */ speed?: number; /** * 任务状态 */ status: 'uploading' | 'paused' | 'canceled' | 'done' | 'error' | 'waiting'; /** * 开始时间 */ startTime?: Date; /** * 结束时间 */ endTime?: Date; /** * 错误信息 */ errorMessage?: string; } /** * 上传服务 */ export const useUploadService = () => { const store = storeToRefs(useAuthStore()); // 创建共享 worker const worker = new UploadWorker(); /** * 上传任务列表 */ const uploadTasks = ref<Map<string, UploadTaskInfo>>( new Map<string, UploadTaskInfo>() ); // 是否已注册状态接收器 const isRegistered = ref(false); // 服务 id const serviceId = crypto.randomUUID(); // 监听上传任务列表变化(只有在注册状态接收器后才会收到消息) worker.port.onmessage = (e: MessageEvent<UploadTaskInfo>) => { uploadTasks.value.set(e.data.id, e.data); }; // 更新 token worker.port.postMessage({ func: 'updateToken', arg: store.token.value, }); watch(store.token, (token) => { worker.port.postMessage({ func: 'updateToken', arg: token, }); }); /** * 排队上传 * @param file 文件 * @param uploadPath 上传路径 */ const queueUpload = (file: File, uploadPath: string) => { worker.port.postMessage({ func: 'queueUpload', arg: { id: crypto.randomUUID(), file: file, uploadPath: uploadPath, }, }); }; /** * 暂停上传 * @param id 任务 id */ const pauseUpload = (id: string) => { worker.port.postMessage({ func: 'pauseUpload', arg: id, }); }; /** * 取消上传 * @param id 任务 id */ const cancelUpload = (id: string) => { worker.port.postMessage({ func: 'cancelUpload', arg: id, }); }; /** * 注册状态接收器 */ const registerStatusReceiver = () => { worker.port.postMessage({ func: 'registerStatusReceiver', arg: serviceId, }); isRegistered.value = true; }; /** * 注销状态接收器 */ const unregisterStatusReceiver = () => { worker.port.postMessage({ func: 'unregisterStatusReceiver', arg: serviceId, }); isRegistered.value = false; }; onUnmounted(() => { unregisterStatusReceiver(); worker.port.close(); }); return { uploadTasks, queueUpload, pauseUpload, cancelUpload, registerStatusReceiver, unregisterStatusReceiver, }; };
04. 用法
// 引入组合式函数 const uploadService = useUploadService(); // 注册状态接收器 uploadService.registerStatusReceiver(); // 表单绑定上传方法 const upload = (file: File, filePath: string) => { uploadService.queueUpload(file, filePath); } // 监听上传进度,当然也可以直接展示在界面,毕竟是 Ref watch(uploadService.uploadTasks, console.log);