Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] Fix race condition (also a regression of the PR 19139) #19258

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,23 @@ type Etcd struct {

Server *etcdserver.EtcdServer

cfg Config
stopc chan struct{}
errc chan error
cfg Config

// closeOnce is to ensure `stopc` is closed only once, no matter
// how many times the Close() method is called.
closeOnce sync.Once
wg sync.WaitGroup
// stopc is used to notify the sub goroutines not to send
// any errors to `errc`.
stopc chan struct{}
// errc is used to receive error from sub goroutines (including
// client handler, peer handler and metrics handler). It's closed
// after all these sub goroutines exit (checked via `wg`). Writers
// should avoid writing after `stopc` is closed by selecting on
// reading from `stopc`.
errc chan error

// wg is used to track the lifecycle of all sub goroutines created by `StartEtcd`.
wg sync.WaitGroup
}

type peerListener struct {
Expand Down Expand Up @@ -368,6 +379,24 @@ func (e *Etcd) Config() Config {
// Close gracefully shuts down all servers/listeners.
// Client requests will be terminated with request timeout.
// After timeout, enforce remaning requests be closed immediately.
//
// The rough workflow to shut down etcd:
// 1. close the `stopc` channel, so that all error handlers (child
// goroutines) won't send back any errors anymore;
// 2. stop the http and grpc servers gracefully, within request timeout;
// 3. close all client and metrics listeners, so that etcd server
// stops receiving any new connection;
// 4. call the cancel function to close the gateway context, so that
// all gateway connections are closed.
// 5. stop etcd server gracefully, and ensure the main raft loop
// goroutine is stopped;
// 6. stop all peer listeners, so that it stops receiving peer connections
// and messages (wait up to 1-second);
// 7. wait for all child goroutines (i.e. client handlers, peer handlers
// and metrics handlers) to exit;
// 8. close the `errc` channel to release the resource. Note that it's only
// safe to close the `errc` after step 7 above is done, otherwise the
// child goroutines may send errors back to already closed `errc` channel.
func (e *Etcd) Close() {
fields := []zap.Field{
zap.String("name", e.cfg.Name),
Expand Down Expand Up @@ -597,7 +626,9 @@ func (e *Etcd) servePeers() (err error) {

// start peer servers in a goroutine
for _, pl := range e.Peers {
e.wg.Add(1)
go func(l *peerListener) {
defer e.wg.Done()
u := l.Addr().String()
e.cfg.logger.Info(
"serving peer traffic",
Expand Down Expand Up @@ -781,7 +812,9 @@ func (e *Etcd) serveClients() (err error) {

// start client servers in each goroutine
for _, sctx := range e.sctxs {
e.wg.Add(1)
go func(s *serveCtx) {
defer e.wg.Done()
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...))
}(sctx)
}
Expand Down Expand Up @@ -869,7 +902,9 @@ func (e *Etcd) serveMetrics() (err error) {
return err
}
e.metricsListeners = append(e.metricsListeners, ml)
e.wg.Add(1)
go func(u url.URL, ln net.Listener) {
defer e.wg.Done()
e.cfg.logger.Info(
"serving metrics",
zap.String("address", u.String()),
Expand All @@ -882,9 +917,6 @@ func (e *Etcd) serveMetrics() (err error) {
}

func (e *Etcd) errHandler(err error) {
e.wg.Add(1)
defer e.wg.Done()

select {
case <-e.stopc:
return
Expand Down
34 changes: 28 additions & 6 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,23 @@ type serveCtx struct {
insecure bool
httpOnly bool

// ctx is used to control the grpc gateway. Terminate the grpc gateway
// by calling `cancel` when shutting down the etcd.
ctx context.Context
cancel context.CancelFunc

userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
serversC chan *servers
closeOnce sync.Once

// serversC is used to receive the http and grpc server objects (created
// in `serve`), both of which will be closed when shutting down the etcd.
// Close it when `serve` returns or when etcd fails to bootstrap.
serversC chan *servers
// closeOnce is to ensure `serversC` is closed only once.
closeOnce sync.Once

// wg is used to track the lifecycle of all sub goroutines created by `serve`.
wg sync.WaitGroup
}

type servers struct {
Expand Down Expand Up @@ -180,13 +190,17 @@ func (sctx *serveCtx) serve(
server = m.Serve

httpl := m.Match(cmux.HTTP1())
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsLis net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsLis))
}(srv, httpl)

if grpcEnabled {
grpcl := m.Match(cmux.HTTP2())
sctx.wg.Add(1)
go func(gs *grpc.Server, l net.Listener) {
defer sctx.wg.Done()
errHandler(gs.Serve(l))
}(gs, grpcl)
}
Expand Down Expand Up @@ -246,11 +260,13 @@ func (sctx *serveCtx) serve(
} else {
server = m.Serve

tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
tlsl, tlsErr := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if tlsErr != nil {
return tlsErr
}
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsl net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsl))
}(srv, tlsl)
}
Expand All @@ -263,7 +279,11 @@ func (sctx *serveCtx) serve(
)
}

return server()
err = server()
sctx.close()
// ensure all goroutines, which are created by this method, to complete before this method returns.
sctx.wg.Wait()
return err
}

func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error {
Expand Down Expand Up @@ -316,7 +336,9 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie
return nil, err
}
}
sctx.wg.Add(1)
go func() {
defer sctx.wg.Done()
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
sctx.lg.Warn(
Expand Down
Loading