Skip to content

Commit

Permalink
websocket: add taskpool.Call method and used for IOModBlocking conn's…
Browse files Browse the repository at this point in the history
… frame/message handler
  • Loading branch information
lesismal committed Jan 25, 2025
1 parent 3703270 commit fffb1ae
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 3 deletions.
4 changes: 4 additions & 0 deletions nbhttp/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ type Engine struct {
BaseCtx context.Context
Cancel func()

SyncCall func(f func())
ExecuteClient func(f func())

// isOneshot bool
Expand Down Expand Up @@ -1044,6 +1045,7 @@ func NewEngine(conf Config) *Engine {
}
conf.Handler = handler

var serverCall = func(f func()) { f() }
var serverExecutor = conf.ServerExecutor
var messageHandlerExecutePool *taskpool.TaskPool
if serverExecutor == nil {
Expand All @@ -1053,6 +1055,7 @@ func NewEngine(conf Config) *Engine {
nativeSize := conf.MessageHandlerPoolSize - 1
messageHandlerExecutePool = taskpool.New(nativeSize, 1024*64)
serverExecutor = messageHandlerExecutePool.Go
serverCall = messageHandlerExecutePool.Call
}

var clientExecutor = conf.ClientExecutor
Expand Down Expand Up @@ -1135,6 +1138,7 @@ func NewEngine(conf Config) *Engine {
emptyRequest: (&http.Request{}).WithContext(baseCtx),
BaseCtx: baseCtx,
Cancel: cancel,
SyncCall: serverCall,
}

// shouldSupportTLS := !conf.SupportServerOnly || len(conf.AddrsTLS) > 0
Expand Down
12 changes: 9 additions & 3 deletions nbhttp/websocket/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ func (c *Conn) handleDataFrame(opcode MessageType, fin bool, pbody *[]byte) {
if c.releasePayload {
defer c.Engine.BodyAllocator.Free(pbody)
}
h(c, opcode, fin, *pbody)
c.Engine.SyncCall(func() {
h(c, opcode, fin, *pbody)
})
} else {
if !c.Execute(func() {
if c.releasePayload {
Expand All @@ -188,7 +190,9 @@ func (c *Conn) handleMessage(opcode MessageType, pbody *[]byte) {
if c.releasePayload {
defer c.Engine.BodyAllocator.Free(pbody)
}
c.handleWsMessage(opcode, *pbody)
c.Engine.SyncCall(func() {
c.handleWsMessage(opcode, *pbody)
})
} else {
if !c.Execute(func() {
if c.releasePayload {
Expand All @@ -209,7 +213,9 @@ func (c *Conn) handleProtocolMessage(opcode MessageType, pbody *[]byte) {
if c.releasePayload {
defer c.Engine.BodyAllocator.Free(pbody)
}
c.handleWsMessage(opcode, *pbody)
c.Engine.SyncCall(func() {
c.handleWsMessage(opcode, *pbody)
})
} else {
if !c.Execute(func() {
if c.releasePayload {
Expand Down
11 changes: 11 additions & 0 deletions taskpool/iotaskpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ type IOTaskPool struct {
pool sync.Pool
}

// Call .
//
//go:norace
func (tp *IOTaskPool) Call(f func([]byte)) {
tp.task.Call(func() {
pbuf := tp.pool.Get().(*[]byte)
f(*pbuf)
tp.pool.Put(pbuf)
})
}

// Go .
//
//go:norace
Expand Down
7 changes: 7 additions & 0 deletions taskpool/taskpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ func (tp *TaskPool) fork(f func()) bool {
return false
}

// Call .
//
//go:norace
func (tp *TaskPool) Call(f func()) {
tp.caller(f)
}

// Go .
//
//go:norace
Expand Down

0 comments on commit fffb1ae

Please sign in to comment.