From 28841a784a2a2804d81765cff4d0ccd2c0320ab4 Mon Sep 17 00:00:00 2001 From: lesismal Date: Tue, 21 Nov 2023 21:28:47 +0800 Subject: [PATCH] websocket: add BlockingModHandleRead and BlockingModTrasferConnToPoller configurations for Upgrader --- nbhttp/websocket/upgrader.go | 49 +++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/nbhttp/websocket/upgrader.go b/nbhttp/websocket/upgrader.go index ba9f6c36..40eda2fa 100644 --- a/nbhttp/websocket/upgrader.go +++ b/nbhttp/websocket/upgrader.go @@ -30,8 +30,16 @@ var ( // DefaultBlockingModAsyncWrite . DefaultBlockingModAsyncWrite = true + // DefaultBlockingModHandleRead . + DefaultBlockingModHandleRead = true + + // DefaultBlockingModTransferConnToPoller . + DefaultBlockingModTransferConnToPoller = false + + // DefaultBlockingModSendQueueInitSize . DefaultBlockingModSendQueueInitSize = 4 + // DefaultBlockingModSendQueueMaxSize . DefaultBlockingModSendQueueMaxSize = 0 DefaultBlockingModAsyncCloseDelay = time.Second / 10 @@ -82,6 +90,33 @@ type Upgrader struct { // false: write buffer to the conn directely. BlockingModAsyncWrite bool + // BlockingModHandleRead represents whether start a goroutine to handle reading automatically during `Upgrade``: + // true: use dynamic goroutine to handle writing. + // false: write buffer to the conn directely. + // + // + // Notice: + // If we start a goroutine to handle read during `Upgrade`, we may receive a new websocket message + // before we have left the http.Handler for the `Websocket Handshake`. + // Then if we have the logic of `websocket.Conn.SetSession` in the http.Handler, it's possible that when we receive + // and are handling a websocket message and call `websocket.Conn.Session()`, we get nil. + // + // To fix this nil session problem, can use `websocket.Conn.SessionWithLock()`. + // + // For other concurrent problems(including the nil session problem), we can: + // 1st: set this `BlockingModHandleRead = false` + // 2nd: `go wsConn.HandleRead(YourBufSize)` after `Upgrade` and finished initialization. + // Then the websocket message wouldn't come before the http.Handler for `Websocket Handshake` has done. + BlockingModHandleRead bool + + // BlockingModTrasferConnToPoller represents whether try to transfer a blocking connection to nonblocking and add to `Engine``. + // true: try to transfer. + // false: don't try to transfer. + // + // Notice: + // Only `net.TCPConn` and `llib's blocking tls.Conn` can be transfered to nonblocking. + BlockingModTrasferConnToPoller bool + // BlockingModSendQueueInitSize represents the init size of a Conn's send queue, // only takes effect when `BlockingModAsyncWrite` is true. BlockingModSendQueueInitSize int @@ -99,10 +134,12 @@ func NewUpgrader() *Upgrader { compressionLevel: defaultCompressionLevel, BlockingModAsyncCloseDelay: DefaultBlockingModAsyncCloseDelay, }, - BlockingModReadBufferSize: DefaultBlockingReadBufferSize, - BlockingModAsyncWrite: DefaultBlockingModAsyncWrite, - BlockingModSendQueueInitSize: DefaultBlockingModSendQueueInitSize, - BlockingModSendQueueMaxSize: DefaultBlockingModSendQueueMaxSize, + BlockingModReadBufferSize: DefaultBlockingReadBufferSize, + BlockingModAsyncWrite: DefaultBlockingModAsyncWrite, + BlockingModHandleRead: DefaultBlockingModHandleRead, + BlockingModTrasferConnToPoller: DefaultBlockingModTransferConnToPoller, + BlockingModSendQueueInitSize: DefaultBlockingModSendQueueInitSize, + BlockingModSendQueueMaxSize: DefaultBlockingModSendQueueMaxSize, } u.pingMessageHandler = func(c *Conn, data string) { err := c.WriteMessage(PongMessage, []byte(data)) @@ -219,7 +256,7 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade var nbc *nbio.Conn var engine = u.Engine var parser *nbhttp.Parser - var transferConn bool + var transferConn = u.BlockingModTrasferConnToPoller if len(args) > 0 { var b bool b, ok = args[0].(bool) @@ -402,7 +439,7 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade } wsc.isReadingByParser = isReadingByParser if wsc.isBlockingMod && (!wsc.isReadingByParser) { - var handleRead = true + var handleRead = u.BlockingModHandleRead if len(args) > 1 { var b bool b, ok = args[1].(bool)