Skip to content

Commit

Permalink
add comments, export some errors
Browse files Browse the repository at this point in the history
  • Loading branch information
lesismal committed Apr 10, 2024
1 parent 103e0ce commit 852d34c
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 83 deletions.
9 changes: 6 additions & 3 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
)

var (
errReadTimeout = errors.New("read timeout")
errWriteTimeout = errors.New("write timeout")
errOverflow = errors.New("write overflow")
ErrReadTimeout = errors.New("read timeout")
errReadTimeout = ErrReadTimeout
ErrWriteTimeout = errors.New("write timeout")
errWriteTimeout = ErrWriteTimeout
ErrOverflow = errors.New("write overflow")
errOverflow = ErrOverflow
)
20 changes: 13 additions & 7 deletions nbhttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (hcs *hostConns) getConn() (*hostConns, *ClientConn, error) {
timer := time.NewTimer(c.Timeout)
defer timer.Stop()

// 1. fast get an existed free connection
// 1. Get an existing free connection.
select {
case hc, ok := <-hcs.chConnss:
if !ok {
Expand All @@ -66,7 +66,8 @@ func (hcs *hostConns) getConn() (*hostConns, *ClientConn, error) {
default:
}

// 2. try to create a new connection if the num of existed connections is smaller than maxConnNum
// 2. Try to create a new connection if the num of existing
// connections is smaller than maxConnNum
if atomic.AddInt32(&hcs.connNum, 1) <= hcs.maxConnNum {
hc := &ClientConn{
Engine: c.Engine,
Expand All @@ -85,7 +86,7 @@ func (hcs *hostConns) getConn() (*hostConns, *ClientConn, error) {
}
atomic.AddInt32(&hcs.connNum, -1)

// 3. wait for an old connection
// 3. Wait for an existed working connection to be free.
select {
case hc, ok := <-hcs.chConnss:
if !ok {
Expand All @@ -104,7 +105,7 @@ func (hcs *hostConns) releaseConn(hc *ClientConn) {
hcs.chConnss <- hc
}

// Client .
// Client implements the similar functions with std http.Client.
type Client struct {
mux sync.Mutex
closed bool
Expand All @@ -130,12 +131,12 @@ type Client struct {
CheckRedirect func(req *http.Request, via []*http.Request) error
}

// Close .
// Close closes all underlayer connections with EOF.
func (c *Client) Close() {
c.CloseWithError(io.EOF)
}

// CloseWithError .
// CloseWithError closes all underlayer connections with error.
func (c *Client) CloseWithError(err error) {
c.mux.Lock()
if !c.closed {
Expand Down Expand Up @@ -167,7 +168,12 @@ func (c *Client) getConn(host string) (*hostConns, *ClientConn, error) {
return hcs.getConn()
}

// Do .
// Do sends an HTTP request and returns an HTTP response.
// Notice:
// 1. It's blocking when Dial to the server;
// 2. It's non-blocking for waiting for the response;
// 3. It calls the handler when the response is received
// or other errors occur, such as timeout.
func (c *Client) Do(req *http.Request, handler func(res *http.Response, conn net.Conn, err error)) {
c.Engine.ExecuteClient(func() {
host := req.URL.Host
Expand Down
27 changes: 11 additions & 16 deletions nbhttp/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type ClientConn struct {
CheckRedirect func(req *http.Request, via []*http.Request) error
}

// Reset .
// Reset resets itself as new created.
func (c *ClientConn) Reset() {
c.mux.Lock()
if c.closed {
Expand All @@ -61,27 +61,17 @@ func (c *ClientConn) Reset() {
c.mux.Unlock()
}

// OnClose .
// OnClose registers a callback for closing.
func (c *ClientConn) OnClose(h func()) {
if h == nil {
return
}

pre := c.onClose
c.onClose = func() {
if pre != nil {
pre()
}
h()
}
c.onClose = h
}

// Close .
// Close closes underlayer connection with EOF.
func (c *ClientConn) Close() {
c.CloseWithError(io.EOF)
}

// CloseWithError .
// CloseWithError closes underlayer connection with error.
func (c *ClientConn) CloseWithError(err error) {
c.mux.Lock()
defer c.mux.Unlock()
Expand Down Expand Up @@ -153,7 +143,12 @@ func (c *ClientConn) onResponse(res *http.Response, err error) {
}
}

// Do .
// Do sends an HTTP request and returns an HTTP response.
// Notice:
// 1. It's blocking when Dial to the server;
// 2. It's non-blocking for waiting for the response;
// 3. It calls the handler when the response is received
// or other errors occur, such as timeout.
func (c *ClientConn) Do(req *http.Request, handler func(res *http.Response, conn net.Conn, err error)) {
c.mux.Lock()
defer func() {
Expand Down
48 changes: 25 additions & 23 deletions nbhttp/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type ParserCloser interface {
type Parser struct {
mux sync.Mutex

// cache for half packet.
cache []byte
// bytesCached for half packet.
bytesCached []byte

// errClose error

Expand Down Expand Up @@ -85,12 +85,12 @@ func (p *Parser) nextState(state int8) {
}
}

// OnClose .
// OnClose registers callback for closing.
func (p *Parser) OnClose(h func(p *Parser, err error)) {
p.onClose = h
}

// Close .
// CloseAndClean closes the underlayer connection and cleans up related.
func (p *Parser) CloseAndClean(err error) {
p.mux.Lock()
defer p.mux.Unlock()
Expand All @@ -109,8 +109,8 @@ func (p *Parser) CloseAndClean(err error) {
if p.Processor != nil {
p.Processor.Close(p, err)
}
if len(p.cache) > 0 {
mempool.Free(p.cache)
if len(p.bytesCached) > 0 {
mempool.Free(p.bytesCached)
}
if p.onClose != nil {
p.onClose(p, err)
Expand All @@ -131,7 +131,9 @@ func parseAndValidateChunkSize(originalStr string) (int, error) {
return int(chunkSize), nil
}

// Read .
// Parse parses data bytes and calls HTTP handler when full request received.
// If the connection is upgraded, it passes the data bytes to the ParserCloser
// and doesn't parse them itself any more.
func (p *Parser) Parse(data []byte) error {
p.mux.Lock()
defer p.mux.Unlock()
Expand All @@ -145,13 +147,13 @@ func (p *Parser) Parse(data []byte) error {
}

var start = 0
var offset = len(p.cache)
var offset = len(p.bytesCached)
if offset > 0 {
if p.Engine.ReadLimit > 0 && offset+len(data) > p.Engine.ReadLimit {
return ErrTooLong
}
p.cache = mempool.Append(p.cache, data...)
data = p.cache
p.bytesCached = mempool.Append(p.bytesCached, data...)
data = p.bytesCached
}

UPGRADER:
Expand All @@ -161,9 +163,9 @@ UPGRADER:
udata = data[start:]
}
err := p.ParserCloser.Parse(udata)
if p.cache != nil {
mempool.Free(p.cache)
p.cache = nil
if p.bytesCached != nil {
mempool.Free(p.bytesCached)
p.bytesCached = nil
}
return err
}
Expand Down Expand Up @@ -661,18 +663,18 @@ UPGRADER:
Exit:
left := len(data) - start
if left > 0 {
if p.cache == nil {
p.cache = mempool.Malloc(left)
copy(p.cache, data[start:])
if p.bytesCached == nil {
p.bytesCached = mempool.Malloc(left)
copy(p.bytesCached, data[start:])
} else if start > 0 {
oldCache := p.cache
p.cache = mempool.Malloc(left)
copy(p.cache, data[start:])
mempool.Free(oldCache)
oldbytesCached := p.bytesCached
p.bytesCached = mempool.Malloc(left)
copy(p.bytesCached, data[start:])
mempool.Free(oldbytesCached)
}
} else if len(p.cache) > 0 {
mempool.Free(p.cache)
p.cache = nil
} else if len(p.bytesCached) > 0 {
mempool.Free(p.bytesCached)
p.bytesCached = nil
}

return nil
Expand Down
35 changes: 3 additions & 32 deletions nbhttp/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,8 @@ func releaseClientResponse(res *http.Response) {
}
}

// func releaseStdResponse(res *http.Response) {
// if res != nil {
// *res = emptyStdResponse
// stdResponsePool.Put(res)
// }
// }

// Processor .
type Processor interface {
// Conn() net.Conn
OnMethod(parser *Parser, method string)
OnURL(parser *Parser, uri string) error
OnProto(parser *Parser, proto string) error
Expand All @@ -105,16 +97,11 @@ var (
emptyClientProcessor = ClientProcessor{}
)

// ServerProcessor .
// ServerProcessor is used for server side connection.
type ServerProcessor struct {
request *http.Request
}

// Conn .
// func (p *ServerProcessor) Conn() net.Conn {
// return nil
// }

// OnMethod .
func (p *ServerProcessor) OnMethod(parser *Parser, method string) {
if p.request == nil {
Expand Down Expand Up @@ -305,21 +292,13 @@ func NewServerProcessor() Processor {
return &ServerProcessor{}
}

// ClientProcessor .
// ClientProcessor is used for client side connection.
type ClientProcessor struct {
conn *ClientConn
response *http.Response
handler func(res *http.Response, err error)
}

// Conn .
// func (p *ClientProcessor) Conn() net.Conn {
// if p.conn != nil {
// return p.conn.conn
// }
// return nil
// }

// OnMethod .
func (p *ClientProcessor) OnMethod(parser *Parser, method string) {
}
Expand All @@ -336,10 +315,6 @@ func (p *ClientProcessor) OnProto(parser *Parser, proto string) error {
return fmt.Errorf("%s %q", "malformed HTTP version", proto)
}
if p.response == nil {
// p.response = &http.Response{
// Proto: proto,
// Header: http.Header{},
// }
p.response = clientResponsePool.Get().(*http.Response)
p.response.Proto = proto
p.response.Header = http.Header{}
Expand Down Expand Up @@ -408,6 +383,7 @@ func (p *ClientProcessor) OnComplete(parser *Parser) {
}
}

// Clean .
func (p *ClientProcessor) Clean(parser *Parser) {
if p.response != nil {
releaseClientResponse(p.response)
Expand All @@ -432,11 +408,6 @@ func NewClientProcessor(conn *ClientConn, handler func(res *http.Response, err e
// EmptyProcessor .
type EmptyProcessor struct{}

// Conn .
// func (p *EmptyProcessor) Conn() net.Conn {
// return nil
// }

// OnMethod .
func (p *EmptyProcessor) OnMethod(parser *Parser, method string) {

Expand Down
4 changes: 2 additions & 2 deletions nbhttp/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
type Response struct {
Parser *Parser

request *http.Request // request for this response
request *http.Request // request for this response.

status string
statusCode int // status code passed to WriteHeader
statusCode int // status code passed to WriteHeader.

header http.Header
trailer map[string]string
Expand Down

0 comments on commit 852d34c

Please sign in to comment.