From 283efbea153abef072d487fd71a5b1dd60034e99 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Thu, 16 May 2024 16:56:53 +0300 Subject: [PATCH] cmd: make mempool OOM error handler more adaptive Once mempool OOm error happens, worker should wait for a while until block processing finishes. Make this awaiting interval depend on MSPerBlock protocol setting so that it's possible to perform precise calculations for low MSPerBlock setting. Signed-off-by: Anna Shaleva --- cmd/bench/main.go | 25 ++++++++++++++++--------- cmd/internal/worker.go | 35 ++++++++++++++++++++++------------- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/cmd/bench/main.go b/cmd/bench/main.go index 3c327d0..9ec6e86 100644 --- a/cmd/bench/main.go +++ b/cmd/bench/main.go @@ -33,15 +33,16 @@ func main() { defer cancel() var ( - workers int - rate int - msPerBlock int - threshold time.Duration - dump *internal.Dump - desc = v.GetString("desc") - timeLimit = v.GetDuration("timeLimit") - mode = internal.BenchMode(v.GetString("mode")) - client *internal.RPCClient + workers int + rate int + msPerBlock int + mempoolOOMDelay time.Duration + threshold time.Duration + dump *internal.Dump + desc = v.GetString("desc") + timeLimit = v.GetDuration("timeLimit") + mode = internal.BenchMode(v.GetString("mode")) + client *internal.RPCClient ) switch mode { @@ -61,6 +62,11 @@ func main() { log.Fatalf("could not receive RPC Node version: %v", err) } msPerBlock = version.Protocol.MillisecondsPerBlock + if msPerBlock > 1000 { + mempoolOOMDelay = time.Duration(msPerBlock) * time.Millisecond / 50 + } else { + mempoolOOMDelay = time.Duration(msPerBlock) * time.Millisecond / 10 + } reg := regexp.MustCompile(`[^\w.-]+`) versionStr := strings.Trim(reg.ReplaceAllString(version.UserAgent, "_"), "_") @@ -133,6 +139,7 @@ func main() { internal.WorkerTimeLimit(timeLimit), internal.WorkerThreshold(threshold), internal.WorkerBlockchainClient(client), + internal.WorkerMempoolOOMDelay(mempoolOOMDelay), internal.WorkerRPSReporter(rep.UpdateRPS), internal.WorkerTPSReporter(rep.UpdateTPS), internal.WorkerErrReporter(rep.UpdateErr), diff --git a/cmd/internal/worker.go b/cmd/internal/worker.go index 1b09bd4..1136c1b 100644 --- a/cmd/internal/worker.go +++ b/cmd/internal/worker.go @@ -36,18 +36,19 @@ type ( } doerParams struct { - wrkCount int - cli *RPCClient - mode BenchMode - rate int - threshold time.Duration - timeLimit time.Duration - dump *Dump - cntReporter func(cnt int32) - errReporter func(cnt int32) - rpsReporter func(rps float64) - tpsReporter func(deltaTime uint64, txCount int, tps float64) - stop context.CancelFunc + wrkCount int + cli *RPCClient + mode BenchMode + rate int + threshold time.Duration + timeLimit time.Duration + mempoolOOMDelay time.Duration + dump *Dump + cntReporter func(cnt int32) + errReporter func(cnt int32) + rpsReporter func(rps float64) + tpsReporter func(deltaTime uint64, txCount int, tps float64) + stop context.CancelFunc } // WorkerOption is an option type to configure workers. @@ -75,6 +76,14 @@ func WorkerBlockchainClient(cli *RPCClient) WorkerOption { } } +// WorkerMempoolOOMDelay sets the time interval to pause sender's work after +// mempool OOM error occurred on tx submission. +func WorkerMempoolOOMDelay(delay time.Duration) WorkerOption { + return func(p *doerParams) { + p.mempoolOOMDelay = delay + } +} + // WorkerTimeLimit sets time limit to send requests. func WorkerTimeLimit(limit time.Duration) WorkerOption { return func(p *doerParams) { @@ -244,7 +253,7 @@ loop: log.Printf("failed to re-enqueue transaction: %s\n", err) d.countErr.Add(1) } - time.Sleep(100 * time.Millisecond) + time.Sleep(d.mempoolOOMDelay) } else { d.countErr.Add(1) }