Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
ordered msgs during batch operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Francois Blanchette committed Feb 1, 2023
1 parent bdfb410 commit 8eba728
Showing 1 changed file with 106 additions and 72 deletions.
178 changes: 106 additions & 72 deletions check_blocks.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package sftools

import (
"bufio"
"context"
"fmt"
"io"
"os"
"regexp"
"strconv"
"strings"

"github.com/golang/protobuf/proto"
"github.com/remeh/sizedwaitgroup"
Expand All @@ -31,10 +28,16 @@ const (
)

type jobInfo struct {
id uint64
jobId uint64
blockRange BlockRange
}

type logInfo struct {
jobId uint64
message string
isDone bool
}

type jobContext struct {
ctx context.Context
logger *zap.Logger
Expand All @@ -44,13 +47,15 @@ type jobContext struct {
blockPrinter func(block *bstream.Block)
printDetails PrintDetails
isDone bool
logs chan logInfo
}

func (jc *jobContext) worker(jobs <-chan jobInfo, results chan<- error, swg *sizedwaitgroup.SizedWaitGroup) {
for !jc.isDone {
for j := range jobs {
results <- CheckMergedBlocks(jc.ctx, jc.logger, jc.storeURL, jc.fileBlockSize, j.blockRange, jc.blockPrinter, jc.printDetails)
swg.Done()
for j := range jobs {
results <- CheckMergedBlocks(jc.ctx, jc.logger, jc.storeURL, jc.fileBlockSize, j.blockRange, jc.blockPrinter, jc.printDetails, j.jobId, jc.logs)
swg.Done()
if jc.isDone {
break
}
}
}
Expand Down Expand Up @@ -82,20 +87,46 @@ func (jc *jobContext) findMinMaxBlock() (uint64, uint64, error) {
return min, max, err
}

func filteredOutput(out string) string {
outLines := []string{}
lines := strings.Split(out, "\n")
for i := 0; i < len(lines); i++ {
line := lines[i]
if (strings.HasPrefix(line, "Block #") &&
!strings.Contains(line, "last linkable")) ||
strings.HasPrefix(line, "Checking block holes on") {
continue
func (jc *jobContext) messageLogger(logDone chan<- bool) {

defer func() { logDone <- true }()

messages := make(map[uint64]string)
isDone := make(map[uint64]bool)
var currentJobId uint64 = 0
var maxJobId uint64 = 0

for log := range jc.logs {
isDone[log.jobId] = log.isDone
if log.jobId > maxJobId {
maxJobId = log.jobId
}
outLines = append(outLines, line)

if log.jobId == currentJobId {
if messages[currentJobId] != "" {
fmt.Println(messages[currentJobId])
delete(messages, uint64(currentJobId))
}
if log.isDone {
delete(messages, currentJobId)
delete(isDone, currentJobId)
currentJobId++
if currentJobId > maxJobId && jc.isDone {
break
}
continue
}
fmt.Println(log.message)
} else {
messages[log.jobId] += log.message
}
if isDone[uint64(currentJobId)] {
delete(messages, uint64(currentJobId))
delete(isDone, uint64(currentJobId))
fmt.Println(messages[uint64(currentJobId)])
currentJobId++
}
}
return strings.Join(outLines, "\n")
}

func CheckMergedBlocksBatch(
Expand All @@ -108,7 +139,6 @@ func CheckMergedBlocksBatch(
printDetails PrintDetails,
batchSize int,
workers int,
lessChatter bool,
) error {

jc := jobContext{
Expand All @@ -133,29 +163,6 @@ func CheckMergedBlocksBatch(
}
}

// keep backup of the real stdout
old := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w

// filter the output in a separate goroutine
// interractively, so printing can't block indefinitely
go func() {
reader := bufio.NewReader(r)
var line string
for !jc.isDone {
bytes, _, _ := reader.ReadLine()
if lessChatter {
line = filteredOutput(string(bytes))
} else {
line = string(bytes)
}
if len(line) > 0 {
fmt.Fprintln(old, line)
}
}
}()

// calculate batchCount
offset := start
totalSize := stop - start
Expand All @@ -168,6 +175,11 @@ func CheckMergedBlocksBatch(
// limit concurrency
swg := sizedwaitgroup.New(int(workers))

// create log channel and message logger
jc.logs = make(chan logInfo, workers*2)
logDone := make(chan bool, 1)
go jc.messageLogger(logDone)

// create channels and workers
results := make(chan error, batchCount)
jobs := make(chan jobInfo, workers)
Expand All @@ -186,15 +198,12 @@ func CheckMergedBlocksBatch(
}
blockRange := BlockRange{start, stop}
swg.Add()
jobs <- jobInfo{id: j, blockRange: blockRange}
jobs <- jobInfo{jobId: j, blockRange: blockRange}
}

swg.Wait()
jc.isDone = true

w.Close()
os.Stdout = old // restoring the real stdout

<-logDone
return nil
}

Expand All @@ -206,8 +215,15 @@ func CheckMergedBlocks(
blockRange BlockRange,
blockPrinter func(block *bstream.Block),
printDetails PrintDetails,
jobId uint64,
logs chan logInfo,
) error {
fmt.Printf("Checking block holes on %s\n", storeURL)
var msg string

msg = fmt.Sprintf("Checking block holes on %s\n", storeURL)
logs <- logInfo{jobId, msg, false}

defer func() { logs <- logInfo{jobId, "", true} }()

var expected uint32
var count int
Expand Down Expand Up @@ -254,7 +270,7 @@ func CheckMergedBlocks(
baseNum32 = uint32(baseNum)

if printDetails != PrintNothing {
newSeenFilters, lowestBlockSegment, highestBlockSegment := validateBlockSegment(ctx, blocksStore, filename, fileBlockSize, blockRange, blockPrinter, printDetails, tfdb)
newSeenFilters, lowestBlockSegment, highestBlockSegment := validateBlockSegment(ctx, blocksStore, filename, fileBlockSize, blockRange, blockPrinter, printDetails, tfdb, jobId, logs)
for key, filters := range newSeenFilters {
seenFilters[key] = filters
}
Expand All @@ -277,20 +293,24 @@ func CheckMergedBlocks(
if baseNum32 != expected {
// There is no previous valid block range if we are at the ever first seen file
if count > 1 {
fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))})
msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))})
logs <- logInfo{jobId, msg, false}
}

// Otherwise, we do not follow last seen element (previous is `100 - 199` but we are `299 - 300`)
missingRange := BlockRange{uint64(expected), uint64(RoundToBundleEndBlock(baseNum32-fileBlockSize, fileBlockSize))}
fmt.Printf("❌ Range %s! (Missing, [%s])\n", missingRange, missingRange.ReprocRange())
msg = fmt.Sprintf("❌ Range %s! (Missing, [%s])\n", missingRange, missingRange.ReprocRange())
logs <- logInfo{jobId, msg, false}
currentStartBlk = baseNum32

holeFound = true
}
expected = baseNum32 + fileBlockSize

if count%10000 == 0 {
fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(baseNum32, fileBlockSize))})
msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(baseNum32, fileBlockSize))})
logs <- logInfo{jobId, msg, false}

