• 分布式文件存储——分块上传和断点续传


    分块上传和断点续传

    两个概念

    分块上传:文件切成多块,独立传输,上传完成后合并

    断点续传:传输暂停或异常中断后,可基于原来进度重传

    几点说明:

    1. 小文件不建议分块上传
    2. 可以并行上传,并且可以无序传输
    3. 分块上传可以极大提高传输效率,不过要注意分块上传文件的数量
    4. 减少传输失败后重试的流量及时间

    流程:

    1. 云端初始化上传文件的信息
    2. 客户端执行上传分块—>上传取消,查询上传信息
    3. 客户端通知云端上传完成

    服务架构

    redis缓存用于云端与客户端文件信息交互

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QJNbQrjS-1659840735242)(C:/Users/86158/AppData/Roaming/Typora/typora-user-images/image-20220807104825035.png)]

    分块上传通用接口

    1、初始化分块信息

    2、上传分块

    3、通知上传完成

    4、取消上传分块

    5、查看分块上传的整体状态

    断点续传

    基于分块上传机制实现,传输暂停或者异常中断后,可以基于原来的进度重传

    接口设计

       // 分块上传通用接口
       // 初始化分块信息
       router.POST("/file/mpupload/init",nil)
       // 上传分块
       router.POST("/file/mpupload/uppart",nil)
       // 通知分块上传完成
       router.POST("/file/mpupload/complete",nil)
       // 取消上传分块
       router.POST("/file/mpupload/cancel",nil)
       // 查看分块上传的整体状态
       router.POST("/file/mpupload/status",nil)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    初始化分块上传

    // 初始化分块上传
    func InitialMultipartUploadHandler(w http.ResponseWriter,r *http.Request,p httprouter.Params) {
       // 1.解析用户请求信息
       r.ParseForm()
       username := r.PostForm.Get("username")
       filehash := r.PostForm.Get("filehash")
       filesize, err := strconv.ParseInt(r.PostForm.Get("filesize"), 10, 64)
       if err != nil {
          response.RespMsg(w,defs.ErrorBadRequest)
          return
       }
    
       // 2.获得redis的连接
       redisConn := redis.RedisConn.Get()
       defer redisConn.Close()
    
       // 3.生成分块上传的初始化信息
       info := &MultipartUploadInfo{
          FileHash:filehash,
          FileSize:filesize,
          UploadID:username + utils.TimeGetNowTimeStr(),
          ChunkSize:5*1024*1024,// 5MD
          ChunkCount:int(math.Ceil(float64(filesize)/(5*1024*1024))),// 转float64除法在向上取整
       }
       // 4.将初始化信息写入到redis缓存
       redisConn.Do("set","name","age")
       redisConn.Do("HSET","MP_" + info.UploadID,"chunkcount",info.ChunkCount)
       redisConn.Do("HSET","MP_" + info.UploadID,"filehash",info.FileHash)
       redisConn.Do("HSET","MP_" + info.UploadID,"filesize",info.FileSize)
       redisConn.Do("HSET","MP_" + info.UploadID,"chunksize",info.ChunkSize)
       // 5.将相应信息初始化数据返回到客户端
       response.RespInputData(w,200,info)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    上传文件分块

    // 上传文件分块
    func UploadPartHandler(w http.ResponseWriter,r *http.Request,p httprouter.Params) {
       // 1.解析用户请求参数
       r.ParseForm()
       //username := r.Form.Get("username")
       uploadId := r.Form.Get("uploadid")
       chunkIndex := r.Form.Get("index")
    
       // 2.获得redis连接池中的一个连接
       redisConn := redis.RedisConn.Get()
       defer redisConn.Close()
    
       // 3.获得文件句柄,用于存储分块内容
       path := "data/" + uploadId
       err := utils.DirPing(path)
       if err != nil {
          response.RespMsg(w,defs.ErrorBadServer)
          return
       }
       file := path + "/" + chunkIndex
       openFile, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 00666)
       if err != nil {
          response.RespMsg(w,defs.ErrorBadServer)
          return
       }
       defer openFile.Close()
       data, _, err := r.FormFile("file")
       defer data.Close()
       if err != nil {
          response.RespMsg(w,defs.ErrorBadRequest)
          return
       }
       reader := bufio.NewReader(data)
       writer := bufio.NewWriter(openFile)
       buf := make([]byte, 1024*1024) // 1M buf
       for {
          _, err := reader.Read(buf)
          if err == io.EOF {
             break
          }else if err != nil {
             response.RespMsg(w,defs.ErrorBadRequest)
             return
          }else{
             writer.Write(buf)
          }
       }
       writer.Flush()
       // 4.更新redis缓存状态
       redisConn.Do("HSET","MP_"+uploadId,"chkidx_"+chunkIndex,1)
       // 5.返回处理结果到客户端
       response.RespInputMsg(w,200,"ok")
    
       // 不足之处,客户端上传需要携带当前分块的hash,服务端校验确保文件的完整性
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    通知上传合并接口

    // 通知上传合并接口
    func CompleteUploadHandler(w http.ResponseWriter,r *http.Request,p httprouter.Params) {
       // 1.解析请求参数
       r.ParseForm()
       upid := r.Form.Get("uploadid")
       username := r.Form.Get("username")
       filehash := r.Form.Get("filehash")
       filesize := r.Form.Get("filesize")
       filename := r.Form.Get("filename")
    
       // 2.获得redis连接池的一个连接
       redisConn := redis.RedisConn.Get()
       defer redisConn.Close()
    
       // 3.通过uploadid查询redis判断是否所有分块上传完成
       values, e := redis2.Values(redisConn.Do("HGETALL", "MP_"+upid))
       if e != nil {
          response.RespMsg(w,defs.ErrorBadRequest)
          return
       }
       totalCount := 0 // 上传完成数量
       chunkCount := 0 // 总数量
       for i:=0;i<len(values);i+=2{
          k := string(values[i].([]byte))
          v := string(values[i+1].([]byte))
          if k == "chunkcount" {
             totalCount,_=strconv.Atoi(v)
          }else if strings.HasPrefix(k,"chkidx_") && v == "1" {
             chunkCount += 1
          }
       }
       // 不等就是上传没有完成
       if totalCount != chunkCount {
          response.RespMsg(w,defs.ErrorBadRequest)
          return
       }
    
       // 4.合并分块
       // 5.更新唯一文件表,更新用户文件表
    
       // 6.相应处理结果
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
  • 相关阅读:
    【RuoYi-Vue-Plus】扩展笔记 07 - CentOS 7 集成 Prometheus + Grafana 监控初体验
    9.25 day 2
    SAP 公司间销售
    解析idea中的debug调试模式
    百看不厌的85M²现代极简装饰设计。福州中宅装饰,福州装修
    物流APP开发方案
    WebService: SpringBoot集成WebService实践二
    XGBoost实战2--数据预测保险赔偿
    工业物联网大数据解决方案:排水设备远程监控和大数据统计系统
    【图神经网络论文整理】(二)—— HOW ATTENTIVE ARE GRAPH ATTENTION NETWORKS?:GATv2
  • 原文地址:https://blog.csdn.net/qq_53267860/article/details/126207769