diff --git a/conn_unix.go b/conn_unix.go index 01b4fabd..f164f618 100644 --- a/conn_unix.go +++ b/conn_unix.go @@ -374,8 +374,7 @@ func (c *Conn) resetRead() { if !c.closed && c.isWAdded { c.isWAdded = false p := c.p - p.deleteEvent(c.fd) - p.addRead(c.fd) + p.resetRead(c.fd) } } diff --git a/nbhttp/engine.go b/nbhttp/engine.go index d941dc11..2060b304 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -781,6 +781,9 @@ func (engine *Engine) readConnBlocking(conn net.Conn, parser *Parser, decrease f return } parser.Read(buf[:n]) + if parser.hijacked { + return + } } } @@ -830,6 +833,9 @@ func (engine *Engine) readTLSConnBlocking(conn net.Conn, tlsConn *tls.Conn, pars logging.Debug("parser.Read failed: %v", err) return } + // if parser.hijacked { + // return + // } } if nread == 0 { break diff --git a/nbhttp/parser.go b/nbhttp/parser.go index e85384ac..9050635c 100644 --- a/nbhttp/parser.go +++ b/nbhttp/parser.go @@ -35,6 +35,7 @@ type Parser struct { state int8 isClient bool + hijacked bool readLimit int diff --git a/nbhttp/processor.go b/nbhttp/processor.go index 1f271ecb..45d1b6e4 100644 --- a/nbhttp/processor.go +++ b/nbhttp/processor.go @@ -266,22 +266,23 @@ func (p *ServerProcessor) OnComplete(parser *Parser) { response := NewResponse(p.parser, request, p.enableSendfile) if !parser.Execute(func() { p.handler.ServeHTTP(response, request) - p.flushResponse(response) + parser.hijacked = p.flushResponse(response) }) { releaseRequest(request, p.parser.Engine.RetainHTTPBody) } } -func (p *ServerProcessor) flushResponse(res *Response) { +func (p *ServerProcessor) flushResponse(res *Response) bool { + hijacked := res.hijacked if p.conn != nil { req := res.request - if !res.hijacked { + if !hijacked { res.eoncodeHead() if err := res.flushTrailer(p.conn); err != nil { p.conn.Close() releaseRequest(req, p.parser.Engine.RetainHTTPBody) releaseResponse(res) - return + return hijacked } if req.Close { // the data may still in the send queue @@ -293,6 +294,7 @@ func (p *ServerProcessor) flushResponse(res *Response) { releaseRequest(req, p.parser.Engine.RetainHTTPBody) releaseResponse(res) } + return hijacked } // Close . diff --git a/nbhttp/websocket/conn.go b/nbhttp/websocket/conn.go index 4a9ba72f..2ffbf6f0 100644 --- a/nbhttp/websocket/conn.go +++ b/nbhttp/websocket/conn.go @@ -69,6 +69,8 @@ type Conn struct { remoteCompressionEnabled bool enableWriteCompression bool isBlockingMod bool + isReadingByParser bool + isInReadingLoop bool expectingFragments bool compress bool opcode MessageType @@ -827,8 +829,19 @@ func NewConn(u *Upgrader, c net.Conn, subprotocol string, remoteCompressionEnabl return wsc } -// BlockingModReadLoop . -func (c *Conn) BlockingModReadLoop(bufSize int) { +// HandleRead . +func (c *Conn) HandleRead(bufSize int) { + if !c.isReadingByParser { + return + } + c.mux.Lock() + reading := c.isInReadingLoop + c.isInReadingLoop = true + c.mux.Unlock() + if reading { + return + } + var ( n int err error diff --git a/nbhttp/websocket/upgrader.go b/nbhttp/websocket/upgrader.go index 635255c4..0e7f01cc 100644 --- a/nbhttp/websocket/upgrader.go +++ b/nbhttp/websocket/upgrader.go @@ -374,14 +374,22 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade return nil, err } + wsc.isReadingByParser = (parser == nil) + if wsc.openHandler != nil { wsc.openHandler(wsc) } - if wsc.isBlockingMod { - if parser == nil { + if wsc.isBlockingMod && wsc.isReadingByParser { + var handleRead = true + if len(args) > 1 { + var b bool + b, ok = args[1].(bool) + handleRead = ok && b + } + if handleRead { wsc.chSessionInited = make(chan struct{}) - go wsc.BlockingModReadLoop(u.BlockingModReadBufferSize) + go wsc.HandleRead(u.BlockingModReadBufferSize) } } @@ -393,6 +401,13 @@ func (u *Upgrader) UpgradeAndTransferConnToPoller(w http.ResponseWriter, r *http return u.Upgrade(w, r, responseHeader, trasferConn) } +func (u *Upgrader) UpgradeWithoutHandlingReadForConnFromSTDServer(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) { + // handle std server's conn, no need transfer conn to nbio Engine + const trasferConn = false + const handleRead = false + return u.Upgrade(w, r, responseHeader, trasferConn, handleRead) +} + func (u *Upgrader) commCheck(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (string, string, bool, error) { if !headerContains(r.Header, "Connection", "upgrade") { return "", "", false, u.returnError(w, r, http.StatusBadRequest, ErrUpgradeTokenNotFound) diff --git a/poller_kqueue.go b/poller_kqueue.go index 9b785c45..5a38d4a3 100644 --- a/poller_kqueue.go +++ b/poller_kqueue.go @@ -108,6 +108,13 @@ func (p *poller) addRead(fd int) { p.trigger() } +func (p *poller) resetRead(fd int) { + p.mux.Lock() + p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DISABLE, Filter: syscall.EVFILT_WRITE}) + p.mux.Unlock() + p.trigger() +} + func (p *poller) modWrite(fd int) { p.mux.Lock() p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE})