Skip to content

Commit

Permalink
feat(BUX-313): removing unused methods, files and server fields (#210)
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba-4chain authored Mar 11, 2024
1 parent ad0d157 commit b997f68
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 222 deletions.
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func main() {
Logger: log,
Config: cfg,
})

p2pServer, err := p2p.NewServer(hs, peers, cfg.P2P, log)
if err != nil {
log.Error().Msgf("failed to init a new p2p server: %v\n", err)
Expand Down
154 changes: 5 additions & 149 deletions transports/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,8 @@ func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
// server provides a bitcoin server for handling communications to and from
// bitcoin peers.
type server struct {
// The following variables must only be used atomically.
// Putting the uint64s first makes them 64-bit aligned for 32-bit systems.
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
started int32
shutdown int32
shutdownSched int32
startupTime int64

chainParams *chaincfg.Params
Expand Down Expand Up @@ -525,12 +520,6 @@ func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA())
}

// OnWrite is invoked when a peer sends a message and it is used to update
// the bytes sent by the server.
func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, _ wire.Message, _ error) {
sp.server.AddBytesSent(uint64(bytesWritten))
}

// handleUpdatePeerHeight updates the heights of all peers who were known to
// announce a block we recently accepted.
func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) {
Expand Down Expand Up @@ -887,7 +876,6 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
OnGetHeaders: sp.OnGetHeaders,
OnGetAddr: sp.OnGetAddr,
OnAddr: sp.OnAddr,
OnWrite: sp.OnWrite,
OnProtoconf: sp.OnProtoconf,
},
Log: sp.log,
Expand Down Expand Up @@ -1101,25 +1089,6 @@ func (s *server) OutboundGroupCount(key string) int {
return <-replyChan
}

// AddBytesSent adds the passed number of bytes to the total bytes sent counter
// for the server. It is safe for concurrent access.
func (s *server) AddBytesSent(bytesSent uint64) {
atomic.AddUint64(&s.bytesSent, bytesSent)
}

// AddBytesReceived adds the passed number of bytes to the total bytes received
// counter for the server. It is safe for concurrent access.
func (s *server) AddBytesReceived(bytesReceived uint64) {
atomic.AddUint64(&s.bytesReceived, bytesReceived)
}

// NetTotals returns the sum of all bytes received and sent across the network
// for all peers. It is safe for concurrent access.
func (s *server) NetTotals() (uint64, uint64) {
return atomic.LoadUint64(&s.bytesReceived),
atomic.LoadUint64(&s.bytesSent)
}

// UpdatePeerHeights updates the heights of all peers who have have announced
// the latest connected main chain block, or a recognized orphan. These height
// updates allow us to dynamically refresh peer heights, ensuring sync peer
Expand Down Expand Up @@ -1183,46 +1152,6 @@ func (s *server) WaitForShutdown() {
s.wg.Wait()
}

// ScheduleShutdown schedules a server shutdown after the specified duration.
// It also dynamically adjusts how often to warn the server is going down based
// on remaining duration.
func (s *server) ScheduleShutdown(duration time.Duration) {
// Don't schedule shutdown more than once.
if atomic.AddInt32(&s.shutdownSched, 1) != 1 {
return
}
s.log.Warn().Msgf("Server shutdown in %v", duration)
go func() {
remaining := duration
tickDuration := dynamicTickDuration(remaining)
done := time.After(remaining)
ticker := time.NewTicker(tickDuration)
out:
for {
select {
case <-done:
ticker.Stop()
s.Stop()
break out
case <-ticker.C:
remaining = remaining - tickDuration
if remaining < time.Second {
continue
}

// Change tick duration dynamically based on remaining time.
newDuration := dynamicTickDuration(remaining)
if tickDuration != newDuration {
tickDuration = newDuration
ticker.Stop()
ticker = time.NewTicker(tickDuration)
}
s.log.Warn().Msgf("Server shutdown in %v", remaining)
}
}
}()
}

