diff --git a/app/p2p/dht.go b/app/p2p/dht.go index 9e9733d..d82e051 100644 --- a/app/p2p/dht.go +++ b/app/p2p/dht.go @@ -5,11 +5,12 @@ import ( "sync" "time" + "github.com/bitcoin-sv/alert-system/app/config" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" - "github.com/bitcoin-sv/alert-system/app/config" dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p/core/peer" ) // initDHT will initialize the DHT @@ -45,25 +46,30 @@ func (s *Server) initDHT(ctx context.Context) (*dht.IpfsDHT, error) { // Connect to the chosen ipfs nodes var connected = false for !connected { - var wg sync.WaitGroup - for _, peerAddr := range peers { - var peerInfo *peer.AddrInfo - if peerInfo, err = peer.AddrInfoFromP2pAddr(peerAddr); err != nil { - return nil, err - } - wg.Add(1) - go func(logger config.LoggerInterface) { - defer wg.Done() - if err = s.host.Connect(ctx, *peerInfo); err != nil { - logger.Errorf("bootstrap warning: %s", err.Error()) - return + select { + case <-s.quitPeerInitializationChannel: + return nil, nil + default: + var wg sync.WaitGroup + for _, peerAddr := range peers { + var peerInfo *peer.AddrInfo + if peerInfo, err = peer.AddrInfoFromP2pAddr(peerAddr); err != nil { + return nil, err } - logger.Infof("connected to peer %v", peerInfo.ID) - connected = true - }(logger) + wg.Add(1) + go func(logger config.LoggerInterface) { + defer wg.Done() + if err = s.host.Connect(ctx, *peerInfo); err != nil { + logger.Errorf("bootstrap warning: %s", err.Error()) + return + } + logger.Infof("connected to peer %v", peerInfo.ID) + connected = true + }(logger) + } + time.Sleep(1 * time.Second) + wg.Wait() } - time.Sleep(1 * time.Second) - wg.Wait() } return kademliaDHT, nil diff --git a/app/p2p/server.go b/app/p2p/server.go index 36e463f..4f50bc1 100644 --- a/app/p2p/server.go +++ b/app/p2p/server.go @@ -42,16 +42,17 @@ type ServerOptions struct { // Server is the P2P server type Server struct { // alertKeyTopicName string - connected bool - config *config.Config - host host.Host - privateKey *crypto.PrivKey - subscriptions map[string]*pubsub.Subscription - topicNames []string - topics map[string]*pubsub.Topic - dht *dht.IpfsDHT - quitAlertProcessingChannel chan bool - quitPeerDiscoveryChannel chan bool + connected bool + config *config.Config + host host.Host + privateKey *crypto.PrivKey + subscriptions map[string]*pubsub.Subscription + topicNames []string + topics map[string]*pubsub.Topic + dht *dht.IpfsDHT + quitAlertProcessingChannel chan bool + quitPeerDiscoveryChannel chan bool + quitPeerInitializationChannel chan bool //peers []peer.AddrInfo } @@ -91,10 +92,11 @@ func NewServer(o ServerOptions) (*Server, error) { // Return the server return &Server{ - host: h, - topicNames: o.TopicNames, - privateKey: pk, - config: o.Config, + host: h, + topicNames: o.TopicNames, + privateKey: pk, + config: o.Config, + quitPeerInitializationChannel: make(chan bool), }, nil } @@ -189,6 +191,7 @@ func (s *Server) Stop(_ context.Context) error { s.config.Services.Log.Info("stopping P2P service") s.quitPeerDiscoveryChannel <- true s.quitAlertProcessingChannel <- true + s.quitPeerInitializationChannel <- true return nil }