Skip to content

Commit

Permalink
http: optimize response buffer usage
Browse files Browse the repository at this point in the history
  • Loading branch information
lesismal committed Jan 1, 2025
1 parent 03a5bbf commit 9dea439
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 55 deletions.
10 changes: 10 additions & 0 deletions nbhttp/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"sync"
"time"

"github.com/lesismal/nbio/mempool"
)

var (
Expand Down Expand Up @@ -65,6 +67,12 @@ func releaseRequest(req *http.Request, retainHTTPBody bool) {
//go:norace
func releaseResponse(res *Response) {
if res != nil {
if res.buffer != nil {
mempool.Free(res.buffer)
}
if res.bodyBuffer != nil {
mempool.Free(res.bodyBuffer)
}
*res = emptyResponse
responsePool.Put(res)
}
Expand Down Expand Up @@ -284,6 +292,8 @@ func (p *ServerProcessor) flushResponse(parser *Parser, res *Response) {
if conn != nil {
req := res.request
if !res.hijacked {
res.WriteHeader(http.StatusOK)
res.checkChunked()
res.eoncodeHead()
if err := res.flush(conn); err != nil {
conn.Close()
Expand Down
192 changes: 137 additions & 55 deletions nbhttp/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ func (res *Response) WriteHeader(statusCode int) {
res.header.Del(contentLengthHeader)
}
}

res.checkChunked()
}
}

const maxPacketSize = 65536
const halfPacketSize = 32768

// WriteString .
//
Expand All @@ -110,46 +109,12 @@ func (res *Response) Write(data []byte) (int, error) {
}

res.WriteHeader(http.StatusOK)
res.checkChunked()

res.hasBody = true

if res.chunked {
res.eoncodeHead()

pbuf := res.buffer
hl := len(*pbuf)
res.buffer = nil
lenStr := res.formatInt(l, 16)
size := hl + len(lenStr) + l + 4
if size < maxPacketSize {
pbuf = mempool.AppendString(pbuf, lenStr)
pbuf = mempool.AppendString(pbuf, "\r\n")
pbuf = mempool.Append(pbuf, data...)
pbuf = mempool.AppendString(pbuf, "\r\n")
res.buffer = pbuf
return l, nil
}
pbuf = mempool.AppendString(pbuf, lenStr)
pbuf = mempool.AppendString(pbuf, "\r\n")
_, err := conn.Write(*pbuf)
mempool.Free(pbuf)
if err != nil {
return 0, err
}
pbuf = mempool.Malloc(len(data) + 2)
*pbuf = (*pbuf)[0:0]
pbuf = mempool.Append(pbuf, data...)
pbuf = mempool.AppendString(pbuf, "\r\n")
if len(*pbuf) < maxPacketSize {
res.buffer = pbuf
return l, nil
}
_, err = conn.Write(*pbuf)
mempool.Free(pbuf)
if err != nil {
return 0, err
}
return l, nil
return res.writeChunk(conn, data, l)
}

cl, err := res.contentLength()
Expand All @@ -159,29 +124,137 @@ func (res *Response) Write(data []byte) (int, error) {
if cl > 0 && res.bodyWritten+l > cl {
return 0, http.ErrContentLength
}
if len(res.header[contentLengthHeader]) > 0 {

if cl > 0 {
res.eoncodeHead()

pbuf := res.buffer
res.buffer = nil

// Header has been sent, no cached head buffer,
// append the data to body buffer.
if pbuf == nil {
return conn.Write(data)
goto APPEND_BODY
}
pbuf = mempool.Append(pbuf, data...)
l, err := conn.Write(*pbuf)
mempool.Free(pbuf)
if l >= 0 {
res.bodyWritten += l

// If has header buffer and total size < maxPacketSize,
// set the header buffer as the body buffer and process
// the new body data.
// Else, send header buffer first, then process the data.
if len(*pbuf)+len(data) < maxPacketSize {
res.bodyBuffer = pbuf
goto APPEND_BODY
} else {
_, err = conn.Write(*pbuf)
mempool.Free(pbuf)
pbuf = nil
if err != nil {
return 0, err
}
}
return l, err
}

APPEND_BODY:
if res.bodyBuffer == nil {
// If "Content-Length" has been set,
// no cached buffer, and
// the data size >= maxPacketSize,
// send the data directly.
if cl > 0 && len(data) >= maxPacketSize {
res.bodyWritten += l
return conn.Write(data)
}

// Prepare a new buffer for caching the data.
res.bodyBuffer = mempool.Malloc(l)
*res.bodyBuffer = (*res.bodyBuffer)[0:0]
} else if cl > 0 && len(*res.bodyBuffer)+len(data) >= maxPacketSize {
// If "Content-Length" has been set,
// has cached buffer, and
// the data total size >= maxPacketSize,
// send the cached buffer first.
_, err = conn.Write(*res.bodyBuffer)
if err != nil {
mempool.Free(res.bodyBuffer)
res.bodyBuffer = nil
return 0, err
}

// If the new data size >= maxPacketSize,
// send the new data directly.
if len(data) >= maxPacketSize {
res.bodyWritten += l
return conn.Write(data)
}

// Reset the cached buffer.
*res.bodyBuffer = (*res.bodyBuffer)[0:0]
}
res.bodyBuffer = mempool.Append(res.bodyBuffer, data...)

// Append the data to the body buffer cache.
res.bodyWritten += l
// res.header[contentLengthHeader] = []string{res.formatInt(l, 10)}
res.bodyBuffer = mempool.Append(res.bodyBuffer, data...)

return l, nil
}

// writeChunk .
//
//go:norace
func (res *Response) writeChunk(conn net.Conn, data []byte, l int) (int, error) {
res.eoncodeHead()

pbuf := res.buffer
res.buffer = nil
lenStr := res.formatInt(l, 16)
totalSize := 0
if pbuf != nil {
totalSize = len(*pbuf) + len(lenStr) + l + 4
} else {
totalSize = len(lenStr) + l + 4
pbuf = mempool.Malloc(totalSize)
*pbuf = (*pbuf)[0:0]
}
if totalSize < maxPacketSize {
pbuf = mempool.AppendString(pbuf, lenStr)
pbuf = mempool.AppendString(pbuf, "\r\n")
pbuf = mempool.Append(pbuf, data...)
pbuf = mempool.AppendString(pbuf, "\r\n")
res.buffer = pbuf
return l, nil
}
lenStrWrote := false
firstLen := len(*pbuf) + len(lenStr) + 2
if cap(*pbuf) >= firstLen || firstLen <= halfPacketSize {
pbuf = mempool.AppendString(pbuf, lenStr)
pbuf = mempool.AppendString(pbuf, "\r\n")
lenStrWrote = true
}
_, err := conn.Write(*pbuf)
mempool.Free(pbuf)
if err != nil {
return 0, err
}
if lenStrWrote {
pbuf = mempool.Malloc(len(data) + 2)
*pbuf = (*pbuf)[0:0]
} else {
pbuf = mempool.Malloc(len(data) + len(lenStr) + 4)
*pbuf = (*pbuf)[0:0]
pbuf = mempool.AppendString(pbuf, lenStr)
pbuf = mempool.AppendString(pbuf, "\r\n")
}
pbuf = mempool.Append(pbuf, data...)
pbuf = mempool.AppendString(pbuf, "\r\n")
if len(*pbuf) < maxPacketSize {
res.buffer = pbuf
return l, nil
}
_, err = conn.Write(*pbuf)
mempool.Free(pbuf)
if err != nil {
return 0, err
}
return l, nil
}

Expand Down Expand Up @@ -265,8 +338,6 @@ func (res *Response) checkChunked() {

res.chunkChecked = true

// res.WriteHeader(http.StatusOK)

if res.request.ProtoAtLeast(1, 1) {
for _, v := range res.header[transferEncodingHeader] {
if v == "chunked" {
Expand Down Expand Up @@ -294,8 +365,6 @@ func (res *Response) eoncodeHead() {
return
}

res.WriteHeader(http.StatusOK)

res.headEncoded = true

status := res.status
Expand Down Expand Up @@ -390,9 +459,22 @@ func (res *Response) flush(conn io.Writer) error {
if !res.chunked {
if res.buffer != nil {
if res.bodyBuffer != nil {
res.buffer = mempool.Append(res.buffer, (*res.bodyBuffer)...)
mempool.Free(res.bodyBuffer)
res.bodyBuffer = nil
if len(*res.buffer)+len(*res.bodyBuffer) > maxPacketSize {
_, err = conn.Write(*res.buffer)
mempool.Free(res.buffer)
res.buffer = nil
if err != nil {
mempool.Free(res.bodyBuffer)
res.bodyBuffer = nil
return err
}
res.buffer = res.bodyBuffer
res.bodyBuffer = nil
} else {
res.buffer = mempool.Append(res.buffer, (*res.bodyBuffer)...)
mempool.Free(res.bodyBuffer)
res.bodyBuffer = nil
}
}
_, err = conn.Write(*res.buffer)
mempool.Free(res.buffer)
Expand Down

0 comments on commit 9dea439

Please sign in to comment.