• Vue 3 中用组合式函数和 Shared Worker 实现后台分片上传(带哈希计算)


    01. 背景

    最近项目需求里有个文件上传功能,而客户需求里的文件基本上是比较大的,基本上得有 1 GiB 以上的大小,而上传大文件尤其是读大文件,可能会造成卡 UI 或者说点不动的问题。而用后台的 Worker 去实现是一个比较不错的解决办法。

    02. 原理讲解

    02.01. Shared Worker

    Shared Worker 的好处是可以从几个浏览上下文中访问,例如几个窗口、iframe 或其他 worker。这样我们可以保证全局的页面上传任务都在我们的控制之下,甚至可以防止重复提交等功能。

    02.02. 组合式函数

    组合式函数的好处是在 Vue 3 是可以在任何 *.vue 文件中使用,并且是响应式方法,可以侦听 pinia 内 token 等的变化,传递给 Worker

    02.03 简单流程设计

    用户选择文件
    创建上传任务
    任务推送到 Worker
    上传到服务器
    Worker 返回任务状态
    组合式函数拦截状态放到 Map 里

    03. 代码

    upload-worker.ts 代码

    import { sha256 } from '@noble/hashes/sha256';
    import { bytesToHex as toHex } from '@noble/hashes/utils';
    interface SharedWorkerGlobalScope {
      onconnect: (event: MessageEvent<any>) => void;
    }
    const _self: SharedWorkerGlobalScope = self as any;
    /**
     * 分片大小
     */
    const pieceSize = 1024 * 1024;
    /**
     * 消息参数
     */
    interface MessageArg {
      /**
       * 函数名
       */
      func: string;
      /**
       * 参数
       */
      arg: T;
    }
    /**
     * 上传任务信息
     */
    interface UploadTaskInfo {
      /**
       * 文件名
       */
      fileName: string;
      /**
       * 上传路径
       */
      uploadPath: string;
      /**
       * 任务 id
       */
      id: string;
      /**
       * 文件大小
       */
      size: number;
      /**
       * 上传进度
       */
      progress: number;
      /**
       * 上传速度
       */
      speed?: number;
      /**
       * 任务状态
       */
      status: 'uploading' | 'paused' | 'canceled' | 'done' | 'error' | 'waiting';
      /**
       * 开始时间
       */
      startTime?: Date;
      /**
       * 结束时间
       */
      endTime?: Date;
      /**
       * 错误信息
       */
      errorMessage?: string;
    }
    /**
     * 上传任务
     */
    interface UploadTask extends UploadTaskInfo {
      file: File;
      pieces: Array<boolean>;
      abort?: AbortController;
    }
    /**
     * 任务/哈希值映射
     */
    const hashs = new Map();
    /**
     * 上传任务列表
     */
    const uploadTasks: Array<UploadTask> = [];
    /**
     * 状态接收器
     */
    const statusReceivers = new Map<string, MessagePort>();
    /**
     * token 仓库
     */
    const tokenStore = {
      /**
       * token
       */
      BearerToken: '',
    };
    /**
     * 返回上传状态
     * @param task 上传任务
     */
    const updateStatus = (task: UploadTaskInfo) => {
      const taskInfo: UploadTaskInfo = {
        fileName: task.fileName,
        uploadPath: task.uploadPath,
        id: task.id,
        size: task.size,
        progress: task.progress,
        speed: task.speed,
        status: task.status,
        startTime: task.startTime,
        endTime: task.endTime,
        errorMessage: task.errorMessage,
      };
      statusReceivers.forEach((item) => {
        item.postMessage(taskInfo);
      });
    };
    /**
     * 运行上传任务
     * @param task 上传任务
     */
    const runUpload = async (task: UploadTask) => {
      task.status = 'uploading';
      const hash = hashs.get(task.id) || sha256.create();
      hashs.set(task.id, hash);
      let retryCount = 0;
      const abort = new AbortController();
      task.abort = abort;
      while (task.status === 'uploading') {
        const startTime = Date.now();
        const index = task.pieces.findIndex((item) => !item);
        if (index === -1) {
          try {
            const response: { code: number; message: string } = await fetch(
              '/api/File/Upload',
              {
                method: 'PUT',
                headers: {
                  Authorization: tokenStore.BearerToken,
                  'Content-Type': 'application/json',
                },
                body: JSON.stringify({
                  id: task.id,
                  fileHash: toHex(hash.digest()),
                  filePath: task.uploadPath,
                }),
              }
            ).then((res) => res.json());
            if (response.code !== 200) {
              throw new Error(response.message);
            }
            task.status = 'done';
            task.endTime = new Date();
            updateStatus(task);
          } catch (e: any) {
            task.status = 'error';
            task.errorMessage = e.toString();
            task.endTime = new Date();
            deleteUpload(task.id);
            updateStatus(task);
          }
          break;
        }
        const start = index * pieceSize;
        const end = start + pieceSize >= task.size ? task.size : start + pieceSize;
        const buffer = task.file.slice(index * pieceSize, end);
        hash.update(new Uint8Array(await buffer.arrayBuffer()));
        const form = new FormData();
        form.append('file', buffer);
        let isTimeout = false;
        try {
          const timer = setTimeout(() => {
            isTimeout = true;
            abort.abort();
          }, 8000);
          const response: { code: number; message: string } = await fetch(
            `/api/File/Upload?id=${task.id}&offset=${start}`,
            {
              method: 'POST',
              body: form,
              headers: {
                Authorization: tokenStore.BearerToken,
              },
              signal: abort.signal,
            }
          ).then((res) => res.json());
          clearTimeout(timer);
          if (response.code !== 200) {
            throw new Error(response.message);
          }
          task.pieces[index] = true;
          task.progress =
            task.pieces.filter((item) => item).length / task.pieces.length;
          task.speed = (pieceSize / (Date.now() - startTime)) * 1000;
          updateStatus(task);
        } catch (e: any) {
          retryCount++;
          if (retryCount > 3) {
            task.status = 'error';
            if (isTimeout) {
              task.errorMessage = 'UploadTimeout';
            } else {
              task.errorMessage = e.toString();
            }
            task.endTime = new Date();
            deleteUpload(task.id);
            updateStatus(task);
          }
        }
        runNextUpload();
      }
    };
    /**
     * 运行下一个上传任务
     */
    const runNextUpload = async () => {
      if (uploadTasks.filter((item) => item.status === 'uploading').length > 3) {
        return;
      }
      const task = uploadTasks.find((item) => item.status === 'waiting');
      if (task) {
        await runUpload(task);
      }
    };
    /**
     * 排队上传
     * @param e 消息事件
     */
    const queueUpload = async (
      e: MessageEvent<
        MessageArg<{
          id: string;
          file: File;
          uploadPath: string;
        }>
      >
    ) => {
      uploadTasks.push({
        file: e.data.arg.file,
        fileName: e.data.arg.file.name,
        id: e.data.arg.id,
        uploadPath: e.data.arg.uploadPath,
        size: e.data.arg.file.size,
        progress: 0,
        speed: 0,
        status: 'waiting',
        pieces: new Array(Math.ceil(e.data.arg.file.size / pieceSize)).fill(false),
        errorMessage: undefined,
      });
      updateStatus(uploadTasks[uploadTasks.length - 1]);
      await runNextUpload();
    };
    /**
     * 注册状态接收器
     * @param e 消息事件
     * @param sender 发送者
     */
    const registerStatusReceiver = (
      e: MessageEventstring>>,
      sender?: MessagePort
    ) => {
      if (sender) statusReceivers.set(e.data.arg, sender);
    };
    /**
     * 注销状态接收器
     * @param e 消息事件
     */
    const unregisterStatusReceiver = (e: MessageEventstring>>) => {
      statusReceivers.delete(e.data.arg);
    };
    /**
     * 更新 token
     * @param e 消息事件
     */
    const updateToken = (e: MessageEventstring>>) => {
      tokenStore.BearerToken = 'Bearer ' + e.data.arg;
    };
    /**
     * 暂停上传
     * @param e 消息事件
     */
    const pauseUpload = (e: MessageEventstring>>) => {
      const task = uploadTasks.find((item) => item.id === e.data.arg);
      if (task) {
        task.status = 'paused';
        if (task.abort) {
          task.abort.abort();
        }
        updateStatus(task);
      }
    };
    /**
     * 取消上传
     * @param e 消息事件
     */
    const cancelUpload = (e: MessageEventstring>>) => {
      const task = uploadTasks.find((item) => item.id === e.data.arg);
      if (task) {
        task.status = 'canceled';
        if (task.abort) {
          task.abort.abort();
        }
        deleteUpload(task.id);
        updateStatus(task);
      }
    };
    /**
     * 删除上传
     * @param id 任务 id
     */
    const deleteUpload = async (id: string) => {
      uploadTasks.splice(
        uploadTasks.findIndex((item) => item.id === id),
        1
      );
      hashs.delete(id);
      await fetch(`/api/File/Upload?id=${id}`, {
        method: 'DELETE',
        headers: {
          Authorization: tokenStore.BearerToken,
        },
      }).then((res) => res.json());
    };
    /**
     * 消息路由
     */
    const messageRoute = new Map<
      string,
      (e: MessageEventany>>, sender?: MessagePort) => void
    >([
      ['queueUpload', queueUpload],
      ['registerStatusReceiver', registerStatusReceiver],
      ['updateToken', updateToken],
      ['pauseUpload', pauseUpload],
      ['cancelUpload', cancelUpload],
      ['unregisterStatusReceiver', unregisterStatusReceiver],
    ]);
    // 监听连接
    _self.onconnect = (e) => {
      const port = e.ports[0];
      port.onmessage = async (e) => {
        // 调用函数
        const func = messageRoute.get(e.data.func);
        if (func) {
          func(e, port);
        }
      };
      port.start();
    };
    
    

    upload-service.ts 代码

    import UploadWorker from './upload-worker?sharedworker';
    import { onUnmounted, ref, watch } from 'vue';
    import { storeToRefs } from 'pinia';
    import { useAuthStore } from 'src/stores/auth';
    /**
     * 上传任务信息
     */
    interface UploadTaskInfo {
      /**
       * 文件名
       */
      fileName: string;
      /**
       * 上传路径
       */
      uploadPath: string;
      /**
       * 任务 id
       */
      id: string;
      /**
       * 文件大小
       */
      size: number;
      /**
       * 上传进度
       */
      progress: number;
      /**
       * 上传速度
       */
      speed?: number;
      /**
       * 任务状态
       */
      status: 'uploading' | 'paused' | 'canceled' | 'done' | 'error' | 'waiting';
      /**
       * 开始时间
       */
      startTime?: Date;
      /**
       * 结束时间
       */
      endTime?: Date;
      /**
       * 错误信息
       */
      errorMessage?: string;
    }
    /**
     * 上传服务
     */
    export const useUploadService = () => {
      const store = storeToRefs(useAuthStore());
      // 创建共享 worker
      const worker = new UploadWorker();
      /**
       * 上传任务列表
       */
      const uploadTasks = ref<Map<string, UploadTaskInfo>>(
        new Map<string, UploadTaskInfo>()
      );
      // 是否已注册状态接收器
      const isRegistered = ref(false);
      // 服务 id
      const serviceId = crypto.randomUUID();
      // 监听上传任务列表变化(只有在注册状态接收器后才会收到消息)
      worker.port.onmessage = (e: MessageEvent) => {
        uploadTasks.value.set(e.data.id, e.data);
      };
      // 更新 token
      worker.port.postMessage({
        func: 'updateToken',
        arg: store.token.value,
      });
      watch(store.token, (token) => {
        worker.port.postMessage({
          func: 'updateToken',
          arg: token,
        });
      });
      /**
       * 排队上传
       * @param file 文件
       * @param uploadPath 上传路径
       */
      const queueUpload = (file: File, uploadPath: string) => {
        worker.port.postMessage({
          func: 'queueUpload',
          arg: {
            id: crypto.randomUUID(),
            file: file,
            uploadPath: uploadPath,
          },
        });
      };
      /**
       * 暂停上传
       * @param id 任务 id
       */
      const pauseUpload = (id: string) => {
        worker.port.postMessage({
          func: 'pauseUpload',
          arg: id,
        });
      };
      /**
       * 取消上传
       * @param id 任务 id
       */
      const cancelUpload = (id: string) => {
        worker.port.postMessage({
          func: 'cancelUpload',
          arg: id,
        });
      };
      /**
       * 注册状态接收器
       */
      const registerStatusReceiver = () => {
        worker.port.postMessage({
          func: 'registerStatusReceiver',
          arg: serviceId,
        });
        isRegistered.value = true;
      };
      /**
       * 注销状态接收器
       */
      const unregisterStatusReceiver = () => {
        worker.port.postMessage({
          func: 'unregisterStatusReceiver',
          arg: serviceId,
        });
        isRegistered.value = false;
      };
      onUnmounted(() => {
        unregisterStatusReceiver();
        worker.port.close();
      });
      return {
        uploadTasks,
        queueUpload,
        pauseUpload,
        cancelUpload,
        registerStatusReceiver,
        unregisterStatusReceiver,
      };
    };
    
    

    04. 用法

    // 引入组合式函数
    const uploadService = useUploadService();
    // 注册状态接收器
    uploadService.registerStatusReceiver();
    // 表单绑定上传方法
    const upload = (file: File, filePath: string) => {
      uploadService.queueUpload(file, filePath);
    }
    // 监听上传进度,当然也可以直接展示在界面,毕竟是 Ref
    watch(uploadService.uploadTasks, console.log);
    
  • 相关阅读:
    ElasticSearch浅谈
    信息化发展49
    Tomcat安装与配置
    企业为什么要做等保?不做等保有什么后果?
    【随想录】-【8 回溯算法】【组合问题】40 组合总和Ⅱ
    【软件测试】(北京)字节跳动科技有限公司二面笔试题
    人工智能教程(一)
    168. Excel表列名称
    Golang区块链钱包
    【macOS付费软件推荐】第4期:Coherence X
  • 原文地址:https://www.cnblogs.com/aobaxu/p/17790793.html