Skip to content

Commit

Permalink
Gracefully shut down server on interrupt
Browse files Browse the repository at this point in the history
  • Loading branch information
galt-tr committed Feb 19, 2024
1 parent 5c76ff3 commit ee0889d
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 22 deletions.
1 change: 1 addition & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
Datastore DatastoreConfig `json:"datastore" mapstructure:"datastore"` // Datastore's configuration
DisableRPCVerification bool `json:"disable_rpc_verification" mapstructure:"disable_rpc_verification"` // DisableRPCVerification will disable the rpc verification check on startup. Useful if bitcoind isn't running yet
LogOutputFile string `json:"log_output_file" mapstructure:"log_output_file"` // LogOutputFile will set an output file for the logger to write to as opposed to stdout
LogLevel string `json:"log_level" mapstructure:"log_level"` // LogLevel sets the logging level
BitcoinConfigPath string `json:"bitcoin_config_path" mapstructure:"bitcoin_config_path"` // BitcoinConfigPath is the path to the bitcoin.conf file
P2P P2PConfig `json:"p2p" mapstructure:"p2p"` // P2P is the configuration for the P2P server
RPCConnections []RPCConfig `json:"rpc_connections" mapstructure:"rpc_connections"` // RPCConnections is a list of RPC connections
Expand Down
1 change: 1 addition & 0 deletions app/config/envs/mainnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"03e45c9dd2b34829c1d27c8b5d16917dd0dc2c88fa0d7bad7bffb9b542229a9304"
],
"log_output_file": "",
"log_level": "info",
"disable_rpc_verification": false,
"request_logging": true,
"alert_processing_interval": "5m",
Expand Down
1 change: 1 addition & 0 deletions app/config/envs/stn.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"02dfb76a88100c2b6cd7ad9c051bc9ef9daf74c9fa13a99cb870865a046a9772f1"
],
"log_output_file": "",
"log_level": "info",
"disable_rpc_verification": false,
"request_logging": true,
"alert_processing_interval": "5m",
Expand Down
1 change: 1 addition & 0 deletions app/config/envs/testnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"02dfb76a88100c2b6cd7ad9c051bc9ef9daf74c9fa13a99cb870865a046a9772f1"
],
"log_output_file": "",
"log_level": "info",
"disable_rpc_verification": false,
"request_logging": true,
"alert_processing_interval": "5m",
Expand Down
5 changes: 3 additions & 2 deletions app/config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ func LoadConfigFile() (_appConfig *Config, err error) {

logger := log.New(writer, "bitcoin-alert-system: ", log.LstdFlags)
_appConfig.Services.Log = &ExtendedLogger{
Logger: logger,
writer: writer,
Logger: logger,
writer: writer,
logLevel: _appConfig.LogLevel,
}

// Set default alert processing interval if it doesn't exist
Expand Down
12 changes: 9 additions & 3 deletions app/config/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type LoggerInterface interface {
Fatalf(msg string, args ...interface{})
Info(args ...interface{})
Infof(msg string, args ...interface{})
LogLevel() int
LogLevel() string
Panic(args ...interface{})
Panicf(msg string, args ...interface{})
Warn(args ...interface{})
Expand All @@ -32,7 +32,7 @@ type LoggerInterface interface {
// ExtendedLogger is the extended logger to satisfy the LoggerInterface
type ExtendedLogger struct {
*log.Logger
logLevel int
logLevel string
writer *os.File
}

Expand All @@ -48,11 +48,17 @@ func (es *ExtendedLogger) Printf(format string, v ...interface{}) {

// Debugf will print debug messages to the console
func (es *ExtendedLogger) Debugf(format string, v ...interface{}) {
if es.logLevel != "debug" {
return
}
es.Logger.Printf(fmt.Sprintf("\033[1;34m| DEBUG | %s\033[0m", format), v...)
}

// Debug will print debug messages to the console
func (es *ExtendedLogger) Debug(v ...interface{}) {
if es.logLevel != "debug" {
return
}
es.Logger.Printf("%v", v...)
}

Expand Down Expand Up @@ -82,7 +88,7 @@ func (es *ExtendedLogger) Infof(format string, v ...interface{}) {
}

// LogLevel returns the logging level
func (es *ExtendedLogger) LogLevel() int {
func (es *ExtendedLogger) LogLevel() string {
return es.logLevel
}

Expand Down
32 changes: 23 additions & 9 deletions app/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewServer(o ServerOptions) (*Server, error) {

// Print out the peer ID and addresses
o.Config.Services.Log.Debugf("peer ID: %s", h.ID().String())
o.Config.Services.Log.Info("connect to me on:")
o.Config.Services.Log.Infof("connect to me on:")
for _, addr := range h.Addrs() {
o.Config.Services.Log.Infof(" %s/p2p/%s", addr, h.ID().String())
}
Expand All @@ -115,14 +115,13 @@ func NewServer(o ServerOptions) (*Server, error) {
topicNames: o.TopicNames,
privateKey: pk,
config: o.Config,
quitPeerInitializationChannel: make(chan bool),
quitPeerInitializationChannel: make(chan bool, 1),
}, nil
}

// Start the server and subscribe to all topics
func (s *Server) Start(ctx context.Context) error {
s.config.Services.Log.Info("p2p service initializing & starting")

s.config.Services.Log.Infof("p2p service initializing & starting")
// Initialize the DHT
kademliaDHT, err := s.initDHT(ctx)
if err != nil {
Expand All @@ -147,6 +146,7 @@ func (s *Server) Start(ctx context.Context) error {
subscriptions := map[string]*pubsub.Subscription{}

s.host.SetStreamHandler(protocol.ID(s.config.P2P.AlertSystemProtocolID), func(stream network.Stream) {
s.config.Services.Log.Infof("received stream %v", stream.ID())
t := StreamThread{
stream: stream,
config: s.config,
Expand All @@ -161,7 +161,7 @@ func (s *Server) Start(ctx context.Context) error {
s.config.Services.Log.Debugf("closing stream %v for peer %v", stream.ID(), t.peer.String())
//_ = stream.Close()
}
//_ = stream.Close()
_ = stream.Close()
})

s.config.Services.Log.Debugf("stream handler set")
Expand All @@ -186,10 +186,12 @@ func (s *Server) Start(ctx context.Context) error {
}
s.topics = topics
s.subscriptions = subscriptions
s.config.Services.Log.Infof("P2P successfully started")
s.config.Services.Log.Infof("P2P server successfully started")
go func() {
for { //nolint:gosimple // This is the only way to perform this loop at the moment

Check failure on line 191 in app/p2p/server.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

directive `//nolint:gosimple // This is the only way to perform this loop at the moment` is unused for linter "gosimple" (nolintlint)
select {
case <-s.quitPeerDiscoveryChannel:
s.config.Services.Log.Infof("p2p service force shut down")
case <-ctx.Done():
s.config.Services.Log.Info("p2p service shutting down")
return
Expand All @@ -207,11 +209,21 @@ func (s *Server) Connected() bool {
// Stop the server
func (s *Server) Stop(_ context.Context) error {
// todo there needs to be a way to stop the server
s.config.Services.Log.Info("stopping P2P service")
s.config.Services.Log.Infof("stopping the p2p server")
s.config.Services.Log.Debugf("sending signals to persistent processes...")
s.quitPeerDiscoveryChannel <- true
s.quitAlertProcessingChannel <- true
s.quitPeerInitializationChannel <- true
return nil

s.config.Services.Log.Debugf("removing stream handler to stop allowing connections")
s.host.RemoveStreamHandler(protocol.ID(s.config.P2P.AlertSystemProtocolID))
s.config.Services.Log.Debugf("shutting down libp2p host")
err := s.host.Close()
if err != nil {
return err
}
s.config.Services.Log.Debugf("shutting down dht") // this is maybe redundant
return s.dht.Close()
}

// RunAlertProcessingCron starts a cron job to attempt to retry unprocessed alerts
Expand All @@ -227,6 +239,7 @@ func (s *Server) RunAlertProcessingCron(ctx context.Context) chan bool {
s.config.Services.Log.Errorf("error processing alerts: %v", err.Error())
}
case <-quit:
s.config.Services.Log.Infof("stopping alert processing process")
ticker.Stop()
return
}
Expand Down Expand Up @@ -295,6 +308,7 @@ func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *droutin
s.config.Services.Log.Errorf("error discovering peers: %v", err.Error())
}
case <-quit:
s.config.Services.Log.Infof("stopping peer discovery process")
ticker.Stop()
return
}
Expand Down Expand Up @@ -430,7 +444,7 @@ func (s *Server) discoverPeers(ctx context.Context, routingDiscovery *drouting.R

// Subscribe will subscribe to the alert system
func (s *Server) Subscribe(ctx context.Context, subscriber *pubsub.Subscription, hostID peer.ID) {
s.config.Services.Log.Infof("subscribing to %s topic", subscriber.Topic())
s.config.Services.Log.Infof("subscribed to %s topic", subscriber.Topic())
for {

msg, err := subscriber.Next(ctx)
Expand Down
1 change: 1 addition & 0 deletions app/p2p/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *StreamThread) ProcessSyncMessage(ctx context.Context) error {
}()
select {
case <-s.quitChannel:
s.config.Services.Log.Infof("quitting sync process")
return nil
case err := <-done:
return err
Expand Down
17 changes: 9 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,23 @@ func main() {
// Create a new (web) server
webServer := webserver.NewServer(_appConfig)

ctx, cancelFunc := context.WithCancel(context.Background())
// Start the p2p server
if err = p2pServer.Start(ctx); err != nil {
_appConfig.Services.Log.Fatalf("error starting p2p server: %s", err.Error())
}
// Sync a channel to listen for interrupts
idleConnectionsClosed := make(chan struct{})
go func(appConfig *config.Config) {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt)

// Log when a signal is received
appConfig.Services.Log.Info("waiting for interrupt signal")
appConfig.Services.Log.Debugf("waiting for interrupt signal")
<-sigint

// Log that we are starting the shutdown process
appConfig.Services.Log.Info("interrupt signal received, starting shutdown process")
appConfig.Services.Log.Infof("interrupt signal received, starting shutdown process")

// We received an interrupt signal, shut down the server
ctxTimeout, cancel := context.WithTimeout(context.Background(), config.DefaultServerShutdown)
Expand All @@ -77,18 +82,14 @@ func main() {
if err = p2pServer.Stop(ctxTimeout); err != nil {
appConfig.Services.Log.Infof("error shutting down p2p server: %s", err.Error())
}

cancelFunc()
appConfig.Services.Log.Infof("successfully shut down server")
close(idleConnectionsClosed)
if err = appConfig.Services.Log.CloseWriter(); err != nil {
log.Printf("error closing logger: %s", err)
}
}(_appConfig)

// Start the p2p server
if err = p2pServer.Start(context.Background()); err != nil {
_appConfig.Services.Log.Fatalf("error starting p2p server: %s", err.Error())
}

// Serve the web server and then wait endlessly
webServer.Serve()

Expand Down

0 comments on commit ee0889d

Please sign in to comment.