diff --git a/cmd/main.go b/cmd/main.go index c6cfce56..82f0d001 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -108,6 +108,7 @@ func main() { Logger: log, Config: cfg, }) + p2pServer, err := p2p.NewServer(hs, peers, cfg.P2P, log) if err != nil { log.Error().Msgf("failed to init a new p2p server: %v\n", err) diff --git a/transports/p2p/server.go b/transports/p2p/server.go index cb5bafb8..41b36a25 100644 --- a/transports/p2p/server.go +++ b/transports/p2p/server.go @@ -191,13 +191,8 @@ func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) { // server provides a bitcoin server for handling communications to and from // bitcoin peers. type server struct { - // The following variables must only be used atomically. - // Putting the uint64s first makes them 64-bit aligned for 32-bit systems. - bytesReceived uint64 // Total bytes received from all peers since start. - bytesSent uint64 // Total bytes sent by all peers since start. started int32 shutdown int32 - shutdownSched int32 startupTime int64 chainParams *chaincfg.Params @@ -525,12 +520,6 @@ func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA()) } -// OnWrite is invoked when a peer sends a message and it is used to update -// the bytes sent by the server. -func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, _ wire.Message, _ error) { - sp.server.AddBytesSent(uint64(bytesWritten)) -} - // handleUpdatePeerHeight updates the heights of all peers who were known to // announce a block we recently accepted. func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) { @@ -887,7 +876,6 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnGetHeaders: sp.OnGetHeaders, OnGetAddr: sp.OnGetAddr, OnAddr: sp.OnAddr, - OnWrite: sp.OnWrite, OnProtoconf: sp.OnProtoconf, }, Log: sp.log, @@ -1101,25 +1089,6 @@ func (s *server) OutboundGroupCount(key string) int { return <-replyChan } -// AddBytesSent adds the passed number of bytes to the total bytes sent counter -// for the server. It is safe for concurrent access. -func (s *server) AddBytesSent(bytesSent uint64) { - atomic.AddUint64(&s.bytesSent, bytesSent) -} - -// AddBytesReceived adds the passed number of bytes to the total bytes received -// counter for the server. It is safe for concurrent access. -func (s *server) AddBytesReceived(bytesReceived uint64) { - atomic.AddUint64(&s.bytesReceived, bytesReceived) -} - -// NetTotals returns the sum of all bytes received and sent across the network -// for all peers. It is safe for concurrent access. -func (s *server) NetTotals() (uint64, uint64) { - return atomic.LoadUint64(&s.bytesReceived), - atomic.LoadUint64(&s.bytesSent) -} - // UpdatePeerHeights updates the heights of all peers who have have announced // the latest connected main chain block, or a recognized orphan. These height // updates allow us to dynamically refresh peer heights, ensuring sync peer @@ -1183,46 +1152,6 @@ func (s *server) WaitForShutdown() { s.wg.Wait() } -// ScheduleShutdown schedules a server shutdown after the specified duration. -// It also dynamically adjusts how often to warn the server is going down based -// on remaining duration. -func (s *server) ScheduleShutdown(duration time.Duration) { - // Don't schedule shutdown more than once. - if atomic.AddInt32(&s.shutdownSched, 1) != 1 { - return - } - s.log.Warn().Msgf("Server shutdown in %v", duration) - go func() { - remaining := duration - tickDuration := dynamicTickDuration(remaining) - done := time.After(remaining) - ticker := time.NewTicker(tickDuration) - out: - for { - select { - case <-done: - ticker.Stop() - s.Stop() - break out - case <-ticker.C: - remaining = remaining - tickDuration - if remaining < time.Second { - continue - } - - // Change tick duration dynamically based on remaining time. - newDuration := dynamicTickDuration(remaining) - if tickDuration != newDuration { - tickDuration = newDuration - ticker.Stop() - ticker = time.NewTicker(tickDuration) - } - s.log.Warn().Msgf("Server shutdown in %v", remaining) - } - } - }() -} - // parseListeners determines whether each listen address is IPv4 and IPv6 and // returns a slice of appropriate net.Addrs to listen on with TCP. It also // properly detects addresses which apply to "all interfaces" and adds the @@ -1333,7 +1262,7 @@ func newServer(chainParams *chaincfg.Params, services *service.Services, var listeners []net.Listener var err error - listeners, err = initListeners(amgr) + listeners, err = initListeners(log) if err != nil { return nil, err } @@ -1452,7 +1381,7 @@ func newServer(chainParams *chaincfg.Params, services *service.Services, // initListeners initializes the configured net listeners and adds any bound // addresses to the address manager. Returns the listeners and a NAT interface, // which is non-nil if UPnP is in use. -func initListeners(amgr *addrmgr.AddrManager) ([]net.Listener, error) { +func initListeners(log *zerolog.Logger) ([]net.Listener, error) { listenAddrs := prepareListeners() // Listen for TCP connections at the configured addresses @@ -1465,14 +1394,14 @@ func initListeners(amgr *addrmgr.AddrManager) ([]net.Listener, error) { for _, addr := range netAddrs { listener, err := net.Listen(addr.Network(), addr.String()) if err != nil { - amgr.Log.Warn().Msgf("Can't listen on %s: %v", addr, err) + log.Warn().Msgf("Can't listen on %s: %v", addr, err) continue } listeners = append(listeners, listener) } if err != nil { - amgr.Log.Error().Msgf("Can not parse default port %s for active chain: %v", + log.Error().Msgf("Can not parse default port %s for active chain: %v", config.ActiveNetParams.DefaultPort, err) return nil, err } @@ -1532,82 +1461,9 @@ func addrStringToNetAddr(addr string, p2pCfg *config.P2PConfig) (net.Addr, error }, nil } -// dynamicTickDuration is a convenience function used to dynamically choose a -// tick duration based on remaining time. It is primarily used during -// server shutdown to make shutdown warnings more frequent as the shutdown time -// approaches. -func dynamicTickDuration(remaining time.Duration) time.Duration { - switch { - case remaining <= time.Second*5: - return time.Second - case remaining <= time.Second*15: - return time.Second * 5 - case remaining <= time.Minute: - return time.Second * 15 - case remaining <= time.Minute*5: - return time.Minute - case remaining <= time.Minute*15: - return time.Minute * 5 - case remaining <= time.Hour: - return time.Minute * 15 - } - return time.Hour -} - -// RunServer starts the server and also contain all deferred functions -// because they are not called in main method when os.Exit() is called. -// The optional serverChan parameter is mainly used by the service code to be -// notified with the server once it is setup so it can gracefully stop it when -// requested from the service control manager. -func RunServer(serverChan chan<- *server, services *service.Services, - peers map[*peerpkg.Peer]*peerpkg.PeerSyncState, p2pCfg *config.P2PConfig, log *zerolog.Logger, -) error { - // Get a channel that will be closed when a shutdown signal has been - // triggered either from an OS signal such as SIGINT (Ctrl+C) or from - // another subsystem - serverLogger := log.With().Str("subservice", "server").Logger() - interrupt := interruptListener(&serverLogger) - defer serverLogger.Info().Msg("Shutdown complete") - - // Create server and start it. - server, err := createAndStartServer(serverChan, services, peers, p2pCfg, log) - if server == nil { - return err - } - defer func() { - server.log.Info().Msg("Gracefully shutting down the server...") - server.Stop() - server.WaitForShutdown() - server.log.Info().Msgf("Server shutdown complete") - }() - - // Wait until the interrupt signal is received from an OS signal or - // shutdown is requested through one of the subsystems - <-interrupt - return nil -} - -// Create and start server, return error if server was not created correctly. -func createAndStartServer(serverChan chan<- *server, services *service.Services, peers map[*peerpkg.Peer]*peerpkg.PeerSyncState, p2pCfg *config.P2PConfig, log *zerolog.Logger) (*server, error) { - server, err := newServer(config.ActiveNetParams.Params, services, peers, p2pCfg, log) - if err != nil { - log.Error().Msgf("Unable to start server: %v", err) - return nil, err - } - err = server.Start() - if err != nil { - server.log.Error().Msgf("Unable to start p2p server: %v", err) - return nil, err - } - if serverChan != nil { - serverChan <- server - } - return server, nil -} - // NewServer creates and return p2p server. func NewServer(services *service.Services, peers map[*peerpkg.Peer]*peerpkg.PeerSyncState, p2pCfg *config.P2PConfig, log *zerolog.Logger) (*server, error) { - serverLogger := log.With().Str("subservice", "server").Logger() + serverLogger := log.With().Str("service", "p2p").Logger() server, err := newServer(config.ActiveNetParams.Params, services, peers, p2pCfg, &serverLogger) if err != nil { serverLogger.Error().Msgf("Unable to start server: %v", err) diff --git a/transports/p2p/signal.go b/transports/p2p/signal.go deleted file mode 100644 index 6c9d2ae9..00000000 --- a/transports/p2p/signal.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) 2013-2016 The btcsuite developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -package p2p - -import ( - "github.com/rs/zerolog" - "os" - "os/signal" -) - -// shutdownRequestChannel is used to initiate shutdown from one of the -// subsystems using the same code paths as when an interrupt signal is received. -var shutdownRequestChannel = make(chan struct{}) - -// interruptSignals defines the default signals to catch in order to do a proper -// shutdown. This may be modified during init depending on the platform. -var interruptSignals = []os.Signal{os.Interrupt} - -// interruptListener listens for OS Signals such as SIGINT (Ctrl+C) and shutdown -// requests from shutdownRequestChannel. It returns a channel that is closed -// when either signal is received. -func interruptListener(log *zerolog.Logger) <-chan struct{} { - l := log.With().Str("subservice", "interruptListener").Logger() - c := make(chan struct{}) - go func() { - interruptChannel := make(chan os.Signal, 1) - signal.Notify(interruptChannel, interruptSignals...) - - // Listen for initial shutdown signal and close the returned - // channel to notify the caller. - select { - case sig := <-interruptChannel: - l.Info().Msgf("Received signal (%s). Shutting down...", sig) - - case <-shutdownRequestChannel: - l.Info().Msg("Shutdown requested. Shutting down...") - } - close(c) - - // Listen for repeated signals and display a message so the user - // knows the shutdown is in progress and the process is not - // hung. - for { - select { - case sig := <-interruptChannel: - l.Info().Msgf("Received signal (%s). Already shutting down...", sig) - - case <-shutdownRequestChannel: - l.Info().Msg("Shutdown requested. Already shutting down...") - } - } - }() - - return c -} diff --git a/transports/p2p/signalsigterm.go b/transports/p2p/signalsigterm.go deleted file mode 100644 index 9dc3ff6e..00000000 --- a/transports/p2p/signalsigterm.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright (c) 2016 The btcsuite developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -// +build darwin dragonfly freebsd linux netbsd openbsd solaris - -package p2p - -import ( - "os" - "syscall" -) - -func init() { - interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} -}