Skip to content

Commit

Permalink
add parallel mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Francois Blanchette committed Feb 1, 2023
1 parent b84f19a commit a74e623
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 27 deletions.
252 changes: 228 additions & 24 deletions check_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"

"github.com/golang/protobuf/proto"
"github.com/remeh/sizedwaitgroup"
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/forkable"
"github.com/streamingfast/dstore"
Expand All @@ -26,16 +27,202 @@ const (
MaxUint64 = ^uint64(0)
)

func CheckMergedBlocks(
type jobInfo struct {
jobId uint64
blockRange BlockRange
}

type logInfo struct {
jobId uint64
message string
isDone bool
}

type jobContext struct {
ctx context.Context
logger *zap.Logger
storeURL string
fileBlockSize uint32
blockRange BlockRange
blockPrinter func(block *bstream.Block)
printDetails PrintDetails
isDone bool
logs chan logInfo
}

func (jc *jobContext) worker(jobs <-chan jobInfo, results chan<- error, swg *sizedwaitgroup.SizedWaitGroup) {
for j := range jobs {
results <- CheckMergedBlocks(jc.ctx, jc.logger, jc.storeURL, jc.fileBlockSize, j.blockRange, jc.blockPrinter, jc.printDetails, j.jobId, jc.logs)
swg.Done()
if jc.isDone {
break
}
}
}

func (jc *jobContext) findMinMaxBlock() (uint64, uint64, error) {
var min = MaxUint64
var max = uint64(0)

blocksStore, err := dstore.NewDBinStore(jc.storeURL)
if err != nil {
return min, max, err
}
err = blocksStore.Walk(jc.ctx, "", func(filename string) error {
match := numberRegex.FindStringSubmatch(filename)
if match == nil {
return nil
}
baseNum, _ := strconv.ParseUint(match[1], 10, 32)
if baseNum < min {
min = baseNum
}

baseNum += uint64(jc.fileBlockSize)
if baseNum > max {
max = baseNum + uint64(jc.fileBlockSize)
}
return nil
})
return min, max, err
}

func (jc *jobContext) messageLogger(logDone chan<- bool) {

defer func() { logDone <- true }()

messages := make(map[uint64]string)
isDone := make(map[uint64]bool)
var currentJobId uint64 = 0
var maxJobId uint64 = 0

for log := range jc.logs {
isDone[log.jobId] = log.isDone
if log.jobId > maxJobId {
maxJobId = log.jobId
}

if log.jobId == currentJobId {
if messages[currentJobId] != "" {
fmt.Println(messages[currentJobId])
delete(messages, currentJobId)
}
if log.isDone {
delete(messages, currentJobId)
delete(isDone, currentJobId)
currentJobId++
if currentJobId > maxJobId && jc.isDone {
break
}
continue
}
fmt.Println(log.message)
} else {
messages[log.jobId] += log.message
}
if isDone[currentJobId] {
delete(messages, currentJobId)
delete(isDone, currentJobId)
fmt.Println(messages[currentJobId])
currentJobId++
}
}
}

func CheckMergedBlocksBatch(
ctx context.Context,
logger *zap.Logger,
storeURL string,
fileBlockSize uint32,
blockRange BlockRange,
blockPrinter func(block *bstream.Block),
printDetails PrintDetails,
batchSize int,
workers int,
) error {

jc := jobContext{
ctx: ctx,
logger: logger,
storeURL: storeURL,
fileBlockSize: fileBlockSize,
blockRange: blockRange,
blockPrinter: blockPrinter,
printDetails: printDetails,
isDone: false,
}

// find unbounded ranges
var err error
start := blockRange.Start
stop := blockRange.Stop
if blockRange.Unbounded() {
start, stop, err = jc.findMinMaxBlock()
if err != nil {
return err
}
}

// calculate batchCount
offset := start
totalSize := stop - start
batchCount := totalSize / uint64(batchSize)
extraBatchSize := totalSize % uint64(batchSize)
if extraBatchSize != 0 {
batchCount++
}

// limit concurrency
swg := sizedwaitgroup.New(int(workers))

// create log channel and message logger
jc.logs = make(chan logInfo, workers*2)
logDone := make(chan bool, 1)
go jc.messageLogger(logDone)

// create channels and workers
results := make(chan error, batchCount)
jobs := make(chan jobInfo, workers)

// create workers
for w := 0; w < workers; w++ {
go jc.worker(jobs, results, &swg)
}

// assign jobs to workers
for j := uint64(0); j < batchCount; j++ {
start := j*uint64(batchSize) + offset
stop := (j+1)*uint64(batchSize) - 1 + offset
if j == batchCount-1 && extraBatchSize != 0 {
stop += -uint64(batchSize) + extraBatchSize
}
blockRange := BlockRange{start, stop}
swg.Add()
jobs <- jobInfo{jobId: j, blockRange: blockRange}
}

swg.Wait()
jc.isDone = true
<-logDone
return nil
}

func CheckMergedBlocks(
ctx context.Context,
logger *zap.Logger,
storeURL string,
fileBlockSize uint32,
blockRange BlockRange,
blockPrinter func(block *bstream.Block),
printDetails PrintDetails, jobId uint64,
logs chan<- logInfo,
) error {
fmt.Printf("Checking block holes on %s\n", storeURL)
var msg string

msg = fmt.Sprintf("Checking block holes on %s\n", storeURL)
logs <- logInfo{jobId, msg, false}

defer func() { logs <- logInfo{jobId, "", true} }()

var expected uint32
var count int
Expand Down Expand Up @@ -88,20 +275,22 @@ func CheckMergedBlocks(
if baseNum32 != expected {
// There is no previous valid block range if we are at the ever first seen file
if count > 1 {
fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))})
msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))})
logs <- logInfo{jobId, msg, false}
}

