From e246bda6ec769fb98654545464657408b7044523 Mon Sep 17 00:00:00 2001 From: lesismal Date: Fri, 27 Oct 2023 14:48:05 +0800 Subject: [PATCH 1/9] kqueue: rm EV_CLEAR, opt write event --- poller_kqueue.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/poller_kqueue.go b/poller_kqueue.go index 3cee88d0..22a6c1b6 100644 --- a/poller_kqueue.go +++ b/poller_kqueue.go @@ -103,28 +103,30 @@ func (p *poller) trigger() { func (p *poller) addRead(fd int) { p.mux.Lock() - p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD | syscall.EV_CLEAR, Filter: syscall.EVFILT_READ}) + p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ}) p.mux.Unlock() p.trigger() } func (p *poller) resetRead(fd int) { p.mux.Lock() - p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DISABLE, Filter: syscall.EVFILT_WRITE}) + p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE}) p.mux.Unlock() p.trigger() } func (p *poller) modWrite(fd int) { p.mux.Lock() - p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD | syscall.EV_CLEAR, Filter: syscall.EVFILT_WRITE}) + p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE}) p.mux.Unlock() p.trigger() } func (p *poller) deleteEvent(fd int) { p.mux.Lock() - p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ}) + p.eventList = append(p.eventList, + syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ}, + syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE}) p.mux.Unlock() p.trigger() } From acd2621ad4ecf915d4567608e1c1500ee6a99d05 Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 9 Nov 2023 21:20:40 +0800 Subject: [PATCH 2/9] add nbhttp.Conn to help marking Conn from nbhttp --- nbhttp/engine.go | 40 +++++++++++++++++++++--------------- nbhttp/processor.go | 6 +++--- nbhttp/websocket/conn.go | 2 +- nbhttp/websocket/upgrader.go | 19 +++++++++++++---- 4 files changed, 42 insertions(+), 25 deletions(-) diff --git a/nbhttp/engine.go b/nbhttp/engine.go index 2060b304..510daed9 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -279,7 +279,12 @@ func (e *Engine) closeAllConns() { } } -func (e *Engine) listen(ln net.Listener, tlsConfig *tls.Config, addConn func(net.Conn, *tls.Config, func()), decrease func()) { +type Conn struct { + net.Conn + Parser *Parser +} + +func (e *Engine) listen(ln net.Listener, tlsConfig *tls.Config, addConn func(*Conn, *tls.Config, func()), decrease func()) { e.WaitGroup.Add(1) go func() { defer func() { @@ -289,7 +294,7 @@ func (e *Engine) listen(ln net.Listener, tlsConfig *tls.Config, addConn func(net for !e.shutdown { conn, err := ln.Accept() if err == nil && !e.shutdown { - addConn(conn, tlsConfig, decrease) + addConn(&Conn{Conn: conn}, tlsConfig, decrease) } else { var ne net.Error if ok := errors.As(err, &ne); ok && ne.Temporary() { @@ -572,10 +577,10 @@ func (engine *Engine) AddTransferredConn(nbc *nbio.Conn) error { } // AddConnNonTLSNonBlocking . -func (engine *Engine) AddConnNonTLSNonBlocking(c net.Conn, tlsConfig *tls.Config, decrease func()) { - nbc, err := nbio.NBConn(c) +func (engine *Engine) AddConnNonTLSNonBlocking(conn *Conn, tlsConfig *tls.Config, decrease func()) { + nbc, err := nbio.NBConn(conn.Conn) if err != nil { - c.Close() + conn.Close() logging.Error("AddConnNonTLSNonBlocking failed: %v", err) return } @@ -606,6 +611,7 @@ func (engine *Engine) AddConnNonTLSNonBlocking(c net.Conn, tlsConfig *tls.Config parser.Execute = SyncExecutor } parser.Engine = engine + conn.Parser = parser processor.(*ServerProcessor).parser = parser nbc.SetSession(parser) nbc.OnData(engine.DataHandler) @@ -614,7 +620,7 @@ func (engine *Engine) AddConnNonTLSNonBlocking(c net.Conn, tlsConfig *tls.Config } // AddConnNonTLSBlocking . -func (engine *Engine) AddConnNonTLSBlocking(conn net.Conn, tlsConfig *tls.Config, decrease func()) { +func (engine *Engine) AddConnNonTLSBlocking(conn *Conn, tlsConfig *tls.Config, decrease func()) { engine.mux.Lock() if len(engine.conns) >= engine.MaxLoad { engine.mux.Unlock() @@ -623,7 +629,7 @@ func (engine *Engine) AddConnNonTLSBlocking(conn net.Conn, tlsConfig *tls.Config decrease() return } - switch vt := conn.(type) { + switch vt := conn.Conn.(type) { case *net.TCPConn, *net.UnixConn: key, err := conn2Array(vt) if err != nil { @@ -646,14 +652,15 @@ func (engine *Engine) AddConnNonTLSBlocking(conn net.Conn, tlsConfig *tls.Config processor := NewServerProcessor(conn, engine.Handler, engine.KeepaliveTime, !engine.DisableSendfile) parser := NewParser(processor, false, engine.ReadLimit, SyncExecutor) parser.Engine = engine + conn.Parser = parser processor.(*ServerProcessor).parser = parser conn.SetReadDeadline(time.Now().Add(engine.KeepaliveTime)) go engine.readConnBlocking(conn, parser, decrease) } // AddConnTLSNonBlocking . -func (engine *Engine) AddConnTLSNonBlocking(conn net.Conn, tlsConfig *tls.Config, decrease func()) { - nbc, err := nbio.NBConn(conn) +func (engine *Engine) AddConnTLSNonBlocking(conn *Conn, tlsConfig *tls.Config, decrease func()) { + nbc, err := nbio.NBConn(conn.Conn) if err != nil { conn.Close() logging.Error("AddConnTLSNonBlocking failed: %v", err) @@ -693,6 +700,7 @@ func (engine *Engine) AddConnTLSNonBlocking(conn net.Conn, tlsConfig *tls.Config } parser.Conn = tlsConn parser.Engine = engine + conn.Parser = parser processor.(*ServerProcessor).parser = parser nbc.SetSession(parser) @@ -702,7 +710,7 @@ func (engine *Engine) AddConnTLSNonBlocking(conn net.Conn, tlsConfig *tls.Config } // AddConnTLSBlocking . -func (engine *Engine) AddConnTLSBlocking(conn net.Conn, tlsConfig *tls.Config, decrease func()) { +func (engine *Engine) AddConnTLSBlocking(conn *Conn, tlsConfig *tls.Config, decrease func()) { engine.mux.Lock() if len(engine.conns) >= engine.MaxLoad { engine.mux.Unlock() @@ -712,7 +720,7 @@ func (engine *Engine) AddConnTLSBlocking(conn net.Conn, tlsConfig *tls.Config, d return } - switch vt := conn.(type) { + switch vt := conn.Conn.(type) { case *net.TCPConn, *net.UnixConn: key, err := conn2Array(vt) if err != nil { @@ -740,6 +748,7 @@ func (engine *Engine) AddConnTLSBlocking(conn net.Conn, tlsConfig *tls.Config, d parser := NewParser(processor, false, engine.ReadLimit, SyncExecutor) parser.Conn = tlsConn parser.Engine = engine + conn.Parser = parser processor.(*ServerProcessor).parser = parser conn.SetReadDeadline(time.Now().Add(engine.KeepaliveTime)) tlsConn.SetSession(parser) @@ -781,9 +790,9 @@ func (engine *Engine) readConnBlocking(conn net.Conn, parser *Parser, decrease f return } parser.Read(buf[:n]) - if parser.hijacked { - return - } + // if parser.hijacked { + // return + // } } } @@ -833,9 +842,6 @@ func (engine *Engine) readTLSConnBlocking(conn net.Conn, tlsConn *tls.Conn, pars logging.Debug("parser.Read failed: %v", err) return } - // if parser.hijacked { - // return - // } } if nread == 0 { break diff --git a/nbhttp/processor.go b/nbhttp/processor.go index be0c2a85..ab9bf28b 100644 --- a/nbhttp/processor.go +++ b/nbhttp/processor.go @@ -273,11 +273,11 @@ func (p *ServerProcessor) OnComplete(parser *Parser) { } func (p *ServerProcessor) flushResponse(res *Response) { - hijacked := res.hijacked - p.parser.hijacked = hijacked + // hijacked := res.hijacked + // p.parser.hijacked = hijacked if p.conn != nil { req := res.request - if !hijacked { + if !res.hijacked { res.eoncodeHead() if err := res.flushTrailer(p.conn); err != nil { p.conn.Close() diff --git a/nbhttp/websocket/conn.go b/nbhttp/websocket/conn.go index 2ffbf6f0..d9c03fb3 100644 --- a/nbhttp/websocket/conn.go +++ b/nbhttp/websocket/conn.go @@ -831,7 +831,7 @@ func NewConn(u *Upgrader, c net.Conn, subprotocol string, remoteCompressionEnabl // HandleRead . func (c *Conn) HandleRead(bufSize int) { - if !c.isReadingByParser { + if c.isReadingByParser { return } c.mux.Lock() diff --git a/nbhttp/websocket/upgrader.go b/nbhttp/websocket/upgrader.go index 0e7f01cc..8d47ff50 100644 --- a/nbhttp/websocket/upgrader.go +++ b/nbhttp/websocket/upgrader.go @@ -235,7 +235,16 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade } } - switch vt := conn.(type) { + var underLayerConn net.Conn + tmpConn, isReadingByParser := conn.(*nbhttp.Conn) + if isReadingByParser { + underLayerConn = tmpConn.Conn + parser = tmpConn.Parser + } else { + underLayerConn = conn + } + + switch vt := underLayerConn.(type) { case *nbio.Conn: // Scenario 1: *nbio.Conn, handled by nbhttp.Engine. parser, ok = vt.Session().(*nbhttp.Parser) @@ -374,13 +383,15 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade return nil, err } - wsc.isReadingByParser = (parser == nil) - if wsc.openHandler != nil { wsc.openHandler(wsc) } - if wsc.isBlockingMod && wsc.isReadingByParser { + if parser != nil { + parser.Reader = wsc + } + wsc.isReadingByParser = isReadingByParser + if wsc.isBlockingMod && (!wsc.isReadingByParser) { var handleRead = true if len(args) > 1 { var b bool From 867adcf1547a42e94c56b83616c9c98305b59ba9 Mon Sep 17 00:00:00 2001 From: lesismal Date: Tue, 14 Nov 2023 06:15:14 +0000 Subject: [PATCH 3/9] nbhttp: fix processor type to nbhttp.Conn --- nbhttp/engine.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/nbhttp/engine.go b/nbhttp/engine.go index 510daed9..6a0d8099 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -584,6 +584,7 @@ func (engine *Engine) AddConnNonTLSNonBlocking(conn *Conn, tlsConfig *tls.Config logging.Error("AddConnNonTLSNonBlocking failed: %v", err) return } + conn.Conn = nbc if nbc.Session() != nil { nbc.Close() return @@ -604,8 +605,8 @@ func (engine *Engine) AddConnNonTLSNonBlocking(conn *Conn, tlsConfig *tls.Config } engine.conns[key] = struct{}{} engine.mux.Unlock() - engine._onOpen(nbc) - processor := NewServerProcessor(nbc, engine.Handler, engine.KeepaliveTime, !engine.DisableSendfile) + engine._onOpen(conn.Conn) + processor := NewServerProcessor(conn, engine.Handler, engine.KeepaliveTime, !engine.DisableSendfile) parser := NewParser(processor, false, engine.ReadLimit, nbc.Execute) if engine.isOneshot { parser.Execute = SyncExecutor @@ -666,6 +667,7 @@ func (engine *Engine) AddConnTLSNonBlocking(conn *Conn, tlsConfig *tls.Config, d logging.Error("AddConnTLSNonBlocking failed: %v", err) return } + conn.Conn = nbc if nbc.Session() != nil { nbc.Close() logging.Error("AddConnTLSNonBlocking failed: session should not be nil") @@ -688,11 +690,11 @@ func (engine *Engine) AddConnTLSNonBlocking(conn *Conn, tlsConfig *tls.Config, d engine.conns[key] = struct{}{} engine.mux.Unlock() - engine._onOpen(nbc) + engine._onOpen(conn.Conn) isClient := false isNonBlock := true - tlsConn := tls.NewConn(nbc, tlsConfig, isClient, isNonBlock, engine.TLSAllocator) + tlsConn := tls.NewConn(conn, tlsConfig, isClient, isNonBlock, engine.TLSAllocator) processor := NewServerProcessor(tlsConn, engine.Handler, engine.KeepaliveTime, !engine.DisableSendfile) parser := NewParser(processor, false, engine.ReadLimit, nbc.Execute) if engine.isOneshot { From 222ef60ed1a1941fc9580d70be27a894d7ea265d Mon Sep 17 00:00:00 2001 From: lesismal Date: Tue, 14 Nov 2023 17:59:40 +0800 Subject: [PATCH 4/9] nbhttp: fix conn type for tls reading --- nbhttp/engine.go | 76 ++++++++++++++++++++---------------- nbhttp/websocket/upgrader.go | 12 ++++-- 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/nbhttp/engine.go b/nbhttp/engine.go index 6a0d8099..57f766ca 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -281,7 +281,8 @@ func (e *Engine) closeAllConns() { type Conn struct { net.Conn - Parser *Parser + Parser *Parser + Trasfered bool } func (e *Engine) listen(ln net.Listener, tlsConfig *tls.Config, addConn func(*Conn, *tls.Config, func()), decrease func()) { @@ -525,31 +526,34 @@ func (e *Engine) TLSDataHandler(c *nbio.Conn, data []byte) { c.Close() return } - if tlsConn, ok := parser.Processor.Conn().(*tls.Conn); ok { - defer tlsConn.ResetOrFreeBuffer() - - readed := data - buffer := data - for { - _, nread, err := tlsConn.AppendAndRead(readed, buffer) - readed = nil - if err != nil { - c.CloseWithError(err) - return - } - if nread > 0 { - err := parser.Read(buffer[:nread]) + nbhttpConn, ok := parser.Processor.Conn().(*Conn) + if ok { + if tlsConn, ok := nbhttpConn.Conn.(*tls.Conn); ok { + defer tlsConn.ResetOrFreeBuffer() + + readed := data + buffer := data + for { + _, nread, err := tlsConn.AppendAndRead(readed, buffer) + readed = nil if err != nil { - logging.Debug("parser.Read failed: %v", err) c.CloseWithError(err) return } + if nread > 0 { + err := parser.Read(buffer[:nread]) + if err != nil { + logging.Debug("parser.Read failed: %v", err) + c.CloseWithError(err) + return + } + } + if nread == 0 { + return + } } - if nread == 0 { - return - } + // c.SetReadDeadline(time.Now().Add(conf.KeepaliveTime)) } - // c.SetReadDeadline(time.Now().Add(conf.KeepaliveTime)) } } @@ -694,13 +698,14 @@ func (engine *Engine) AddConnTLSNonBlocking(conn *Conn, tlsConfig *tls.Config, d isClient := false isNonBlock := true - tlsConn := tls.NewConn(conn, tlsConfig, isClient, isNonBlock, engine.TLSAllocator) - processor := NewServerProcessor(tlsConn, engine.Handler, engine.KeepaliveTime, !engine.DisableSendfile) + tlsConn := tls.NewConn(nbc, tlsConfig, isClient, isNonBlock, engine.TLSAllocator) + conn = &Conn{Conn: tlsConn} + processor := NewServerProcessor(conn, engine.Handler, engine.KeepaliveTime, !engine.DisableSendfile) parser := NewParser(processor, false, engine.ReadLimit, nbc.Execute) if engine.isOneshot { parser.Execute = SyncExecutor } - parser.Conn = tlsConn + parser.Conn = conn parser.Engine = engine conn.Parser = parser processor.(*ServerProcessor).parser = parser @@ -722,7 +727,8 @@ func (engine *Engine) AddConnTLSBlocking(conn *Conn, tlsConfig *tls.Config, decr return } - switch vt := conn.Conn.(type) { + underLayerConn := conn.Conn + switch vt := underLayerConn.(type) { case *net.TCPConn, *net.UnixConn: key, err := conn2Array(vt) if err != nil { @@ -745,16 +751,17 @@ func (engine *Engine) AddConnTLSBlocking(conn *Conn, tlsConfig *tls.Config, decr isClient := false isNonBlock := true - tlsConn := tls.NewConn(conn, tlsConfig, isClient, isNonBlock, engine.TLSAllocator) - processor := NewServerProcessor(tlsConn, engine.Handler, engine.KeepaliveTime, !engine.DisableSendfile) + tlsConn := tls.NewConn(underLayerConn, tlsConfig, isClient, isNonBlock, engine.TLSAllocator) + conn = &Conn{Conn: tlsConn} + processor := NewServerProcessor(conn, engine.Handler, engine.KeepaliveTime, !engine.DisableSendfile) parser := NewParser(processor, false, engine.ReadLimit, SyncExecutor) - parser.Conn = tlsConn + parser.Conn = conn parser.Engine = engine conn.Parser = parser processor.(*ServerProcessor).parser = parser conn.SetReadDeadline(time.Now().Add(engine.KeepaliveTime)) tlsConn.SetSession(parser) - go engine.readTLSConnBlocking(conn, tlsConn, parser, decrease) + go engine.readTLSConnBlocking(conn, underLayerConn, tlsConn, parser, decrease) } func (engine *Engine) readConnBlocking(conn net.Conn, parser *Parser, decrease func()) { @@ -798,7 +805,7 @@ func (engine *Engine) readConnBlocking(conn net.Conn, parser *Parser, decrease f } } -func (engine *Engine) readTLSConnBlocking(conn net.Conn, tlsConn *tls.Conn, parser *Parser, decrease func()) { +func (engine *Engine) readTLSConnBlocking(conn *Conn, rconn net.Conn, tlsConn *tls.Conn, parser *Parser, decrease func()) { var ( err error nread int @@ -812,10 +819,13 @@ func (engine *Engine) readTLSConnBlocking(conn net.Conn, tlsConn *tls.Conn, pars buffer := readBufferPool.Malloc(engine.BlockingReadBufferSize) defer func() { readBufferPool.Free(buffer) - parser.Close(err) - tlsConn.Close() + if !conn.Trasfered { + parser.Close(err) + tlsConn.Close() + } + engine.mux.Lock() - switch vt := conn.(type) { + switch vt := conn.Conn.(type) { case *net.TCPConn, *net.UnixConn: key, _ := conn2Array(vt) delete(engine.conns, key) @@ -826,7 +836,7 @@ func (engine *Engine) readTLSConnBlocking(conn net.Conn, tlsConn *tls.Conn, pars }() for { - nread, err = conn.Read(buffer) + nread, err = rconn.Read(buffer) if err != nil { return } diff --git a/nbhttp/websocket/upgrader.go b/nbhttp/websocket/upgrader.go index 8d47ff50..f30c7b6e 100644 --- a/nbhttp/websocket/upgrader.go +++ b/nbhttp/websocket/upgrader.go @@ -236,10 +236,10 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade } var underLayerConn net.Conn - tmpConn, isReadingByParser := conn.(*nbhttp.Conn) + nbhttpConn, isReadingByParser := conn.(*nbhttp.Conn) if isReadingByParser { - underLayerConn = tmpConn.Conn - parser = tmpConn.Parser + underLayerConn = nbhttpConn.Conn + parser = nbhttpConn.Parser } else { underLayerConn = conn } @@ -266,6 +266,9 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade if err != nil { return nil, u.returnError(w, r, http.StatusInternalServerError, err) } + if nbhttpConn != nil { + nbhttpConn.Trasfered = true + } vt.ResetRawInput() parser = &nbhttp.Parser{Execute: nbc.Execute} if engine.EpollMod == nbio.EPOLLET && engine.EPOLLONESHOT == nbio.EPOLLONESHOT { @@ -338,6 +341,9 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade if err != nil { return nil, u.returnError(w, r, http.StatusInternalServerError, err) } + if nbhttpConn != nil { + nbhttpConn.Trasfered = true + } parser = &nbhttp.Parser{Execute: nbc.Execute} if engine.EpollMod == nbio.EPOLLET && engine.EPOLLONESHOT == nbio.EPOLLONESHOT { parser.Execute = nbhttp.SyncExecutor From baf498e7b7f97d77aafea1ea911a40cc157d9de7 Mon Sep 17 00:00:00 2001 From: lesismal Date: Tue, 14 Nov 2023 21:36:16 +0800 Subject: [PATCH 5/9] nbhttp: fix conn type for disconnected cleanning --- nbhttp/engine.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nbhttp/engine.go b/nbhttp/engine.go index 57f766ca..128dc8ae 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -764,7 +764,7 @@ func (engine *Engine) AddConnTLSBlocking(conn *Conn, tlsConfig *tls.Config, decr go engine.readTLSConnBlocking(conn, underLayerConn, tlsConn, parser, decrease) } -func (engine *Engine) readConnBlocking(conn net.Conn, parser *Parser, decrease func()) { +func (engine *Engine) readConnBlocking(conn *Conn, parser *Parser, decrease func()) { var ( n int err error @@ -782,7 +782,7 @@ func (engine *Engine) readConnBlocking(conn net.Conn, parser *Parser, decrease f // go func() { parser.Close(err) engine.mux.Lock() - switch vt := conn.(type) { + switch vt := conn.Conn.(type) { case *net.TCPConn, *net.UnixConn: key, _ := conn2Array(vt) delete(engine.conns, key) From 5ac93ce96e15376dd45d9aa0414d0a238deee92c Mon Sep 17 00:00:00 2001 From: lesismal Date: Tue, 14 Nov 2023 21:40:10 +0800 Subject: [PATCH 6/9] nbhttp: fix tls conn type for disconnected cleanning --- nbhttp/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nbhttp/engine.go b/nbhttp/engine.go index 128dc8ae..af9835b6 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -825,7 +825,7 @@ func (engine *Engine) readTLSConnBlocking(conn *Conn, rconn net.Conn, tlsConn *t } engine.mux.Lock() - switch vt := conn.Conn.(type) { + switch vt := rconn.(type) { case *net.TCPConn, *net.UnixConn: key, _ := conn2Array(vt) delete(engine.conns, key) From a66785178e56149a6e122b9aab90aa2526c39a2b Mon Sep 17 00:00:00 2001 From: lesismal Date: Tue, 14 Nov 2023 21:56:50 +0800 Subject: [PATCH 7/9] lint --- nbhttp/engine.go | 3 --- nbhttp/parser.go | 11 +++++------ nbhttp/processor.go | 2 -- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/nbhttp/engine.go b/nbhttp/engine.go index af9835b6..b05e3361 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -799,9 +799,6 @@ func (engine *Engine) readConnBlocking(conn *Conn, parser *Parser, decrease func return } parser.Read(buf[:n]) - // if parser.hijacked { - // return - // } } } diff --git a/nbhttp/parser.go b/nbhttp/parser.go index 9050635c..92c45722 100644 --- a/nbhttp/parser.go +++ b/nbhttp/parser.go @@ -33,10 +33,6 @@ type Parser struct { cache []byte - state int8 - isClient bool - hijacked bool - readLimit int errClose error @@ -66,8 +62,11 @@ type Parser struct { trailer http.Header contentLength int chunkSize int - chunked bool - headerExists bool + + state int8 + chunked bool + isClient bool + headerExists bool } func (p *Parser) nextState(state int8) { diff --git a/nbhttp/processor.go b/nbhttp/processor.go index ab9bf28b..1f271ecb 100644 --- a/nbhttp/processor.go +++ b/nbhttp/processor.go @@ -273,8 +273,6 @@ func (p *ServerProcessor) OnComplete(parser *Parser) { } func (p *ServerProcessor) flushResponse(res *Response) { - // hijacked := res.hijacked - // p.parser.hijacked = hijacked if p.conn != nil { req := res.request if !res.hijacked { From 7e4627d9de297b976ecdfa33defbc9f9a041dfb9 Mon Sep 17 00:00:00 2001 From: lesismal Date: Wed, 15 Nov 2023 18:15:19 +0800 Subject: [PATCH 8/9] fix http/ws client conn type and close clean --- nbhttp/client_conn.go | 5 +++-- nbhttp/engine.go | 1 + nbhttp/websocket/dialer.go | 8 +++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/nbhttp/client_conn.go b/nbhttp/client_conn.go index 173b0190..8b552f68 100644 --- a/nbhttp/client_conn.go +++ b/nbhttp/client_conn.go @@ -318,10 +318,11 @@ func (c *ClientConn) Do(req *http.Request, handler func(res *http.Response, conn isNonblock := true tlsConn.ResetConn(nbc, isNonblock) - c.conn = tlsConn + nbhttpConn := &Conn{Conn: tlsConn} + c.conn = nbhttpConn processor := NewClientProcessor(c, c.onResponse) parser := NewParser(processor, true, engine.ReadLimit, nbc.Execute) - parser.Conn = tlsConn + parser.Conn = nbhttpConn parser.Engine = engine parser.OnClose(func(p *Parser, err error) { c.CloseWithError(err) diff --git a/nbhttp/engine.go b/nbhttp/engine.go index b05e3361..c0a2fb76 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -1026,6 +1026,7 @@ func NewEngine(conf Config) *Engine { engine.mux.Lock() key, _ := conn2Array(c) delete(engine.conns, key) + delete(engine.dialerConns, key) engine.mux.Unlock() }) }) diff --git a/nbhttp/websocket/dialer.go b/nbhttp/websocket/dialer.go index d1f9ab2f..ef65c5cc 100644 --- a/nbhttp/websocket/dialer.go +++ b/nbhttp/websocket/dialer.go @@ -185,7 +185,13 @@ func (d *Dialer) DialContext(ctx context.Context, urlStr string, requestHeader h nbc, ok := conn.(*nbio.Conn) if !ok { - tlsConn, tlsOk := conn.(*tls.Conn) + nbhttpConn, ok := conn.(*nbhttp.Conn) + if !ok { + err = ErrBadHandshake + notifyResult(err) + return + } + tlsConn, tlsOk := nbhttpConn.Conn.(*tls.Conn) if !tlsOk { err = ErrBadHandshake notifyResult(err) From 56b05596387d55a6bfe6a17fcfc41f23d2af2b70 Mon Sep 17 00:00:00 2001 From: lesismal Date: Wed, 15 Nov 2023 18:18:51 +0800 Subject: [PATCH 9/9] lint --- nbhttp/websocket/dialer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nbhttp/websocket/dialer.go b/nbhttp/websocket/dialer.go index ef65c5cc..7dbf0860 100644 --- a/nbhttp/websocket/dialer.go +++ b/nbhttp/websocket/dialer.go @@ -185,8 +185,8 @@ func (d *Dialer) DialContext(ctx context.Context, urlStr string, requestHeader h nbc, ok := conn.(*nbio.Conn) if !ok { - nbhttpConn, ok := conn.(*nbhttp.Conn) - if !ok { + nbhttpConn, ok2 := conn.(*nbhttp.Conn) + if !ok2 { err = ErrBadHandshake notifyResult(err) return