From ee0889d176a22bdc7e3fa1e9ed56b793f0e570ef Mon Sep 17 00:00:00 2001 From: Dylan Murray Date: Mon, 19 Feb 2024 13:18:15 -0500 Subject: [PATCH] Gracefully shut down server on interrupt --- app/config/config.go | 1 + app/config/envs/mainnet.json | 1 + app/config/envs/stn.json | 1 + app/config/envs/testnet.json | 1 + app/config/load.go | 5 +++-- app/config/logger.go | 12 +++++++++--- app/p2p/server.go | 32 +++++++++++++++++++++++--------- app/p2p/thread.go | 1 + cmd/main.go | 17 +++++++++-------- 9 files changed, 49 insertions(+), 22 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index a1c8ef0..d873d71 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -60,6 +60,7 @@ type ( Datastore DatastoreConfig `json:"datastore" mapstructure:"datastore"` // Datastore's configuration DisableRPCVerification bool `json:"disable_rpc_verification" mapstructure:"disable_rpc_verification"` // DisableRPCVerification will disable the rpc verification check on startup. Useful if bitcoind isn't running yet LogOutputFile string `json:"log_output_file" mapstructure:"log_output_file"` // LogOutputFile will set an output file for the logger to write to as opposed to stdout + LogLevel string `json:"log_level" mapstructure:"log_level"` // LogLevel sets the logging level BitcoinConfigPath string `json:"bitcoin_config_path" mapstructure:"bitcoin_config_path"` // BitcoinConfigPath is the path to the bitcoin.conf file P2P P2PConfig `json:"p2p" mapstructure:"p2p"` // P2P is the configuration for the P2P server RPCConnections []RPCConfig `json:"rpc_connections" mapstructure:"rpc_connections"` // RPCConnections is a list of RPC connections diff --git a/app/config/envs/mainnet.json b/app/config/envs/mainnet.json index c7b1f7f..26b56d6 100644 --- a/app/config/envs/mainnet.json +++ b/app/config/envs/mainnet.json @@ -9,6 +9,7 @@ "03e45c9dd2b34829c1d27c8b5d16917dd0dc2c88fa0d7bad7bffb9b542229a9304" ], "log_output_file": "", + "log_level": "info", "disable_rpc_verification": false, "request_logging": true, "alert_processing_interval": "5m", diff --git a/app/config/envs/stn.json b/app/config/envs/stn.json index b98c3ed..e8d504d 100644 --- a/app/config/envs/stn.json +++ b/app/config/envs/stn.json @@ -9,6 +9,7 @@ "02dfb76a88100c2b6cd7ad9c051bc9ef9daf74c9fa13a99cb870865a046a9772f1" ], "log_output_file": "", + "log_level": "info", "disable_rpc_verification": false, "request_logging": true, "alert_processing_interval": "5m", diff --git a/app/config/envs/testnet.json b/app/config/envs/testnet.json index a07c517..a3dfc73 100644 --- a/app/config/envs/testnet.json +++ b/app/config/envs/testnet.json @@ -9,6 +9,7 @@ "02dfb76a88100c2b6cd7ad9c051bc9ef9daf74c9fa13a99cb870865a046a9772f1" ], "log_output_file": "", + "log_level": "info", "disable_rpc_verification": false, "request_logging": true, "alert_processing_interval": "5m", diff --git a/app/config/load.go b/app/config/load.go index cc7b324..ec6f58a 100644 --- a/app/config/load.go +++ b/app/config/load.go @@ -230,8 +230,9 @@ func LoadConfigFile() (_appConfig *Config, err error) { logger := log.New(writer, "bitcoin-alert-system: ", log.LstdFlags) _appConfig.Services.Log = &ExtendedLogger{ - Logger: logger, - writer: writer, + Logger: logger, + writer: writer, + logLevel: _appConfig.LogLevel, } // Set default alert processing interval if it doesn't exist diff --git a/app/config/logger.go b/app/config/logger.go index 427d931..ccce2fb 100644 --- a/app/config/logger.go +++ b/app/config/logger.go @@ -19,7 +19,7 @@ type LoggerInterface interface { Fatalf(msg string, args ...interface{}) Info(args ...interface{}) Infof(msg string, args ...interface{}) - LogLevel() int + LogLevel() string Panic(args ...interface{}) Panicf(msg string, args ...interface{}) Warn(args ...interface{}) @@ -32,7 +32,7 @@ type LoggerInterface interface { // ExtendedLogger is the extended logger to satisfy the LoggerInterface type ExtendedLogger struct { *log.Logger - logLevel int + logLevel string writer *os.File } @@ -48,11 +48,17 @@ func (es *ExtendedLogger) Printf(format string, v ...interface{}) { // Debugf will print debug messages to the console func (es *ExtendedLogger) Debugf(format string, v ...interface{}) { + if es.logLevel != "debug" { + return + } es.Logger.Printf(fmt.Sprintf("\033[1;34m| DEBUG | %s\033[0m", format), v...) } // Debug will print debug messages to the console func (es *ExtendedLogger) Debug(v ...interface{}) { + if es.logLevel != "debug" { + return + } es.Logger.Printf("%v", v...) } @@ -82,7 +88,7 @@ func (es *ExtendedLogger) Infof(format string, v ...interface{}) { } // LogLevel returns the logging level -func (es *ExtendedLogger) LogLevel() int { +func (es *ExtendedLogger) LogLevel() string { return es.logLevel } diff --git a/app/p2p/server.go b/app/p2p/server.go index 1e45d4d..340d9ee 100644 --- a/app/p2p/server.go +++ b/app/p2p/server.go @@ -104,7 +104,7 @@ func NewServer(o ServerOptions) (*Server, error) { // Print out the peer ID and addresses o.Config.Services.Log.Debugf("peer ID: %s", h.ID().String()) - o.Config.Services.Log.Info("connect to me on:") + o.Config.Services.Log.Infof("connect to me on:") for _, addr := range h.Addrs() { o.Config.Services.Log.Infof(" %s/p2p/%s", addr, h.ID().String()) } @@ -115,14 +115,13 @@ func NewServer(o ServerOptions) (*Server, error) { topicNames: o.TopicNames, privateKey: pk, config: o.Config, - quitPeerInitializationChannel: make(chan bool), + quitPeerInitializationChannel: make(chan bool, 1), }, nil } // Start the server and subscribe to all topics func (s *Server) Start(ctx context.Context) error { - s.config.Services.Log.Info("p2p service initializing & starting") - + s.config.Services.Log.Infof("p2p service initializing & starting") // Initialize the DHT kademliaDHT, err := s.initDHT(ctx) if err != nil { @@ -147,6 +146,7 @@ func (s *Server) Start(ctx context.Context) error { subscriptions := map[string]*pubsub.Subscription{} s.host.SetStreamHandler(protocol.ID(s.config.P2P.AlertSystemProtocolID), func(stream network.Stream) { + s.config.Services.Log.Infof("received stream %v", stream.ID()) t := StreamThread{ stream: stream, config: s.config, @@ -161,7 +161,7 @@ func (s *Server) Start(ctx context.Context) error { s.config.Services.Log.Debugf("closing stream %v for peer %v", stream.ID(), t.peer.String()) //_ = stream.Close() } - //_ = stream.Close() + _ = stream.Close() }) s.config.Services.Log.Debugf("stream handler set") @@ -186,10 +186,12 @@ func (s *Server) Start(ctx context.Context) error { } s.topics = topics s.subscriptions = subscriptions - s.config.Services.Log.Infof("P2P successfully started") + s.config.Services.Log.Infof("P2P server successfully started") go func() { for { //nolint:gosimple // This is the only way to perform this loop at the moment select { + case <-s.quitPeerDiscoveryChannel: + s.config.Services.Log.Infof("p2p service force shut down") case <-ctx.Done(): s.config.Services.Log.Info("p2p service shutting down") return @@ -207,11 +209,21 @@ func (s *Server) Connected() bool { // Stop the server func (s *Server) Stop(_ context.Context) error { // todo there needs to be a way to stop the server - s.config.Services.Log.Info("stopping P2P service") + s.config.Services.Log.Infof("stopping the p2p server") + s.config.Services.Log.Debugf("sending signals to persistent processes...") s.quitPeerDiscoveryChannel <- true s.quitAlertProcessingChannel <- true s.quitPeerInitializationChannel <- true - return nil + + s.config.Services.Log.Debugf("removing stream handler to stop allowing connections") + s.host.RemoveStreamHandler(protocol.ID(s.config.P2P.AlertSystemProtocolID)) + s.config.Services.Log.Debugf("shutting down libp2p host") + err := s.host.Close() + if err != nil { + return err + } + s.config.Services.Log.Debugf("shutting down dht") // this is maybe redundant + return s.dht.Close() } // RunAlertProcessingCron starts a cron job to attempt to retry unprocessed alerts @@ -227,6 +239,7 @@ func (s *Server) RunAlertProcessingCron(ctx context.Context) chan bool { s.config.Services.Log.Errorf("error processing alerts: %v", err.Error()) } case <-quit: + s.config.Services.Log.Infof("stopping alert processing process") ticker.Stop() return } @@ -295,6 +308,7 @@ func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *droutin s.config.Services.Log.Errorf("error discovering peers: %v", err.Error()) } case <-quit: + s.config.Services.Log.Infof("stopping peer discovery process") ticker.Stop() return } @@ -430,7 +444,7 @@ func (s *Server) discoverPeers(ctx context.Context, routingDiscovery *drouting.R // Subscribe will subscribe to the alert system func (s *Server) Subscribe(ctx context.Context, subscriber *pubsub.Subscription, hostID peer.ID) { - s.config.Services.Log.Infof("subscribing to %s topic", subscriber.Topic()) + s.config.Services.Log.Infof("subscribed to %s topic", subscriber.Topic()) for { msg, err := subscriber.Next(ctx) diff --git a/app/p2p/thread.go b/app/p2p/thread.go index bd91662..de88c6b 100644 --- a/app/p2p/thread.go +++ b/app/p2p/thread.go @@ -147,6 +147,7 @@ func (s *StreamThread) ProcessSyncMessage(ctx context.Context) error { }() select { case <-s.quitChannel: + s.config.Services.Log.Infof("quitting sync process") return nil case err := <-done: return err diff --git a/cmd/main.go b/cmd/main.go index 7be271e..38c84e4 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -53,6 +53,11 @@ func main() { // Create a new (web) server webServer := webserver.NewServer(_appConfig) + ctx, cancelFunc := context.WithCancel(context.Background()) + // Start the p2p server + if err = p2pServer.Start(ctx); err != nil { + _appConfig.Services.Log.Fatalf("error starting p2p server: %s", err.Error()) + } // Sync a channel to listen for interrupts idleConnectionsClosed := make(chan struct{}) go func(appConfig *config.Config) { @@ -60,11 +65,11 @@ func main() { signal.Notify(sigint, os.Interrupt) // Log when a signal is received - appConfig.Services.Log.Info("waiting for interrupt signal") + appConfig.Services.Log.Debugf("waiting for interrupt signal") <-sigint // Log that we are starting the shutdown process - appConfig.Services.Log.Info("interrupt signal received, starting shutdown process") + appConfig.Services.Log.Infof("interrupt signal received, starting shutdown process") // We received an interrupt signal, shut down the server ctxTimeout, cancel := context.WithTimeout(context.Background(), config.DefaultServerShutdown) @@ -77,18 +82,14 @@ func main() { if err = p2pServer.Stop(ctxTimeout); err != nil { appConfig.Services.Log.Infof("error shutting down p2p server: %s", err.Error()) } - + cancelFunc() + appConfig.Services.Log.Infof("successfully shut down server") close(idleConnectionsClosed) if err = appConfig.Services.Log.CloseWriter(); err != nil { log.Printf("error closing logger: %s", err) } }(_appConfig) - // Start the p2p server - if err = p2pServer.Start(context.Background()); err != nil { - _appConfig.Services.Log.Fatalf("error starting p2p server: %s", err.Error()) - } - // Serve the web server and then wait endlessly webServer.Serve()