// Otherwise, we do not follow last seen element (previous is `100 - 199` but we are `299 - 300`)
missingRange := BlockRange{uint64(expected), uint64(RoundToBundleEndBlock(baseNum32-fileBlockSize, fileBlockSize))}
fmt.Printf("❌ Range %s! (Missing, [%s])\n", missingRange, missingRange.ReprocRange())
msg = fmt.Sprintf("❌ Range %s! (Missing, [%s])\n", missingRange, missingRange.ReprocRange())
logs <- logInfo{jobId, msg, false}
currentStartBlk = baseNum32

holeFound = true
}
expected = baseNum32 + fileBlockSize

if printDetails != PrintNothing {
newSeenFilters, lowestBlockSegment, highestBlockSegment := validateBlockSegment(ctx, blocksStore, filename, fileBlockSize, blockRange, blockPrinter, printDetails, tfdb)
newSeenFilters, lowestBlockSegment, highestBlockSegment := validateBlockSegment(ctx, blocksStore, filename, fileBlockSize, blockRange, blockPrinter, printDetails, tfdb, jobId, logs)
for key, filters := range newSeenFilters {
seenFilters[key] = filters
}
Expand All @@ -121,7 +310,8 @@ func CheckMergedBlocks(
}

if count%10000 == 0 {
fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(baseNum32, fileBlockSize))})
msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(baseNum32, fileBlockSize))})
logs <- logInfo{jobId, msg, false}
currentStartBlk = baseNum32 + fileBlockSize
}

Expand All @@ -146,28 +336,33 @@ func CheckMergedBlocks(
if blockRange.Bounded() &&
(highestBlockSeen < (blockRange.Stop-1) ||
(lowestBlockSeen > blockRange.Start && lowestBlockSeen > bstream.GetProtocolFirstStreamableBlock)) {
fmt.Printf("🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen))
msg = fmt.Sprintf("🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen))
logs <- logInfo{jobId, msg, false}
}

if tfdb.lastLinkedBlock != nil && tfdb.lastLinkedBlock.Number < highestBlockSeen {
fmt.Printf("🔶 Range %s has issues with forks, last linkable block number: %d\n", BlockRange{uint64(currentStartBlk), highestBlockSeen}, tfdb.lastLinkedBlock.Number)
msg = fmt.Sprintf("🔶 Range %s has issues with forks, last linkable block number: %d\n", BlockRange{uint64(currentStartBlk), highestBlockSeen}, tfdb.lastLinkedBlock.Number)
logs <- logInfo{jobId, msg, false}
} else {
fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(highestBlockSeen)})
msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(highestBlockSeen)})
logs <- logInfo{jobId, msg, false}
}

if len(seenFilters) > 0 {
fmt.Println()
fmt.Println("Seen filters")
msg = "\nSeen filters"
for _, filters := range seenFilters {
fmt.Printf("- [Include %q, Exclude %q, System %q]\n", filters.Include, filters.Exclude, filters.System)
msg += fmt.Sprintf("- [Include %q, Exclude %q, System %q]\n", filters.Include, filters.Exclude, filters.System)
}
fmt.Println()
msg += "\n"
logs <- logInfo{jobId, msg, false}
}

if holeFound {
fmt.Printf("🆘 Holes found!\n")
msg = fmt.Sprintf("🆘 Holes found!\n")
logs <- logInfo{jobId, msg, false}
} else {
fmt.Printf("🆗 No hole found\n")
msg = fmt.Sprintf("🆗 No hole found\n")
logs <- logInfo{jobId, msg, false}
}

