From e46388b8a2f6d6f8e5e6957ba5bab94951e5ce44 Mon Sep 17 00:00:00 2001 From: Dylan Murray Date: Thu, 25 Jan 2024 09:13:40 -0500 Subject: [PATCH] Add timeout to sync process --- app/p2p/thread.go | 122 +++++++++++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 51 deletions(-) diff --git a/app/p2p/thread.go b/app/p2p/thread.go index 6b3bd0b..7ffbda9 100644 --- a/app/p2p/thread.go +++ b/app/p2p/thread.go @@ -3,7 +3,9 @@ package p2p import ( "context" "encoding/hex" + "fmt" "math" + "time" "github.com/bitcoin-sv/alert-system/app/config" "github.com/bitcoin-sv/alert-system/app/models" @@ -70,64 +72,82 @@ func (s *StreamThread) Sync(ctx context.Context) error { // ProcessSyncMessage will process the sync message func (s *StreamThread) ProcessSyncMessage(ctx context.Context) error { - - for { - b, err := wire.ReadVarBytes(s.stream, 0, math.MaxUint64, config.ApplicationName) - if err != nil { - if s.stream.Conn().IsClosed() || s.stream.Stat().Transient { - return nil + done := make(chan error) + go func() { + for { + b, err := wire.ReadVarBytes(s.stream, 0, math.MaxUint64, config.ApplicationName) + if err != nil { + if s.stream.Conn().IsClosed() || s.stream.Stat().Transient { + done <- nil + return + } + s.config.Services.Log.Debugf("failed to read sync message: %s; closing stream", err.Error()) + done <- s.stream.Close() } - s.config.Services.Log.Debugf("failed to read sync message: %s; closing stream", err.Error()) - return s.stream.Close() - } - if len(b) == 0 { - _ = s.stream.Close() - return nil - } - var msg *SyncMessage - if msg, err = NewSyncMessageFromBytes(b); err != nil { - s.config.Services.Log.Errorf("failed to convert to sync message: %s", err.Error()) - return err - } - switch msg.Type { - case IGotLatest: - s.config.Services.Log.Debugf("received latest sequence %d from peer %s", msg.SequenceNumber, s.peer.String()) - if err = s.ProcessGotLatest(ctx, msg); err != nil { - return err - } - if s.myLatestSequence >= s.latestSequence { - _ = s.stream.Close() - return nil - } - s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", s.myLatestSequence+1, s.peer.String()) - case IGotSequenceNumber: - s.config.Services.Log.Debugf("received IGotSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String()) - if err = s.ProcessGotSequenceNumber(msg); err != nil { - return err - } - if s.myLatestSequence == s.latestSequence { + if len(b) == 0 { _ = s.stream.Close() - return nil - } - s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", msg.SequenceNumber+1, s.peer.String()) - case IWantSequenceNumber: - s.config.Services.Log.Debugf("received IWantSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String()) - if err = s.ProcessWantSequenceNumber(ctx, msg); err != nil { - return err + done <- nil + return } - s.config.Services.Log.Debugf("wrote sequence %d to peer %s", msg.SequenceNumber, s.peer.String()) - if msg.SequenceNumber == s.myLatestSequence { - _ = s.stream.Close() - return nil + var msg *SyncMessage + if msg, err = NewSyncMessageFromBytes(b); err != nil { + s.config.Services.Log.Errorf("failed to convert to sync message: %s", err.Error()) + done <- err + return } - case IWantLatest: - s.config.Services.Log.Debugf("received IWantLatest from peer %s", s.peer.String()) - if err = s.ProcessWantLatest(ctx); err != nil { - return err + switch msg.Type { + case IGotLatest: + s.config.Services.Log.Debugf("received latest sequence %d from peer %s", msg.SequenceNumber, s.peer.String()) + if err = s.ProcessGotLatest(ctx, msg); err != nil { + done <- err + return + } + if s.myLatestSequence >= s.latestSequence { + _ = s.stream.Close() + done <- nil + return + } + s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", s.myLatestSequence+1, s.peer.String()) + case IGotSequenceNumber: + s.config.Services.Log.Debugf("received IGotSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String()) + if err = s.ProcessGotSequenceNumber(msg); err != nil { + done <- err + return + } + if s.myLatestSequence == s.latestSequence { + _ = s.stream.Close() + done <- nil + return + } + s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", msg.SequenceNumber+1, s.peer.String()) + case IWantSequenceNumber: + s.config.Services.Log.Debugf("received IWantSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String()) + if err = s.ProcessWantSequenceNumber(ctx, msg); err != nil { + done <- err + return + } + s.config.Services.Log.Debugf("wrote sequence %d to peer %s", msg.SequenceNumber, s.peer.String()) + if msg.SequenceNumber == s.myLatestSequence { + err = s.stream.Close() + done <- err + return + } + case IWantLatest: + s.config.Services.Log.Debugf("received IWantLatest from peer %s", s.peer.String()) + if err = s.ProcessWantLatest(ctx); err != nil { + done <- err + return + } + s.config.Services.Log.Debugf("wrote latest sequence %d to peer %s", s.myLatestSequence, s.peer.String()) } - s.config.Services.Log.Debugf("wrote latest sequence %d to peer %s", s.myLatestSequence, s.peer.String()) } + }() + select { + case err := <-done: + return err + case <-time.After(time.Minute * 1): + return fmt.Errorf("sync from peer %s process timed out after 1 minute", s.peer.String()) } }