• 大文件分片上传-续传-秒传(详解)


    前言

    前面记录过使用库实现的大文件的分片上传
    基于WebUploader实现大文件分片上传
    基于vue-simple-uploader 实现大文件分片上传

    前面记录过基于库实现的大文件的分片上传,那如果不使用库,
    文件分片是怎么实现的,该怎么做到呢?
    一起看看吧
    
    • 1
    • 2
    • 3

    思路

    1、文件分片、
    2、每个文件标识、
    3、并发上传、
    4、合并组装
    5、上传前查询是否存在
    
    • 1
    • 2
    • 3
    • 4
    • 5

    实现

    读取文件

    通过监听 input 的 change 事件,当选取了本地文件后,可以在回调函数中拿到对应的文件:

    const handleUpload = (e: Event) => {
      const files = (e.target as HTMLInputElement).files
      if (!files) {
        return
      }
      // 读取选择的文件
      console.log(files[0]);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    文件分片

    核心是用Blob 对象的 slice 方法,用法如下:

    let blob = instanceOfBlob.slice([start [, end [, contentType]]]};
    start 和 end 代表 Blob 里的下标,表示被拷贝进新的 Blob 的字节的起始位置和结束位置。
    contentType 会给新的 Blob 赋予一个新的文档类型,在这里我们用不到。
    
    • 1
    • 2
    • 3

    使用slice方法来实现下对文件的分片,获取分片的文件列表

    const createFileChunks = (file: File) => {
      const fileChunkList = []
      let cur = 0
      while (cur < file.size) {
        fileChunkList.push({
          file: file.slice(cur, cur + CHUNK_SIZE),
        })
        cur += CHUNK_SIZE // CHUNK_SIZE为分片的大小
      }
      return fileChunkList
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    hash 计算

    怎么区分每一个文件呢?
    1、根据文件名去区分,不可以,因为文件名我们可以是随便修改的;
    2、我们见过用 webpack 打包出来的文件的文件名,会有一串不一样的字符串,这个字符串就是根据文件的内容生成的 hash 值,文件内容变化,hash 值就会跟着发生变化。
    3、而且妙传实现也是基于此:
    服务器在处理上传文件的请求的时候,要先判断下对应文件的 hash 值有没有记录,如果 A 和 B 先后上传一份内容相同的文件,
    所以这两份文件的 hash 值是一样的。当 A 上传的时候会根据文件内容生成一个对应的 hash 值,然后在服务器上就会有一个对应的文件,B 再上传的时候,服务器就会发现这个文件的 hash 值之前已经有记录了,说明之前
    已经上传过相同内容的文件了,所以就不用处理 B 的这个上传请求了,给用户的感觉就像是实现了秒传
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    spark-md5

    我们得先安装spark-md5。我们就可以用文件的所有切片来算该文件的hash 值,
    但是如果一个文件特别大,每个切片的所有内容都参与计算的话会很耗时间,所有我们可以采取以下策略:
    1、第一个和最后一个切片的内容全部参与计算;
    2、中间剩余的切片我们分别在前面、后面和中间取 2 个字节参与计算;
    3、既能保证所有的切片参与了计算,也能保证不耗费很长的时间
    
    • 1
    • 2
    • 3
    • 4
    • 5

    安装使用

    npm install spark-md5
    npm install @types/spark-md5 -D
    
    import SparkMD5 from 'spark-md5'
    
    • 1
    • 2
    • 3
    • 4
    /**
     * 计算文件的hash值,计算的时候并不是根据所用的切片的内容去计算的,那样会很耗时间,我们采取下面的策略去计算:
     * 1. 第一个和最后一个切片的内容全部参与计算
     * 2. 中间剩余的切片我们分别在前面、后面和中间取2个字节参与计算
     * 这样做会节省计算hash的时间
     */
    const calculateHash = async (fileChunks: Array<{file: Blob}>) => {
      return new Promise(resolve => {
        const spark = new sparkMD5.ArrayBuffer()
        const chunks: Blob[] = []
    
        fileChunks.forEach((chunk, index) => {
          if (index === 0 || index === fileChunks.length - 1) {
            // 1. 第一个和最后一个切片的内容全部参与计算
            chunks.push(chunk.file)
          } else {
            // 2. 中间剩余的切片我们分别在前面、后面和中间取2个字节参与计算
            // 前面的2字节
            chunks.push(chunk.file.slice(0, 2))
            // 中间的2字节
            chunks.push(chunk.file.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2))
            // 后面的2字节
            chunks.push(chunk.file.slice(CHUNK_SIZE - 2, CHUNK_SIZE))
          }
        })
    
        const reader = new FileReader()
        reader.readAsArrayBuffer(new Blob(chunks))
        reader.onload = (e: Event) => {
          spark.append(e?.target?.result as ArrayBuffer)
          resolve(spark.end())
        }
      })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    文件上传前端实现

    const uploadChunks = async (fileChunks: Array<{ file: Blob }>) => {
      const data = fileChunks.map(({ file }, index) => ({
        fileHash: fileHash.value,
        index,
        chunkHash: `${fileHash.value}-${index}`,
        chunk: file,
        size: file.size,
      }))
    
      const formDatas = data.map(({ chunk, chunkHash }) => {
        const formData = new FormData()
        // 切片文件
        formData.append('chunk', chunk)
        // 切片文件hash
        formData.append('chunkHash', chunkHash)
        // 大文件的文件名
        formData.append('fileName', fileName.value)
        // 大文件hash
        formData.append('fileHash', fileHash.value)
        return formData
      })
    
      let index = 0
      const max = 6 // 并发请求数量
      const taskPool: any = [] // 请求队列
    
      while (index < formDatas.length) {
        const task = fetch('http://127.0.0.1:3000/upload', {
          method: 'POST',
          body: formDatas[index],
        })
    
        task.then(() => {
          taskPool.splice(taskPool.findIndex((item: any) => item === task))
        })
        taskPool.push(task)
        if (taskPool.length === max) {
          // 当请求队列中的请求数达到最大并行请求数的时候,得等之前的请求完成再循环下一个
          await Promise.race(taskPool)
        }
        index++
        percentage.value = ((index / formDatas.length) * 100).toFixed(0)
      }
    
      await Promise.all(taskPool)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    文件上传后端实现

    后端 express 框架,用到的工具包:multiparty、fs-extra、cors、body-parser、nodemon
    
    后端我们处理文件时需要用到 multiparty 这个工具,所以也是得先安装,然后再引入它。
    我们在处理每个上传的分片的时候,应该先将它们临时存放到服务器的一个地方,方便我们合并的时候再去读
    取。为了区分不同文件的分片,我们就用文件对应的那个 hash 为文件夹的名称,将这个文件的所有分片放到这
    个文件夹中。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    // 所有上传的文件存放到该目录下
    const UPLOAD_DIR = path.resolve(__dirname, 'uploads')
    
    // 处理上传的分片
    app.post('/upload', async (req, res) => {
      const form = new multiparty.Form()
    
      form.parse(req, async function (err, fields, files) {
        if (err) {
          res.status(401).json({
            ok: false,
            msg: '上传失败',
          })
        }
        const chunkHash = fields['chunkHash'][0]
        const fileName = fields['fileName'][0]
        const fileHash = fields['fileHash'][0]
    
        // 存储切片的临时文件夹
        const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
    
        // 切片目录不存在,则创建切片目录
        if (!fse.existsSync(chunkDir)) {
          await fse.mkdirs(chunkDir)
        }
    
        const oldPath = files.chunk[0].path
        // 把文件切片移动到我们的切片文件夹中
        await fse.move(oldPath, path.resolve(chunkDir, chunkHash))
    
        res.status(200).json({
          ok: true,
          msg: 'received file chunk',
        })
      })
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    写完前后端代码后就可以来试下看看文件能不能实现切片的上传,如果没有错误的话,我们的 uploads 文件
    夹下应该就会多一个文件夹,这个文件夹里面就是存储的所有文件的分片了。
    
    • 1
    • 2

    文件合并前端实现

    核心:切片合并
    前端只需要向服务器发送一个合并的请求,并且为了区分要合并的文件,需要将文件的 hash 值给传过去
    
    • 1
    • 2
    /**
     * 发请求通知服务器,合并切片
     */
    const mergeRequest = () => {
      // 发送合并请求
      fetch('http://127.0.0.1:3000/merge', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          size: CHUNK_SIZE,
          fileHash: fileHash.value,
          fileName: fileName.value,
        }),
      })
        .then((response) => response.json())
        .then(() => {
          alert('上传成功')
        })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    文件合并后端实现

    之前已经将所有的切片上传到服务器并存储到对应的目录里面去了,
    合并的时候需要从对应的文件夹中获取所有的切片,然后利用文件的读写操作,实现文件的合并了。
    合并完成之后,我们将生成的文件以 hash 值命名存放到对应的位置就可以了
    
    • 1
    • 2
    • 3
    // 提取文件后缀名
    const extractExt = (filename) => {
      return filename.slice(filename.lastIndexOf('.'), filename.length)
    }
    
    /**
     * 读的内容写到writeStream中
     */
    const pipeStream = (path, writeStream) => {
      return new Promise((resolve, reject) => {
        // 创建可读流
        const readStream = fse.createReadStream(path)
        readStream.on('end', async () => {
          fse.unlinkSync(path)
          resolve()
        })
        readStream.pipe(writeStream)
      })
    }
    
    /**
     * 合并文件夹中的切片,生成一个完整的文件
     */
    async function mergeFileChunk(filePath, fileHash, size) {
      const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
      const chunkPaths = await fse.readdir(chunkDir)
      // 根据切片下标进行排序
      // 否则直接读取目录的获得的顺序可能会错乱
      chunkPaths.sort((a, b) => {
        return a.split('-')[1] - b.split('-')[1]
      })
    
      const list = chunkPaths.map((chunkPath, index) => {
        return pipeStream(
          path.resolve(chunkDir, chunkPath),
          fse.createWriteStream(filePath, {
            start: index * size,
            end: (index + 1) * size,
          }),
        )
      })
    
      await Promise.all(list)
      // 文件合并后删除保存切片的目录
      fse.rmdirSync(chunkDir)
    }
    
    // 合并文件
    app.post('/merge', async (req, res) => {
      const { fileHash, fileName, size } = req.body
      const filePath = path.resolve(UPLOAD_DIR, `${fileHash}${extractExt(fileName)}`)
      // 如果大文件已经存在,则直接返回
      if (fse.existsSync(filePath)) {
        res.status(200).json({
          ok: true,
          msg: '合并成功',
        })
        return
      }
      const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
      // 切片目录不存在,则无法合并切片,报异常
      if (!fse.existsSync(chunkDir)) {
        res.status(200).json({
          ok: false,
          msg: '合并失败,请重新上传',
        })
        return
      }
      await mergeFileChunk(filePath, fileHash, size)
      res.status(200).json({
        ok: true,
        msg: '合并成功',
      })
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    文件秒传&断点续传

    服务器上给上传的文件命名的时候就是用对应的 hash 值命名的,
    所以在上传之前判断有对应的这个文件,就不用再重复上传了,
    直接告诉用户上传成功,给用户的感觉就像是实现了秒传。
    
    • 1
    • 2
    • 3
    文件秒传-前端
    前端在上传之前,需要将对应文件的 hash 值告诉服务器,看看服务器上有没有对应的这个文件,
    如果有,就直接返回,不执行上传分片的操作了
    
    • 1
    • 2
    /**
     * 验证该文件是否需要上传,文件通过hash生成唯一,改名后也是不需要再上传的,也就相当于秒传
     */
    const verifyUpload = async () => {
      return fetch('http://127.0.0.1:3000/verify', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          fileName: fileName.value,
          fileHash: fileHash.value,
        }),
      })
        .then((response) => response.json())
        .then((data) => {
          return data // data中包含对应的表示服务器上有没有该文件的查询结果
        })
    }
    
    // 点击上传事件
    const handleUpload = async (e: Event) => {
      // ...
    
      // uploadedList已上传的切片的切片文件名称
      const res = await verifyUpload()
    
      const { shouldUpload } = res.data
    
      if (!shouldUpload) {
        // 服务器上已经有该文件,不需要上传
        alert('秒传:上传成功')
        return
      }
    
      // 服务器上不存在该文件,继续上传
      uploadChunks(fileChunks)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    文件秒传-后端
    // 根据文件hash验证文件有没有上传过
    app.post('/verify', async (req, res) => {
      const { fileHash, fileName } = req.body
      const filePath = path.resolve(UPLOAD_DIR, `${fileHash}${extractExt(fileName)}`)
    
      if (fse.existsSync(filePath)) {
        // 文件存在服务器中,不需要再上传了
        res.status(200).json({
          ok: true,
          data: {
            shouldUpload: false,
          },
        })
      } else {
        // 文件不在服务器中,就需要上传
        res.status(200).json({
          ok: true,
          data: {
            shouldUpload: true,
          },
        })
      }
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    文件断点续传-前端
    如果我们之前已经上传了一部分分片了,我们只需要再上传之前拿到这部分分片,
    然后再过滤掉是不是就可以避免去重复上传这些分片了,也就是只需要上传那些上传失败的分片,
    所以,再上传之前还得加一个判断。
    我们还是在那个 verify 的接口中去获取已经上传成功的分片,然后在上传分片前进行一个过滤
    
    • 1
    • 2
    • 3
    • 4
    const uploadChunks = async (fileChunks: Array<{ file: Blob }>, uploadedList: Array<string>) => {
      const formDatas = fileChunks
        .filter((chunk, index) => {
          // 过滤服务器上已经有的切片
          return !uploadedList.includes(`${fileHash.value}-${index}`)
        })
        .map(({ file }, index) => {
          const formData = new FormData()
          // 切片文件
          formData.append('file', file)
          // 切片文件hash
          formData.append('chunkHash', `${fileHash.value}-${index}`)
          // 大文件的文件名
          formData.append('fileName', fileName.value)
          // 大文件hash
          formData.append('fileHash', fileHash.value)
          return formData
        })
    
      // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    文件断点续传-后端
    只需在 /verify 这个接口中加上已经上传成功的所有切片的名称就可以,
    因为所有的切片都存放在以文件的 hash 值命名的那个文件夹,
    所以需要读取这个文件夹中所有的切片的名称就可以。
    
    • 1
    • 2
    • 3
    /**
     * 返回已经上传切片名
     * @param {*} fileHash
     * @returns
     */
    const createUploadedList = async (fileHash) => {
      return fse.existsSync(path.resolve(UPLOAD_DIR, fileHash))
        ? await fse.readdir(path.resolve(UPLOAD_DIR, fileHash)) // 读取该文件夹下所有的文件的名称
        : []
    }
    
    // 根据文件hash验证文件有没有上传过
    app.post('/verify', async (req, res) => {
      const { fileHash, fileName } = req.body
      const filePath = path.resolve(UPLOAD_DIR, `${fileHash}${extractExt(fileName)}`)
    
      if (fse.existsSync(filePath)) {
        // 文件存在服务器中,不需要再上传了
        res.status(200).json({
          ok: true,
          data: {
            shouldUpload: false,
          },
        })
      } else {
        // 文件不在服务器中,就需要上传,并且返回服务器上已经存在的切片
        res.status(200).json({
          ok: true,
          data: {
            shouldUpload: true,
            uploadedList: await createUploadedList(fileHash),
          },
        })
      }
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
  • 相关阅读:
    dfs之字符串拼接
    uniapp项目实践总结(二十五)苹果 ios 平台 APP 打包教程
    SpringBoot整合Mybatis(使用c3p0数据源)
    谈谈HashTable, HashMap, ConcurrentHashMap 之间的区别(一道经典的面试题)
    Python爬虫实战-批量爬取豆瓣电影排行信息
    【李沐深度学习笔记】矩阵计算(4)
    毫米波雷达和视觉传感器融合的检测仿真代码
    springboot+英语在线学习系统 毕业设计-附源码211714
    数据库计算机三级等级考试--数据库技术相关知识点和笔记
    springboot + easyRules 搭建规则引擎服务
  • 原文地址:https://blog.csdn.net/weixin_43909743/article/details/134075496