Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully shut down server on interrupt #59

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
Datastore DatastoreConfig `json:"datastore" mapstructure:"datastore"` // Datastore's configuration
DisableRPCVerification bool `json:"disable_rpc_verification" mapstructure:"disable_rpc_verification"` // DisableRPCVerification will disable the rpc verification check on startup. Useful if bitcoind isn't running yet
LogOutputFile string `json:"log_output_file" mapstructure:"log_output_file"` // LogOutputFile will set an output file for the logger to write to as opposed to stdout
LogLevel string `json:"log_level" mapstructure:"log_level"` // LogLevel sets the logging level
BitcoinConfigPath string `json:"bitcoin_config_path" mapstructure:"bitcoin_config_path"` // BitcoinConfigPath is the path to the bitcoin.conf file
P2P P2PConfig `json:"p2p" mapstructure:"p2p"` // P2P is the configuration for the P2P server
RPCConnections []RPCConfig `json:"rpc_connections" mapstructure:"rpc_connections"` // RPCConnections is a list of RPC connections
Expand Down
1 change: 1 addition & 0 deletions app/config/envs/mainnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"03e45c9dd2b34829c1d27c8b5d16917dd0dc2c88fa0d7bad7bffb9b542229a9304"
],
"log_output_file": "",
"log_level": "info",
"disable_rpc_verification": false,
"request_logging": true,
"alert_processing_interval": "5m",
Expand Down
1 change: 1 addition & 0 deletions app/config/envs/stn.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"02dfb76a88100c2b6cd7ad9c051bc9ef9daf74c9fa13a99cb870865a046a9772f1"
],
"log_output_file": "",
"log_level": "info",
"disable_rpc_verification": false,
"request_logging": true,
"alert_processing_interval": "5m",
Expand Down
1 change: 1 addition & 0 deletions app/config/envs/testnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"02dfb76a88100c2b6cd7ad9c051bc9ef9daf74c9fa13a99cb870865a046a9772f1"
],
"log_output_file": "",
"log_level": "info",
"disable_rpc_verification": false,
"request_logging": true,
"alert_processing_interval": "5m",
Expand Down
5 changes: 3 additions & 2 deletions app/config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ func LoadConfigFile() (_appConfig *Config, err error) {

logger := log.New(writer, "bitcoin-alert-system: ", log.LstdFlags)
_appConfig.Services.Log = &ExtendedLogger{
Logger: logger,
writer: writer,
Logger: logger,
writer: writer,
logLevel: _appConfig.LogLevel,
}

// Set default alert processing interval if it doesn't exist
Expand Down
12 changes: 9 additions & 3 deletions app/config/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type LoggerInterface interface {
Fatalf(msg string, args ...interface{})
Info(args ...interface{})
Infof(msg string, args ...interface{})
LogLevel() int
LogLevel() string
Panic(args ...interface{})
Panicf(msg string, args ...interface{})
Warn(args ...interface{})
Expand All @@ -32,7 +32,7 @@ type LoggerInterface interface {
// ExtendedLogger is the extended logger to satisfy the LoggerInterface
type ExtendedLogger struct {
*log.Logger
logLevel int
logLevel string
writer *os.File
}

Expand All @@ -48,11 +48,17 @@ func (es *ExtendedLogger) Printf(format string, v ...interface{}) {

// Debugf will print debug messages to the console
func (es *ExtendedLogger) Debugf(format string, v ...interface{}) {
if es.logLevel != "debug" {
return
}
es.Logger.Printf(fmt.Sprintf("\033[1;34m| DEBUG | %s\033[0m", format), v...)
}

// Debug will print debug messages to the console
func (es *ExtendedLogger) Debug(v ...interface{}) {
if es.logLevel != "debug" {
return
}
es.Logger.Printf("%v", v...)
}

Expand Down Expand Up @@ -82,7 +88,7 @@ func (es *ExtendedLogger) Infof(format string, v ...interface{}) {
}

// LogLevel returns the logging level
func (es *ExtendedLogger) LogLevel() int {
func (es *ExtendedLogger) LogLevel() string {
return es.logLevel
}

Expand Down
34 changes: 24 additions & 10 deletions app/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewServer(o ServerOptions) (*Server, error) {

// Print out the peer ID and addresses
o.Config.Services.Log.Debugf("peer ID: %s", h.ID().String())
o.Config.Services.Log.Info("connect to me on:")
o.Config.Services.Log.Infof("connect to me on:")
for _, addr := range h.Addrs() {
o.Config.Services.Log.Infof(" %s/p2p/%s", addr, h.ID().String())
}
Expand All @@ -115,14 +115,13 @@ func NewServer(o ServerOptions) (*Server, error) {
topicNames: o.TopicNames,
privateKey: pk,
config: o.Config,
quitPeerInitializationChannel: make(chan bool),
quitPeerInitializationChannel: make(chan bool, 1),
}, nil
}

// Start the server and subscribe to all topics
func (s *Server) Start(ctx context.Context) error {
s.config.Services.Log.Info("p2p service initializing & starting")

s.config.Services.Log.Infof("p2p service initializing & starting")
// Initialize the DHT
kademliaDHT, err := s.initDHT(ctx)
if err != nil {
Expand All @@ -147,6 +146,7 @@ func (s *Server) Start(ctx context.Context) error {
subscriptions := map[string]*pubsub.Subscription{}

s.host.SetStreamHandler(protocol.ID(s.config.P2P.AlertSystemProtocolID), func(stream network.Stream) {
s.config.Services.Log.Infof("received stream %v", stream.ID())
t := StreamThread{
stream: stream,
config: s.config,
Expand All @@ -161,7 +161,7 @@ func (s *Server) Start(ctx context.Context) error {
s.config.Services.Log.Debugf("closing stream %v for peer %v", stream.ID(), t.peer.String())
//_ = stream.Close()
}
//_ = stream.Close()
_ = stream.Close()
})

s.config.Services.Log.Debugf("stream handler set")
Expand All @@ -186,10 +186,12 @@ func (s *Server) Start(ctx context.Context) error {
}
s.topics = topics
s.subscriptions = subscriptions
s.config.Services.Log.Infof("P2P successfully started")
s.config.Services.Log.Infof("P2P server successfully started")
go func() {
for { //nolint:gosimple // This is the only way to perform this loop at the moment
for {
select {
case <-s.quitPeerDiscoveryChannel:
s.config.Services.Log.Infof("p2p service force shut down")
case <-ctx.Done():
s.config.Services.Log.Info("p2p service shutting down")
return
Expand All @@ -207,11 +209,21 @@ func (s *Server) Connected() bool {
// Stop the server
func (s *Server) Stop(_ context.Context) error {
// todo there needs to be a way to stop the server
s.config.Services.Log.Info("stopping P2P service")
s.config.Services.Log.Infof("stopping the p2p server")
s.config.Services.Log.Debugf("sending signals to persistent processes...")
s.quitPeerDiscoveryChannel <- true
s.quitAlertProcessingChannel <- true
s.quitPeerInitializationChannel <- true
return nil

s.config.Services.Log.Debugf("removing stream handler to stop allowing connections")
s.host.RemoveStreamHandler(protocol.ID(s.config.P2P.AlertSystemProtocolID))
s.config.Services.Log.Debugf("shutting down libp2p host")
err := s.host.Close()
if err != nil {
return err
}
s.config.Services.Log.Debugf("shutting down dht") // this is maybe redundant
return s.dht.Close()
}

// RunAlertProcessingCron starts a cron job to attempt to retry unprocessed alerts
Expand All @@ -227,6 +239,7 @@ func (s *Server) RunAlertProcessingCron(ctx context.Context) chan bool {
s.config.Services.Log.Errorf("error processing alerts: %v", err.Error())
}
case <-quit:
s.config.Services.Log.Infof("stopping alert processing process")
ticker.Stop()
return
}
Expand Down Expand Up @@ -295,6 +308,7 @@ func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *droutin
s.config.Services.Log.Errorf("error discovering peers: %v", err.Error())
}
case <-quit:
s.config.Services.Log.Infof("stopping peer discovery process")
ticker.Stop()
return
}
Expand Down Expand Up @@ -430,7 +444,7 @@ func (s *Server) discoverPeers(ctx context.Context, routingDiscovery *drouting.R

// Subscribe will subscribe to the alert system
func (s *Server) Subscribe(ctx context.Context, subscriber *pubsub.Subscription, hostID peer.ID) {
s.config.Services.Log.Infof("subscribing to %s topic", subscriber.Topic())
s.config.Services.Log.Infof("subscribed to %s topic", subscriber.Topic())
for {

msg, err := subscriber.Next(ctx)
Expand Down
1 change: 1 addition & 0 deletions app/p2p/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *StreamThread) ProcessSyncMessage(ctx context.Context) error {
}()
select {
case <-s.quitChannel:
s.config.Services.Log.Infof("quitting sync process")
return nil
case err := <-done:
return err
Expand Down
17 changes: 9 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,23 @@ func main() {
// Create a new (web) server
webServer := webserver.NewServer(_appConfig)

ctx, cancelFunc := context.WithCancel(context.Background())
// Start the p2p server
if err = p2pServer.Start(ctx); err != nil {
_appConfig.Services.Log.Fatalf("error starting p2p server: %s", err.Error())
}
// Sync a channel to listen for interrupts
idleConnectionsClosed := make(chan struct{})
go func(appConfig *config.Config) {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt)

// Log when a signal is received
appConfig.Services.Log.Info("waiting for interrupt signal")
appConfig.Services.Log.Debugf("waiting for interrupt signal")
<-sigint

// Log that we are starting the shutdown process
appConfig.Services.Log.Info("interrupt signal received, starting shutdown process")
appConfig.Services.Log.Infof("interrupt signal received, starting shutdown process")

// We received an interrupt signal, shut down the server
ctxTimeout, cancel := context.WithTimeout(context.Background(), config.DefaultServerShutdown)
Expand All @@ -77,18 +82,14 @@ func main() {
if err = p2pServer.Stop(ctxTimeout); err != nil {
appConfig.Services.Log.Infof("error shutting down p2p server: %s", err.Error())
}

cancelFunc()
appConfig.Services.Log.Infof("successfully shut down server")
close(idleConnectionsClosed)
if err = appConfig.Services.Log.CloseWriter(); err != nil {
log.Printf("error closing logger: %s", err)
}
}(_appConfig)

// Start the p2p server
if err = p2pServer.Start(context.Background()); err != nil {
_appConfig.Services.Log.Fatalf("error starting p2p server: %s", err.Error())
}

// Serve the web server and then wait endlessly
webServer.Serve()

Expand Down
Loading