currentStartBlk = baseNum32 + fileBlockSize
}

Expand All @@ -305,28 +325,33 @@ func CheckMergedBlocks(
}

if !blockRange.Unbounded() && highestBlockSeen < blockRange.Stop || (lowestBlockSeen > blockRange.Start && lowestBlockSeen > bstream.GetProtocolFirstStreamableBlock) {
fmt.Printf("🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen))
msg = fmt.Sprintf("🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen))
logs <- logInfo{jobId, msg, false}
}

if tfdb.lastLinkedBlock != nil && tfdb.lastLinkedBlock.Number < highestBlockSeen {
fmt.Printf("🔶 Range %s has issues with forks, last linkable block number: %d\n", BlockRange{uint64(currentStartBlk), highestBlockSeen}, tfdb.lastLinkedBlock.Number)
msg = fmt.Sprintf("🔶 Range %s has issues with forks, last linkable block number: %d\n", BlockRange{uint64(currentStartBlk), highestBlockSeen}, tfdb.lastLinkedBlock.Number)
logs <- logInfo{jobId, msg, false}
} else {
fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(highestBlockSeen)})
msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(highestBlockSeen)})
logs <- logInfo{jobId, msg, false}
}

if len(seenFilters) > 0 {
fmt.Println()
fmt.Println("Seen filters")
msg = "\nSeen filters\n"
for _, filters := range seenFilters {
fmt.Printf("- [Include %q, Exclude %q, System %q]\n", filters.Include, filters.Exclude, filters.System)
msg += fmt.Sprintf("- [Include %q, Exclude %q, System %q]\n", filters.Include, filters.Exclude, filters.System)
}
fmt.Println()
msg += "\n"
logs <- logInfo{jobId, msg, false}
}

