Skip to content

Commit

Permalink
uploader4: POST /upload/upload-link/:uid
Browse files Browse the repository at this point in the history
  • Loading branch information
lovehunter9 committed Aug 13, 2024
1 parent 93aa63a commit 98dcdae
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 7 deletions.
250 changes: 243 additions & 7 deletions cmd/upload/app/handler4.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,258 @@ func (a *appController) UploadedBytes(c *fiber.Ctx) error {
}

//Generate unique Upload-ID
uploadID := uid.MakeUid(fullPath)
exist, info := a.server.fileInfoMgr.ExistFileInfo(uploadID)
fileExist, fileLen := a.server.fileInfoMgr.CheckTempFile(uploadID)
//uploadID := uid.MakeUid(fullPath)
resumableIdentifier := uid.GenerateUniqueIdentifier(fileName)
exist, info := a.server.fileInfoMgr.ExistFileInfo(resumableIdentifier)
fileExist, fileLen := a.server.fileInfoMgr.CheckTempFile(resumableIdentifier)
if exist {
if fileExist {
if info.Offset != fileLen {
info.Offset = fileLen
a.server.fileInfoMgr.UpdateInfo(uploadID, info)
a.server.fileInfoMgr.UpdateInfo(resumableIdentifier, info)
}
klog.Infof("uploadID:%s, info.Offset:%d", uploadID, info.Offset)
klog.Infof("resumableIdentifier:%s, info.Offset:%d", resumableIdentifier, info.Offset)
responseData["uploadedBytes"] = info.Offset
} else if info.Offset == 0 {
klog.Warningf("uploadID:%s, info.Offset:%d", uploadID, info.Offset)
klog.Warningf("resumableIdentifier:%s, info.Offset:%d", resumableIdentifier, info.Offset)
} else {
a.server.fileInfoMgr.DelFileInfo(uploadID)
a.server.fileInfoMgr.DelFileInfo(resumableIdentifier)
}
}
return c.JSON(responseData)
}

func (a *appController) UploadChunks(c *fiber.Ctx) error {
responseData := make(map[string]interface{})
responseData["success"] = true

uploadID := c.Params("uid")

if !utils.PathExists(fileutils.UploadsDir) {
if err := os.MkdirAll(fileutils.UploadsDir, os.ModePerm); err != nil {
klog.Warningf("uploadID:%s, err:%v", uploadID, err)
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, "failed to create folder", nil))
}
}

klog.Infof("uploadID:%s, c:%+v", uploadID, c)

var resumableInfo models.ResumableInfo
if err := c.BodyParser(&resumableInfo); err != nil {
klog.Warningf("uploadID:%s, err:%v", uploadID, err)
//todo check info valid
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "param invalid", nil))
}

if uploadID != uid.MakeUid(resumableInfo.ParentDir) {
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "invalid upload link", nil))
}

var err error
resumableInfo.File, err = c.FormFile("file")
if err != nil || resumableInfo.File == nil {
klog.Warningf("uploadID:%s, Failed to parse file: %v\n", uploadID, err)
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "param invalid", nil))
}

klog.Infof("uploadID:%s, patchInfo:%+v", uploadID, resumableInfo)

// Get file information based on upload ID
resumableIdentifier := resumableInfo.ResumableIdentifier
exist, info := a.server.fileInfoMgr.ExistFileInfo(resumableIdentifier)
if !exist {
klog.Warningf("resumableIdentifier %s not exist", resumableIdentifier)
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "Invalid resumableIdentifire", nil))
}
klog.Infof("resumableIdentifier:%s, info:%+v", resumableIdentifier, info)
if resumableIdentifier != info.ID {
klog.Warningf("resumableIdentifier:%s diff from info:%+v", resumableIdentifier, info)
}

if resumableInfo.ResumableChunkNumber == 1 {
//clear temp file and reset info
fileutils.RemoveTempFileAndInfoFile(resumableIdentifier)
if info.Offset != 0 {
info.Offset = 0
a.server.fileInfoMgr.UpdateInfo(resumableIdentifier, info)
}

//do creation when the first chunk
if !utils.CheckDirExist(resumableInfo.ParentDir) {
klog.Warningf("Parent dir %s is not exist or is not a dir", resumableInfo.ParentDir)
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "Parent dir is not exist or is not a dir", nil))
}

fullPath := filepath.Join(resumableInfo.ParentDir, resumableInfo.ResumableRelativePath)

