Skip to content

Commit

Permalink
Add timeout to sync process (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
galt-tr authored Jan 25, 2024
1 parent ff47577 commit 610323b
Showing 1 changed file with 71 additions and 51 deletions.
122 changes: 71 additions & 51 deletions app/p2p/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package p2p
import (
"context"
"encoding/hex"
"fmt"
"math"
"time"

"github.com/bitcoin-sv/alert-system/app/config"
"github.com/bitcoin-sv/alert-system/app/models"
Expand Down Expand Up @@ -70,64 +72,82 @@ func (s *StreamThread) Sync(ctx context.Context) error {

// ProcessSyncMessage will process the sync message
func (s *StreamThread) ProcessSyncMessage(ctx context.Context) error {

for {
b, err := wire.ReadVarBytes(s.stream, 0, math.MaxUint64, config.ApplicationName)
if err != nil {
if s.stream.Conn().IsClosed() || s.stream.Stat().Transient {
return nil
done := make(chan error)
go func() {
for {
b, err := wire.ReadVarBytes(s.stream, 0, math.MaxUint64, config.ApplicationName)
if err != nil {
if s.stream.Conn().IsClosed() || s.stream.Stat().Transient {
done <- nil
return
}
s.config.Services.Log.Debugf("failed to read sync message: %s; closing stream", err.Error())
done <- s.stream.Close()
}
s.config.Services.Log.Debugf("failed to read sync message: %s; closing stream", err.Error())
return s.stream.Close()
}

if len(b) == 0 {
_ = s.stream.Close()
return nil
}
var msg *SyncMessage
if msg, err = NewSyncMessageFromBytes(b); err != nil {
s.config.Services.Log.Errorf("failed to convert to sync message: %s", err.Error())
return err
}
switch msg.Type {
case IGotLatest:
s.config.Services.Log.Debugf("received latest sequence %d from peer %s", msg.SequenceNumber, s.peer.String())
if err = s.ProcessGotLatest(ctx, msg); err != nil {
return err
}
if s.myLatestSequence >= s.latestSequence {
_ = s.stream.Close()
return nil
}
s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", s.myLatestSequence+1, s.peer.String())
case IGotSequenceNumber:
s.config.Services.Log.Debugf("received IGotSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String())
if err = s.ProcessGotSequenceNumber(msg); err != nil {
return err
}
if s.myLatestSequence == s.latestSequence {
if len(b) == 0 {
_ = s.stream.Close()
return nil
}
s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", msg.SequenceNumber+1, s.peer.String())
case IWantSequenceNumber:
s.config.Services.Log.Debugf("received IWantSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String())
if err = s.ProcessWantSequenceNumber(ctx, msg); err != nil {
return err
done <- nil
return
}
s.config.Services.Log.Debugf("wrote sequence %d to peer %s", msg.SequenceNumber, s.peer.String())
if msg.SequenceNumber == s.myLatestSequence {
_ = s.stream.Close()
return nil
var msg *SyncMessage
if msg, err = NewSyncMessageFromBytes(b); err != nil {
s.config.Services.Log.Errorf("failed to convert to sync message: %s", err.Error())
done <- err
return
}
case IWantLatest:
s.config.Services.Log.Debugf("received IWantLatest from peer %s", s.peer.String())
if err = s.ProcessWantLatest(ctx); err != nil {
return err
switch msg.Type {
case IGotLatest:
s.config.Services.Log.Debugf("received latest sequence %d from peer %s", msg.SequenceNumber, s.peer.String())
if err = s.ProcessGotLatest(ctx, msg); err != nil {
done <- err
return
}
if s.myLatestSequence >= s.latestSequence {
_ = s.stream.Close()
done <- nil
return
}
s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", s.myLatestSequence+1, s.peer.String())
case IGotSequenceNumber:
s.config.Services.Log.Debugf("received IGotSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String())
if err = s.ProcessGotSequenceNumber(msg); err != nil {
done <- err
return
}
if s.myLatestSequence == s.latestSequence {
_ = s.stream.Close()
done <- nil
return
}
s.config.Services.Log.Debugf("wrote msg requesting next sequence %d from peer %s", msg.SequenceNumber+1, s.peer.String())
case IWantSequenceNumber:
s.config.Services.Log.Debugf("received IWantSequenceNumber %d from peer %s", msg.SequenceNumber, s.peer.String())
if err = s.ProcessWantSequenceNumber(ctx, msg); err != nil {
done <- err
return
}
s.config.Services.Log.Debugf("wrote sequence %d to peer %s", msg.SequenceNumber, s.peer.String())
if msg.SequenceNumber == s.myLatestSequence {
err = s.stream.Close()
done <- err
return
}
case IWantLatest:
s.config.Services.Log.Debugf("received IWantLatest from peer %s", s.peer.String())
if err = s.ProcessWantLatest(ctx); err != nil {
done <- err
return
}
s.config.Services.Log.Debugf("wrote latest sequence %d to peer %s", s.myLatestSequence, s.peer.String())
}
s.config.Services.Log.Debugf("wrote latest sequence %d to peer %s", s.myLatestSequence, s.peer.String())
}
}()
select {
case err := <-done:
return err
case <-time.After(time.Minute * 1):
return fmt.Errorf("sync from peer %s process timed out after 1 minute", s.peer.String())
}
}

Expand Down

0 comments on commit 610323b

Please sign in to comment.