diff --git a/check_blocks.go b/check_blocks.go index 19ca98f..a7685c2 100644 --- a/check_blocks.go +++ b/check_blocks.go @@ -8,6 +8,7 @@ import ( "strconv" "github.com/golang/protobuf/proto" + "github.com/remeh/sizedwaitgroup" "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" "github.com/streamingfast/dstore" @@ -26,7 +27,109 @@ const ( MaxUint64 = ^uint64(0) ) -func CheckMergedBlocks( +type jobInfo struct { + jobId uint64 + blockRange BlockRange +} + +type logInfo struct { + jobId uint64 + message string + isDone bool +} + +type jobContext struct { + ctx context.Context + logger *zap.Logger + storeURL string + fileBlockSize uint32 + blockRange BlockRange + blockPrinter func(block *bstream.Block) + printDetails PrintDetails + isDone bool + logs chan logInfo +} + +func (jc *jobContext) worker(jobs <-chan jobInfo, results chan<- error, swg *sizedwaitgroup.SizedWaitGroup) { + for j := range jobs { + results <- CheckMergedBlocks(jc.ctx, jc.logger, jc.storeURL, jc.fileBlockSize, j.blockRange, jc.blockPrinter, jc.printDetails, j.jobId, jc.logs) + swg.Done() + if jc.isDone { + break + } + } +} + +func (jc *jobContext) findMinMaxBlock() (uint64, uint64, error) { + var min = MaxUint64 + var max = uint64(0) + + blocksStore, err := dstore.NewDBinStore(jc.storeURL) + if err != nil { + return min, max, err + } + err = blocksStore.Walk(jc.ctx, "", func(filename string) error { + match := numberRegex.FindStringSubmatch(filename) + if match == nil { + return nil + } + baseNum, _ := strconv.ParseUint(match[1], 10, 32) + if baseNum < min { + min = baseNum + } + + baseNum += uint64(jc.fileBlockSize) + if baseNum > max { + max = baseNum + uint64(jc.fileBlockSize) + } + return nil + }) + return min, max, err +} + +func (jc *jobContext) messageLogger(logDone chan<- bool) { + + defer func() { logDone <- true }() + + messages := make(map[uint64]string) + isDone := make(map[uint64]bool) + var currentJobId uint64 = 0 + var maxJobId uint64 = 0 + + for log := range jc.logs { + isDone[log.jobId] = log.isDone + if log.jobId > maxJobId { + maxJobId = log.jobId + } + + if log.jobId == currentJobId { + if messages[currentJobId] != "" { + fmt.Println(messages[currentJobId]) + delete(messages, currentJobId) + } + if log.isDone { + delete(messages, currentJobId) + delete(isDone, currentJobId) + currentJobId++ + if currentJobId > maxJobId && jc.isDone { + break + } + continue + } + fmt.Println(log.message) + } else { + messages[log.jobId] += log.message + } + if isDone[currentJobId] { + delete(messages, currentJobId) + delete(isDone, currentJobId) + fmt.Println(messages[currentJobId]) + currentJobId++ + } + } +} + +func CheckMergedBlocksBatch( ctx context.Context, logger *zap.Logger, storeURL string, @@ -34,8 +137,92 @@ func CheckMergedBlocks( blockRange BlockRange, blockPrinter func(block *bstream.Block), printDetails PrintDetails, + batchSize int, + workers int, +) error { + + jc := jobContext{ + ctx: ctx, + logger: logger, + storeURL: storeURL, + fileBlockSize: fileBlockSize, + blockRange: blockRange, + blockPrinter: blockPrinter, + printDetails: printDetails, + isDone: false, + } + + // find unbounded ranges + var err error + start := blockRange.Start + stop := blockRange.Stop + if blockRange.Unbounded() { + start, stop, err = jc.findMinMaxBlock() + if err != nil { + return err + } + } + + // calculate batchCount + offset := start + totalSize := stop - start + batchCount := totalSize / uint64(batchSize) + extraBatchSize := totalSize % uint64(batchSize) + if extraBatchSize != 0 { + batchCount++ + } + + // limit concurrency + swg := sizedwaitgroup.New(int(workers)) + + // create log channel and message logger + jc.logs = make(chan logInfo, workers*2) + logDone := make(chan bool, 1) + go jc.messageLogger(logDone) + + // create channels and workers + results := make(chan error, batchCount) + jobs := make(chan jobInfo, workers) + + // create workers + for w := 0; w < workers; w++ { + go jc.worker(jobs, results, &swg) + } + + // assign jobs to workers + for j := uint64(0); j < batchCount; j++ { + start := j*uint64(batchSize) + offset + stop := (j+1)*uint64(batchSize) - 1 + offset + if j == batchCount-1 && extraBatchSize != 0 { + stop += -uint64(batchSize) + extraBatchSize + } + blockRange := BlockRange{start, stop} + swg.Add() + jobs <- jobInfo{jobId: j, blockRange: blockRange} + } + + swg.Wait() + jc.isDone = true + <-logDone + return nil +} + +func CheckMergedBlocks( + ctx context.Context, + logger *zap.Logger, + storeURL string, + fileBlockSize uint32, + blockRange BlockRange, + blockPrinter func(block *bstream.Block), + printDetails PrintDetails, jobId uint64, + logs chan<- logInfo, ) error { - fmt.Printf("Checking block holes on %s\n", storeURL) + var msg string + + msg = fmt.Sprintf("Checking block holes on %s\n", storeURL) + logs <- logInfo{jobId, msg, false} + + defer func() { logs <- logInfo{jobId, "", true} }() var expected uint32 var count int @@ -88,12 +275,14 @@ func CheckMergedBlocks( if baseNum32 != expected { // There is no previous valid block range if we are at the ever first seen file if count > 1 { - fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))}) + msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))}) + logs <- logInfo{jobId, msg, false} } // Otherwise, we do not follow last seen element (previous is `100 - 199` but we are `299 - 300`) missingRange := BlockRange{uint64(expected), uint64(RoundToBundleEndBlock(baseNum32-fileBlockSize, fileBlockSize))} - fmt.Printf("❌ Range %s! (Missing, [%s])\n", missingRange, missingRange.ReprocRange()) + msg = fmt.Sprintf("❌ Range %s! (Missing, [%s])\n", missingRange, missingRange.ReprocRange()) + logs <- logInfo{jobId, msg, false} currentStartBlk = baseNum32 holeFound = true @@ -101,7 +290,7 @@ func CheckMergedBlocks( expected = baseNum32 + fileBlockSize if printDetails != PrintNothing { - newSeenFilters, lowestBlockSegment, highestBlockSegment := validateBlockSegment(ctx, blocksStore, filename, fileBlockSize, blockRange, blockPrinter, printDetails, tfdb) + newSeenFilters, lowestBlockSegment, highestBlockSegment := validateBlockSegment(ctx, blocksStore, filename, fileBlockSize, blockRange, blockPrinter, printDetails, tfdb, jobId, logs) for key, filters := range newSeenFilters { seenFilters[key] = filters } @@ -121,7 +310,8 @@ func CheckMergedBlocks( } if count%10000 == 0 { - fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(baseNum32, fileBlockSize))}) + msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(baseNum32, fileBlockSize))}) + logs <- logInfo{jobId, msg, false} currentStartBlk = baseNum32 + fileBlockSize } @@ -146,28 +336,33 @@ func CheckMergedBlocks( if blockRange.Bounded() && (highestBlockSeen < (blockRange.Stop-1) || (lowestBlockSeen > blockRange.Start && lowestBlockSeen > bstream.GetProtocolFirstStreamableBlock)) { - fmt.Printf("🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen)) + msg = fmt.Sprintf("🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen)) + logs <- logInfo{jobId, msg, false} } if tfdb.lastLinkedBlock != nil && tfdb.lastLinkedBlock.Number < highestBlockSeen { - fmt.Printf("🔶 Range %s has issues with forks, last linkable block number: %d\n", BlockRange{uint64(currentStartBlk), highestBlockSeen}, tfdb.lastLinkedBlock.Number) + msg = fmt.Sprintf("🔶 Range %s has issues with forks, last linkable block number: %d\n", BlockRange{uint64(currentStartBlk), highestBlockSeen}, tfdb.lastLinkedBlock.Number) + logs <- logInfo{jobId, msg, false} } else { - fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(highestBlockSeen)}) + msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(highestBlockSeen)}) + logs <- logInfo{jobId, msg, false} } if len(seenFilters) > 0 { - fmt.Println() - fmt.Println("Seen filters") + msg = "\nSeen filters" for _, filters := range seenFilters { - fmt.Printf("- [Include %q, Exclude %q, System %q]\n", filters.Include, filters.Exclude, filters.System) + msg += fmt.Sprintf("- [Include %q, Exclude %q, System %q]\n", filters.Include, filters.Exclude, filters.System) } - fmt.Println() + msg += "\n" + logs <- logInfo{jobId, msg, false} } if holeFound { - fmt.Printf("🆘 Holes found!\n") + msg = fmt.Sprintf("🆘 Holes found!\n") + logs <- logInfo{jobId, msg, false} } else { - fmt.Printf("🆗 No hole found\n") + msg = fmt.Sprintf("🆗 No hole found\n") + logs <- logInfo{jobId, msg, false} } return nil @@ -189,18 +384,23 @@ func validateBlockSegment( blockPrinter func(block *bstream.Block), printDetails PrintDetails, tfdb *trackedForkDB, + jobId uint64, + logs chan<- logInfo, ) (seenFilters map[string]FilteringFilters, lowestBlockSeen, highestBlockSeen uint64) { + var msg string lowestBlockSeen = MaxUint64 reader, err := store.OpenObject(ctx, segment) if err != nil { - fmt.Printf("❌ Unable to read blocks segment %s: %s\n", segment, err) + msg = fmt.Sprintf("❌ Unable to read blocks segment %s: %s\n", segment, err) + logs <- logInfo{jobId, msg, false} return } defer reader.Close() readerFactory, err := bstream.GetBlockReaderFactory.New(reader) if err != nil { - fmt.Printf("❌ Unable to read blocks segment %s: %s\n", segment, err) + msg = fmt.Sprintf("❌ Unable to read blocks segment %s: %s\n", segment, err) + logs <- logInfo{jobId, msg, false} return } @@ -239,12 +439,14 @@ func validateBlockSegment( if printDetails != PrintNothing { // TODO: this print should be under a 'check forkable' flag? - fmt.Printf("🔶 Block #%d is not linkable at this point\n", block.Num()) + msg = fmt.Sprintf("🔶 Block #%d is not linkable at this point\n", block.Num()) + logs <- logInfo{jobId, msg, false} } if tfdb.unlinkableSegmentCount > 99 && tfdb.unlinkableSegmentCount%100 == 0 { // TODO: this print should be under a 'check forkable' flag? - fmt.Printf("❌ Large gap of %d unlinkable blocks found in chain. Last linked block: %d, first Unlinkable block: %d. \n", tfdb.unlinkableSegmentCount, tfdb.lastLinkedBlock.Num(), tfdb.firstUnlinkableBlock.Num()) + msg = fmt.Sprintf("❌ Large gap of %d unlinkable blocks found in chain. Last linked block: %d, first Unlinkable block: %d. \n", tfdb.unlinkableSegmentCount, tfdb.lastLinkedBlock.Num(), tfdb.firstUnlinkableBlock.Num()) + logs <- logInfo{jobId, msg, false} } } else { tfdb.lastLinkedBlock = block @@ -264,11 +466,11 @@ func validateBlockSegment( if printDetails == PrintFull { out, err := jsonpb.MarshalIndentToString(block.ToProtocol().(proto.Message), " ") if err != nil { - fmt.Printf("❌ Unable to print full block %s: %s\n", block.AsRef(), err) + msg = fmt.Sprintf("❌ Unable to print full block %s: %s\n", block.AsRef(), err) + logs <- logInfo{jobId, msg, false} continue } - - fmt.Println(out) + logs <- logInfo{jobId, out, false} } continue @@ -276,14 +478,16 @@ func validateBlockSegment( if block == nil && err == io.EOF { if seenBlockCount < expectedBlockCount(segment, fileBlockSize) { - fmt.Printf("🔶 Segment %s contained only %d blocks (< 100), this can happen on some chains\n", segment, seenBlockCount) + msg = fmt.Sprintf("🔶 Segment %s contained only %d blocks (< 100), this can happen on some chains\n", segment, seenBlockCount) + logs <- logInfo{jobId, msg, false} } return } if err != nil { - fmt.Printf("❌ Unable to read all blocks from segment %s after reading %d blocks: %s\n", segment, seenBlockCount, err) + msg = fmt.Sprintf("❌ Unable to read all blocks from segment %s after reading %d blocks: %s\n", segment, seenBlockCount, err) + logs <- logInfo{jobId, msg, false} return } } diff --git a/go.mod b/go.mod index f1a0ab1..b503b07 100644 --- a/go.mod +++ b/go.mod @@ -8,20 +8,24 @@ require ( github.com/lithammer/dedent v1.1.0 github.com/manifoldco/promptui v0.3.2 github.com/mostynb/go-grpc-compression v1.1.17 - github.com/nicksnyder/go-i18n v1.10.1 // indirect github.com/prometheus/client_golang v1.12.1 + github.com/remeh/sizedwaitgroup v1.0.0 github.com/spf13/cobra v1.4.0 github.com/spf13/viper v1.8.1 github.com/streamingfast/bstream v0.0.2-0.20221017131819-2a7e38be1047 github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648 github.com/streamingfast/firehose v0.1.1-0.20221017171248-8fd3adbe7b4d github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 - github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424 + github.com/streamingfast/logging v0.0.0-20221209193439-bff11742bf4c github.com/streamingfast/pbgo v0.0.6-0.20221014191646-3a05d7bc30c8 github.com/stretchr/testify v1.8.0 go.uber.org/zap v1.21.0 golang.org/x/crypto v0.0.0-20220214200702-86341886e292 google.golang.org/grpc v1.49.0 google.golang.org/protobuf v1.28.0 +) + +require ( + github.com/nicksnyder/go-i18n v1.10.1 // indirect gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20191105091915-95d230a53780 // indirect ) diff --git a/go.sum b/go.sum index fd094b6..8c1be68 100644 --- a/go.sum +++ b/go.sum @@ -484,6 +484,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E= +github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= @@ -539,8 +541,9 @@ github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0/go.mod h1:cTN github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20210908162127-bdc5856d5341/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20220222131651-12c3943aac2e/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= -github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424 h1:qKt1W13L7GXL3xqvD6z2ufSkIy/KDm9oGrfurypC78E= github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU= +github.com/streamingfast/logging v0.0.0-20221209193439-bff11742bf4c h1:dV1ye/S2PiW9uIWvLtMrxWoTLcZS+yhjZDSKEV102Ho= +github.com/streamingfast/logging v0.0.0-20221209193439-bff11742bf4c/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU= github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 h1:xlWSfi1BoPfsHtPb0VEHGUcAdBF208LUiFCwfaVPfLA= github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308/go.mod h1:K1p8Bj/wG34KJvYzPUqtzpndffmpkrVY11u2hkyxCWQ= github.com/streamingfast/pbgo v0.0.6-0.20220629184423-cfd0608e0cf4/go.mod h1:huKwfgTGFIFZMKSVbD5TywClM7zAeBUG/zePZMqvXQQ=