Feature: support operate the underlying incoming data for uploading large file

For file uploading, default behaviour will `ReadForm` and write data to new temporary file if oversize `maxMemory`.

If temporary directory size is too small, it will fail for uploading large file and even consume OS memory since copy to buffer.

Inspired by Flask, Flask `request.stream.read` will read underlying socket data directly if payload found. Refer to: https://werkzeug.palletsprojects.com/en/2.0.x/wsgi/#werkzeug.wsgi.LimitedStream

This method makes assumptions because the request payload exists for operating `POST/PATCH/PUT`.

A simple demo for uploading 5G large file in low memory based on docker:
```
[root@control-master tmp]# docker run -ti --rm --memory 1G --tmpfs /tmp:rw,size=1G,mode=1777 -v $(pwd)/go:/go golang:1.16.5 /bin/bash

root@b672a98e5314:/go/testmodules# cat main.go
package main

import (
	"fmt"
	"github.com/gin-gonic/gin"
	"io"
	"os"
	"path/filepath"
)

func main() {
	engine := gin.Default()
	engine.POST("/streamupload", func(c *gin.Context) {
		if c.GetHeader("Content-Type") != "application/octet-stream" {
			err := fmt.Errorf("required octet-stream")
			c.AbortWithStatusJSON(400, map[string]string{"message": err.Error()})
			return
		}
		info, err := os.Create(filepath.Join("/home", "foo"))
		if err != nil {
			c.AbortWithStatusJSON(400, gin.H{"message": "Create: "+err.Error()})
			return
		}
		defer info.Close()
		_, err = io.Copy(info, c.Request.Body)
		if err != nil {
			c.AbortWithStatusJSON(400, gin.H{"message": "Copy: "+err.Error()})
			return
		}
		c.JSON(200, map[string]string{"message": "ok stream"})
	})

	if err := engine.Run("0.0.0.0:19090"); err != nil {
		panic(err)
	}
}

root@b53810b3e294:/go/testmodules# GOTMPDIR=/opt/ go run main.go

[root@control-master ~]# dd if=/dev/zero of=5G bs=1M count=5170 status=progress
[root@control-master ~]# curl -vvv -H "Content-Type:application/octet-stream" -T 5G -X POST 172.17.0.2:19090/streamupload
```

Signed-off-by: Chenyang Yan <memory.yancy@gmail.com>
This commit is contained in:
Chenyang Yan 2022-01-26 20:10:00 +08:00
parent 580e7da6ee
commit df54454102
3 changed files with 54 additions and 0 deletions

View File

@ -22,6 +22,7 @@ const (
MIMEMSGPACK = "application/x-msgpack"
MIMEMSGPACK2 = "application/msgpack"
MIMEYAML = "application/x-yaml"
MIMEOctetStream = "application/octet-stream"
)
// Binding describes the interface which needs to be implemented for binding the

View File

@ -601,6 +601,27 @@ func (c *Context) SaveUploadedFile(file *multipart.FileHeader, dst string) error
return err
}
// SaveOctetStreamFile is useful for uploading large file since containing request data, it will operate underlying data stream to dst.
func (c *Context) SaveOctetStreamFile(dst string, flag int, perm os.FileMode) error {
if c.GetHeader("Content-Type") != binding.MIMEOctetStream {
return fmt.Errorf("octet stream required %s data format", binding.MIMEOctetStream)
}
method := c.Request.Method
// In particular, only support POST/PATCH/PUT for taking payload according to https://www.rfc-editor.org/rfc/rfc2616#section-9
if method != http.MethodPost && method != http.MethodPatch && method != http.MethodPut {
return fmt.Errorf("invalid http request method, only support POST/PATCH/PUT")
}
out, err := os.OpenFile(dst, flag, perm)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, c.Request.Body)
return err
}
// Bind checks the Method and Content-Type to select a binding engine automatically,
// Depending on the "Content-Type" header different bindings are used, for example:
// "application/json" --> JSON binding

View File

@ -143,6 +143,38 @@ func TestSaveUploadedCreateFailed(t *testing.T) {
assert.Error(t, c.SaveUploadedFile(f, "/"))
}
func TestSaveOctetStreamFile(t *testing.T) {
buf := new(bytes.Buffer)
_, err := buf.WriteString("large file binary content")
assert.NoError(t, err, "write string error for buffer")
c, _ := CreateTestContext(httptest.NewRecorder())
c.Request, _ = http.NewRequest("POST", "/", buf)
c.Request.Header.Set("Content-Type", binding.MIMEOctetStream)
assert.NoError(t, c.SaveOctetStreamFile("test", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644))
}
func TestSaveOctetStreamFileFailed(t *testing.T) {
buf := new(bytes.Buffer)
_, err := buf.WriteString("large file binary content")
assert.NoError(t, err, "write string error for buffer")
c, _ := CreateTestContext(httptest.NewRecorder())
c.Request, _ = http.NewRequest("GET", "/", buf)
c.Request.Header.Set("Content-Type", binding.MIMEOctetStream)
err = c.SaveOctetStreamFile("test", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
assert.Error(t, err)
assert.Contains(t, err.Error(), "support POST/PATCH/PUT")
c.Request, _ = http.NewRequest("PATCH", "/", buf)
c.Request.Header.Set("Content-Type", binding.MIMEJSON)
err = c.SaveOctetStreamFile("test", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
assert.Error(t, err)
assert.Contains(t, err.Error(), binding.MIMEOctetStream)
}
func TestContextReset(t *testing.T) {
router := New()
c := router.allocateContext()