From 4e755a65ae88988780e85ba155068fe5792747bd Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 12 Oct 2023 16:33:22 +0800 Subject: [PATCH 1/5] websocket: add UpgradeWithoutHandlingReadForConnFromSTDServer --- nbhttp/websocket/conn.go | 9 +++++++-- nbhttp/websocket/upgrader.go | 21 ++++++++++++++++++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/nbhttp/websocket/conn.go b/nbhttp/websocket/conn.go index 4a9ba72f..cc320651 100644 --- a/nbhttp/websocket/conn.go +++ b/nbhttp/websocket/conn.go @@ -69,6 +69,7 @@ type Conn struct { remoteCompressionEnabled bool enableWriteCompression bool isBlockingMod bool + isHandledBySTDServer bool expectingFragments bool compress bool opcode MessageType @@ -827,8 +828,12 @@ func NewConn(u *Upgrader, c net.Conn, subprotocol string, remoteCompressionEnabl return wsc } -// BlockingModReadLoop . -func (c *Conn) BlockingModReadLoop(bufSize int) { +// HandleRead . +func (c *Conn) HandleRead(bufSize int) { + if !c.isHandledBySTDServer { + return + } + var ( n int err error diff --git a/nbhttp/websocket/upgrader.go b/nbhttp/websocket/upgrader.go index 635255c4..eea3b03d 100644 --- a/nbhttp/websocket/upgrader.go +++ b/nbhttp/websocket/upgrader.go @@ -374,14 +374,22 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade return nil, err } + wsc.isHandledBySTDServer = (parser == nil) + if wsc.openHandler != nil { wsc.openHandler(wsc) } - if wsc.isBlockingMod { - if parser == nil { + if wsc.isBlockingMod && wsc.isHandledBySTDServer { + var handleRead = true + if len(args) > 1 { + var b bool + b, ok = args[1].(bool) + handleRead = ok && b + } + if handleRead { wsc.chSessionInited = make(chan struct{}) - go wsc.BlockingModReadLoop(u.BlockingModReadBufferSize) + go wsc.HandleRead(u.BlockingModReadBufferSize) } } @@ -393,6 +401,13 @@ func (u *Upgrader) UpgradeAndTransferConnToPoller(w http.ResponseWriter, r *http return u.Upgrade(w, r, responseHeader, trasferConn) } +func (u *Upgrader) UpgradeWithoutHandlingReadForConnFromSTDServer(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) { + // handle std server's conn, no need transfer conn to nbio Engine + const trasferConn = false + const handleRead = false + return u.Upgrade(w, r, responseHeader, trasferConn, handleRead) +} + func (u *Upgrader) commCheck(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (string, string, bool, error) { if !headerContains(r.Header, "Connection", "upgrade") { return "", "", false, u.returnError(w, r, http.StatusBadRequest, ErrUpgradeTokenNotFound) From 515fb3dfd898e8a251c29e71be665b63c15f0efe Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 12 Oct 2023 17:38:32 +0800 Subject: [PATCH 2/5] websocket: guarantee doing read loop for once only --- nbhttp/websocket/conn.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/nbhttp/websocket/conn.go b/nbhttp/websocket/conn.go index cc320651..2b180637 100644 --- a/nbhttp/websocket/conn.go +++ b/nbhttp/websocket/conn.go @@ -70,6 +70,7 @@ type Conn struct { enableWriteCompression bool isBlockingMod bool isHandledBySTDServer bool + isInReadingLoop bool expectingFragments bool compress bool opcode MessageType @@ -92,6 +93,11 @@ func (c *Conn) IsBlockingMod() bool { return c.isBlockingMod } +// IsHandledBySTDServer . +func (c *Conn) IsHandledBySTDServer() bool { + return c.isHandledBySTDServer +} + // IsAsyncWrite . func (c *Conn) IsAsyncWrite() bool { return c.sendQueue != nil @@ -833,6 +839,13 @@ func (c *Conn) HandleRead(bufSize int) { if !c.isHandledBySTDServer { return } + c.mux.Lock() + reading := c.isInReadingLoop + c.isInReadingLoop = true + c.mux.Unlock() + if reading { + return + } var ( n int From 9d07fda7615cab258417163517bddfc263fda064 Mon Sep 17 00:00:00 2001 From: lesismal Date: Wed, 18 Oct 2023 01:33:37 +0800 Subject: [PATCH 3/5] epoll: opt reset read-event --- conn_unix.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/conn_unix.go b/conn_unix.go index 01b4fabd..f164f618 100644 --- a/conn_unix.go +++ b/conn_unix.go @@ -374,8 +374,7 @@ func (c *Conn) resetRead() { if !c.closed && c.isWAdded { c.isWAdded = false p := c.p - p.deleteEvent(c.fd) - p.addRead(c.fd) + p.resetRead(c.fd) } } From eec98322147b6333c705d7b113c223d830ec5a8b Mon Sep 17 00:00:00 2001 From: lesismal Date: Wed, 18 Oct 2023 22:45:55 +0800 Subject: [PATCH 4/5] websocket: fix old reading loop's conflict with 3rd frameworks details: https://github.com/lesismal/nbio/issues/353#issuecomment-1768284064 --- nbhttp/engine.go | 6 ++++++ nbhttp/parser.go | 1 + nbhttp/processor.go | 10 ++++++---- nbhttp/websocket/conn.go | 9 ++------- nbhttp/websocket/upgrader.go | 4 ++-- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/nbhttp/engine.go b/nbhttp/engine.go index d941dc11..2060b304 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -781,6 +781,9 @@ func (engine *Engine) readConnBlocking(conn net.Conn, parser *Parser, decrease f return } parser.Read(buf[:n]) + if parser.hijacked { + return + } } } @@ -830,6 +833,9 @@ func (engine *Engine) readTLSConnBlocking(conn net.Conn, tlsConn *tls.Conn, pars logging.Debug("parser.Read failed: %v", err) return } + // if parser.hijacked { + // return + // } } if nread == 0 { break diff --git a/nbhttp/parser.go b/nbhttp/parser.go index e85384ac..9050635c 100644 --- a/nbhttp/parser.go +++ b/nbhttp/parser.go @@ -35,6 +35,7 @@ type Parser struct { state int8 isClient bool + hijacked bool readLimit int diff --git a/nbhttp/processor.go b/nbhttp/processor.go index 1f271ecb..45d1b6e4 100644 --- a/nbhttp/processor.go +++ b/nbhttp/processor.go @@ -266,22 +266,23 @@ func (p *ServerProcessor) OnComplete(parser *Parser) { response := NewResponse(p.parser, request, p.enableSendfile) if !parser.Execute(func() { p.handler.ServeHTTP(response, request) - p.flushResponse(response) + parser.hijacked = p.flushResponse(response) }) { releaseRequest(request, p.parser.Engine.RetainHTTPBody) } } -func (p *ServerProcessor) flushResponse(res *Response) { +func (p *ServerProcessor) flushResponse(res *Response) bool { + hijacked := res.hijacked if p.conn != nil { req := res.request - if !res.hijacked { + if !hijacked { res.eoncodeHead() if err := res.flushTrailer(p.conn); err != nil { p.conn.Close() releaseRequest(req, p.parser.Engine.RetainHTTPBody) releaseResponse(res) - return + return hijacked } if req.Close { // the data may still in the send queue @@ -293,6 +294,7 @@ func (p *ServerProcessor) flushResponse(res *Response) { releaseRequest(req, p.parser.Engine.RetainHTTPBody) releaseResponse(res) } + return hijacked } // Close . diff --git a/nbhttp/websocket/conn.go b/nbhttp/websocket/conn.go index 2b180637..2ffbf6f0 100644 --- a/nbhttp/websocket/conn.go +++ b/nbhttp/websocket/conn.go @@ -69,7 +69,7 @@ type Conn struct { remoteCompressionEnabled bool enableWriteCompression bool isBlockingMod bool - isHandledBySTDServer bool + isReadingByParser bool isInReadingLoop bool expectingFragments bool compress bool @@ -93,11 +93,6 @@ func (c *Conn) IsBlockingMod() bool { return c.isBlockingMod } -// IsHandledBySTDServer . -func (c *Conn) IsHandledBySTDServer() bool { - return c.isHandledBySTDServer -} - // IsAsyncWrite . func (c *Conn) IsAsyncWrite() bool { return c.sendQueue != nil @@ -836,7 +831,7 @@ func NewConn(u *Upgrader, c net.Conn, subprotocol string, remoteCompressionEnabl // HandleRead . func (c *Conn) HandleRead(bufSize int) { - if !c.isHandledBySTDServer { + if !c.isReadingByParser { return } c.mux.Lock() diff --git a/nbhttp/websocket/upgrader.go b/nbhttp/websocket/upgrader.go index eea3b03d..0e7f01cc 100644 --- a/nbhttp/websocket/upgrader.go +++ b/nbhttp/websocket/upgrader.go @@ -374,13 +374,13 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade return nil, err } - wsc.isHandledBySTDServer = (parser == nil) + wsc.isReadingByParser = (parser == nil) if wsc.openHandler != nil { wsc.openHandler(wsc) } - if wsc.isBlockingMod && wsc.isHandledBySTDServer { + if wsc.isBlockingMod && wsc.isReadingByParser { var handleRead = true if len(args) > 1 { var b bool From 6a214973e2898cadf1f65bb864d903372086acf1 Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 19 Oct 2023 10:35:44 +0800 Subject: [PATCH 5/5] kqueue: fix write event --- poller_kqueue.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/poller_kqueue.go b/poller_kqueue.go index 9b785c45..5a38d4a3 100644 --- a/poller_kqueue.go +++ b/poller_kqueue.go @@ -108,6 +108,13 @@ func (p *poller) addRead(fd int) { p.trigger() } +func (p *poller) resetRead(fd int) { + p.mux.Lock() + p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DISABLE, Filter: syscall.EVFILT_WRITE}) + p.mux.Unlock() + p.trigger() +} + func (p *poller) modWrite(fd int) { p.mux.Lock() p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE})