Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel processing mode #1

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 228 additions & 24 deletions check_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"

"github.com/golang/protobuf/proto"
"github.com/remeh/sizedwaitgroup"
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/forkable"
"github.com/streamingfast/dstore"
Expand All @@ -26,16 +27,202 @@ const (
MaxUint64 = ^uint64(0)
)

func CheckMergedBlocks(
type jobInfo struct {
jobId uint64
blockRange BlockRange
}

type logInfo struct {
jobId uint64
message string
isDone bool
}

type jobContext struct {
ctx context.Context
logger *zap.Logger
storeURL string
fileBlockSize uint32
blockRange BlockRange
blockPrinter func(block *bstream.Block)
printDetails PrintDetails
isDone bool
logs chan logInfo
}

func (jc *jobContext) worker(jobs <-chan jobInfo, results chan<- error, swg *sizedwaitgroup.SizedWaitGroup) {
for j := range jobs {
results <- CheckMergedBlocks(jc.ctx, jc.logger, jc.storeURL, jc.fileBlockSize, j.blockRange, jc.blockPrinter, jc.printDetails, j.jobId, jc.logs)
swg.Done()
if jc.isDone {
break
}
}
}

func (jc *jobContext) findMinMaxBlock() (uint64, uint64, error) {
var min = MaxUint64
var max = uint64(0)

blocksStore, err := dstore.NewDBinStore(jc.storeURL)
if err != nil {
return min, max, err
}
err = blocksStore.Walk(jc.ctx, "", func(filename string) error {
match := numberRegex.FindStringSubmatch(filename)
if match == nil {
return nil
}
baseNum, _ := strconv.ParseUint(match[1], 10, 32)
if baseNum < min {
min = baseNum
}

baseNum += uint64(jc.fileBlockSize)
if baseNum > max {
max = baseNum + uint64(jc.fileBlockSize)
}
return nil
})
return min, max, err
}

func (jc *jobContext) messageLogger(logDone chan<- bool) {

defer func() { logDone <- true }()

messages := make(map[uint64]string)
isDone := make(map[uint64]bool)
var currentJobId uint64 = 0
var maxJobId uint64 = 0

for log := range jc.logs {
isDone[log.jobId] = log.isDone
if log.jobId > maxJobId {
maxJobId = log.jobId
}

if log.jobId == currentJobId {
if messages[currentJobId] != "" {
fmt.Println(messages[currentJobId])
delete(messages, currentJobId)
}
if log.isDone {
delete(messages, currentJobId)
delete(isDone, currentJobId)
currentJobId++
if currentJobId > maxJobId && jc.isDone {
break
}
continue
}
fmt.Println(log.message)
} else {
messages[log.jobId] += log.message
}
if isDone[currentJobId] {
delete(messages, currentJobId)
delete(isDone, currentJobId)
fmt.Println(messages[currentJobId])
currentJobId++
}
}
}

func CheckMergedBlocksBatch(
ctx context.Context,
logger *zap.Logger,
storeURL string,
fileBlockSize uint32,
blockRange BlockRange,
blockPrinter func(block *bstream.Block),
printDetails PrintDetails,
batchSize int,
workers int,
) error {

jc := jobContext{
ctx: ctx,
logger: logger,
storeURL: storeURL,
fileBlockSize: fileBlockSize,
blockRange: blockRange,
blockPrinter: blockPrinter,
printDetails: printDetails,
isDone: false,
}

// find unbounded ranges
var err error
start := blockRange.Start
stop := blockRange.Stop
if blockRange.Unbounded() {
start, stop, err = jc.findMinMaxBlock()
if err != nil {
return err
}
}

// calculate batchCount
offset := start
totalSize := stop - start
batchCount := totalSize / uint64(batchSize)
extraBatchSize := totalSize % uint64(batchSize)
if extraBatchSize != 0 {
batchCount++
}

// limit concurrency
swg := sizedwaitgroup.New(int(workers))

// create log channel and message logger
jc.logs = make(chan logInfo, workers*2)
logDone := make(chan bool, 1)
go jc.messageLogger(logDone)

// create channels and workers
results := make(chan error, batchCount)
jobs := make(chan jobInfo, workers)

// create workers
for w := 0; w < workers; w++ {
go jc.worker(jobs, results, &swg)
}

// assign jobs to workers
for j := uint64(0); j < batchCount; j++ {
start := j*uint64(batchSize) + offset
stop := (j+1)*uint64(batchSize) - 1 + offset
if j == batchCount-1 && extraBatchSize != 0 {
stop += -uint64(batchSize) + extraBatchSize
}
blockRange := BlockRange{start, stop}
swg.Add()
jobs <- jobInfo{jobId: j, blockRange: blockRange}
}

swg.Wait()
jc.isDone = true
<-logDone
return nil
}

func CheckMergedBlocks(
ctx context.Context,
logger *zap.Logger,
storeURL string,
fileBlockSize uint32,
blockRange BlockRange,
blockPrinter func(block *bstream.Block),
printDetails PrintDetails, jobId uint64,
logs chan<- logInfo,
) error {
fmt.Printf("Checking block holes on %s\n", storeURL)
var msg string

msg = fmt.Sprintf("Checking block holes on %s\n", storeURL)
logs <- logInfo{jobId, msg, false}

defer func() { logs <- logInfo{jobId, "", true} }()

var expected uint32
var count int
Expand Down Expand Up @@ -88,20 +275,22 @@ func CheckMergedBlocks(
if baseNum32 != expected {
// There is no previous valid block range if we are at the ever first seen file
if count > 1 {
fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))})
msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))})
logs <- logInfo{jobId, msg, false}
}

