Skip to content

Commit

Permalink
refactor(pool): introduce pool.Manager (#251)
Browse files Browse the repository at this point in the history
* refactor(pool): introduce pool.Manager

Change-Id: I72f76d24fe267ef9e2bfc8d3df376d22d56df205

* refactor(pool): introduce pool.Manager

Change-Id: I72f76d24fe267ef9e2bfc8d3df376d22d56df205

* pool manager set maxDuration default value 10 Minutes (#252)

* fix: comment about provide

* fix: manager's maxDuration default 10 Minutes

Co-authored-by: guxi.reasno <guxi.reasno@bytedance.com>
Co-authored-by: rock G <35254251+GGXXLL@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 2, 2022
1 parent e7c857f commit 8fffa99
Show file tree
Hide file tree
Showing 13 changed files with 297 additions and 193 deletions.
4 changes: 0 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
* **cron:** add MockStartTimeFunc helper ([#243](https://github.com/DoNewsCode/core/issues/243)) (@[谷溪](guxi99@gmail.com))


<a name="v0.13.1-beta1"></a>
## [v0.13.1-beta1](https://github.com/DoNewsCode/core/compare/v0.13.0-beta2...v0.13.1-beta1) (2022-07-19)


<a name="v0.13.0-beta2"></a>
## [v0.13.0-beta2](https://github.com/DoNewsCode/core/compare/v0.13.0-beta1...v0.13.0-beta2) (2022-07-19)

Expand Down
25 changes: 3 additions & 22 deletions control/pool/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,7 @@ import (
"github.com/DoNewsCode/core/di"
)

// Providers provide a *pool.Pool to the core.
func Providers(options ...ProviderOptionFunc) di.Deps {
return di.Deps{func() *Pool {
return NewPool(options...)
}}
}

// ProviderOptionFunc is the functional option to Providers.
type ProviderOptionFunc func(pool *Pool)

// WithConcurrency sets the maximum concurrency for the pool.
func WithConcurrency(concurrency int) ProviderOptionFunc {
return func(pool *Pool) {
pool.concurrency = concurrency
}
}

// WithCounter sets the counter for the pool.
func WithCounter(counter *Counter) ProviderOptionFunc {
return func(pool *Pool) {
pool.counter = counter
}
// Providers provide a *Manager to the core.
func Providers() di.Deps {
return di.Deps{NewManager}
}
5 changes: 3 additions & 2 deletions control/pool/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ func Example() {
core.WithInline("http.addr", ":9777"),
core.WithInline("log.level", "none"),
)
c.Provide(pool.Providers(pool.WithConcurrency(1)))
c.Provide(pool.Providers())

c.Invoke(func(p *pool.Pool, dispatcher lifecycle.HTTPServerStart) {
c.Invoke(func(m *pool.Manager, dispatcher lifecycle.HTTPServerStart) {
p := pool.NewPool(m, 10)
dispatcher.On(func(ctx context.Context, payload lifecycle.HTTPServerStartPayload) error {
go func() {
if _, err := http.Get("http://localhost:9777/"); err != nil {
Expand Down
97 changes: 97 additions & 0 deletions control/pool/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package pool

import (
"context"
"sync"
"time"

"github.com/DoNewsCode/core/ctxmeta"
)

// Manager manages a pool of workers.
type Manager struct {
workers chan *Worker
maxDuration time.Duration
startWorkerCh chan *Worker
managerStoppedCh chan struct{}
}

// NewManager returns a new manager.
func NewManager() *Manager {
return &Manager{
workers: make(chan *Worker, 1000),
startWorkerCh: make(chan *Worker),
managerStoppedCh: make(chan struct{}),
maxDuration: 10 * time.Minute,
}
}

// Get returns a worker from the free list. If the free list is empty, create a new one.
func (m *Manager) Get() *Worker {
var w *Worker
select {
case w = <-m.workers:
default:
w = NewWorker()
select {
case m.startWorkerCh <- w:
case <-m.managerStoppedCh:
w.Stop()
}
}
return w
}

// Release put the worker back into the free list. If the free list is full,
// discard the worker. If the worker has surpassed the max duration, discard and
// managerStoppedCh the worker.
func (m *Manager) Release(w *Worker) {
if time.Since(w.startTime) > m.maxDuration {
w.Stop()
return
}

select {
case m.workers <- w:
default:
}
}

// Go runs function with no concurrency limit.
func (m *Manager) Go(ctx context.Context, f func(context.Context)) {
w := m.Get()
ctx = ctxmeta.WithoutCancel(ctx)
fn := func() {
f(ctx)
m.Release(w)
}
select {
case w.jobCh <- fn:
case <-w.stopCh: // only executed if manager.Run is cancelled
fn()
}
}

// Run starts the manager. It should be called during the initialization of the program.
func (m *Manager) Run(ctx context.Context) error {
var wg sync.WaitGroup
for {
select {
case w := <-m.startWorkerCh:
wg.Add(1)
go func(w *Worker) {
w.Run(ctx)
wg.Done()
}(w)
case <-ctx.Done():
close(m.managerStoppedCh)
wg.Wait()
return nil
}
}
}

// Module implements the di.Modular interface.
func (m *Manager) Module() interface{} {
return m
}
21 changes: 21 additions & 0 deletions control/pool/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pool

import (
"context"
"testing"
"time"
)

func TestManager_Go(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

m := NewManager()
go m.Run(ctx)

var executed = make(chan struct{})
m.Go(ctx, func(ctx context.Context) {
close(executed)
})
<-executed
}
48 changes: 0 additions & 48 deletions control/pool/metrics.go

This file was deleted.

82 changes: 17 additions & 65 deletions control/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,52 +44,23 @@ package pool

import (
"context"
"sync"

"github.com/DoNewsCode/core/ctxmeta"

"github.com/oklog/run"
)

// NewPool returned func(contract.Dispatcher) *Pool
func NewPool(options ...ProviderOptionFunc) *Pool {
// NewPool returns *Pool
func NewPool(manager *Manager, cap int) *Pool {
pool := Pool{
ch: make(chan job),
concurrency: 10,
}
for _, f := range options {
f(&pool)
manager: manager,
concurrency: make(chan struct{}, cap),
}
return &pool
}

type job struct {
fn func()
}

// Pool is an async worker pool. It can be used to dispatch the async jobs from
// web servers. See the package documentation about its advantage over creating a
// goroutine directly.
type Pool struct {
ch chan job
concurrency int
counter *Counter
}

// ProvideRunGroup implements core.RunProvider
func (p *Pool) ProvideRunGroup(group *run.Group) {
ctx, cancel := context.WithCancel(context.Background())

group.Add(func() error {
return p.Run(ctx)
}, func(err error) {
cancel()
})
}

// Module implements di.Modular
func (p *Pool) Module() interface{} {
return p
manager *Manager
concurrency chan struct{}
}

// Go dispatchers a job to the async worker pool. requestContext is the context
Expand All @@ -98,37 +69,18 @@ func (p *Pool) Module() interface{} {
// nothing to do with the request. If the pool has reached max concurrency, the job will
// be executed in the current goroutine. In other word, the job will be executed synchronously.
func (p *Pool) Go(requestContext context.Context, function func(asyncContext context.Context)) {
j := job{
fn: func() {
function(ctxmeta.WithoutCancel(requestContext))
},
}
select {
case p.ch <- j:
default:
p.counter.IncSyncJob()
j.fn()
}
p.concurrency <- struct{}{}
p.manager.Go(requestContext, func(ctx context.Context) {
defer func() {
<-p.concurrency
}()
function(ctx)
})
}

// Run starts the async worker pool and block until it finishes.
func (p *Pool) Run(ctx context.Context) error {
var wg sync.WaitGroup
for i := 0; i < p.concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case j := <-p.ch:
p.counter.IncAsyncJob()
j.fn()
case <-ctx.Done():
return
}
}
}()
// Wait waits for all the async jobs to finish.
func (p *Pool) Wait() {
for i := 0; i < cap(p.concurrency); i++ {
p.concurrency <- struct{}{}
}
wg.Wait()
return nil
}
Loading

0 comments on commit 8fffa99

Please sign in to comment.