Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make peer discovery a constant process #18

Merged
merged 3 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions app/config/envs/stn.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"alert_webhook_url": "",
"request_logging": true,
"web_server": {
"idle_timeout": "60s",
"port": "3000",
"read_timeout": "15s",
"write_timeout": "15s"
},
"environment": "local",
"datastore": {
"auto_migrate": true,
"debug": true,
"engine": "sqlite",
"password": "",
"table_prefix": "alert_system",
"sqlite": {
"database_path": "alert_system_datastore.db",
"shared": false
},
"sql_read": {
"driver": "postgresql",
"host": "localhost",
"max_connection_idle_time": "20s",
"max_connection_time": "20s",
"max_idle_connections": 2,
"max_open_connections": 5,
"name": "alert_system_db",
"password": "postgres",
"port": "5432",
"replica": true,
"skip_initialize_with_version": true,
"time_zone": "UTC",
"tx_timeout": "20s",
"user": "postgres"
},
"sql_write": {
"driver": "postgresql",
"host": "localhost",
"max_connection_idle_time": "20s",
"max_connection_time": "20s",
"max_idle_connections": 2,
"max_open_connections": 5,
"name": "alert_system_db",
"password": "postgres",
"port": "5432",
"replica": false,
"skip_initialize_with_version": true,
"time_zone": "UTC",
"tx_timeout": "20s",
"user": "postgres"
}
},
"p2p": {
"ip": "0.0.0.0",
"port": "9906",
"alert_system_protocol_id": "/bitcoin-stn/alert-system/0.0.1",
"bootstrap_peer": "",
"private_key_path": "",
"topic_name": "alert_system_stn"
},
"rpc_connections": [
{
"user": "galt",
"password": "galt",
"host": "http://localhost:8333"
}
]
}
1 change: 1 addition & 0 deletions app/models/alert_message_invalidate_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (a *AlertMessageInvalidateBlock) Read(alert []byte) error {
a.ReasonLength = length
a.Reason = msg
a.BlockHash = blockHash
a.Config().Services.Log.Infof("InvalidateBlock alert; hash [%s]; reason [%s]", a.BlockHash, a.Reason)
return nil
}

Expand Down
54 changes: 43 additions & 11 deletions app/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Server struct {
topicNames []string
topics map[string]*pubsub.Topic
dht *dht.IpfsDHT
//peers []peer.AddrInfo
}

// NewServer will create a new server
Expand Down Expand Up @@ -112,10 +113,10 @@ func (s *Server) Start(ctx context.Context) error {
dutil.Advertise(ctx, routingDiscovery, topicName)
}

go func() {
// todo handle errors
_ = s.discoverPeers(ctx, s.topicNames, routingDiscovery)
}()
_ = s.RunPeerDiscovery(ctx, routingDiscovery)
for !s.connected {
time.Sleep(5 * time.Second)
}

ps, err := pubsub.NewGossipSub(ctx, s.host, pubsub.WithDiscovery(routingDiscovery))
if err != nil {
Expand All @@ -142,6 +143,7 @@ func (s *Server) Start(ctx context.Context) error {
//_ = stream.Close()
})