// Otherwise, we do not follow last seen element (previous is `100 - 199` but we are `299 - 300`)
missingRange := BlockRange{uint64(expected), uint64(RoundToBundleEndBlock(baseNum32-fileBlockSize, fileBlockSize))}
fmt.Printf("❌ Range %s! (Missing, [%s])\n", missingRange, missingRange.ReprocRange())
msg = fmt.Sprintf("❌ Range %s! (Missing, [%s])\n", missingRange, missingRange.ReprocRange())
logs <- logInfo{jobId, msg, false}
currentStartBlk = baseNum32

holeFound = true
}
expected = baseNum32 + fileBlockSize

if printDetails != PrintNothing {
newSeenFilters, lowestBlockSegment, highestBlockSegment := validateBlockSegment(ctx, blocksStore, filename, fileBlockSize, blockRange, blockPrinter, printDetails, tfdb)
newSeenFilters, lowestBlockSegment, highestBlockSegment := validateBlockSegment(ctx, blocksStore, filename, fileBlockSize, blockRange, blockPrinter, printDetails, tfdb, jobId, logs)
for key, filters := range newSeenFilters {
seenFilters[key] = filters
}
Expand All @@ -121,7 +310,8 @@ func CheckMergedBlocks(
}

if count%10000 == 0 {
fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(baseNum32, fileBlockSize))})
msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(RoundToBundleEndBlock(baseNum32, fileBlockSize))})
logs <- logInfo{jobId, msg, false}
currentStartBlk = baseNum32 + fileBlockSize
}