dirPath := filepath.Dir(fullPath)

if !utils.CheckDirExist(dirPath) {
if err := os.MkdirAll(dirPath, os.ModePerm); err != nil {
klog.Warning("err:", err)
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, "failed to create folder", nil))
}
}

if resumableInfo.ResumableRelativePath == "" {
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "file_relative_path invalid", nil))
}

if strings.HasSuffix(resumableInfo.ResumableRelativePath, "/") {
klog.Warningf("full path %s is a dir", fullPath)
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, fmt.Sprintf("full path %s is a dir", fullPath), nil))
}

// Make support judgment after parsing the file type
if !a.server.checkType(resumableInfo.ResumableType) {
klog.Warningf("unsupported filetype:%s", resumableInfo.ResumableType)
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "Unsupported file type", nil))
}

if !a.server.checkSize(resumableInfo.ResumableTotalSize) {
klog.Warningf("Unsupported file size uploadSize:%d", resumableInfo.ResumableTotalSize)
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "Unsupported file size", nil))
}

//Generate unique Upload-ID
//uploadID := uid.MakeUid(uploadInfo.FullPath)
oExist, oInfo := a.server.fileInfoMgr.ExistFileInfo(resumableIdentifier)
oFileExist, oFileLen := a.server.fileInfoMgr.CheckTempFile(resumableIdentifier)
if oExist {
if oFileExist {
if oInfo.Offset != oFileLen {
oInfo.Offset = oFileLen
a.server.fileInfoMgr.UpdateInfo(resumableIdentifier, oInfo)
}
klog.Infof("resumableIdentifier:%s, info.Offset:%d", resumableIdentifier, oInfo.Offset)
//return c.Status(fiber.StatusOK).JSON(
// models.NewResponse(0, "success", info))
return c.JSON(responseData)
} else if oInfo.Offset == 0 {
klog.Warningf("resumableIdentifier:%s, info.Offset:%d", resumableIdentifier, oInfo.Offset)
//return c.Status(fiber.StatusOK).JSON(
// models.NewResponse(0, "success", info))
return c.JSON(responseData)
} else {
a.server.fileInfoMgr.DelFileInfo(resumableIdentifier)
}
}

fileInfo := models.FileInfo{
ID: resumableIdentifier,
Offset: 0,
FileMetaData: models.FileMetaData{
FileRelativePath: resumableInfo.ResumableRelativePath,
FileType: resumableInfo.ResumableType,
FileSize: resumableInfo.ResumableTotalSize,
StoragePath: resumableInfo.ParentDir,
FullPath: fullPath,
},
}

if oFileExist {
fileInfo.Offset = oFileLen
}

err = a.server.fileInfoMgr.AddFileInfo(resumableIdentifier, fileInfo)
if err != nil {
klog.Warningf("resumableIdentifier:%s, err:%v", resumableIdentifier, err)
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, "Error save file info", nil))
}

klog.Infof("resumableIdentifier:%s, fileInfo:%+v", resumableIdentifier, fileInfo)
//return c.Status(fiber.StatusOK).JSON(
// models.NewResponse(0, "success", fileInfo))
// can't return here
}

fileExist, fileLen := a.server.fileInfoMgr.CheckTempFile(resumableIdentifier)
if fileExist {
klog.Infof("resumableIdentifier %s temp file exist, info.Offset:%d, fileLen:%d", uploadID, info.Offset, fileLen)
if info.Offset != fileLen {
info.Offset = fileLen
a.server.fileInfoMgr.UpdateInfo(uploadID, info)
}
}

// Check if file size and offset match
//if patchInfo.UploadOffset != info.Offset {
// klog.Warningf("uploadID %s, patchInfo.UploadOffset:%d diff from info.Offset:%d, info:%v", uploadID, patchInfo.UploadOffset, info.Offset, info)
// return c.Status(fiber.StatusBadRequest).JSON(
// models.NewResponse(1, "Invalid offset", nil))
//}

fileHeader := resumableInfo.File
size := fileHeader.Size

klog.Infof("fileHeader.Size:%d, info.Offset:%d, info.FileSize:%d",
fileHeader.Size, info.Offset, info.FileSize)
if !a.server.checkSize(size) || size+info.Offset > info.FileSize {
klog.Warningf("Unsupported file size uploadSize:%d", size)
return c.Status(fiber.StatusBadRequest).JSON(
models.NewResponse(1, "Unsupported file size", nil))
}