s.config.Services.Log.Debugf("stream handler set")
for _, topicName := range s.topicNames {
var topic *pubsub.Topic
if topic, err = ps.Join(topicName); err != nil {
Expand All @@ -160,9 +162,7 @@ func (s *Server) Start(ctx context.Context) error {
}
s.topics = topics
s.subscriptions = subscriptions

s.config.Services.Log.Info("p2p service start ending")

s.config.Services.Log.Infof("P2P successfully started")
go func() {
for { //nolint:gosimple // This is the only way to perform this loop at the moment
select {
Expand All @@ -187,6 +187,31 @@ func (s *Server) Stop(_ context.Context) error {
return nil
}

// RunPeerDiscovery starts a 5 min cron job to resync peers and update routable peers
func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery) chan bool {
ticker := time.NewTicker(5 * time.Minute)
quit := make(chan bool, 1)
go func() {
err := s.discoverPeers(ctx, s.topicNames, routingDiscovery)
if err != nil {
s.config.Services.Log.Errorf("error discovering peers: %v", err.Error())
}
for {
select {
case <-ticker.C:
err := s.discoverPeers(ctx, s.topicNames, routingDiscovery)
if err != nil {
s.config.Services.Log.Errorf("error discovering peers: %v", err.Error())
}
case <-quit:
ticker.Stop()
}

}
}()
return quit
}

// generatePrivateKey generates a private key and stores it in `private_key` file
func generatePrivateKey(filePath string) (*crypto.PrivKey, error) {
// Generate a new key pair
Expand Down Expand Up @@ -238,9 +263,11 @@ func (s *Server) Topics() map[string]*pubsub.Topic {

// discoverPeers will discover peers
func (s *Server) discoverPeers(ctx context.Context, tn []string, routingDiscovery *drouting.RoutingDiscovery) error {
s.config.Services.Log.Infof("Running peer discovery at %s", time.Now().String())

// Look for others who have announced and attempt to connect to them
anyConnected := false
for !anyConnected {
connected := 0
for connected < 7 {
for _, topicName := range tn {
s.config.Services.Log.Debugf("searching for peers for topic %s..\n", topicName)

Expand All @@ -259,6 +286,8 @@ func (s *Server) discoverPeers(ctx context.Context, tn []string, routingDiscover
}

// Failed to connect to peer
s.config.Services.Log.Debugf("attempting connection to %s", foundPeer.ID.String())

if err = s.host.Connect(ctx, foundPeer); err != nil {
// we fail to connect to a lot of peers. Just ignore it for now.
s.config.Services.Log.Debugf("failed connecting to %s, error: %s", foundPeer.ID.String(), err.Error())
Expand Down Expand Up @@ -289,10 +318,10 @@ func (s *Server) discoverPeers(ctx context.Context, tn []string, routingDiscover
continue
}

s.config.Services.Log.Debugf("successfully synced messages from peer %s", foundPeer.ID.String())
s.config.Services.Log.Infof("successfully synced up to %d from peer %s", t.LatestSequence(), foundPeer.ID.String())

// Set the flag
anyConnected = true
connected++
}
time.Sleep(1 * time.Second)
}
Expand All @@ -302,6 +331,7 @@ func (s *Server) discoverPeers(ctx context.Context, tn []string, routingDiscover
s.config.Services.Log.Debugf("peer discovery complete")
s.config.Services.Log.Debugf("connected to %d peers\n", len(s.host.Network().Peers()))
s.config.Services.Log.Debugf("peerstore has %d peers\n", len(s.host.Peerstore().Peers()))
s.config.Services.Log.Infof("Successfully discovered %d active peers at %s", connected, time.Now().String())
s.connected = true
return nil
}
Expand All @@ -310,7 +340,9 @@ func (s *Server) discoverPeers(ctx context.Context, tn []string, routingDiscover
func (s *Server) Subscribe(ctx context.Context, subscriber *pubsub.Subscription, hostID peer.ID) {
s.config.Services.Log.Infof("subscribing to %s topic", subscriber.Topic())
for {

msg, err := subscriber.Next(ctx)

if err != nil {
s.config.Services.Log.Infof("error subscribing via next: %s", err.Error())
continue
Expand Down
23 changes: 14 additions & 9 deletions app/p2p/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type StreamThread struct {
stream network.Stream
}

// LatestSequence will return the threads latest sequence
func (s *StreamThread) LatestSequence() uint32 {
return s.latestSequence
}

// Sync will start the thread
func (s *StreamThread) Sync(ctx context.Context) error {

Expand Down Expand Up @@ -57,7 +62,7 @@ func (s *StreamThread) Sync(ctx context.Context) error {
return err
}

s.config.Services.Log.Infof("requested latest sequence in stream %s", s.stream.ID())
s.config.Services.Log.Debugf("requested latest sequence in stream %s", s.stream.ID())

return s.ProcessSyncMessage(ctx)
}
Expand Down Expand Up @@ -85,41 +90,41 @@ func (s *StreamThread) ProcessSyncMessage(ctx context.Context) error {
}
switch msg.Type {
case IGotLatest:
s.config.Services.Log.Infof("received latest sequence %d from peer %s", msg.SequenceNumber, s.peer.String())
s.config.Services.Log.Debugf("received latest sequence %d from peer %s", msg.SequenceNumber, s.peer.String())
if err = s.ProcessGotLatest(ctx, msg); err != nil {
return err
}
if s.myLatestSequence >= s.latestSequence {
_ = s.stream.Close()
return nil
}
s.config.Services.Log.Infof("wrote msg requesting next sequence %d from peer %s", s.myLatestSequence+1, s.peer.String())
s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", s.myLatestSequence+1, s.peer.String())
case IGotSequenceNumber:
s.config.Services.Log.Infof("received IGotSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String())
s.config.Services.Log.Debugf("received IGotSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String())
if err = s.ProcessGotSequenceNumber(msg); err != nil {
return err
}
if s.myLatestSequence == s.latestSequence {
_ = s.stream.Close()
return nil
}
s.config.Services.Log.Infof("wrote msg requesting next sequence %d from peer %s", msg.SequenceNumber+1, s.peer.String())
s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", msg.SequenceNumber+1, s.peer.String())
case IWantSequenceNumber:
s.config.Services.Log.Infof("received IWantSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String())
s.config.Services.Log.Debugf("received IWantSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String())
if err = s.ProcessWantSequenceNumber(ctx, msg); err != nil {
return err
}
s.config.Services.Log.Infof("wrote sequence %d to peer %s", msg.SequenceNumber, s.peer.String())
s.config.Services.Log.Debugf("wrote sequence %d to peer %s", msg.SequenceNumber, s.peer.String())
if msg.SequenceNumber == s.myLatestSequence {
_ = s.stream.Close()
return nil
}
case IWantLatest:
s.config.Services.Log.Infof("received IWantLatest from peer %s", s.peer.String())
s.config.Services.Log.Debugf("received IWantLatest from peer %s", s.peer.String())
if err = s.ProcessWantLatest(ctx); err != nil {
return err
}
s.config.Services.Log.Infof("wrote latest sequence %d to peer %s", s.myLatestSequence, s.peer.String())
s.config.Services.Log.Debugf("wrote latest sequence %d to peer %s", s.myLatestSequence, s.peer.String())
}
}
}
Expand Down
Loading
Loading