Skip to content

Commit

Permalink
Merge pull request #358 from lesismal/ws_std_session
Browse files Browse the repository at this point in the history
Ws std session
  • Loading branch information
lesismal authored Oct 23, 2023
2 parents 7a56b23 + 6a21497 commit 0d9546c
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 11 deletions.
3 changes: 1 addition & 2 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ func (c *Conn) resetRead() {
if !c.closed && c.isWAdded {
c.isWAdded = false
p := c.p
p.deleteEvent(c.fd)
p.addRead(c.fd)
p.resetRead(c.fd)
}
}

Expand Down
6 changes: 6 additions & 0 deletions nbhttp/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,9 @@ func (engine *Engine) readConnBlocking(conn net.Conn, parser *Parser, decrease f
return
}
parser.Read(buf[:n])
if parser.hijacked {
return
}
}
}

Expand Down Expand Up @@ -830,6 +833,9 @@ func (engine *Engine) readTLSConnBlocking(conn net.Conn, tlsConn *tls.Conn, pars
logging.Debug("parser.Read failed: %v", err)
return
}
// if parser.hijacked {
// return
// }
}
if nread == 0 {
break
Expand Down
1 change: 1 addition & 0 deletions nbhttp/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Parser struct {

state int8
isClient bool
hijacked bool

readLimit int

Expand Down
10 changes: 6 additions & 4 deletions nbhttp/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,23 @@ func (p *ServerProcessor) OnComplete(parser *Parser) {
response := NewResponse(p.parser, request, p.enableSendfile)
if !parser.Execute(func() {
p.handler.ServeHTTP(response, request)
p.flushResponse(response)
parser.hijacked = p.flushResponse(response)
}) {
releaseRequest(request, p.parser.Engine.RetainHTTPBody)
}
}

func (p *ServerProcessor) flushResponse(res *Response) {
func (p *ServerProcessor) flushResponse(res *Response) bool {
hijacked := res.hijacked
if p.conn != nil {
req := res.request
if !res.hijacked {
if !hijacked {
res.eoncodeHead()
if err := res.flushTrailer(p.conn); err != nil {
p.conn.Close()
releaseRequest(req, p.parser.Engine.RetainHTTPBody)
releaseResponse(res)
return
return hijacked
}
if req.Close {
// the data may still in the send queue
Expand All @@ -293,6 +294,7 @@ func (p *ServerProcessor) flushResponse(res *Response) {
releaseRequest(req, p.parser.Engine.RetainHTTPBody)
releaseResponse(res)
}
return hijacked
}

// Close .
Expand Down
17 changes: 15 additions & 2 deletions nbhttp/websocket/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type Conn struct {
remoteCompressionEnabled bool
enableWriteCompression bool
isBlockingMod bool
isReadingByParser bool
isInReadingLoop bool
expectingFragments bool
compress bool
opcode MessageType
Expand Down Expand Up @@ -827,8 +829,19 @@ func NewConn(u *Upgrader, c net.Conn, subprotocol string, remoteCompressionEnabl
return wsc
}

// BlockingModReadLoop .
func (c *Conn) BlockingModReadLoop(bufSize int) {
// HandleRead .
func (c *Conn) HandleRead(bufSize int) {
if !c.isReadingByParser {
return
}
c.mux.Lock()
reading := c.isInReadingLoop
c.isInReadingLoop = true
c.mux.Unlock()
if reading {
return
}

var (
n int
err error
Expand Down
21 changes: 18 additions & 3 deletions nbhttp/websocket/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,22 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
return nil, err
}

wsc.isReadingByParser = (parser == nil)

if wsc.openHandler != nil {
wsc.openHandler(wsc)
}

if wsc.isBlockingMod {
if parser == nil {
if wsc.isBlockingMod && wsc.isReadingByParser {
var handleRead = true
if len(args) > 1 {
var b bool
b, ok = args[1].(bool)
handleRead = ok && b
}
if handleRead {
wsc.chSessionInited = make(chan struct{})
go wsc.BlockingModReadLoop(u.BlockingModReadBufferSize)
go wsc.HandleRead(u.BlockingModReadBufferSize)
}
}

Expand All @@ -393,6 +401,13 @@ func (u *Upgrader) UpgradeAndTransferConnToPoller(w http.ResponseWriter, r *http
return u.Upgrade(w, r, responseHeader, trasferConn)
}

func (u *Upgrader) UpgradeWithoutHandlingReadForConnFromSTDServer(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) {
// handle std server's conn, no need transfer conn to nbio Engine
const trasferConn = false
const handleRead = false
return u.Upgrade(w, r, responseHeader, trasferConn, handleRead)
}

func (u *Upgrader) commCheck(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (string, string, bool, error) {
if !headerContains(r.Header, "Connection", "upgrade") {
return "", "", false, u.returnError(w, r, http.StatusBadRequest, ErrUpgradeTokenNotFound)
Expand Down
7 changes: 7 additions & 0 deletions poller_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ func (p *poller) addRead(fd int) {
p.trigger()
}

func (p *poller) resetRead(fd int) {
p.mux.Lock()
p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DISABLE, Filter: syscall.EVFILT_WRITE})
p.mux.Unlock()
p.trigger()
}

func (p *poller) modWrite(fd int) {
p.mux.Lock()
p.eventList = append(p.eventList, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE})
Expand Down

0 comments on commit 0d9546c

Please sign in to comment.