Expand All @@ -146,28 +336,33 @@ func CheckMergedBlocks(
if blockRange.Bounded() &&
(highestBlockSeen < (blockRange.Stop-1) ||
(lowestBlockSeen > blockRange.Start && lowestBlockSeen > bstream.GetProtocolFirstStreamableBlock)) {
fmt.Printf("🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen))
msg = fmt.Sprintf("🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen))
logs <- logInfo{jobId, msg, false}
}

if tfdb.lastLinkedBlock != nil && tfdb.lastLinkedBlock.Number < highestBlockSeen {
fmt.Printf("🔶 Range %s has issues with forks, last linkable block number: %d\n", BlockRange{uint64(currentStartBlk), highestBlockSeen}, tfdb.lastLinkedBlock.Number)
msg = fmt.Sprintf("🔶 Range %s has issues with forks, last linkable block number: %d\n", BlockRange{uint64(currentStartBlk), highestBlockSeen}, tfdb.lastLinkedBlock.Number)
logs <- logInfo{jobId, msg, false}
} else {
fmt.Printf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(highestBlockSeen)})
msg = fmt.Sprintf("✅ Range %s\n", BlockRange{uint64(currentStartBlk), uint64(highestBlockSeen)})
logs <- logInfo{jobId, msg, false}
}

if len(seenFilters) > 0 {
fmt.Println()
fmt.Println("Seen filters")
msg = "\nSeen filters"
for _, filters := range seenFilters {
fmt.Printf("- [Include %q, Exclude %q, System %q]\n", filters.Include, filters.Exclude, filters.System)
msg += fmt.Sprintf("- [Include %q, Exclude %q, System %q]\n", filters.Include, filters.Exclude, filters.System)
}
fmt.Println()
msg += "\n"
logs <- logInfo{jobId, msg, false}
}

if holeFound {
fmt.Printf("🆘 Holes found!\n")
msg = fmt.Sprintf("🆘 Holes found!\n")
logs <- logInfo{jobId, msg, false}
} else {
fmt.Printf("🆗 No hole found\n")
msg = fmt.Sprintf("🆗 No hole found\n")
logs <- logInfo{jobId, msg, false}
}

return nil
Expand All @@ -189,18 +384,23 @@ func validateBlockSegment(
blockPrinter func(block *bstream.Block),
printDetails PrintDetails,
tfdb *trackedForkDB,
jobId uint64,
logs chan<- logInfo,
) (seenFilters map[string]FilteringFilters, lowestBlockSeen, highestBlockSeen uint64) {
var msg string
lowestBlockSeen = MaxUint64
reader, err := store.OpenObject(ctx, segment)
if err != nil {
fmt.Printf("❌ Unable to read blocks segment %s: %s\n", segment, err)
msg = fmt.Sprintf("❌ Unable to read blocks segment %s: %s\n", segment, err)
logs <- logInfo{jobId, msg, false}
return
}
defer reader.Close()

readerFactory, err := bstream.GetBlockReaderFactory.New(reader)
if err != nil {
fmt.Printf("❌ Unable to read blocks segment %s: %s\n", segment, err)
msg = fmt.Sprintf("❌ Unable to read blocks segment %s: %s\n", segment, err)
logs <- logInfo{jobId, msg, false}
return
}

Expand Down Expand Up @@ -239,12 +439,14 @@ func validateBlockSegment(

if printDetails != PrintNothing {
// TODO: this print should be under a 'check forkable' flag?
fmt.Printf("🔶 Block #%d is not linkable at this point\n", block.Num())
msg = fmt.Sprintf("🔶 Block #%d is not linkable at this point\n", block.Num())
logs <- logInfo{jobId, msg, false}
}

if tfdb.unlinkableSegmentCount > 99 && tfdb.unlinkableSegmentCount%100 == 0 {
// TODO: this print should be under a 'check forkable' flag?
fmt.Printf("❌ Large gap of %d unlinkable blocks found in chain. Last linked block: %d, first Unlinkable block: %d. \n", tfdb.unlinkableSegmentCount, tfdb.lastLinkedBlock.Num(), tfdb.firstUnlinkableBlock.Num())
msg = fmt.Sprintf("❌ Large gap of %d unlinkable blocks found in chain. Last linked block: %d, first Unlinkable block: %d. \n", tfdb.unlinkableSegmentCount, tfdb.lastLinkedBlock.Num(), tfdb.firstUnlinkableBlock.Num())
logs <- logInfo{jobId, msg, false}
}
} else {
tfdb.lastLinkedBlock = block
Expand All @@ -264,26 +466,28 @@ func validateBlockSegment(
if printDetails == PrintFull {
out, err := jsonpb.MarshalIndentToString(block.ToProtocol().(proto.Message), " ")
if err != nil {
fmt.Printf("❌ Unable to print full block %s: %s\n", block.AsRef(), err)
msg = fmt.Sprintf("❌ Unable to print full block %s: %s\n", block.AsRef(), err)
logs <- logInfo{jobId, msg, false}
continue
}

fmt.Println(out)
logs <- logInfo{jobId, out, false}
}

continue
}

if block == nil && err == io.EOF {
if seenBlockCount < expectedBlockCount(segment, fileBlockSize) {
fmt.Printf("🔶 Segment %s contained only %d blocks (< 100), this can happen on some chains\n", segment, seenBlockCount)
msg = fmt.Sprintf("🔶 Segment %s contained only %d blocks (< 100), this can happen on some chains\n", segment, seenBlockCount)
logs <- logInfo{jobId, msg, false}
}

return
}

if err != nil {
fmt.Printf("❌ Unable to read all blocks from segment %s after reading %d blocks: %s\n", segment, seenBlockCount, err)
msg = fmt.Sprintf("❌ Unable to read all blocks from segment %s after reading %d blocks: %s\n", segment, seenBlockCount, err)
logs <- logInfo{jobId, msg, false}
return
}
}
Expand Down
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@ require (
github.com/lithammer/dedent v1.1.0
github.com/manifoldco/promptui v0.3.2
github.com/mostynb/go-grpc-compression v1.1.17
github.com/nicksnyder/go-i18n v1.10.1 // indirect
github.com/prometheus/client_golang v1.12.1
github.com/remeh/sizedwaitgroup v1.0.0
github.com/spf13/cobra v1.4.0
github.com/spf13/viper v1.8.1
github.com/streamingfast/bstream v0.0.2-0.20221017131819-2a7e38be1047
github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648
github.com/streamingfast/firehose v0.1.1-0.20221017171248-8fd3adbe7b4d
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0
github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424
github.com/streamingfast/logging v0.0.0-20221209193439-bff11742bf4c
github.com/streamingfast/pbgo v0.0.6-0.20221014191646-3a05d7bc30c8
github.com/stretchr/testify v1.8.0
go.uber.org/zap v1.21.0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.0
)

require (
github.com/nicksnyder/go-i18n v1.10.1 // indirect
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20191105091915-95d230a53780 // indirect
)
Loading