// Write the file contents to the file at the specified path
fileSize, err := fileutils.SaveFile(fileHeader, fileutils.GetTempFilePathById(resumableIdentifier))
if err != nil {
klog.Warningf("resumableIdentifier:%s, info:%+v, err:%v", resumableIdentifier, info, err)
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, err.Error(), info))
}

info.Offset = fileSize
a.server.fileInfoMgr.UpdateInfo(resumableIdentifier, info)

// Update file information for debug
err = fileutils.UpdateFileInfo(info)
if err != nil {
klog.Warningf("resumableIdentifier:%s, info:%+v, err:%v", resumableIdentifier, info, err)
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, err.Error(), info))
}

// Check if the file has been written
if fileSize == info.FileSize {
// Move the file to the specified upload path
err = fileutils.MoveFileByInfo(info)
if err != nil {
klog.Warningf("resumableIdentifier:%s, info:%+v, err:%v", resumableIdentifier, info, err)
return c.Status(fiber.StatusInternalServerError).JSON(
models.NewResponse(1, err.Error(), info))
}
a.server.fileInfoMgr.DelFileInfo(resumableIdentifier)

klog.Infof("resumableIdentifier:%s File uploaded successfully info:%+v", resumableIdentifier, info)
// Return successful response

finishData := []map[string]interface{}{
{
"name": resumableInfo.ResumableFilename,
"id": uid.MakeUid(info.FullPath),
"size": info.FileSize,
},
}
return c.JSON(finishData)
//return c.Status(fiber.StatusOK).JSON(
// models.NewResponse(0, "File uploaded successfully", info))
}

klog.Infof("resumableIdentifier:%s File Continue uploading info:%+v", resumableIdentifier, info)

//return c.Status(fiber.StatusOK).JSON(
// models.NewResponse(0, "Continue uploading", info))
return c.JSON(responseData)
}
1 change: 1 addition & 0 deletions cmd/upload/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (server *Server) ServerRun() {

server.app.Get("/upload/upload-link", server.controller.UploadLink)
server.app.Get("/upload/file-uploaded-bytes", server.controller.UploadedBytes)
server.app.Post("/upload/upload-link/:uid", server.controller.UploadChunks)

klog.Info("upload server listening on 40030")
klog.Fatal(server.app.Listen(":40030"))
Expand Down
14 changes: 14 additions & 0 deletions pkg/upload/models/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,17 @@ type FilePatchInfo struct {
File *multipart.FileHeader `json:"file" form:"file" binding:"required"`
UploadOffset int64 `json:"upload_offset" form:"upload_offset" binding:"required"`
}

type ResumableInfo struct {
ResumableChunkNumber int `json:"resumableChunkNumber" form:"resumableChunkNumber"`
ResumableChunkSize int64 `json:"resumableChunkSize" form:"resumableChunkSize"`
ResumableCurrentChunkSize int64 `json:"resumableCurrentChunkSize" form:"resumableCurrentChunkSize"`
ResumableTotalSize int64 `json:"resumableTotalSize" form:"resumableTotalSize"`
ResumableType string `json:"resumableType" form:"resumableType"`
ResumableIdentifier string `json:"resumableIdentifier" form:"resumableIdentifier"`
ResumableFilename string `json:"resumableFilename" form:"resumableFilename"`
ResumableRelativePath string `json:"resumableRelativePath" form:"resumableRelativePath"`
ResumableTotalChunks int `json:"resumableTotalChunks" form:"resumableTotalChunks"`
ParentDir string `json:"parent_dir" form:"parent_dir"`
File *multipart.FileHeader `json:"file" form:"file" binding:"required"`
}
9 changes: 9 additions & 0 deletions pkg/upload/uid/uid.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"crypto/md5"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"k8s.io/klog/v2"
"time"
)

// uid returns a unique id. These ids consist of 128 bits from a
Expand All @@ -30,3 +32,10 @@ func MakeUid(filePath string) string {
klog.Infof("filePath:%s, uid:%s", filePath, md5String)
return md5String
}

func GenerateUniqueIdentifier(relativePath string) string {
// 计算 MD5 哈希
h := md5.New()
io.WriteString(h, relativePath+time.Now().String())
return fmt.Sprintf("%x%s", h.Sum(nil), relativePath)
}

0 comments on commit 98dcdae

Please sign in to comment.