Skip to content

Commit

Permalink
bench: add workers into rate mode
Browse files Browse the repository at this point in the history
We need to run multiple workers for the rate mode and share the load
between them.

Close #170

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed May 16, 2024
1 parent b0d351e commit 7b43f94
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
14 changes: 4 additions & 10 deletions cmd/bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
defer cancel()

var (
workers int
workers = v.GetInt("workers")
rate int
msPerBlock int
threshold time.Duration
Expand All @@ -44,18 +44,12 @@ func main() {
client *internal.RPCClient
)

switch mode {
case internal.ModeWorker:
workers = v.GetInt("workers")
client = internal.NewRPCClient(v, workers)

case internal.ModeRate:
workers = 1
if mode == internal.ModeRate {
rate = v.GetInt("rateLimit")
threshold = time.Duration(time.Second.Nanoseconds() / int64(rate))
client = internal.NewRPCClient(v, 1)
threshold = time.Duration(time.Second.Nanoseconds() / int64(rate) * int64(workers))
}

client = internal.NewRPCClient(v, workers)
version, err := client.GetVersion(ctx)
if err != nil {
log.Fatalf("could not receive RPC Node version: %v", err)
Expand Down
12 changes: 7 additions & 5 deletions cmd/internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func NewWorkers(opts ...WorkerOption) (Worker, error) {

switch p.mode {
case ModeRate:
log.Printf("Init worker with %d QPS / %s time limit (%d txs will try to send)", p.rate, p.timeLimit, ln)
log.Printf("Init %d workers with %d QPS / %s time limit (%d txs will try to send)", p.wrkCount, p.rate, p.timeLimit, ln)
case ModeWorker:
log.Printf("Init %d workers / %s time limit (%d txs will try to send)", p.wrkCount, p.timeLimit, ln)
}
Expand All @@ -211,8 +211,9 @@ func NewWorkers(opts ...WorkerOption) (Worker, error) {
// idx defines the order of the transaction being sent and can be more than overall transactions count, because retransmission is supported.
func (d *doer) worker(ctx context.Context, idx *atomic.Int64, start time.Time) {
var (
done = ctx.Done()
timer = time.NewTimer(d.timeLimit)
done = ctx.Done()
timer = time.NewTimer(d.timeLimit)
localTxCounter int64
)

defer func() {
Expand All @@ -228,7 +229,7 @@ loop:
case <-timer.C:
return
default:
i := idx.Add(1)
idx.Add(1)
if d.dump.TransactionsQueue.Len() == 0 {
return
}
Expand All @@ -255,10 +256,11 @@ loop:

since := time.Since(start)
count := d.countTxs.Add(1)
localTxCounter++
d.rpsReporter(float64(count) / since.Seconds())

if d.threshold > 0 {
waitFor := time.Until(start.Add(time.Duration(d.threshold.Nanoseconds() * (i + 1))))
waitFor := time.Until(start.Add(time.Duration(d.threshold.Nanoseconds() * (localTxCounter + 1))))
if waitFor > 0 {
time.Sleep(waitFor)
}
Expand Down
7 changes: 6 additions & 1 deletion runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,12 @@ else
fatal "Invalid validator count: $NEOBENCH_VALIDATOR_COUNT"
fi

OUTPUT="/out/${OUTPUT}_${MODE}_${COUNT}.log"
if [ "rate" = "$MODE" ]; then
OUTPUT="/out/${OUTPUT}_${MODE}_${COUNT}_workers_${WORKERS_COUNT}.log"
else
OUTPUT="/out/${OUTPUT}_${MODE}_${COUNT}"
fi

if [ ${#RPC_ADDR[@]} -eq 0 ]; then
ARGS+=("${DEFAULT_RPC_ADDR[@]}")
else
Expand Down

0 comments on commit 7b43f94

Please sign in to comment.