// parseListeners determines whether each listen address is IPv4 and IPv6 and
// returns a slice of appropriate net.Addrs to listen on with TCP. It also
// properly detects addresses which apply to "all interfaces" and adds the
Expand Down Expand Up @@ -1333,7 +1262,7 @@ func newServer(chainParams *chaincfg.Params, services *service.Services,

var listeners []net.Listener
var err error
listeners, err = initListeners(amgr)
listeners, err = initListeners(log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1452,7 +1381,7 @@ func newServer(chainParams *chaincfg.Params, services *service.Services,
// initListeners initializes the configured net listeners and adds any bound
// addresses to the address manager. Returns the listeners and a NAT interface,
// which is non-nil if UPnP is in use.
func initListeners(amgr *addrmgr.AddrManager) ([]net.Listener, error) {
func initListeners(log *zerolog.Logger) ([]net.Listener, error) {
listenAddrs := prepareListeners()

// Listen for TCP connections at the configured addresses
Expand All @@ -1465,14 +1394,14 @@ func initListeners(amgr *addrmgr.AddrManager) ([]net.Listener, error) {
for _, addr := range netAddrs {
listener, err := net.Listen(addr.Network(), addr.String())
if err != nil {
amgr.Log.Warn().Msgf("Can't listen on %s: %v", addr, err)
log.Warn().Msgf("Can't listen on %s: %v", addr, err)
continue
}
listeners = append(listeners, listener)
}

if err != nil {
amgr.Log.Error().Msgf("Can not parse default port %s for active chain: %v",
log.Error().Msgf("Can not parse default port %s for active chain: %v",
config.ActiveNetParams.DefaultPort, err)
return nil, err
}
Expand Down Expand Up @@ -1532,82 +1461,9 @@ func addrStringToNetAddr(addr string, p2pCfg *config.P2PConfig) (net.Addr, error
}, nil
}

// dynamicTickDuration is a convenience function used to dynamically choose a
// tick duration based on remaining time. It is primarily used during
// server shutdown to make shutdown warnings more frequent as the shutdown time
// approaches.
func dynamicTickDuration(remaining time.Duration) time.Duration {
switch {
case remaining <= time.Second*5:
return time.Second
case remaining <= time.Second*15:
return time.Second * 5
case remaining <= time.Minute:
return time.Second * 15
case remaining <= time.Minute*5:
return time.Minute
case remaining <= time.Minute*15:
return time.Minute * 5
case remaining <= time.Hour:
return time.Minute * 15
}
return time.Hour
}

// RunServer starts the server and also contain all deferred functions
// because they are not called in main method when os.Exit() is called.
// The optional serverChan parameter is mainly used by the service code to be
// notified with the server once it is setup so it can gracefully stop it when
// requested from the service control manager.
func RunServer(serverChan chan<- *server, services *service.Services,
peers map[*peerpkg.Peer]*peerpkg.PeerSyncState, p2pCfg *config.P2PConfig, log *zerolog.Logger,
) error {
// Get a channel that will be closed when a shutdown signal has been
// triggered either from an OS signal such as SIGINT (Ctrl+C) or from
// another subsystem
serverLogger := log.With().Str("subservice", "server").Logger()
interrupt := interruptListener(&serverLogger)
defer serverLogger.Info().Msg("Shutdown complete")

// Create server and start it.
server, err := createAndStartServer(serverChan, services, peers, p2pCfg, log)
if server == nil {
return err
}
defer func() {
server.log.Info().Msg("Gracefully shutting down the server...")
server.Stop()
server.WaitForShutdown()
server.log.Info().Msgf("Server shutdown complete")
}()

// Wait until the interrupt signal is received from an OS signal or
// shutdown is requested through one of the subsystems
<-interrupt
return nil
}

// Create and start server, return error if server was not created correctly.
func createAndStartServer(serverChan chan<- *server, services *service.Services, peers map[*peerpkg.Peer]*peerpkg.PeerSyncState, p2pCfg *config.P2PConfig, log *zerolog.Logger) (*server, error) {
server, err := newServer(config.ActiveNetParams.Params, services, peers, p2pCfg, log)
if err != nil {
log.Error().Msgf("Unable to start server: %v", err)
return nil, err
}
err = server.Start()
if err != nil {
server.log.Error().Msgf("Unable to start p2p server: %v", err)
return nil, err
}
if serverChan != nil {
serverChan <- server
}
return server, nil
}

// NewServer creates and return p2p server.
func NewServer(services *service.Services, peers map[*peerpkg.Peer]*peerpkg.PeerSyncState, p2pCfg *config.P2PConfig, log *zerolog.Logger) (*server, error) {
serverLogger := log.With().Str("subservice", "server").Logger()
serverLogger := log.With().Str("service", "p2p").Logger()
server, err := newServer(config.ActiveNetParams.Params, services, peers, p2pCfg, &serverLogger)
if err != nil {
serverLogger.Error().Msgf("Unable to start server: %v", err)
Expand Down
57 changes: 0 additions & 57 deletions transports/p2p/signal.go

This file was deleted.

16 changes: 0 additions & 16 deletions transports/p2p/signalsigterm.go

This file was deleted.

0 comments on commit b997f68

Please sign in to comment.