Skip to content

Commit

Permalink
Add cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
IronGauntlets committed Jan 18, 2024
1 parent bef4369 commit 4f2d63a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ node:
--p2p-boot-peers /ip4/127.0.0.1/tcp/7777/p2p/12D3KooWLdURCjbp1D7hkXWk6ZVfcMDPtsNnPHuxoTcWXFtvrxGG \
--db-path ./juno2

node2: juno
node2:
./build/juno --network=sepolia --log-level=debug --p2p \
--p2p-boot-peers /ip4/127.0.0.1/tcp/7777/p2p/12D3KooWLdURCjbp1D7hkXWk6ZVfcMDPtsNnPHuxoTcWXFtvrxGG \
--db-path ./juno3
74 changes: 45 additions & 29 deletions p2p/sync_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/NethermindEth/juno/utils/pipeline"
)

const maxBlocks = 100

func (s *syncService) startPipeline(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -24,17 +26,20 @@ func (s *syncService) startPipeline(ctx context.Context) {

var bootNodeHeight uint64
for i := 0; ; i++ {
if ctx.Err() != nil {
if err := ctx.Err(); err != nil {
break
}

fmt.Println("Continuous iteration", "i", i)

ctx, cancelIteration := context.WithCancel(ctx)

var err error
bootNodeHeight, err = s.bootNodeHeight(ctx)
if err != nil {
s.logError("Failed to get boot node height", err)
return
cancelIteration()
continue
}

var nextHeight uint64
Expand All @@ -44,85 +49,94 @@ func (s *syncService) startPipeline(ctx context.Context) {
s.log.Errorw("Failed to get current height", "err", err)
}

if bootNodeHeight-(nextHeight-1) == 0 {
blockBehind := bootNodeHeight - (nextHeight - 1)
if blockBehind <= 0 {
s.log.Infow("Bootnode height is the same as local height, retrying in 30s")
time.Sleep(30 * time.Second)
cancelIteration()
continue
}

s.log.Infow("Start Pipeline", "Bootnode height", bootNodeHeight, "Current height", nextHeight-1)
s.log.Infow("Fetching blocks", "Start", nextHeight, "End", nextHeight+min(blockBehind, maxBlocks))

commonIt := s.createIterator(BlockRange{nextHeight, bootNodeHeight})
commonIt := s.createIterator(BlockRange{nextHeight, nextHeight + min(blockBehind, maxBlocks)})
headersAndSigsCh, err := s.genHeadersAndSigs(ctx, commonIt)
if err != nil {
s.logError("Failed to get block headers parts", err)
return
cancelIteration()
continue
}

blockBodiesCh, err := s.genBlockBodies(ctx, commonIt)
if err != nil {
s.logError("Failed to get block bodies", err)
return
cancelIteration()
continue
}

txsCh, err := s.genTransactions(ctx, commonIt)
if err != nil {
s.logError("Failed to get transactions", err)
return
cancelIteration()
continue
}

receiptsCh, err := s.genReceipts(ctx, commonIt)
if err != nil {
s.logError("Failed to get receipts", err)
return
cancelIteration()
continue
}

eventsCh, err := s.genEvents(ctx, commonIt)
if err != nil {
s.logError("Failed to get events", err)
return
cancelIteration()
continue
}

// A channel of a specific type cannot be converted to a channel of another type. Therefore, we have to consume/read from the channel
// and change the input to the desired type. The following is not allowed:
// var ch1 chan any = make(chan any)
// var ch2 chan someOtherType = make(chan someOtherType)
// ch2 = (chan any)(ch2) <----- This line will give compilation error.

for b := range pipeline.Bridge(ctx,
s.processSpecBlockParts(ctx, nextHeight,
pipeline.FanIn(ctx,
pipeline.Stage(ctx, headersAndSigsCh, func(i specBlockHeaderAndSigs) specBlockParts { return i }),
pipeline.Stage(ctx, blockBodiesCh, func(i specBlockBody) specBlockParts { return i }),
pipeline.Stage(ctx, txsCh, func(i specTransactions) specBlockParts { return i }),
pipeline.Stage(ctx, receiptsCh, func(i specReceipts) specBlockParts { return i }),
pipeline.Stage(ctx, eventsCh, func(i specEvents) specBlockParts { return i }),
))) {
blocksCh := pipeline.Bridge(ctx, s.processSpecBlockParts(ctx, nextHeight, pipeline.FanIn(ctx,
pipeline.Stage(ctx, headersAndSigsCh, specBlockPartsFunc[specBlockHeaderAndSigs]),
pipeline.Stage(ctx, blockBodiesCh, specBlockPartsFunc[specBlockBody]),
pipeline.Stage(ctx, txsCh, specBlockPartsFunc[specTransactions]),
pipeline.Stage(ctx, receiptsCh, specBlockPartsFunc[specReceipts]),
pipeline.Stage(ctx, eventsCh, specBlockPartsFunc[specEvents]),
)))

for b := range blocksCh {
if b.err != nil {
// cannot process any more blocks
s.log.Errorw("Failed to process block", "err", b.err)
return
cancelIteration()
break
}

err = s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses)
if err != nil {
s.log.Errorw("Failed to Store Block", "number", b.block.Number, "err", err)
cancelIteration()
break
} else {
s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(), "root",
b.block.GlobalStateRoot.ShortString())
}
}
cancelIteration()
}
}

func specBlockPartsFunc[T specBlockHeaderAndSigs | specBlockBody | specTransactions | specReceipts | specEvents](i T) specBlockParts {
return specBlockParts(i)
}

func (s *syncService) logError(msg string, err error) {
if !errors.Is(err, context.Canceled) {
s.log.Errorw(msg, "err", err)
}
}

func (s *syncService) processSpecBlockParts(ctx context.Context, startingBlockNum uint64,
specBlockPartsCh <-chan specBlockParts,
) <-chan <-chan blockBody {
func (s *syncService) processSpecBlockParts(ctx context.Context, startingBlockNum uint64, specBlockPartsCh <-chan specBlockParts) <-chan <-chan blockBody {
orderedBlockBodiesCh := make(chan (<-chan blockBody))

go func() {
Expand Down Expand Up @@ -337,7 +351,9 @@ func (s *syncService) genHeadersAndSigs(ctx context.Context, it *spec.Iteration)

select {
case <-ctx.Done():
return
// Consume everything from the stream which will eventually break the for iteration loop.
// Todo: modify the usage of stream, so that we use libp2p network stream which gives the ability to close it.
continue
case headersAndSigCh <- headerAndSig:
}
}
Expand Down

0 comments on commit 4f2d63a

Please sign in to comment.