return nil
Expand All @@ -189,18 +384,23 @@ func validateBlockSegment(
blockPrinter func(block *bstream.Block),
printDetails PrintDetails,
tfdb *trackedForkDB,
jobId uint64,
logs chan<- logInfo,
) (seenFilters map[string]FilteringFilters, lowestBlockSeen, highestBlockSeen uint64) {
var msg string
lowestBlockSeen = MaxUint64
reader, err := store.OpenObject(ctx, segment)
if err != nil {
fmt.Printf("❌ Unable to read blocks segment %s: %s\n", segment, err)
msg = fmt.Sprintf("❌ Unable to read blocks segment %s: %s\n", segment, err)
logs <- logInfo{jobId, msg, false}
return
}
defer reader.Close()

readerFactory, err := bstream.GetBlockReaderFactory.New(reader)
if err != nil {
fmt.Printf("❌ Unable to read blocks segment %s: %s\n", segment, err)
msg = fmt.Sprintf("❌ Unable to read blocks segment %s: %s\n", segment, err)
logs <- logInfo{jobId, msg, false}
return
}

Expand Down Expand Up @@ -239,12 +439,14 @@ func validateBlockSegment(

if printDetails != PrintNothing {
// TODO: this print should be under a 'check forkable' flag?
fmt.Printf("🔶 Block #%d is not linkable at this point\n", block.Num())
msg = fmt.Sprintf("🔶 Block #%d is not linkable at this point\n", block.Num())
logs <- logInfo{jobId, msg, false}
}

if tfdb.unlinkableSegmentCount > 99 && tfdb.unlinkableSegmentCount%100 == 0 {
// TODO: this print should be under a 'check forkable' flag?
fmt.Printf("❌ Large gap of %d unlinkable blocks found in chain. Last linked block: %d, first Unlinkable block: %d. \n", tfdb.unlinkableSegmentCount, tfdb.lastLinkedBlock.Num(), tfdb.firstUnlinkableBlock.Num())
msg = fmt.Sprintf("❌ Large gap of %d unlinkable blocks found in chain. Last linked block: %d, first Unlinkable block: %d. \n", tfdb.unlinkableSegmentCount, tfdb.lastLinkedBlock.Num(), tfdb.firstUnlinkableBlock.Num())
logs <- logInfo{jobId, msg, false}
}
} else {
tfdb.lastLinkedBlock = block
Expand All @@ -264,26 +466,28 @@ func validateBlockSegment(
if printDetails == PrintFull {
out, err := jsonpb.MarshalIndentToString(block.ToProtocol().(proto.Message), " ")
if err != nil {
fmt.Printf("❌ Unable to print full block %s: %s\n", block.AsRef(), err)
msg = fmt.Sprintf("❌ Unable to print full block %s: %s\n", block.AsRef(), err)
logs <- logInfo{jobId, msg, false}
continue
}

fmt.Println(out)
logs <- logInfo{jobId, out, false}
}

continue
}

if block == nil && err == io.EOF {
if seenBlockCount < expectedBlockCount(segment, fileBlockSize) {
fmt.Printf("🔶 Segment %s contained only %d blocks (< 100), this can happen on some chains\n", segment, seenBlockCount)
msg = fmt.Sprintf("🔶 Segment %s contained only %d blocks (< 100), this can happen on some chains\n", segment, seenBlockCount)
logs <- logInfo{jobId, msg, false}
}

return
}

if err != nil {
fmt.Printf("❌ Unable to read all blocks from segment %s after reading %d blocks: %s\n", segment, seenBlockCount, err)
msg = fmt.Sprintf("❌ Unable to read all blocks from segment %s after reading %d blocks: %s\n", segment, seenBlockCount, err)
logs <- logInfo{jobId, msg, false}
return
}
}
Expand Down
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@ require (
github.com/lithammer/dedent v1.1.0
github.com/manifoldco/promptui v0.3.2
github.com/mostynb/go-grpc-compression v1.1.17
github.com/nicksnyder/go-i18n v1.10.1 // indirect
github.com/prometheus/client_golang v1.12.1
github.com/remeh/sizedwaitgroup v1.0.0
github.com/spf13/cobra v1.4.0
github.com/spf13/viper v1.8.1
github.com/streamingfast/bstream v0.0.2-0.20221017131819-2a7e38be1047
github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648
github.com/streamingfast/firehose v0.1.1-0.20221017171248-8fd3adbe7b4d
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0
github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424
github.com/streamingfast/logging v0.0.0-20221209193439-bff11742bf4c
github.com/streamingfast/pbgo v0.0.6-0.20221014191646-3a05d7bc30c8
github.com/stretchr/testify v1.8.0
go.uber.org/zap v1.21.0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.0
)

require (
github.com/nicksnyder/go-i18n v1.10.1 // indirect
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20191105091915-95d230a53780 // indirect
)
Loading

0 comments on commit a74e623

Please sign in to comment.