if holeFound {
fmt.Printf("🆘 Holes found!\n")
msg = fmt.Sprintf("🆘 Holes found!\n")
logs <- logInfo{jobId, msg, false}
} else {
fmt.Printf("🆗 No hole found\n")
msg = fmt.Sprintf("🆗 No hole found\n")
logs <- logInfo{jobId, msg, false}
}

return nil
Expand All @@ -348,18 +373,23 @@ func validateBlockSegment(
blockPrinter func(block *bstream.Block),
printDetails PrintDetails,
tfdb *trackedForkDB,
jobId uint64,
logs chan logInfo,
) (seenFilters map[string]FilteringFilters, lowestBlockSeen, highestBlockSeen uint64) {
var msg string
lowestBlockSeen = MaxUint64
reader, err := store.OpenObject(ctx, segment)
if err != nil {
fmt.Printf("❌ Unable to read blocks segment %s: %s\n", segment, err)
msg = fmt.Sprintf("❌ Unable to read blocks segment %s: %s\n", segment, err)
logs <- logInfo{jobId, msg, false}
return
}
defer reader.Close()

readerFactory, err := bstream.GetBlockReaderFactory.New(reader)
if err != nil {
fmt.Printf("❌ Unable to read blocks segment %s: %s\n", segment, err)
msg = fmt.Sprintf("❌ Unable to read blocks segment %s: %s\n", segment, err)
logs <- logInfo{jobId, msg, false}
return
}

Expand Down Expand Up @@ -398,12 +428,14 @@ func validateBlockSegment(

if printDetails != PrintNothing {
// TODO: this print should be under a 'check forkable' flag?
fmt.Printf("🔶 Block #%d is not linkable at this point\n", block.Num())
msg = fmt.Sprintf("🔶 Block #%d is not linkable at this point\n", block.Num())
logs <- logInfo{jobId, msg, false}
}

if tfdb.unlinkableSegmentCount > 99 && tfdb.unlinkableSegmentCount%100 == 0 {
// TODO: this print should be under a 'check forkable' flag?
fmt.Printf("❌ Large gap of %d unlinkable blocks found in chain. Last linked block: %d, first Unlinkable block: %d. \n", tfdb.unlinkableSegmentCount, tfdb.lastLinkedBlock.Num(), tfdb.firstUnlinkableBlock.Num())
msg = fmt.Sprintf("❌ Large gap of %d unlinkable blocks found in chain. Last linked block: %d, first Unlinkable block: %d. \n", tfdb.unlinkableSegmentCount, tfdb.lastLinkedBlock.Num(), tfdb.firstUnlinkableBlock.Num())
logs <- logInfo{jobId, msg, false}
}
} else {
tfdb.lastLinkedBlock = block
Expand All @@ -423,26 +455,28 @@ func validateBlockSegment(
if printDetails == PrintFull {
out, err := jsonpb.MarshalIndentToString(block.ToNative().(proto.Message), " ")
if err != nil {
fmt.Printf("❌ Unable to print full block %s: %s\n", block.AsRef(), err)
msg = fmt.Sprintf("❌ Unable to print full block %s: %s\n", block.AsRef(), err)
logs <- logInfo{jobId, msg, false}
continue
}

fmt.Println(out)
//fmt.Println(out)
logs <- logInfo{jobId, out, false}
}

continue
}

if block == nil && err == io.EOF {
if seenBlockCount < expectedBlockCount(segment, fileBlockSize) {
fmt.Printf("🔶 Segment %s contained only %d blocks (< 100), this can happen on some chains\n", segment, seenBlockCount)
msg = fmt.Sprintf("🔶 Segment %s contained only %d blocks (< 100), this can happen on some chains\n", segment, seenBlockCount)
logs <- logInfo{jobId, msg, false}
}

return
}

if err != nil {
fmt.Printf("❌ Unable to read all blocks from segment %s after reading %d blocks: %s\n", segment, seenBlockCount, err)
msg = fmt.Sprintf("❌ Unable to read all blocks from segment %s after reading %d blocks: %s\n", segment, seenBlockCount, err)
logs <- logInfo{jobId, msg, false}
return
}
}
Expand Down

0 comments on commit 8eba728

Please sign in to comment.